Administrator
2024-03-21 300cd8f128dd74ae7c3602ae3895f4d34312193e
L2数据插入主动卖单
8个文件已修改
181 ■■■■ 已修改文件
l2/l2_data_manager_new.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_processor.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_export_util.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py
@@ -316,6 +316,8 @@
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
            if code not in local_today_datas:
                local_today_datas[code] = []
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
@@ -341,7 +343,8 @@
                exec_data = order_info[2]
                order_begin_pos = cls.__get_order_begin_pos(
                    code)
                async_log_util.info(logger_debug, f"下单位矫正:真实下单位-{real_order_index} 订单信息-{order_info}  下单信息-{order_begin_pos}")
                async_log_util.info(logger_debug,
                                    f"下单位矫正:真实下单位-{real_order_index} 订单信息-{order_info}  下单信息-{order_begin_pos}")
                if order_begin_pos and order_begin_pos.buy_exec_index == exec_data["index"]:
                    cls.set_real_place_order_index(code, real_order_index, order_begin_pos)
                    async_log_util.info(logger_real_place_order_position,
@@ -382,7 +385,8 @@
                                    cls.__recompute_real_order_index, code, place_order_index, order_info)
                            except:
                                pass
                            async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
                            async_log_util.info(logger_l2_process, f"code:{code} 获取到下单真实位置:{place_order_index}")
                except:
                    async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错")
            # 第1条数据是否为09:30:00
@@ -391,7 +395,7 @@
                    price_data = global_util.cuurent_prices.get(code)
                    if price_data[1]:
                        # 当前涨停价,设置涨停时间
                        async_log_util.info(logger_l2_process, "开盘涨停:{}", code)
                        async_log_util.info(logger_l2_process, f"开盘涨停:{code}")
                        # 保存涨停时间
                        cls.__LimitUpTimeManager.save_limit_up_time(code, "09:30:00")
@@ -458,11 +462,10 @@
                    # 未挂单,时间相差不大才能挂单
                    # if tool.trade_time_sub(latest_time, "09:32:00") < 0 or l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time):
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code)
            async_log_util.info(logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 线程ID:{}", code,
                                add_datas[0]["index"],
                                add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                l2_log.threadIds.get(code))
            l2_log.info(code, logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 线程ID:{}", code,
                        add_datas[0]["index"],
                        add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                        l2_log.threadIds.get(code))
    # 处理未挂单
    @classmethod
@@ -492,7 +495,8 @@
            _start_time = round(t.time() * 1000)
            # S撤单计算,看秒级大单撤单
            try:
                b_need_cancel, b_cancel_msg = cls.__SCancelBigNumComputer.need_cancel_for_down(code, start_index,end_index)
                b_need_cancel, b_cancel_msg = cls.__SCancelBigNumComputer.need_cancel_for_down(code, start_index,
                                                                                               end_index)
                if b_need_cancel:
                    async_log_util.error(logger_debug, f"{code} S后撤单:{b_cancel_msg}")
                    return total_data[end_index], f"S后撤({b_cancel_msg})"
@@ -862,9 +866,9 @@
        if cls.__PauseBuyCodesManager.is_in_cache(code):
            return False, True, f"该代码被暂停交易"
        now_time_int = int(tool.get_now_time_str().replace(":", ""))
        if  now_time_int>= 145700:
        if now_time_int >= 145700:
            return False, True, f"14:57后不能交易"
        if  130100>=now_time_int>= 112900:
        if 130100 >= now_time_int >= 112900:
            return False, True, f"11:29:00-13:01:00不能交易"
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -1730,8 +1734,8 @@
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if (buy_nums >= threshold_num and buy_count >= threshold_count) or buy_nums >= threshold_max_num:
                        async_log_util.info(logger_l2_trade_buy,
                                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num}/{threshold_max_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
                        l2_log.info(code, logger_l2_trade_buy,
                                    f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num}/{threshold_max_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if _val["num"] >= bigger_num:
                    # 只统计59万以上的金额
@@ -1863,8 +1867,8 @@
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    async_log_util.info(logger_l2_trade_buy,
                                        f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
                    l2_log.info(code, logger_l2_trade_buy,
                                f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 判断买入位置是否在买入信号之前
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
l2/l2_log.py
@@ -53,7 +53,7 @@
    # 运行同步服务
    def run_async(self):
        for m in self.async_log_managers:
            threading.Thread(target=m.run_sync, daemon=True).start()
            threading.Thread(target=lambda: m.run_sync(True), daemon=True).start()
codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT)
l2/l2_transaction_data_manager.py
@@ -120,7 +120,9 @@
                    cls.__latest_sell_order_dict[code][4] = (d[3], d[6])
                else:
                    # 封存数据,计算新起点
                    l2_log.info(code, hx_logger_l2_transaction_sell_order, f"{cls.__latest_sell_order_dict[code]}")
                    # 大于50w的卖单才会保存
                    if cls.__latest_sell_order_dict[code][1] * cls.__latest_sell_order_dict[code][2] > 50*10000:
                        l2_log.info(code, hx_logger_l2_transaction_sell_order, f"{cls.__latest_sell_order_dict[code]}")
                    # 大于50w加入卖单
                    info = cls.__latest_sell_order_dict[code]
                    if info[1] * info[2] >= 500000:
l2/l2_transaction_data_processor.py
@@ -67,6 +67,7 @@
                        order_begin_pos = None
                except Exception as e:
                    async_log_util.error(hx_logger_l2_debug, str(e))
            big_sell_order_info = None
            try:
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas)
@@ -78,7 +79,7 @@
                    L2TradeDataProcessor.cancel_buy(code, f"S后撤:{cancel_msg}")
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{str(e)}")
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
                logger_debug.exception(e)
            # 计算已经成交的大单
log_module/async_log_util.py
@@ -1,7 +1,9 @@
"""
异步日志管理器
"""
import logging
import queue
import threading
import time
from log_module.log import logger_debug, logger_system
@@ -9,10 +11,15 @@
class AsyncLogManager:
    __log_queue = queue.Queue()
    def __init__(self):
        self.__log_queue = queue.Queue()
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
    def add_log(self, data):
        self.__log_queue.put_nowait(data)
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
@@ -30,56 +37,64 @@
        self.__add_log(logger, "exception", *args)
    # 运行同步日志
    def run_sync(self):
    def run_sync(self, add_to_common_log=False):
        print("run_sync", add_to_common_log)
        logger_system.info(f"run_sync 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                val = self.__log_queue.get()
                time_s = val[1]
                cmd = val[2]
                method = getattr(val[0], cmd)
                d = list(val[3])
                d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
                d = tuple(d)
                method(*d)
            except:
                pass
                if not add_to_common_log:
                    time_s = val[1]
                    cmd = val[2]
                    method = getattr(val[0], cmd)
                    d = list(val[3])
                    d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
                    d = tuple(d)
                    method(*d)
                else:
                    _common_log.add_log(val)
            except Exception as e:
                logging.exception(e)
l2_data_log = AsyncLogManager()
huaxin_l2_log = AsyncLogManager()
__common_log = AsyncLogManager()
_common_log = AsyncLogManager()
def debug(logger, *args):
    __common_log.debug(logger, *args)
    _common_log.debug(logger, *args)
def info(logger, *args):
    __common_log.info(logger, *args)
    _common_log.info(logger, *args)
def warning(logger, *args):
    __common_log.warning(logger, *args)
    _common_log.warning(logger, *args)
def error(logger, *args):
    __common_log.error(logger, *args)
    _common_log.error(logger, *args)
def exception(logger, *args):
    __common_log.exception(logger, *args)
    _common_log.exception(logger, *args)
# 运行同步日志
def run_sync():
    logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}")
    __common_log.run_sync()
    _common_log.run_sync()
if __name__ == "__main__":
    # info(logger_debug, "*-{}", "test")
    info(logger_debug, "002375")
    asyncLogManager = AsyncLogManager()
    asyncLogManager.info(logger_debug, "测试123")
    threading.Thread(target=lambda: asyncLogManager.run_sync(), daemon=True).start()
    time.sleep(1)
    # info(logger_debug, "002375")
    run_sync()
log_module/log_export.py
@@ -351,7 +351,7 @@
# 加载买入得分记录
def load_trade_recod(code,date=tool.get_now_date_str()):
def load_trade_recod(code, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/logs/gp/trade/trade_record.{date}.log"
    fdatas = []
    lines = __load_file_content(path)
@@ -370,7 +370,7 @@
# 加载l2订单成交数据
def load_huaxin_deal_record(code,date = tool.get_now_date_str()):
def load_huaxin_deal_record(code, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/logs/huaxin/l2/transaction_desc.{date}.log"
    # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)]
    fdatas = []
@@ -414,6 +414,7 @@
            fdatas.append((time_str, codes))
    return fdatas
# 加载华鑫本地买入订单号
def load_huaxin_local_buy_no():
    path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/l2_buy_no.{tool.get_now_date_str()}.log"
@@ -431,6 +432,29 @@
                    if code not in fdatas:
                        fdatas[code] = set()
                    fdatas[code].add(buy_no)
    return fdatas
# 加载华鑫成交的卖单
def load_huaxin_transaction_sell_no(code=None,date = tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/logs/huaxin/l2/transaction_sell_order.{date}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    data = line.split(" - ")[1].strip()
                    if data.startswith("["):
                        data = data[data.find("]") + 1:].strip()
                    data = data.split("code=")[1]
                    code_ = data[:6]
                    if code and code !=code_:
                        continue
                    data = data[6:].strip()
                    if code_ not in fdatas:
                        fdatas[code_] = []
                    fdatas[code_].append(eval(data))
    return fdatas
@@ -500,7 +524,7 @@
if __name__ == '__main__':
    fdatas = load_kpl_open_limit_up()
    fdatas = load_huaxin_transaction_sell_no("600990")
    print(len(fdatas))
    # print(get_h_cancel_compute_info("603912"))
third_data/data_server.py
@@ -534,8 +534,15 @@
            statistic_list = [(k, statistic[k]) for k in statistic]
            statistic_list.sort(key=lambda x: x[1], reverse=True)
            fresults = []
            limit_up_records = KPLLimitUpDataRecordManager.list_all_cache(tool.get_now_date_str())
            limit_up_count_dict = {}
            if limit_up_records:
                for d in limit_up_records:
                    limit_up_count_dict[d[3]] = d[12]
            for x in statistic_list:
                fresults.append((x[0], gpcode_manager.get_code_name(x[0]), x[1]))
                fresults.append((x[0], gpcode_manager.get_code_name(x[0]), x[1],limit_up_count_dict.get(x[0])))
            fresults = fresults[:30]
            response_data = json.dumps({"code": 0, "data": fresults})
        elif url.path == "/get_h_cancel_data":
utils/data_export_util.py
@@ -15,6 +15,7 @@
from log_module import log, log_export
from l2 import l2_data_source_util
from trade import deal_big_money_manager
from utils import tool
def export_l2_excel(code, date=None):
@@ -25,6 +26,8 @@
# 获取L2的数据
def get_l2_datas(code, today_datas=None, date=None):
    if date is None:
        date = tool.get_now_date_str()
    datas = today_datas
    if datas is None:
        local_today_datas = log_export.load_l2_from_log(date)
@@ -38,11 +41,15 @@
    deal_list_dict = {}
    for d in deal_list:
        deal_list_dict[str(d[0])] = d
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict)
    sell_no_dict = log_export.load_huaxin_transaction_sell_no(code=code, date=date)
    sell_nos = sell_no_dict.get(code)
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos)
    return fdatas
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict):
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos):
    def find_process_index(index):
        for i in range(0, len(process_indexs)):
            if process_indexs[i][0] <= index <= process_indexs[i][1]:
@@ -180,6 +187,39 @@
        format_data.append(cancel_order_info)
        format_data.append(data["val"].get("orderNo"))
        fdatas.append((style_int, trade_info, format_data))
    # 将订单号索引
    order_no_index_map = {}
    for i in range(len(fdatas)):
        d = fdatas[i][2]
        if d[6].find('撤') >= 0:
            continue
        order_no_index_map[int(d[10])] = i
    order_no_indexes = [(k, order_no_index_map[k]) for k in order_no_index_map]
    order_no_indexes.sort(key=lambda x: x[0])
    if sell_nos:
        for sell_info in sell_nos:
            if sell_info[1] * sell_info[2] < 50 * 10000:
                continue
            for i in range(len(order_no_indexes) - 1):
                if order_no_indexes[i][0] < sell_info[0] < order_no_indexes[i + 1][0]:
                    item = []
                    item.append(order_no_indexes[i + 1][1])
                    item.append(l2_huaxin_util.convert_time(sell_info[3][0], with_ms=True))
                    item.append("")
                    item.append(
                        "{}万".format(round(sell_info[1] * sell_info[2] / 10000, 1)))
                    item.append(sell_info[2])
                    item.append(sell_info[1] // 100)
                    item.append("主动卖")
                    item.append(1)
                    item.append(l2_huaxin_util.convert_time(sell_info[4][0], with_ms=True))
                    item.append(None)
                    item.append(sell_info[0])
                    fdatas.insert(order_no_indexes[i + 1][1], (0,None,item))
                    break
    return fdatas
@@ -283,6 +323,6 @@
if __name__ == "__main__":
    try:
        get_l2_datas("600822", date="2024-03-12")
        get_l2_datas("600990")
    except Exception as e:
        logging.exception(e)