Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
test/test.py
@@ -1,37 +1,126 @@
import multiprocessing
import json
import os
import queue
import time
from l2.cancel_buy_strategy import LCancelRateManager
import xlwt
__queue = queue.Queue()
from code_attribute import gpcode_manager
from l2.huaxin import l2_huaxin_util
from utils import tool
def add_data(msg):
    time.sleep(1)
    start_time = time.time()
    __queue.put({"msg": msg})
    end_time = time.time()
    if end_time - start_time > 0.002:
        print("加入日志耗时")
class Test:
    def print_log(self, arg):
        print(time.time(), f"arg:{arg}")
        time.sleep(2)
def test_process_1(pipe):
    while True:
        for i in range(10):
            pipe.send_bytes(f"hello world:{i}".encode("utf-8"))
            time.sleep(1)
def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None):
    """
    统计某一天的涨停大单数量分布
    @param path_:
    @param limit_up_price_dict:
    @return:
    """
    with open(path_, mode='r', encoding='utf-8') as f:
        lines = f.readlines()
        code_big_order_count_dict = {}
        for line in lines:
            data = line[line.find(" - ") + 3:].strip()
            data = eval(data)
            if data[1] != 0:
                continue
            if data[2][2] < 1000000:
                continue
            if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time:
                continue
            # if data[0] not in limit_up_price_dict:
            #     continue
            # if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001:
            #     continue
            if data[0] not in code_big_order_count_dict:
                code_big_order_count_dict[data[0]] = [0, 0]
            code_big_order_count_dict[data[0]][0] += 1
            code_big_order_count_dict[data[0]][1] += data[2][2]
        return code_big_order_count_dict
def test_process_2(pipe):
    while True:
        results = pipe.recv_bytes()
        if results:
            print("接受到内容:", results)
def __load_limit_up_price():
    # 日期下代码的涨停价
    limit_up_price_date_code_dict = {}
    dir_path = r"D:\回测数据\K线数据"
    k_bars_files = os.listdir(dir_path)
    for k_bars_file in k_bars_files:
        code = k_bars_file.split("_")[1][:6]
        with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f:
            lines = f.readlines()
            data = lines[0]
            datas = eval(data)
            # {"日期": 涨停价}
            date_limit_price = {
                d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in
                datas}
            for date in date_limit_price:
                if date not in limit_up_price_date_code_dict:
                    limit_up_price_date_code_dict[date] = {}
                limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2)
    return limit_up_price_date_code_dict
if __name__ == "__main__":
    code = "000333"
    LCancelRateManager.set_block_limit_up_count(code, 100)
    LCancelRateManager.set_big_num_deal_rate(code, 10)
    print(LCancelRateManager.get_cancel_rate(code))
def export(fdatas, dates, file_name):
    wb = xlwt.Workbook(encoding="utf-8")
    ws = wb.add_sheet('sheet1')
    for i in range(len(dates)):
        ws.write(0, i + 1, dates[i])
    index = 0
    for code in fdatas:
        date_count_dict = fdatas[code]
        index += 1
        ws.write(index, 0, code)
        for i in range(len(dates)):
            d = date_count_dict.get(dates[i])
            average_big_money = d[1] // d[0] if d[0] > 0 else 0
            if d[0] > 0:
                ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}万")
            else:
                ws.write(index, i + 1, f"")
    wb.save(file_name)
if __name__ == '__main__':
    q = queue.Queue()
    if not  q.empty():
        data = q.get(block=False)
        print(data)
if __name__ == '__main__1':
    # 加载每一天的代码的涨停价
    limit_up_price_date_code_dict = __load_limit_up_price()
    print(limit_up_price_date_code_dict["2025-04-22"]["002719"])
    dir_path = r"D:\回测数据\大单数据"
    files = os.listdir(dir_path)
    dates = []
    big_order_count_date_code_dict = {}
    for file_name in files:
        path_ = f"{dir_path}\\{file_name}"
        date = file_name[-14:-4]
        dates.append(date)
        code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date])
        print(code_big_order_count_dict)
        big_order_count_date_code_dict[date] = code_big_order_count_dict
    # 转为:{代码:{"日期":大单数}}
    fdata = {}
    for date in big_order_count_date_code_dict:
        for code in big_order_count_date_code_dict[date]:
            if code not in fdata:
                fdata[code] = {}
            fdata[code][date] = big_order_count_date_code_dict[date][code]
    for code in fdata:
        for date in dates:
            if date not in fdata[code]:
                # 默认填充0
                fdata[code][date] = [0, 0]
    dates.sort(reverse=True)
    export(fdata, dates, "D:/test.xls")