该项目分为两个部分:一是数据计算,二是可视化,三是MACD策略
一、计算MACD
1、数据部分
数据来源:tushare
数据字段包含:日期,开盘价,收盘价,最低价,最高价,涨跌
需要计算的数据:macd,diff,dea
2、MACD的计算
(1)计算指数移动平均值(EMA)
12日EMA的算式为
EMA(12)=前一日EMA(12)×11/13+今日收盘价×2/13
26日EMA的算式为
EMA(26)=前一日EMA(26)×25/27+今日收盘价×2/27
(2)计算离差值(DIF)
DIF=今日EMA(12)-今日EMA(26)
(3)计算DIF的9日EMA
根据离差值计算其9日的EMA,即离差平均值,是所求的MACD值。为了不与指标原名相混淆,此值又名
DEA或DEM。
今日DEA(MACD)=前一日DEA×8/10+今日DIF×2/10。
计算出的DIF和DEA的数值均为正值或负值。
用(DIF-DEA)×2即为MACD柱状图。
二、可视化
1、可视化工具:pyecharts
pyecharts官网
三、MACD策略
代码实现如下:
from typing import List, Sequence, Union
from pyecharts import options as opts
from pyecharts.commons.utils import JsCode
from pyecharts.charts import Kline, Line, Bar, Grid
# 数据
#时间,
#ema重构版
import pandas as pd
import numpy as np
import tushare as ts
def macd(code,start_date,end_date):
'''
获取数据
预处理
计算股票的ema、dif、dea、macd
主要有9日,12日,26日
'''
#获取数据
pro=ts.pro_api()
#获取日k线数据
df=pro.daily(ts_code=code,start_date=start_date,end_date=end_date)
#删除不需要的数据
df=df.drop('ts_code',axis=1)
df=df.drop('pre_close',axis=1)
df=df.drop('change',axis=1)
df=df.drop('pct_chg',axis=1)
df=df.drop('amount',axis=1)
df=df.sort_values(by='trade_date')
#将成交量缩小一万倍
df.loc[:,['vol']]=df.loc[:,['vol']].apply(lambda x:x/10000)
df.loc[:,['vol']]=df.loc[:,['vol']].round(2)
'''
需要输出哪些数据?
return:
日期,开盘价,收盘价,最低价,最高价,涨跌,macd,dif,dea
date,open,close,low,high,vol,涨跌,macd,diff,dea
'''
#ema计算函数
def ema(num,data=df):
ema = [data['close'][0]] * len(data)
for i in range(1, len(data)):
ema[i] = ema[i-1] * (num-1)/(num+1) + data['close'][i] * 2/(num+1)
return ema
#计算dea
def dea(num,data):#传入数组numpy.array
dea = [data[0]] * len(data)
for i in range(1, len(data)):
dea[i] = dea[i-1] * (num-1)/(num+1) + data[i] * 2/(num+1)
return np.array(dea)
ema_12=np.array(ema(12))
ema_26=np.array(ema(26))
dif=ema_12-ema_26
dea=dea(9,dif)
macd=(dif-dea)*2
#转化为pd
df1={'macd':macd,'dif':dif,'dea':dea}
df1=pd.DataFrame(df1)
#连接数据
df=df.join(df1)
df=df.round(2)
print(df)
#判断涨跌,1为涨,0为跌
def f(x):
if x>=0:
return 1
else:
return 0
x=(df['close']-df['open']).apply(f)
#调整列顺序
df=df.loc[:,['trade_date','open','close','low','high','vol','macd','dif','dea']]
#插入涨跌数据
df.insert(loc=6,column='x',value=x)
return df.values#转化为列表
macd=macd(code='000001.SZ',start_date='20230101',end_date='20230320')
echarts_data =macd.tolist()
print(type(echarts_data))
print(echarts_data)
'''
echarts_data = [
["2015-10-16", 18.4, 18.58, 18.33, 18.79, 67.00, 1, 0.04, 0.11, 0.09],
["2015-10-19", 18.56, 18.25, 18.19, 18.56, 55.00, 0, -0.00, 0.08, 0.09],
["2016-12-30", 17.53, 17.6, 17.47, 17.61, 22.00, 0, -0.05, -0.03, -0.01],
["2017-01-03", 17.6, 17.92, 17.57, 17.98, 28.00, 1, 0.00, 0.00, 0.00],
]
'''
def split_data(origin_data) -> dict:
datas = []
times = []
vols = []
macds = []
difs = []
deas = []
for i in range(len(origin_data)):
datas.append(origin_data[i][1:])
times.append(origin_data[i][0:1][0])
vols.append(origin_data[i][5])
macds.append(origin_data[i][7])
difs.append(origin_data[i][8])
deas.append(origin_data[i][9])
vols = [int(v) for v in vols]
return {
"datas": datas,
"times": times,
"vols": vols,
"macds": macds,
"difs": difs,
"deas": deas,
}
def split_data_part() -> Sequence:
mark_line_data = []
idx = 0
tag = 0
vols = 0
for i in range(len(data["times"])):
if data["datas"][i][5] != 0 and tag == 0:
idx = i
vols = data["datas"][i][4]
tag = 1
if tag == 1:
vols += data["datas"][i][4]
if data["datas"][i][5] != 0 or tag == 1:
mark_line_data.append(
[
{
"xAxis": idx,
"yAxis": float("%.2f" % data["datas"][idx][3])
if data["datas"][idx][1] > data["datas"][idx][0]
else float("%.2f" % data["datas"][idx][2]),
"value": vols,
},
{
"xAxis": i,
"yAxis": float("%.2f" % data["datas"][i][3])
if data["datas"][i][1] > data["datas"][i][0]
else float("%.2f" % data["datas"][i][2]),
},
]
)
idx = i
vols = data["datas"][i][4]
tag = 2
if tag == 2:
vols += data["datas"][i][4]
if data["datas"][i][5] != 0 and tag == 2:
mark_line_data.append(
[
{
"xAxis": idx,
"yAxis": float("%.2f" % data["datas"][idx][3])
if data["datas"][i][1] > data["datas"][i][0]
else float("%.2f" % data["datas"][i][2]),
"value": str(float("%.2f" % (vols / (i - idx + 1)))) + " M",
},
{
"xAxis": i,
"yAxis": float("%.2f" % data["datas"][i][3])
if data["datas"][i][1] > data["datas"][i][0]
else float("%.2f" % data["datas"][i][2]),
},
]
)
idx = i
vols = data["datas"][i][4]
return mark_line_data
def calculate_ma(day_count: int):
result: List[Union[float, str]] = []
for i in range(len(data["times"])):
if i < day_count:
result.append("-")
continue
sum_total = 0.0
for j in range(day_count):
sum_total += float(data["datas"][i - j][1])
result.append(abs(float("%.2f" % (sum_total / day_count))))
return result
def draw_chart():
kline = (
Kline()
.add_xaxis(xaxis_data=data["times"])
.add_yaxis(
series_name="",
y_axis=data["datas"],
itemstyle_opts=opts.ItemStyleOpts(
color="#ef232a",
color0="#14b143",
border_color="#ef232a",
border_color0="#14b143",
),
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值"),
opts.MarkPointItem(type_="min", name="最小值"),
]
),
markline_opts=opts.MarkLineOpts(
label_opts=opts.LabelOpts(
position="middle", color="blue", font_size=15
),
data=split_data_part(),
symbol=["circle", "none"],
),
)
.set_series_opts(
markarea_opts=opts.MarkAreaOpts(is_silent=True, data=split_data_part())
)
.set_global_opts(
title_opts=opts.TitleOpts(title="K线周期图表", pos_left="0"),
xaxis_opts=opts.AxisOpts(
type_="category",
is_scale=True,
boundary_gap=False,
axisline_opts=opts.AxisLineOpts(is_on_zero=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
split_number=20,
min_="dataMin",
max_="dataMax",
),
yaxis_opts=opts.AxisOpts(
is_scale=True, splitline_opts=opts.SplitLineOpts(is_show=True)
),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="line"),
datazoom_opts=[
opts.DataZoomOpts(
is_show=False, type_="inside", xaxis_index=[0, 0], range_end=100
),
opts.DataZoomOpts(
is_show=True, xaxis_index=[0, 1], pos_top="97%", range_end=100
),
opts.DataZoomOpts(is_show=False, xaxis_index=[0, 2], range_end=100),
],
# 三个图的 axis 连在一块
# axispointer_opts=opts.AxisPointerOpts(
# is_show=True,
# link=[{"xAxisIndex": "all"}],
# label=opts.LabelOpts(background_color="#777"),
# ),
)
)
kline_line = (
Line()
.add_xaxis(xaxis_data=data["times"])
.add_yaxis(
series_name="MA5",
y_axis=calculate_ma(day_count=5),
is_smooth=True,
linestyle_opts=opts.LineStyleOpts(opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_="category",
grid_index=1,
axislabel_opts=opts.LabelOpts(is_show=False),
),
yaxis_opts=opts.AxisOpts(
grid_index=1,
split_number=3,
axisline_opts=opts.AxisLineOpts(is_on_zero=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
axislabel_opts=opts.LabelOpts(is_show=True),
),
)
)
# Overlap Kline + Line
overlap_kline_line = kline.overlap(kline_line)
# Bar-1
bar_1 = (
Bar()
.add_xaxis(xaxis_data=data["times"])
.add_yaxis(
series_name="Volumn",
y_axis=data["vols"],
xaxis_index=1,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False),
# 根据 echarts demo 的原版是这么写的
# itemstyle_opts=opts.ItemStyleOpts(
# color=JsCode("""
# function(params) {
# var colorList;
# if (data.datas[params.dataIndex][1]>data.datas[params.dataIndex][0]) {
# colorList = '#ef232a';
# } else {
# colorList = '#14b143';
# }
# return colorList;
# }
# """)
# )
# 改进后在 grid 中 add_js_funcs 后变成如下
itemstyle_opts=opts.ItemStyleOpts(
color=JsCode(
"""
function(params) {
var colorList;
if (barData[params.dataIndex][1] > barData[params.dataIndex][0]) {
colorList = '#ef232a';
} else {
colorList = '#14b143';
}
return colorList;
}
"""
)
),
)
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_="category",
grid_index=1,
axislabel_opts=opts.LabelOpts(is_show=False),
),
legend_opts=opts.LegendOpts(is_show=False),
)
)
# Bar-2 (Overlap Bar + Line)
bar_2 = (
Bar()
.add_xaxis(xaxis_data=data["times"])
.add_yaxis(
series_name="MACD",
y_axis=data["macds"],
xaxis_index=2,
yaxis_index=2,
label_opts=opts.LabelOpts(is_show=False),
itemstyle_opts=opts.ItemStyleOpts(
color=JsCode(
"""
function(params) {
var colorList;
if (params.data >= 0) {
colorList = '#ef232a';
} else {
colorList = '#14b143';
}
return colorList;
}
"""
)
),
)
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_="category",
grid_index=2,
axislabel_opts=opts.LabelOpts(is_show=False),
),
yaxis_opts=opts.AxisOpts(
grid_index=2,
split_number=4,
axisline_opts=opts.AxisLineOpts(is_on_zero=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
axislabel_opts=opts.LabelOpts(is_show=True),
),
legend_opts=opts.LegendOpts(is_show=False),
)
)
line_2 = (
Line()
.add_xaxis(xaxis_data=data["times"])
.add_yaxis(
series_name="DIF",
y_axis=data["difs"],
xaxis_index=2,
yaxis_index=2,
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="DIF",
y_axis=data["deas"],
xaxis_index=2,
yaxis_index=2,
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(legend_opts=opts.LegendOpts(is_show=False))
)
# 最下面的柱状图和折线图
overlap_bar_line = bar_2.overlap(line_2)
# 最后的 Grid
grid_chart = Grid()
# 这个是为了把 data.datas 这个数据写入到 html 中,还没想到怎么跨 series 传值
# demo 中的代码也是用全局变量传的
grid_chart.add_js_funcs("var barData = {}".format(data["datas"]))
# K线图和 MA5 的折线图
grid_chart.add(
overlap_kline_line,
grid_opts=opts.GridOpts(pos_left="3%", pos_right="1%", height="60%"),
)
# Volumn 柱状图
grid_chart.add(
bar_1,
grid_opts=opts.GridOpts(
pos_left="3%", pos_right="1%", pos_top="71%", height="10%"
),
)
# MACD DIFS DEAS
grid_chart.add(
overlap_bar_line,
grid_opts=opts.GridOpts(
pos_left="3%", pos_right="1%", pos_top="82%", height="14%"
),
)
grid_chart.render("professional_kline_chart.html")
if __name__ == "__main__":
data = split_data(origin_data=echarts_data)
draw_chart()