Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
test/test.py
@@ -1,36 +1,126 @@
import decimal
import json
import os
import queue
import time
import l2_data_util
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodesNameManager
from huaxin_client import l1_subscript_codes_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinHttpApi, HistoryKDatasUtils
from utils import tool, init_data_util
import xlwt
from code_attribute import gpcode_manager
from l2.huaxin import l2_huaxin_util
from utils import tool
def test_active_buy():
    current_rank = 2
    TIME_STR_RANGES = ["10:00:00", "10:30:00", "11:00:00", "13:00:00", "13:30:00", "14:00:00", "14:30:00",
                       "15:00:00"]
    TIME_INT_RANGES = [int(x.replace(':', '')) for x in TIME_STR_RANGES]
    MAX_RANKS = [3, 3, 2, 2, 1, 0, 0, 0]
    now_time_str = "09:36:00".replace(':', '')
    for i in range(len(TIME_INT_RANGES)):
        if int(now_time_str) <= TIME_INT_RANGES[i]:
            if MAX_RANKS[i] > current_rank:
                return True
            break
    return False
class Test:
    def print_log(self, arg):
        print(time.time(), f"arg:{arg}")
        time.sleep(2)
if __name__ == "__main__":
    # codes = ["600859", "000333"]
    # for code in codes:
    #     datas = init_data_util.get_volumns_by_code(code)
    #     HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas, force=True)
    codes = HistoryKDataManager().get_history_bars_codes("2024-07-02")
    print(codes)
    # print(count)
    # print(HistoryKDataManager().get_history_bars(code, "2024-07-02"))
def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None):
    """
    统计某一天的涨停大单数量分布
    @param path_:
    @param limit_up_price_dict:
    @return:
    """
    with open(path_, mode='r', encoding='utf-8') as f:
        lines = f.readlines()
        code_big_order_count_dict = {}
        for line in lines:
            data = line[line.find(" - ") + 3:].strip()
            data = eval(data)
            if data[1] != 0:
                continue
            if data[2][2] < 1000000:
                continue
            if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time:
                continue
            # if data[0] not in limit_up_price_dict:
            #     continue
            # if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001:
            #     continue
            if data[0] not in code_big_order_count_dict:
                code_big_order_count_dict[data[0]] = [0, 0]
            code_big_order_count_dict[data[0]][0] += 1
            code_big_order_count_dict[data[0]][1] += data[2][2]
        return code_big_order_count_dict
def __load_limit_up_price():
    # 日期下代码的涨停价
    limit_up_price_date_code_dict = {}
    dir_path = r"D:\回测数据\K线数据"
    k_bars_files = os.listdir(dir_path)
    for k_bars_file in k_bars_files:
        code = k_bars_file.split("_")[1][:6]
        with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f:
            lines = f.readlines()
            data = lines[0]
            datas = eval(data)
            # {"日期": 涨停价}
            date_limit_price = {
                d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in
                datas}
            for date in date_limit_price:
                if date not in limit_up_price_date_code_dict:
                    limit_up_price_date_code_dict[date] = {}
                limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2)
    return limit_up_price_date_code_dict
def export(fdatas, dates, file_name):
    wb = xlwt.Workbook(encoding="utf-8")
    ws = wb.add_sheet('sheet1')
    for i in range(len(dates)):
        ws.write(0, i + 1, dates[i])
    index = 0
    for code in fdatas:
        date_count_dict = fdatas[code]
        index += 1
        ws.write(index, 0, code)
        for i in range(len(dates)):
            d = date_count_dict.get(dates[i])
            average_big_money = d[1] // d[0] if d[0] > 0 else 0
            if d[0] > 0:
                ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}万")
            else:
                ws.write(index, i + 1, f"")
    wb.save(file_name)
if __name__ == '__main__':
    q = queue.Queue()
    if not  q.empty():
        data = q.get(block=False)
        print(data)
if __name__ == '__main__1':
    # 加载每一天的代码的涨停价
    limit_up_price_date_code_dict = __load_limit_up_price()
    print(limit_up_price_date_code_dict["2025-04-22"]["002719"])
    dir_path = r"D:\回测数据\大单数据"
    files = os.listdir(dir_path)
    dates = []
    big_order_count_date_code_dict = {}
    for file_name in files:
        path_ = f"{dir_path}\\{file_name}"
        date = file_name[-14:-4]
        dates.append(date)
        code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date])
        print(code_big_order_count_dict)
        big_order_count_date_code_dict[date] = code_big_order_count_dict
    # 转为:{代码:{"日期":大单数}}
    fdata = {}
    for date in big_order_count_date_code_dict:
        for code in big_order_count_date_code_dict[date]:
            if code not in fdata:
                fdata[code] = {}
            fdata[code][date] = big_order_count_date_code_dict[date][code]
    for code in fdata:
        for date in dates:
            if date not in fdata[code]:
                # 默认填充0
                fdata[code][date] = [0, 0]
    dates.sort(reverse=True)
    export(fdata, dates, "D:/test.xls")