From c285883d71ef8a362b012983dadc7ce4256b40f6 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 23 五月 2025 01:52:07 +0800 Subject: [PATCH] bug修复 --- test/test.py | 140 ++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 120 insertions(+), 20 deletions(-) diff --git a/test/test.py b/test/test.py index 8c48fbc..26f1b22 100644 --- a/test/test.py +++ b/test/test.py @@ -1,26 +1,126 @@ -import base64 +import json +import os +import queue +import time -from code_attribute import global_data_loader -from settings.trade_setting import TradeBlockBuyModeManager -from third_data.history_k_data_util import HistoryKDatasUtils +import xlwt + +from code_attribute import gpcode_manager +from l2.huaxin import l2_huaxin_util from utils import tool -def test_active_buy(): - current_rank = 2 - - TIME_STR_RANGES = ["10:00:00", "10:30:00", "11:00:00", "13:00:00", "13:30:00", "14:00:00", "14:30:00", - "15:00:00"] - TIME_INT_RANGES = [int(x.replace(':', '')) for x in TIME_STR_RANGES] - MAX_RANKS = [3, 3, 2, 2, 1, 0, 0, 0] - now_time_str = "09:36:00".replace(':', '') - for i in range(len(TIME_INT_RANGES)): - if int(now_time_str) <= TIME_INT_RANGES[i]: - if MAX_RANKS[i] > current_rank: - return True - break - return False +class Test: + def print_log(self, arg): + print(time.time(), f"arg:{arg}") + time.sleep(2) -if __name__ == "__main__": - print( base64.b64decode("WwogICAgWwogICAgICAgICLkuK3lrZflpLQiLAogICAgICAgIDIKICAgIF0KXQ==").decode('utf-8')) +def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None): + """ + 缁熻鏌愪竴澶╃殑娑ㄥ仠澶у崟鏁伴噺鍒嗗竷 + @param path_: + @param limit_up_price_dict: + @return: + """ + with open(path_, mode='r', encoding='utf-8') as f: + lines = f.readlines() + code_big_order_count_dict = {} + for line in lines: + data = line[line.find(" - ") + 3:].strip() + data = eval(data) + if data[1] != 0: + continue + if data[2][2] < 1000000: + continue + if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time: + continue + # if data[0] not in limit_up_price_dict: + # continue + # if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001: + # continue + if data[0] not in code_big_order_count_dict: + code_big_order_count_dict[data[0]] = [0, 0] + code_big_order_count_dict[data[0]][0] += 1 + code_big_order_count_dict[data[0]][1] += data[2][2] + return code_big_order_count_dict + + +def __load_limit_up_price(): + # 鏃ユ湡涓嬩唬鐮佺殑娑ㄥ仠浠� + limit_up_price_date_code_dict = {} + dir_path = r"D:\鍥炴祴鏁版嵁\K绾挎暟鎹�" + k_bars_files = os.listdir(dir_path) + for k_bars_file in k_bars_files: + code = k_bars_file.split("_")[1][:6] + with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f: + lines = f.readlines() + data = lines[0] + datas = eval(data) + # {"鏃ユ湡": 娑ㄥ仠浠穧 + date_limit_price = { + d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in + datas} + for date in date_limit_price: + if date not in limit_up_price_date_code_dict: + limit_up_price_date_code_dict[date] = {} + limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2) + return limit_up_price_date_code_dict + + +def export(fdatas, dates, file_name): + wb = xlwt.Workbook(encoding="utf-8") + ws = wb.add_sheet('sheet1') + for i in range(len(dates)): + ws.write(0, i + 1, dates[i]) + index = 0 + for code in fdatas: + date_count_dict = fdatas[code] + index += 1 + ws.write(index, 0, code) + for i in range(len(dates)): + d = date_count_dict.get(dates[i]) + average_big_money = d[1] // d[0] if d[0] > 0 else 0 + if d[0] > 0: + ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}涓�") + else: + ws.write(index, i + 1, f"") + wb.save(file_name) + + +if __name__ == '__main__': + q = queue.Queue() + if not q.empty(): + data = q.get(block=False) + print(data) + +if __name__ == '__main__1': + # 鍔犺浇姣忎竴澶╃殑浠g爜鐨勬定鍋滀环 + limit_up_price_date_code_dict = __load_limit_up_price() + print(limit_up_price_date_code_dict["2025-04-22"]["002719"]) + dir_path = r"D:\鍥炴祴鏁版嵁\澶у崟鏁版嵁" + files = os.listdir(dir_path) + dates = [] + big_order_count_date_code_dict = {} + for file_name in files: + path_ = f"{dir_path}\\{file_name}" + date = file_name[-14:-4] + dates.append(date) + code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date]) + print(code_big_order_count_dict) + big_order_count_date_code_dict[date] = code_big_order_count_dict + # 杞负锛歿浠g爜:{"鏃ユ湡":澶у崟鏁皚} + fdata = {} + for date in big_order_count_date_code_dict: + for code in big_order_count_date_code_dict[date]: + if code not in fdata: + fdata[code] = {} + fdata[code][date] = big_order_count_date_code_dict[date][code] + + for code in fdata: + for date in dates: + if date not in fdata[code]: + # 榛樿濉厖0 + fdata[code][date] = [0, 0] + dates.sort(reverse=True) + export(fdata, dates, "D:/test.xls") -- Gitblit v1.8.0