import json import os import queue import time import xlwt from code_attribute import gpcode_manager from l2.huaxin import l2_huaxin_util from utils import tool class Test: def print_log(self, arg): print(time.time(), f"arg:{arg}") time.sleep(2) def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None): """ 统计某一天的涨停大单数量分布 @param path_: @param limit_up_price_dict: @return: """ with open(path_, mode='r', encoding='utf-8') as f: lines = f.readlines() code_big_order_count_dict = {} for line in lines: data = line[line.find(" - ") + 3:].strip() data = eval(data) if data[1] != 0: continue if data[2][2] < 1000000: continue if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time: continue # if data[0] not in limit_up_price_dict: # continue # if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001: # continue if data[0] not in code_big_order_count_dict: code_big_order_count_dict[data[0]] = [0, 0] code_big_order_count_dict[data[0]][0] += 1 code_big_order_count_dict[data[0]][1] += data[2][2] return code_big_order_count_dict def __load_limit_up_price(): # 日期下代码的涨停价 limit_up_price_date_code_dict = {} dir_path = r"D:\回测数据\K线数据" k_bars_files = os.listdir(dir_path) for k_bars_file in k_bars_files: code = k_bars_file.split("_")[1][:6] with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f: lines = f.readlines() data = lines[0] datas = eval(data) # {"日期": 涨停价} date_limit_price = { d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in datas} for date in date_limit_price: if date not in limit_up_price_date_code_dict: limit_up_price_date_code_dict[date] = {} limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2) return limit_up_price_date_code_dict def export(fdatas, dates, file_name): wb = xlwt.Workbook(encoding="utf-8") ws = wb.add_sheet('sheet1') for i in range(len(dates)): ws.write(0, i + 1, dates[i]) index = 0 for code in fdatas: date_count_dict = fdatas[code] index += 1 ws.write(index, 0, code) for i in range(len(dates)): d = date_count_dict.get(dates[i]) average_big_money = d[1] // d[0] if d[0] > 0 else 0 if d[0] > 0: ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}万") else: ws.write(index, i + 1, f"") wb.save(file_name) if __name__ == '__main__': q = queue.Queue() if not q.empty(): data = q.get(block=False) print(data) if __name__ == '__main__1': # 加载每一天的代码的涨停价 limit_up_price_date_code_dict = __load_limit_up_price() print(limit_up_price_date_code_dict["2025-04-22"]["002719"]) dir_path = r"D:\回测数据\大单数据" files = os.listdir(dir_path) dates = [] big_order_count_date_code_dict = {} for file_name in files: path_ = f"{dir_path}\\{file_name}" date = file_name[-14:-4] dates.append(date) code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date]) print(code_big_order_count_dict) big_order_count_date_code_dict[date] = code_big_order_count_dict # 转为:{代码:{"日期":大单数}} fdata = {} for date in big_order_count_date_code_dict: for code in big_order_count_date_code_dict[date]: if code not in fdata: fdata[code] = {} fdata[code][date] = big_order_count_date_code_dict[date][code] for code in fdata: for date in dates: if date not in fdata[code]: # 默认填充0 fdata[code][date] = [0, 0] dates.sort(reverse=True) export(fdata, dates, "D:/test.xls")