官方网址:https://docs.streamlit.io/
官方网址:https://discuss.streamlit.io/t/streamlit-components-community-tracker/4634
官方网址:https://github.com/streamlit/streamlit
第三方插件网址:https://github.com/arnaudmiribel/streamlit-extras
第三方插件网址:https://github.com/joy13975/streamlit-nested-layout
第三方插件网址:https://github.com/Schluca/streamlit_tree_select
第三方插件网址:https://github.com/blackary/st_pages
第三方插件网址:https://github.com/andfanilo/streamlit-echarts
第三方插件网址:https://github.com/victoryhb/streamlit-option-menu
系统环境:win10
py版本:3.7
安装指令pip install streamlit
安装后测试运行,提示ImportError: cannot import name 'builder' from 'google.protobuf.internal' (D:\Program Files\Python\Python37\lib\site-packages\google\protobuf\internal\__init__.py)
经查说需要更新protobuf来解决。或者将streamlit退到1.13。
所以我最终使用的版本是:
streamlit : 1.13
protobuf : 3.15.6
20230109 由于需要清除缓存功能,所以更改了使用版本
streamlit : 1.15
protobuf : 3.20.0
初次运行
然后第一次运行时候,会提示需要输入一个邮箱来确定使用。
启动程序后
可以通过pycharm修改代码,然后刷新页面来查看新的页面效果。
下面按照需求,在官方文档中找到对应的功能模块,开发调试。
最终效果
样例代码
主页
主要技术采用st_pages作为分页导航设置
import streamlit as st
from pkg.st_pages.src.st_pages import Page, show_pages, add_page_title
# Optional -- adds the title and icon to the current page
add_page_title()
body = '''
这是主页,暂时没有想好写什么,所以先随便写点东西来填充一下
'''
st.markdown(body, unsafe_allow_html=False)
# Specify what pages should be shown in the sidebar, and what their titles and icons
# should be
# https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json
show_pages(
[
Page("main_app.py", "主页", "🏠"),
Page("public_pages/page_1.py", "微观数据", ":rabbit2:"),
Page("public_pages/main_page.py", "宏观数据", ":cow2:"),
# Page("public_pages/page_2.py", "数据覆盖度", ":water_buffalo:"),
]
)
st_pages使用注意事项
请求json问题
在使用https://github.com/blackary/st_pages
这个包时候,不建议直接安装。因为这个包的stc/st_pages/__init__.py
中有一个功能如下:
@st.experimental_singleton
def get_icons() -> dict[str, str]:
url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
return requests.get(url).json()
这里是请求网页上的json,但是在离线环境或者国内的环境就经常请求不到。这里是建议将那个json下载下来,然后放到项目中,然后将代码改成如下:
@st.experimental_singleton
def get_icons() -> dict[str, str]:
# url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
# return requests.get(url).json()
with open('./pkg/templates/emoji.json', 'r', encoding="utf8") as f:
js_data = json.load(f)
return js_data
这样回避访问网络问题。
菜单刷新问题
在使用2级菜单的时候,由于本身该功能包是采用重新写css的方式(li:nth-child)来实现菜单功能的,所以需要所有的次级页面都添加add_page_title()或者add_indentation()来实现菜单层级功能。
主页修改-20230118
原st_pages不太满足需求,故参考st_pages重新写了份类似的功能包streamlit_pages。
main_app.py:导航页面
from pkg.streamlit_pages import add_indentation, Page, Menu, overwrite_pages
from streamlit.source_util import _on_pages_changed, get_pages
pages = [
Page("main_app.py", "主页", "🏠"),
Menu(name="微观数据", icon=":horse:"),
# Menu(name="二级菜单", icon=":horse:", father="微观数据"),
Page("public_pages/page_1.py", "微观数据", ":rabbit2:", father="微观数据"),
Menu(name="宏观数据", icon=":horse:"),
Page("public_pages/page_2.py", "重点指标地区覆盖度", ":rabbit2:", father="宏观数据"),
Page("public_pages/main_page.py", "重点指标数据查询", ":cow2:", father="宏观数据"),
Page("public_pages/page_test.py", "测试页面", ":cow2:"),
]
overwrite_pages(pages)
add_indentation()
body = '''
这是主页,暂时没有想好写什么,所以先随便写点东西来填充一下
'''
st.markdown(body, unsafe_allow_html=False)
with st.container():
with st.expander("检查页面信息-测试开发用"):
for page in pages:
st.write(page.to_dict())
streamlit_pages.init.py:page程序包
# -*- coding:utf-8 -*-
# author: cyz
# time: 2023/1/12 14:02
import os, sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
# os.chdir(os.path.dirname(os.path.abspath(__file__)))
from dataclasses import dataclass
from pathlib import Path
import json
from typing import Union, Dict, Tuple, List
from streamlit.source_util import _on_pages_changed, get_pages
from streamlit.commands.page_config import get_random_emoji
from streamlit.util import calc_md5
import streamlit as st
try:
from streamlit import _gather_metrics # type: ignore
except ImportError:
def _gather_metrics(name, func, *args, **kwargs):
return func
try:
from streamlit.source_util import page_icon_and_name
except ImportError:
from streamlit.source_util import page_name_and_icon # type: ignore
def page_icon_and_name(script_path: Path) -> Tuple[str, str]:
icon, name = page_name_and_icon(script_path)
return name, icon
@dataclass
class Page:
"""
Utility class for working with pages
Parameters
----------
path: str
The path to the page
name: str (optional)
The name of the page. If not provided, the name will be inferred from
the path
icon: str (optional)
The icon of the page. If not provided, the icon will be inferred from
the path
"""
path: str
name: Union[str, None] = None
icon: Union[str, None] = None
is_menu: bool = False
father: Union[str, None] = None
@property
def page_path(self) -> Path:
return Path(str(self.path))
@property
def page_name(self) -> str:
standard_name = page_icon_and_name(self.page_path)[1]
standard_name = standard_name.replace("_", " ").title()
if self.name is None:
return standard_name
return self.name
@property
def page_icon(self) -> str:
standard_icon = page_icon_and_name(self.page_path)[0]
icon = self.icon or standard_icon or ""
return translate_icon(icon)
@property
def page_hash(self) -> str:
if self.is_menu:
return calc_md5(f"{self.page_path}_{self.page_name}")
return calc_md5(str(self.page_path))
def to_dict(self) -> Dict[str, Union[str , bool]]:
return {
"page_script_hash": self.page_hash,
"page_name": self.page_name,
"icon": self.page_icon,
"script_path": str(self.page_path),
"is_menu": self.is_menu,
"father": self.father,
}
@classmethod
def from_dict(cls, page_dict: Dict[str, Union[str , bool]]):
return cls(
path=str(page_dict["script_path"]),
name=str(page_dict["page_name"]),
icon=str(page_dict["icon"]),
is_menu=bool(page_dict["is_menu"]),
father=str(page_dict["father"]),
)
class Menu(Page):
def __init__(self, name: str, icon: Union[str, None] = None, father=None):
super().__init__(path="", name=name, icon=icon, is_menu=True, father=father)
# 原本的shou_pages,感觉这个名字更加切合实际
def _overwrite_pages(pages: List[Page]):
"""
Given a list of Page objects, overwrite whatever pages are currently being
shown in the sidebar, and overwrite them with this new set of pages.
NOTE: This changes the list of pages globally, not just for the current user, so
it is not appropriate for dymaically changing the list of pages.
"""
current_pages: Dict[str, Dict[str, Union[str , bool]]] = get_pages("") # type: ignore
if set(current_pages.keys()) == set(p.page_hash for p in pages):
return
try:
default_page = [p.path for p in pages if p.path][0]
except IndexError:
raise ValueError("Must pass at least one page to show_pages")
for page in pages:
if page.is_menu:
page.path = default_page
current_pages.clear()
for page in pages:
current_pages[page.page_hash] = page.to_dict()
_on_pages_changed.send()
overwrite_pages = _gather_metrics("streamlit_pages.overwrite_pages", _overwrite_pages)
@st.experimental_singleton
def get_icons() -> Dict[str, str]:
# url = "https://raw.githubusercontent.com/omnidan/node-emoji/master/lib/emoji.json"
# return requests.get(url).json()
emoji_path = os.path.dirname(os.path.abspath(__file__)) +'/emoji.json'
with open(emoji_path, 'r', encoding="utf8") as f:
js_data = json.load(f)
return js_data
# @st.experimental_singleton
def get_css() -> str:
# url = "https://github.com/Socvest/streamlit-on-Hover-tabs/blob/main/st_on_hover_tabs/style.css"
emoji_path = os.path.dirname(os.path.abspath(__file__)) +'/style.css'
with open(emoji_path, 'r', encoding="utf8") as f:
css_file = f.read()
return css_file
def translate_icon(icon: str) -> str:
"""
If you pass a name of an icon, like :dog:, translate it into the
corresponding unicode character
"""
icons = get_icons()
if icon == "random":
icon = get_random_emoji()
elif icon.startswith(":") and icon.endswith(":"):
icon = icon[1:-1]
if icon in icons:
return icons[icon]
return icon
def _get_indentation_code(style:str="st-page") -> str:
if style == "st-page":
styling = '''
div[data-testid="stSidebarNav"] {
/* background-color:rgb(0, 0, 0); */
/*color:rgb(255, 255, 255);*/
/*padding-top: 20px;*/
}
'''
current_pages = get_pages("")
menu_info = {}
for idx, val in enumerate(current_pages.values()):
if val.get("is_menu"):
if val.get("father") and (val.get("father") in menu_info.keys()):
menu_margin_left = menu_info[val.get("father")] + 1.5
else:
menu_margin_left = 1
menu_info[val.get("page_name")] = menu_margin_left
styling += f"""
li:nth-child({idx + 1}) a {{
pointer-events: none; /* Disable clicking on section header */
margin-left: {menu_margin_left}rem;
}}
"""
elif val.get("father") and (val.get("father") in menu_info.keys()):
# Unless specifically unnested, indent all pages that aren't section headers
menu_margin_left = menu_info[val.get("father")] + 0.5
styling += f"""
li:nth-child({idx + 1}) span:nth-child(1) {{
margin-left: {menu_margin_left}rem;
}}
"""
styling += get_css()
else:
styling = '''
div[data-testid="stSidebarNav"] {
display: None;
}
'''
styling = f"""
<style>
{styling}
</style>
"""
return styling
def _add_indentation(style:str="st-page"):
"""
For an app that has set one or more "sections", this will add indentation
to the files "within" a section, and make the sections itself
unclickable. Makes the sidebar look like something like this:
- page 1
- section 1
- page 2
- page 3
- section 2
- page 4
"""
styling = _get_indentation_code(style)
st.write(
styling,
unsafe_allow_html=True,
)
# st.markdown('<style>' + get_css() + '</style>', unsafe_allow_html=True)
add_indentation = _gather_metrics("streamlit_pages.add_indentation", _add_indentation)
style.css:我有部分注释掉,跟原版的有些不太相同。
section[data-testid='stSidebar'] {
/*background-color: #111;*/
min-width:unset !important;
width: unset !important;
flex-shrink: unset !important;
}
button[kind="header"] {
background-color: transparent;
color:rgb(180, 167, 141)
}
@media(hover){
/* header element to be removed */
header[data-testid="stHeader"] {
display:none;
}
/* The navigation menu specs and size */
section[data-testid='stSidebar'] > div {
/*height: 100%;*/
width: 95px;
position: relative;
z-index: 1;
top: 0;
left: 0;
/*background-color: #111;*/
overflow-x: hidden;
transition: 0.5s ease;
/*padding-top: 60px;*/
white-space: nowrap;
/*color:rgb(255, 255, 255)*/
}
/* The navigation menu open and close on hover and size */
/* section[data-testid='stSidebar'] > div {
height: 100%;
width: 75px; /* Put some width to hover on. */
/* }
/* ON HOVER */
section[data-testid='stSidebar'] > div:hover{
width: 300px;
}
/* The button on the streamlit navigation menu - hidden */
button[kind="header"] {
display: none;
}
}
@media(max-width: 272px){
section[data-testid='stSidebar'] > div {
width:15rem;
}
}
微观数据页面
这个页面主要是采用直接连接外页面使用的。所以本身代码量不高。
import streamlit as st
import streamlit.components.v1 as components
st.set_page_config(layout="wide", page_title="微观数据")
# https://docs.streamlit.io/library/components/components-api
components.iframe("http://192.168.1.1:5000/", width=1680, height=760, scrolling=True)
宏观数据页面
基础设置
import streamlit as st
import streamlit_nested_layout # 布局必须导入
from streamlit_tree_select import tree_select # 树状选择
from streamlit_echarts import st_pyecharts # 画图
from datetime import datetime
import pandas as pd
from pkg.streamlit_pages import add_indentation # 刷新菜单栏使用
# https://docs.streamlit.io/library/api-reference/utilities/st.set_page_config
st.set_page_config(layout="wide", page_title="宏观数据")
# https://docs.streamlit.io/library/advanced-features/session-state
if 'macro_index_data' not in st.session_state: # 数据状态,就是共享的内存,不会随着代码重运行而导致数据重置
st.session_state.macro_index_data = pd.DataFrame()
add_indentation()
# 页面布局
# https://docs.streamlit.io/library/api-reference/layout/st.columns
col1, col2 = st.columns((4, 1)) # 将页面按照4:1宽度分割成2个区域
with col1:
select_expander=st.expander("详细选项")
expander_col1, expander_col2, expander_col3, expander_col4 = select_expander.columns((1, 1, 1, 1))
数据缓存
# 缓存功能区
# https://docs.streamlit.io/library/api-reference/performance/st.cache
@st.experimental_singleton()
def aaaaaa():
# 请求df数据api
md = MacroData()
return md
md = aaaaaa()
dp = DrawPic()
@st.experimental_memo
def get_index_label():
return md.get_index_label()
按键触发查询数据
查询数据功能
def get_macro_index_data(data):
source = md.get_macro_index_data(data)
source = source.astype(object).where(pd.notnull(source), None)
return source
def selectData(index_code, area_code, dim_cal, dim_dur, frequency, occur_period_star, occur_period_end, model):
data = {
"index_code": index_code,
"area_code": area_code,
"dim_cal": dim_cal,
"dim_dur": dim_dur,
"frequency": frequency,
"model": model
}
if occur_period_star != "":
data["occur_period_star"] = occur_period_star
if occur_period_end != "":
data["occur_period_end"] = occur_period_end
else:
data["occur_period_end"] = datetime.now().strftime("%Y%m")
# st.write('You selected:', data)
macro_index_data = get_macro_index_data(data)
st.session_state.macro_index_data = macro_index_data
按键触发
select_data_flag = col1_1_1.button(label="查询", on_click=selectData,
args=(index_code["index_code"].tolist(),
area_code,
dim_cal["cur_code"].tolist(),
index_code["dim_dur"].tolist(),
index_code["frequency"].tolist(),
occur_period_star,
occur_period_end,
model
)
)
下载数据
下载功能
with open('./pkg/templates/jquery-3.2.1.min.js', 'r') as f:
jquery = f.read()
def downloadFile(df, path, **kwargs):
# https://discuss.streamlit.io/t/automatic-download-select-and-download-file-with-single-button-click/15141/4
index = kwargs.get("index", False)
df.to_excel(path, index=index)
with open(path, "rb") as file:
bytes = file.read()
os.remove(path)
b64 = base64.b64encode(bytes).decode()
href = f"""
<html>
<head>
<title>Start Auto Download file</title>
<script language="javascript">
{jquery}
</script>
<script>
$('<a href="data:text/xlsx;base64,{b64}" download="{path.split("/")[1]}">')[0].click()
</script>
</head>
</html>"""
components.html(href, height=0)
这里的js是网页连接上的js,由于网络问题导致经常访问不了,所以这里也是改成读取本地js文件的写法。
按键触发
download_data_flag = col1_1_2.button(label="下载", on_click=downloadFile, args=(st.session_state.macro_index_data, "download/数据表.xlsx",))
选择按钮
单项选择
data_model_select = st.selectbox("查询的数据类型:", ("数据表", "判断表", "图表"))
带全选的多项选择
def multiselectContainer(label:str, item:list):
# https://discuss.streamlit.io/t/select-all-on-a-streamlit-multiselect/9799/2
container = st.container()
key = "全选" + label
select_all = st.checkbox("全选", key=key)
if select_all:
select = container.multiselect(label + ":", (item), (item))
else:
select = container.multiselect(label + ":", (item))
return select
multiselectContainer("指标名称", base_dim_index["index_name"].tolist())
树状结构选择
def treeSelectNodes(source):
cur_level = sorted(source["cur_level"].unique().tolist(), reverse=True)
nodes = []
detail_col = ["country_code", "province_code", "city_code", "area_code", "street_code", "community_code"]
for index, row in source.iterrows():
key = set(row[detail_col].fillna("").tolist())
node = {"label": row.cur_name, "value": row.cur_code, "children": []}
nodes.append([key, node, row["cur_level"]])
nodes_df = pd.DataFrame(nodes,columns=["key", "node", "cur_level"])
nodes_df["is_use"] = False
for i in range(len(cur_level) - 1):
tmp = nodes_df[(nodes_df["cur_level"] >= cur_level[i]) &
(nodes_df["is_use"] == False)
] # 小于等于当前层级
tmp_1 = nodes_df[nodes_df["cur_level"] == cur_level[i + 1]] # 上一层级
for index, row in tmp.iterrows():
for tem_1_key in tmp_1["key"].tolist():
if tem_1_key.issubset(row["key"]) : # 判断是否包含关系
node = nodes_df[nodes_df["key"] == row["key"]]["node"].tolist()[0]
node_1 = nodes_df[nodes_df["key"] == tem_1_key]["node"].tolist()[0]
node_1["children"].append(node)
nodes_df.loc[nodes_df["key"] == row["key"], "is_use"] = True
nodes_df.loc[nodes_df["key"] == tem_1_key, "node"] \
= nodes_df.loc[nodes_df["key"] == tem_1_key, "node"].apply(lambda x: node_1)
return nodes_df[nodes_df["is_use"] == False]["node"].tolist()
@st.experimental_memo
def tree_select_nodes(area_code_list:list):
source = md.get_dim_admin_area(area_code_list)
nodes = treeSelectNodes(source)
return nodes
def tree_select_container(label ,area_code_list, value_node_dict):
container = st.container()
nodes = tree_select_nodes(area_code_list)
select_all = st.checkbox("全选", key="tree_select_all")
if select_all:
return_select = tree_select(nodes, checked=area_code_list, check_model="all", no_cascade=True, show_expand_all=True)
else:
return_select = tree_select(nodes, check_model="all", no_cascade=True, show_expand_all=True)
show_select = [value_node_dict[i] for i in return_select["checked"]]
container.text_area(label, ",".join(show_select), disabled=True, height=130)
# st.write(return_select)
return return_select["checked"]
area_code = tree_select_container("已选择行政区划",
base_dim_area["area_code"].tolist(),
base_dim_area.set_index("area_code").to_dict()["area_name"])
勾选后文字划掉
# https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/stodo/__init__.py
def to_do(st_commands, checkbox_id:str, value:bool=False):
"""Create a to_do item
Args:
st_commands (_type_): _description_
checkbox_id (_type_): _description_
Returns:
_type_: _description_
"""
container = st.container()
cols = container.columns((1, 100))
done = cols[0].checkbox("", key=checkbox_id, value=value)
if done:
for (cmd, *args) in st_commands:
with cols[1]:
if cmd == st.write:
text = args[0]
cols[1].write(
"<s style='color: rgba(49, 51, 63, 0.4)'>" f" {text} </s>",
unsafe_allow_html=True,
)
else:
if cmd in (
st.slider,
st.button,
st.checkbox,
st.time_input,
st.color_picker,
st.selectbox,
st.camera_input,
st.radio,
st.date_input,
st.multiselect,
st.text_area,
st.text_input,
):
cmd(*args, disabled=True)
else:
cmd(*args)
else:
for (cmd, *args) in st_commands:
with cols[1]:
if cmd == st.write:
st.write(*args, unsafe_allow_html=True)
else:
cmd(*args)
st.write("")
return done
to_do(
[(st.write, "☕ Take my coffee"), (st.write, "🥞 Have a nice breakfast")],
"coffee",
)
to_do(
[(st.write, "☕ Take my coffee")],
"coffee1",
)
页面跳转
# ===============================页面跳转=========================
def switch_page(page_name: str):
# https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/switch_page_button/__init__.py
from streamlit.runtime.scriptrunner import RerunData, RerunException
from streamlit.source_util import get_pages
def standardize_name(name: str) -> str:
return name.lower().replace("_", " ")
page_name = standardize_name(page_name)
pages = get_pages("")
for page_hash, config in pages.items():
if standardize_name(config["page_name"]) == page_name:
raise RerunException(
RerunData(
page_script_hash=page_hash,
page_name=page_name,
)
)
page_names = [standardize_name(config["page_name"]) for config in pages.values()]
raise ValueError(f"Could not find page {page_name}. Must be one of {page_names}")
want_to_contribute = st.button("重点指标地区覆盖度!")
if want_to_contribute:
switch_page("重点指标地区覆盖度")
# ===============================页面跳转=========================
# ===============================页面跳转1=========================
from streamlit.components.v1 import html
import urllib.parse
def nav_page(page_name, timeout_secs=3):
# https://github.com/streamlit/streamlit/issues/4832
page_name = urllib.parse.quote(page_name.encode('utf8'))
nav_script = """
<script type="text/javascript">
function attempt_nav_page(page_name, start_time, timeout_secs) {
var links = window.parent.document.getElementsByTagName("a");
for (var i = 0; i < links.length; i++) {
if (links[i].href.toLowerCase().endsWith("/" + page_name.toLowerCase())) {
links[i].click();
return;
}
}
var elasped = new Date() - start_time;
if (elasped < timeout_secs * 1000) {
setTimeout(attempt_nav_page, 100, page_name, start_time, timeout_secs);
} else {
alert("Unable to navigate to page '" + page_name + "' after " + timeout_secs + " second(s).");
}
}
window.addEventListener("load", function() {
attempt_nav_page("%s", new Date(), %d);
});
</script>
""" % (page_name, timeout_secs)
html(nav_script)
if st.button("重点指标数据查询"):
nav_page("重点指标数据查询")
# ===============================页面跳转1=========================
数据展示
无数据时提醒
st.warning('warning: 没有查询到数据', icon="⚠️")
表格展示-带筛选器
def dataframe_explorer(df: pd.DataFrame) -> pd.DataFrame:
# https://github.com/arnaudmiribel/streamlit-extras/blob/main/src/streamlit_extras/dataframe_explorer/__init__.py
"""
Adds a UI on top of a dataframe to let viewers filter columns
Args:
df (pd.DataFrame): Original dataframe
Returns:
pd.DataFrame: Filtered dataframe
"""
random_key_base = pd.util.hash_pandas_object(df)
df = df.copy()
# Try to convert datetimes into standard format (datetime, no timezone)
# for col in df.columns:
# if is_object_dtype(df[col]):
# try:
# df[col] = pd.to_datetime(df[col])
# except Exception:
# pass
#
# if is_datetime64_any_dtype(df[col]):
# df[col] = df[col].dt.tz_localize(None)
modification_container = st.container()
with modification_container:
to_filter_columns = st.multiselect(
"数据表筛选列", # "Filter dataframe on",
df.columns,
key=f"{random_key_base}_multiselect",
)
filters: Dict[str, Any] = dict()
for column in to_filter_columns:
left, right = st.columns((1, 20))
# Treat columns with < 10 unique values as categorical
if is_categorical_dtype(df[column]) or df[column].nunique() < 10:
left.write("↳")
filters[column] = right.multiselect(
f"{column}的数据值", # f"Values for {column}",
df[column].unique(),
default=df[column].unique().tolist(),
key=f"{random_key_base}_{column}",
)
df = df[df[column].isin(filters[column])]
elif is_numeric_dtype(df[column]):
left.write("↳")
_min = float(df[column].min())
_max = float(df[column].max())
step = (_max - _min) / 100
filters[column] = right.slider(
f"{column}的数据值", # f"Values for {column}",
_min,
_max,
(_min, _max),
step=step,
key=f"{random_key_base}_{column}",
)
df = df[df[column].between(*filters[column])]
elif is_datetime64_any_dtype(df[column]):
left.write("↳")
filters[column] = right.date_input(
f"{column}的数据值", # f"Values for {column}",
value=(
df[column].min(),
df[column].max(),
),
key=f"{random_key_base}_{column}",
)
if len(filters[column]) == 2:
filters[column] = tuple(map(pd.to_datetime, filters[column]))
start_date, end_date = filters[column]
df = df.loc[df[column].between(start_date, end_date)]
else:
left.write("↳")
filters[column] = right.text_input(
f"{column}的样例值",# f"Pattern in {column}",
key=f"{random_key_base}_{column}",
)
if filters[column]:
df = df[df[column].str.contains(filters[column])]
return df
# https://pandas.pydata.org/docs/reference/api/pandas.io.formats.style.Styler.format.html
try:
source = dataframe_explorer(st.session_state.macro_index_data)
except:
source = st.session_state.macro_index_data.copy()
func = lambda s: "%.2f" % float(s) if isNumber(s) else s
source = source.style.format(func)
st.dataframe(source) # 表格展示
表格展示
source = st.session_state.macro_index_data.copy().applymap(lambda x: x[0] if isinstance(x, list) and len(x) == 1 else "✔️" if isinstance(x, list) and len(x) == 2 else x)
st.dataframe(source) # 表格展示
图表展示
class DrawPic():
def line(self, data:pd.DataFrame, xaxis_col:str, yaxis_col:List[str], **kwargs):
title = kwargs.get("title", "")
subtitle = kwargs.get("subtitle", "")
yaxis_name = kwargs.get("yaxis_name", "")
yaxis_ext_col = kwargs.get("yaxis_ext_col", [])
yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
chart = Line(init_opts=opts.InitOpts())
xaxis_data = data[xaxis_col].tolist()
chart.add_xaxis(xaxis_data)
for i in yaxis_col:
series_name = i
yaxis_data = data[i].tolist()
chart.add_yaxis(
series_name,
yaxis_data,
# markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
)
if yaxis_ext_col != []:
for i in yaxis_ext_col:
series_name = i
yaxis_data = data[i].tolist()
chart.add_yaxis(
series_name,
yaxis_data,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
)
chart.extend_axis(
yaxis=opts.AxisOpts(
name=yaxis_ext_name,
name_location="end",
type_="value",
is_inverse=False,
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
)
)
chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
yaxis_opts=opts.AxisOpts(
name=yaxis_name,
type_="value",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
# xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
# legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll")
)
return chart
def bar(self, data:pd.DataFrame, xaxis_col:str, yaxis_col:List[str], **kwargs):
title = kwargs.get("title", "")
subtitle = kwargs.get("subtitle", "")
yaxis_name = kwargs.get("yaxis_name", "")
yaxis_ext_col = kwargs.get("yaxis_ext_col", [])
yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
chart = Bar(init_opts=opts.InitOpts())
xaxis_data = data[xaxis_col].tolist()
chart.add_xaxis(xaxis_data)
for i in yaxis_col:
series_name = i
yaxis_data = data[i].tolist()
chart.add_yaxis(
series_name,
yaxis_data,
# markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
)
if yaxis_ext_col != []:
for i in yaxis_ext_col:
series_name = i
yaxis_data = data[i].tolist()
chart.add_yaxis(
series_name,
yaxis_data,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
)
chart.extend_axis(
yaxis=opts.AxisOpts(
name=yaxis_ext_name,
name_location="end",
type_="value",
is_inverse=False,
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
)
)
chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
yaxis_opts=opts.AxisOpts(
name=yaxis_name,
type_="value",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
# xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
# legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll")
)
return chart
def mixBarLine(self, data:pd.DataFrame, xaxis_col:str,
yaxis_col:List[str], yaxis_ext_col:List[str],
**kwargs):
title = kwargs.get("title", "")
subtitle = kwargs.get("subtitle", "")
yaxis_name = kwargs.get("yaxis_name", "")
yaxis_ext_name = kwargs.get("yaxis_ext_name", "")
chart = Bar(init_opts=opts.InitOpts())
chart1 = Line(init_opts=opts.InitOpts())
xaxis_data = data[xaxis_col].tolist()
chart.add_xaxis(xaxis_data)
chart1.add_xaxis(xaxis_data)
for i in yaxis_col:
series_name = i
yaxis_data = data[i].tolist()
chart.add_yaxis(
series_name,
yaxis_data,
# markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem()]),
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
)
chart.extend_axis(
yaxis=opts.AxisOpts(
name=yaxis_ext_name,
name_location="end",
type_="value",
is_inverse=False,
position="right",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
)
)
chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
yaxis_opts=opts.AxisOpts(
name=yaxis_name,
type_="value",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
xaxis_opts=opts.AxisOpts(type_="category",
axispointer_opts=opts.AxisPointerOpts(is_show=True, type_="shadow"),
),
datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100,),
# legend_opts=opts.LegendOpts(orient="vertical", pos_left="right")
legend_opts=opts.LegendOpts(orient="horizontal", pos_top="top", type_="scroll"),
)
for i in yaxis_ext_col:
series_name = i
yaxis_data = data[i].tolist()
chart1.add_yaxis(
series_name,
yaxis_data,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False, formatter="{value}"),
z_level=1, # 调整折现图级别,高级别显示在低级别的图形上
)
return chart.overlap(chart1)
def scatter(self, x_data, y_data, xaxis_name, yaxis_name, **kwargs):
title = kwargs.get("title", "")
subtitle = kwargs.get("subtitle", "")
chart = Scatter(init_opts=opts.InitOpts())
chart.add_xaxis(xaxis_data=x_data)
chart.add_yaxis(
series_name="",
y_axis=y_data,
# symbol_size=20,
label_opts=opts.LabelOpts(is_show=False),
)
chart.set_global_opts(title_opts=opts.TitleOpts(title=title, subtitle=subtitle),
tooltip_opts=opts.TooltipOpts(trigger="item", axis_pointer_type="cross"),
xaxis_opts=opts.AxisOpts(
name=xaxis_name,
type_="value",
splitline_opts=opts.SplitLineOpts(is_show=True)
),
yaxis_opts=opts.AxisOpts(
name=yaxis_name,
type_="value",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
# xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100),
)
return chart
def heatmap(self, data:pd.DataFrame, **kwargs):
visualmap_opts_min = kwargs.get("visualmap_opts_min", 0)
visualmap_opts_max = kwargs.get("visualmap_opts_max", 1)
xaxis_data = data.columns.tolist()
yaxis_data = data.index.tolist()
corr_rows = data.shape[0]
corr_cols = data.shape[1]
value = [[i, j, round(data.iloc[i, j], 4)] for i in range(corr_rows) for j in range(corr_cols)]
width = max(100*len(xaxis_data), 900)
height = max(50*len(xaxis_data), 600)
chart = HeatMap(init_opts=opts.InitOpts(width=f"{width}px", height=f"{height}px"))
chart.add_xaxis(xaxis_data=xaxis_data)
chart.add_yaxis(
series_name="Punch Card",
yaxis_data=yaxis_data,
value=value,
# label_opts=opts.LabelOpts(
# is_show=True, color="#fff", position="bottom", horizontal_align="50%"
# ),
)
chart.set_series_opts()
chart.set_global_opts(
legend_opts=opts.LegendOpts(is_show=False),
xaxis_opts=opts.AxisOpts(
type_="category",
splitarea_opts=opts.SplitAreaOpts(
is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
),
),
yaxis_opts=opts.AxisOpts(
type_="category",
splitarea_opts=opts.SplitAreaOpts(
is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
),
),
visualmap_opts=opts.VisualMapOpts(
min_=visualmap_opts_min, max_=visualmap_opts_max, is_calculable=True, orient="vertical", pos_left="right"
),
tooltip_opts=opts.TooltipOpts(trigger="item", axis_pointer_type="cross"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
# datazoom_opts=opts.DataZoomOpts(is_show=True, range_start=0, range_end=100),
)
return chart
source = st.session_state.macro_index_data.copy()
if ("tag" not in source.columns) or (len(cal_label_select) == 0) or (len(base_dim_index_select) == 0):
st.dataframe(source)
else:
chart_data = source[source["tag"] == "index_value"]
unit_data = source[source["tag"] == "unit_use"]
tab1, tab2, tab3 = col1.tabs(["📈 图表", "🗃 数据值", "🎀 分析"])
with tab1:
item = [i for i in chart_data.columns if i not in ["occur_period", "tag"]]
tab1_select = tab1.selectbox("选择数据" + ":", (base_dim_index_select))
# 正常的单轴构造
yaxis_col = [i for i in chart_data.columns if (tab1_select in i) and (cal_label_select[0] in i)]
for i in yaxis_col:
if unit_data[i].dropna().unique().tolist() != []:
unit_use = unit_data[i].dropna().unique().tolist()[0]
break
if yaxis_col == []:
yaxis_name = ""
else:
yaxis_name = cal_label_select[0] + "(" + unit_use + ")"
# 额外的第二轴构造
if len(cal_label_select) == 2:
yaxis_ext_col = [i for i in chart_data.columns if (tab1_select in i) and (cal_label_select[1] in i)]
for i in yaxis_ext_col:
if unit_data[i].dropna().unique().tolist() != []:
unit_ext_use = unit_data[i].dropna().unique().tolist()[0]
break
if yaxis_ext_col != []:
yaxis_ext_name = cal_label_select[1] + "(" + unit_ext_use + ")"
else:
yaxis_ext_name = ""
else:
yaxis_ext_col = []
yaxis_ext_name = ""
if len(cal_label_select) == 2:
c = dp.mixBarLine(chart_data,
"occur_period",
yaxis_col,
yaxis_ext_col,
yaxis_name=yaxis_name,
yaxis_ext_name=yaxis_ext_name
)
else:
c = dp.line(chart_data,
"occur_period",
yaxis_col,
yaxis_name=yaxis_name)
st_pyecharts(c, height="500px")
with tab2:
st.dataframe(source)
清除缓存
def clearCache():
# https://docs.streamlit.io/library/advanced-features/experimental-cache-primitives
st.experimental_memo.clear()
st.experimental_singleton.clear()
查询配置信息
st.write("theme base: ",st.get_option("theme.base"))
st.write("server port: ",st.get_option("server.port"))
启动项
streamlit run script.py --theme.base light --server.port 80