import json
|
import os
|
import queue
|
import time
|
|
import xlwt
|
|
from code_attribute import gpcode_manager
|
from l2.huaxin import l2_huaxin_util
|
from utils import tool
|
|
|
class Test:
|
def print_log(self, arg):
|
print(time.time(), f"arg:{arg}")
|
time.sleep(2)
|
|
|
def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None):
|
"""
|
统计某一天的涨停大单数量分布
|
@param path_:
|
@param limit_up_price_dict:
|
@return:
|
"""
|
with open(path_, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
code_big_order_count_dict = {}
|
for line in lines:
|
data = line[line.find(" - ") + 3:].strip()
|
data = eval(data)
|
if data[1] != 0:
|
continue
|
if data[2][2] < 1000000:
|
continue
|
if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time:
|
continue
|
# if data[0] not in limit_up_price_dict:
|
# continue
|
# if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001:
|
# continue
|
if data[0] not in code_big_order_count_dict:
|
code_big_order_count_dict[data[0]] = [0, 0]
|
code_big_order_count_dict[data[0]][0] += 1
|
code_big_order_count_dict[data[0]][1] += data[2][2]
|
return code_big_order_count_dict
|
|
|
def __load_limit_up_price():
|
# 日期下代码的涨停价
|
limit_up_price_date_code_dict = {}
|
dir_path = r"D:\回测数据\K线数据"
|
k_bars_files = os.listdir(dir_path)
|
for k_bars_file in k_bars_files:
|
code = k_bars_file.split("_")[1][:6]
|
with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f:
|
lines = f.readlines()
|
data = lines[0]
|
datas = eval(data)
|
# {"日期": 涨停价}
|
date_limit_price = {
|
d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in
|
datas}
|
for date in date_limit_price:
|
if date not in limit_up_price_date_code_dict:
|
limit_up_price_date_code_dict[date] = {}
|
limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2)
|
return limit_up_price_date_code_dict
|
|
|
def export(fdatas, dates, file_name):
|
wb = xlwt.Workbook(encoding="utf-8")
|
ws = wb.add_sheet('sheet1')
|
for i in range(len(dates)):
|
ws.write(0, i + 1, dates[i])
|
index = 0
|
for code in fdatas:
|
date_count_dict = fdatas[code]
|
index += 1
|
ws.write(index, 0, code)
|
for i in range(len(dates)):
|
d = date_count_dict.get(dates[i])
|
average_big_money = d[1] // d[0] if d[0] > 0 else 0
|
if d[0] > 0:
|
ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}万")
|
else:
|
ws.write(index, i + 1, f"")
|
wb.save(file_name)
|
|
|
if __name__ == '__main__':
|
q = queue.Queue()
|
if not q.empty():
|
data = q.get(block=False)
|
print(data)
|
|
if __name__ == '__main__1':
|
# 加载每一天的代码的涨停价
|
limit_up_price_date_code_dict = __load_limit_up_price()
|
print(limit_up_price_date_code_dict["2025-04-22"]["002719"])
|
dir_path = r"D:\回测数据\大单数据"
|
files = os.listdir(dir_path)
|
dates = []
|
big_order_count_date_code_dict = {}
|
for file_name in files:
|
path_ = f"{dir_path}\\{file_name}"
|
date = file_name[-14:-4]
|
dates.append(date)
|
code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date])
|
print(code_big_order_count_dict)
|
big_order_count_date_code_dict[date] = code_big_order_count_dict
|
# 转为:{代码:{"日期":大单数}}
|
fdata = {}
|
for date in big_order_count_date_code_dict:
|
for code in big_order_count_date_code_dict[date]:
|
if code not in fdata:
|
fdata[code] = {}
|
fdata[code][date] = big_order_count_date_code_dict[date][code]
|
|
for code in fdata:
|
for date in dates:
|
if date not in fdata[code]:
|
# 默认填充0
|
fdata[code][date] = [0, 0]
|
dates.sort(reverse=True)
|
export(fdata, dates, "D:/test.xls")
|