最近在写一个量化框架,从数据到回测、因子、策略、资管、模拟、实盘以及AI模型等全能力构建,本来想不要重复造轮子的,但是实在太闲了,太闲了,从原子开始造感觉有挑战性一点 ,而且能学习到更多更深层次的东西,也不在乎能不能成功,走一步算一步咯....
以上吹那么多,接下来仅仅是描述一个简单的算法而已,原因就是我在做数据清洗的过程中,一般只需要拿原始数据即可,从原始数据再来构建业务数据。典型的就是K线数据的周期转换,服务器拿到每天日线的原始历史数据,继而可以延伸出复权数据,周期数据、指标数据等。复权和指标算法后面会写,这里先讲一个简单的周期转换算法,即日线转周线、月线、年线,分钟周期5分钟、15分钟、30分钟、60分钟转换等。
当然了,仅仅50行左右代码即可实现,实现思路就是基于时间的分组算法,即通过对日期的周期提取分组,同周期K线归类为一组,再合并计算滚动即可得到目标周期K线数据,缺点就是时间复杂度增加了一倍为O(2n)。应该还能优化到O(n),有没有同样很闲的大神指导指导。
算法流程图:
核心算法代码:
# 日线数据 [[date,open,high,low,close,vol,amount],...]
# 分钟数据 [[datetime,open,high,low,close,vol,amount],...]
# k线滚动缓存
last_klines = []
# 分组标识
last_group = None
# 保存转换结果
new_klines = []
# 滚动开始
for item in klines:
# 分组提取
if not m: group_name = get_date_group(item[0])
else: group_name = self.get_date_min_group(item[0],m)
if last_group == None: last_group = group_name
# 分组合并
if group_name==last_group:
# 始终合并上一个周期
last_klines = self.merge_group_klines([last_klines]+[item])
else:
# 保存上个周期
new_klines.append(last_klines)
# 传递新周期数据
last_klines = item
# 传递新周期分组
last_group = group_name
# 终止
if item == klines[-1]:
if group_name==last_group:
# 保存新周期
new_klines.append(last_klines)
return new_klines
完整代码:
目前托管在github中:https://github.com/dsxkline/dsx_base_algorithm/
import datetime
import os
# 周期
class CYCLE:
T='t' # 分时线
T5='t5' # 五日分时线
DAY="day" # 日K
WEEK="week" # 周K
MONTH="month" # 月K
YEAR="year" # 年K
M1="m1" # 1分钟K
M5="m5" # 5分钟K
M15="m15" # 15分钟K
M30="m30" # 30分钟K
M60="m60" # 60分钟K
class DsxBaseConverKlines:
def __init__(self,klines:list) -> None:
# k线历史数据
# 日线数据 [date,open,high,low,close,vol,amount]
# 分钟数据 [datetime,open,high,low,close,vol,amount]
self.klines = klines
pass
def converkline_to(self,cycle:CYCLE):
"""通过时间分组算法转换k线周期
通过对日期的周期提取分组,同周期K线归类为一组,再计算即可得到目标周期K线数据
日线数据 [[date,open,high,low,close,vol,amount],...]
分钟数据 [[datetime,open,high,low,close,vol,amount],...]
Args:
cycle (CYCLE): 周期枚举
Returns:
list: 转换后的k线
"""
klines = self.klines
# 日周月年K线的分组标识提取方法
get_date_group = self.get_date_week_group
if cycle==CYCLE.WEEK : get_date_group = self.get_date_week_group
if cycle==CYCLE.MONTH : get_date_group = self.get_date_month_group
if cycle==CYCLE.YEAR : get_date_group = self.get_date_year_group
# 分钟线分组周期
m = None
if cycle==CYCLE.M5 : m = 5
if cycle==CYCLE.M15 : m = 15
if cycle==CYCLE.M30 : m = 30
if cycle==CYCLE.M60 : m = 60
# k线滚动缓存
last_klines = []
# 分组标识
last_group = None
# 保存转换结果
new_klines = []
# 滚动开始
for item in klines:
# 分组提取
if not m: group_name = get_date_group(item[0])
else: group_name = self.get_date_min_group(item[0],m)
if last_group == None: last_group = group_name
# 分组合并
if group_name==last_group:
# 始终合并上一个周期
last_klines = self.merge_group_klines([last_klines]+[item])
else:
# 保存上个周期
new_klines.append(last_klines)
# 传递新周期数据
last_klines = item
# 传递新周期分组
last_group = group_name
# 终止
if item == klines[-1]:
if group_name==last_group:
# 保存新周期
new_klines.append(last_klines)
return new_klines
def get_date_week_group(self,date:str):
"""提取周分组特征
Args:
date (str): 日期格式 %Y%m%d
Returns:
str: %Y-%week
"""
d = datetime.datetime.strptime(date,'%Y%m%d')
# (2023,45,5) 年 year,周号 week,周几 weekday
week_group = d.isocalendar()
return str(week_group[0])+"-"+str(week_group[1])
def get_date_month_group(self,date:str):
"""提取月分组特征
Args:
date (str): 日期格式 %Y%m%d
Returns:
str: %Y%m
"""
return date[:6]
def get_date_year_group(self,date:str):
"""提取年分组特征
Args:
date (str): 日期格式 %Y%m%d
Returns:
str: %Y
"""
return date[:4]
def get_date_min_group(self,date:str,m:int):
"""提取分钟分组特征
Args:
date (str): 日期格式 %Y%m%d%H%M
Returns:
str: %Y%m%d%Hmin
"""
datemin = date[8:12]
# 分组规则根据N分钟来分组,每隔N分钟一组数据 N=min,所以需要根据当前时间计算相对于开盘时间的顺序编号
d = (datetime.datetime.strptime(datemin,"%H%M") - datetime.datetime.strptime("0930","%H%M"))
sort = d.seconds/60
if int(datemin)>=1300:
# 中午休盘90分钟
sort -= 90
# 换算成所在分组
y = sort % m
# 按N分钟一组
group_name = int(sort/m) + (y>0 and 1 or 0)
return str(group_name)
def merge_group_klines(self,group_klines:list):
"""合并分组k线
Args:
group_klines (list): k线数据
Returns:
_type_: _description_
"""
if group_klines==None : return
open = 0
high = 0
low = 1000000000
close = 0
vol = 0
amount = 0
i = 0
date = ""
for item in group_klines:
if len(item)>=7:
if i==0:open = item[1]
date = item[0]
high = max(high,item[2])
low = min(low,item[3])
close = item[4]
vol += item[5]
amount += item[6]
i+=1
return [date,open,high,low,close,vol,amount]
def converkline_toweek(self):
"""把日线数据转成周线
通过对日期的周期提取归类,同周期K线归类为一组,再计算即可得到周K数据
需要防止日期跨周,例如遇到不连续或停牌一段时间的
Args:
klines (list): 日线数据 [date,open,high,low,close,vol,amount]
"""
return self.converkline_to(CYCLE.WEEK)
def converkline_tomonth(self):
"""把日线数据转成月线
通过对日期的月度提取归类,同月度K线归类为一组,再计算可能得到月K数据
Args:
klines (list): 日线数据 [date,open,high,low,close,vol,amount]
"""
return self.converkline_to(CYCLE.MONTH)
def converkline_toyear(self):
"""把日线数据转成年线
通过对日线的年度提取归类,同年度的K线归位一组,再计算即可得到年K数据
Args:
klines (list): 日线数据 [date,open,high,low,close,vol,amount]
"""
return self.converkline_to(CYCLE.YEAR)
def converkline_tomin(self,cycle:CYCLE):
"""把1分钟线数据转成N分钟线
把时间分成N份,计算每一份的K线数据即可转换
Args:
klines (list): 1分钟数据 [datetime,open,high,low,close,vol,amount]
"""
return self.converkline_to(cycle)
if __name__=="__main__":
with open(os.path.dirname(os.path.abspath(__file__))+"/day.txt") as f:
# 取原始日线数据
datas = f.read()
datas = datas.split("n")
klines = []
for item in datas:
date,op,high,low,close,vol,amount = item.split(",")
klines.append([date,float(op),float(high),float(low),float(close),float(vol),float(amount)])
# 转成周线数据
week_klines = DsxBaseConverKlines(klines).converkline_toweek()
# 转成周线数据
# month_klines = DsxBaseConverKlines(klines).converkline_tomonth()
# # 转成周线数据
# year_klines = DsxBaseConverKlines(klines).converkline_toyear()
print(week_klines)
with open(os.path.dirname(os.path.abspath(__file__))+"/min.txt") as f:
# 取原始一分钟数据
datas = f.read()
datas = datas.split("n")
klines = []
for item in datas:
date,m,op,high,low,close,vol,amount = item.split(",")
klines.append([date+m,float(op),float(high),float(low),float(close),float(vol),float(amount)])
# # 转成5分钟数据
min5_klines = DsxBaseConverKlines(klines).converkline_tomin(CYCLE.M5)
# # 转成15分钟数据
# min15_klines = DsxBaseConverKlines(klines).converkline_tomin(CYCLE.M15)
# # 转成30分钟数据
# min30_klines = DsxBaseConverKlines(klines).converkline_tomin(CYCLE.M30)
# 转成60分钟数据
# min60_klines = DsxBaseConverKlines(klines).converkline_tomin(CYCLE.M60)
print(min5_klines)
代码完整开源可运行,感兴趣可去GitHub下载,有问题可私信,欢迎交流学习。