Administrator
2024-03-28 af7d9b55cad79aa7ade0b02802a26ebeb742c297
持仓L2订阅
4个文件已修改
8个文件已添加
2069 ■■■■■ 已修改文件
constant.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 397 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 180 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_transform_protocol.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/l2_huaxin_util.py 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_source_util.py 219 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 559 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 288 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade_data_update.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_strategy.py 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -46,6 +46,7 @@
    "passwd": "Yeshi2016@"
}
MAX_L2_CHANNEL_COUNT = 10
# 获取根路径
def get_path_prefix():
huaxin_client/l2_client.py
New file
@@ -0,0 +1,397 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import queue
import threading
import time
import concurrent.futures
from typing import List
from huaxin_client import command_manager, l2_data_transform_protocol
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = constant.LOCAL_IP
g_SubMarketData = False
g_SubTransaction = False
g_SubOrderDetail = False
g_SubXTSTick = False
g_SubXTSMarketData = False
g_SubNGTSTick = False
g_SubBondMarketData = False
g_SubBondTransaction = False
g_SubBondOrderDetail = False
SH_Securities = [b"603000", b"600225", b"600469", b"600616", b"600059", b"002849", b"605188", b"603630", b"600105",
                 b"603773", b"603915", b"603569", b"603322", b"603798", b"605198", b"603079", b"600415", b"600601"]
SH_XTS_Securities = [b"018003", b"113565"]
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    # 新增订阅
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            # 取消订阅逐笔成交
            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔委托
            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
            # 订阅逐笔成交
            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
    def __process_codes_data(self, codes, from_cache=False, delay=0.0):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
            time.sleep(delay)
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        try:
            for c in del_codes:
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c)
                l2_data_manager.add_target_code(c)
        except Exception as e:
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        try:
            self.__process_codes_data(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes)
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            threading.Thread(
                target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0),
                daemon=True).start()
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspUnSubOrderDetail", bIsLast)
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRtnTransaction(self, pTransaction):
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
                    "OrderType": "2",
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'],
                    "OrderStatus": "D"}
            buyNo = pTransaction['BuyNo']
            sellNo = pTransaction['SellNo']
            if buyNo > 0:
                # 买
                item["OrderNO"] = buyNo
                item["Side"] = "1"
            elif sellNo > 0:
                # 卖
                item["OrderNO"] = sellNo
                item["Side"] = "2"
            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
        else:
            item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                    "TradeVolume": pTransaction['TradeVolume'],
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        # 输出逐笔委托数据
        # 上证OrderStatus=b"D"表示撤单
        item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
                "Volume": pOrderDetail['Volume'],
                "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        self.l2_data_upload_manager.add_l2_order_detail(item, 0)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes):
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes))
        try:
            spi.set_codes_data(codes)
        except Exception as e:
            logging.exception(e)
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                _type = data["type"]
                if _type == "l2_cmd":
                    __start_time = time.time()
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
                    use_time = time.time() - __start_time
                    if use_time > 0.005:
                        huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
        except Exception as e:
            logging.exception(e)
pipe_strategy = None
def test_add_codes(queue_r):
    time.sleep(10)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
    #     data = json.loads(value)
    #     _type = data["type"]
    #     if _type == "listen_volume":
    #         volume = data["data"]["volume"]
    #         code = data["data"]["code"]
    #         spi.set_code_special_watch_volume(code, volume)
    #     elif _type == "l2_cmd":
    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    time.sleep(2)
    demo_datas = ["603002",
                  "002654",
                  "603701",
                  "002908"]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": demo_datas}))
    time.sleep(10)
    while True:
        try:
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
            time.sleep(0.1)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
        logger_system.info("L2订阅服务启动成功")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
if __name__ == "__main__":
    input()
huaxin_client/l2_data_manager.py
New file
@@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
import logging
import marshal
import queue
import threading
import time
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error
import collections
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
# 临时数据
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
    # 过滤订单
    def __filter_order(self, item):
        if item[1] * item[2] < 500000:
            return None
        return item
        # 过滤订单
    def __filter_transaction(self, item):
        return item
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
        code = data["SecurityID"]
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                  data['SellNo'], data['ExecType']))
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            self.upload_l2_data_task_dict[code] = (t1, t2)
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.temp_log_queue_dict:
            self.temp_log_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    # 前置数据处理,过滤掉无用的数据
                    data = self.__filter_order(data)
                    if data:
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    __start_time = time.time()
                    last_data = temp_list[-1]
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
                                                                                                   time.time())
                    use_time = time.time() - __start_time
                    if use_time > 0.01:
                        # 记录10ms以上的数据
                        huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s  结束数据:{last_data}")
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
                        break
                    self.l2_order_codes.add(code)
                    time.sleep(0.001)
            except Exception as e:
                logging.exception(e)
            finally:
                pass
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    data = self.__filter_transaction(data)
                    if data:
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
                                                                                                         temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
                        self.l2_transaction_codes.discard(code)
                        break
                    self.l2_transaction_codes.add(code)
                    time.sleep(0.001)
            except:
                pass
            finally:
                pass
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
def add_subscript_codes(codes):
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
huaxin_client/l2_data_transform_protocol.py
New file
@@ -0,0 +1,18 @@
"""
L2数据传输协议
"""
class L2DataCallBack:
    # L2委托明细
    def OnL2Order(self, code, datas, timestamp):
        pass
    def OnL2Transaction(self, code, datas):
        pass
    def OnMarketData(self, code, datas):
        pass
    def OnTradingOrderCancel(self, code, buy_no):
        pass
l2/huaxin/l2_huaxin_util.py
New file
@@ -0,0 +1,116 @@
"""
华鑫LV2处理工具类
"""
# 处理逐笔委托
# item逐笔委托
# (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'],
# data['OrderTime'],data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'])
def convert_time(time_str, with_ms=False):
    time_str = str(time_str)
    if time_str.startswith("9"):
        time_str = f"0{time_str}"
    ms = "{:0<3}".format(time_str[6:])
    time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
    if with_ms:
        return f"{time_}.{ms}"
    return time_
def __convert_order(item, limit_up_price):
    time_str = f"{item[5]}"
    if time_str.startswith("9"):
        time_str = f"0{time_str}"
    ms = "{:0<3}".format(time_str[6:])
    time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
    price = item[1]
    if price <= 0:
        # 深证的买撤无价格数据,需要去查找价格数据,暂时设置为涨停价
        price = limit_up_price
    limitPrice = 1 if abs(limit_up_price - price) < 0.001 else 0
    operateType = 0
    if item[9] == 'D':
        if item[3] == '1':
            # 买撤
            operateType = 1
        else:
            # 卖撤
            operateType = 3
    else:
        if item[3] == '1':
            # 买
            operateType = 0
        else:
            # 卖
            operateType = 2
    return {"time": time_, "tms": ms, "price": price, "num": item[2] // 100, "limitPrice": limitPrice,
            "operateType": operateType, "cancelTime": 0, "cancelTimeUnit": 0, "orderNo": item[8],
            "mainSeq": item[6], "subSeq": item[7]}
# 处理l2数据
# filter_not_limit_up : 过滤掉非涨停数据
def __format_l2_data(origin_datas, code, limit_up_price, filter_not_limit_up=True):
    datas = []
    dataIndexs = {}
    same_time_num = {}
    for i in range(0, len(origin_datas)):
        item = origin_datas[i]
        # 解析数据
        time = item["time"]
        if time in same_time_num:
            same_time_num[time] = same_time_num[time] + 1
        else:
            same_time_num[time] = 1
        price = item["price"]
        num = item["num"]
        limitPrice = item["limitPrice"]
        # 涨停价
        if limit_up_price is not None:
            if abs(price - limit_up_price) < 0.001:
                limitPrice = 1
            else:
                limitPrice = 0
            item["limitPrice"] = "{}".format(limitPrice)
        operateType = item["operateType"]
        # 不需要非涨停买与买撤
        if filter_not_limit_up and int(item["limitPrice"]) != 1 and (
                int(operateType) == 0 or int(operateType) == 1) and num != 1:
            continue
        key = "{}-{}-{}".format(code, item["mainSeq"], item["subSeq"])
        if key in dataIndexs:
            # 数据重复次数+1
            datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
        else:
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    return datas
def get_format_l2_datas(code, origin_datas, limit_up_price, start_index):
    # 先转变数据格式
    datas = [__convert_order(x, float(limit_up_price)) for x in origin_datas]
    # 在9:25之前不过滤非涨停金额
    filter_not_limit_up = True
    if int(datas[0]["time"][:5].replace(":", "")) <= 925:
        filter_not_limit_up = False
    fdatas = __format_l2_data(datas, code, float(limit_up_price), filter_not_limit_up=filter_not_limit_up)
    for i in range(0, len(fdatas)):
        fdatas[i]["index"] = start_index + i
    return fdatas
if __name__ == "__main__":
    ds = ["('605167', 10.08, 68500, '1', '0', 9303108, 2, 439438, 436472, 'D', 1695864632451)",
          "('603439', 17.97, 27800, '1', '0', 9304966, 6, 435127, 407524, 'D', 1695864649883)",
          "('002369', 0.0, 100800, '1', '2', 93051880, 2011, 1431910, 1160638, 'D', 1695864651875)"
          ]
    for d in ds:
        d = eval(d)
        print(__convert_order(d, 15.55))
l2/l2_data_source_util.py
New file
@@ -0,0 +1,219 @@
"""
L2数据溯源
"""
import constant
from log_module.log import logger_l2_error
from utils import tool
class L2DataSourceUtils(object):
    __cancel_and_buy_map = {}
    __buy_and_cancel_map = {}
    @classmethod
    def __save_map(cls, code, buy_index, cancel_index):
        if cls.__cancel_and_buy_map.get(code) is None:
            cls.__cancel_and_buy_map[code] = {}
        cls.__cancel_and_buy_map[code][cancel_index] = buy_index
        if cls.__buy_and_cancel_map.get(code) is None:
            cls.__buy_and_cancel_map[code] = {}
        cls.__buy_and_cancel_map[code][buy_index] = cancel_index
    @classmethod
    def __get_cancel_index_cache(cls, code, buy_index):
        if code not in cls.__buy_and_cancel_map:
            return None
        return cls.__buy_and_cancel_map[code].get(buy_index)
    @classmethod
    def __get_buy_index_cache(cls, code, cancel_index):
        if code not in cls.__cancel_and_buy_map:
            return None
        return cls.__cancel_and_buy_map[code].get(cancel_index)
    @classmethod
    def __compare_time(cls, time1, time2):
        result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
        return result
    # 计算时间的区间
    @classmethod
    def __compute_time_space_as_second(cls, cancel_time, cancel_time_unit):
        __time = int(cancel_time)
        if int(cancel_time) == 0:
            return 0, 0
        unit = int(cancel_time_unit)
        if unit == 0:
            # 秒
            return __time, (__time + 1)
        elif unit == 1:
            # 分钟
            return __time * 60, (__time + 1) * 60
        elif unit == 2:
            # 小时
            return __time * 3600, (__time + 1) * 3600
    # 华鑫渠道的L2,根据买撤数据查找买入数据
    @classmethod
    def __get_buy_index_with_cancel_data_by_huaxin_l2(cls, code, cancel_data, local_today_num_operate_map):
        buy_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
        if buy_datas is None:
            # 无数据
            return None
        # orderNo
        for bd in buy_datas:
            # 根据订单号做匹配
            if bd["val"]["orderNo"] == cancel_data["val"]["orderNo"]:
                return bd["index"]
        return None
    # 同花顺渠道的L2,根据买撤数据查找买入数据
    @classmethod
    def __get_buy_index_with_cancel_data_by_ths_l2(cls, code, cancel_data, local_today_num_operate_map):
        buy_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
        if buy_datas is None:
            # 无数据
            return None
        min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                                  cancel_data["val"]["cancelTimeUnit"])
        max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space)
        min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space)
        # 匹配到的index
        suit_indexes = []
        for i in range(0, len(buy_datas)):
            data = buy_datas[i]
            if int(data["val"]["operateType"]) != 0:
                continue
            if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
                continue
                # 如果能找到对应的买撤就需要返回
            cancel_index = cls.__get_cancel_index_cache(code, data["index"])
            if cancel_index is not None and cancel_index != cancel_data["index"]:
                continue
            if min_space == 0 and max_space == 0:
                if cls.__compare_time(data["val"]["time"], min_time) == 0:
                    suit_indexes.append(data["index"])
            elif cls.__compare_time(data["val"]["time"], min_time) > 0 and cls.__compare_time(data["val"]["time"],
                                                                                              max_time) <= 0:
                suit_indexes.append(data["index"])
        if len(suit_indexes) >= 2:
            # 多个匹配项,优先溯源离取消位置最近的数据
            suit_indexes.sort(key=lambda t: cancel_data["index"] - t)
        if len(suit_indexes) >= 1:
            cls.__save_map(code, suit_indexes[0], cancel_data["index"])
            return suit_indexes[0]
        return None
    @classmethod
    def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        buy_index = cls.__get_buy_index_cache(code, cancel_data["index"])
        if buy_index is not None:
            return buy_index
        if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
            return cls.__get_buy_index_with_cancel_data_by_huaxin_l2(code, cancel_data, local_today_num_operate_map)
        else:
            return cls.__get_buy_index_with_cancel_data_by_ths_l2(code, cancel_data, local_today_num_operate_map)
    # 根据买撤数据(与今日总的数据)计算买入数据
    @classmethod
    def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"])
        cancel_datas = local_today_num_operate_map.get(key)
        if cancel_datas:
            try:
                cancel_datas.sort(key=lambda t: t["index"])
            except Exception as e:
                print("测试")
            for item in cancel_datas:
                # 提前做计算
                cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
        return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
    # 根据买撤数据计算买入数据(华鑫原生L2数据适用)
    @classmethod
    def get_buy_index_with_cancel_data_v2(cls, cancel_data, buyno_map):
        order_no = str(cancel_data["val"]["orderNo"])
        buy_data = buyno_map.get(order_no)
        if buy_data:
            return buy_data["index"]
        return None
    # 获取没撤的笔数
    @classmethod
    def get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        # 判断当前买是否已经买撤
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        canceled = False
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = cls.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
                if buy_index == index:
                    canceled = True
                    count = data["re"] - cancel_data["re"]
                    if count > 0:
                        return count
                    break
        if not canceled:
            count = data["re"]
            return count
        return 0
        # 获取没撤的笔数
    # 获取涨停买没有撤单的数量
    @classmethod
    def get_limit_up_buy_no_canceled_count_v2(cls, code, index, total_data, canceled_buyno_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(
                f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        order_no = str(val["orderNo"])
        canceled_data = canceled_buyno_map.get(order_no)
        if canceled_data:
            return data["re"] - canceled_data["re"]
        else:
            return data["re"]
    @classmethod
    def get_limit_up_buy_canceled_data_v2(cls, code, index, total_data, canceled_buyno_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(
                f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        order_no = str(val["orderNo"])
        canceled_data = canceled_buyno_map.get(order_no)
        if canceled_data:
            return canceled_data
        else:
            return None
# if __name__ == "__main__":
#     code = "000925"
#     l2_data_util.load_l2_data(code)
#     total_datas = l2_data_util.local_today_datas.get(code)
#     local_today_num_operate_map = l2_data_util.local_today_num_operate_map.get(code)
#     cancel_index = 900
#     index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index],
#                                                              l2_data_util.local_today_num_operate_map.get(code))
#     print("溯源位置:", index)
l2/l2_data_util.py
New file
@@ -0,0 +1,559 @@
"""
L2相关数据处理
"""
# L2交易队列
import datetime
import decimal
import json
import logging
import time
import numpy
import constant
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_source_util
from log_module import log, async_log_util, log_export
from db import redis_manager_delegate as redis_manager
from utils import tool
__db = 1
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
local_latest_datas = {}
# 本地今日数据
local_today_datas = {}
# 本地手数+操作那类型组成的临时变量
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
# 买入订单号映射,只有原生的L2数据才有
local_today_buyno_map = {}
# 已经撤单的订单号
local_today_canceled_buyno_map = {}
def load_l2_data(code, load_latest=True, force=False):
    # 加载最近的l2数据
    if load_latest:
        if local_latest_datas.get(code) is None or force:
            # 获取最近的数据
            _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code))
            if _data is not None:
                if code in local_latest_datas:
                    local_latest_datas[code] = json.loads(_data)
                else:
                    local_latest_datas.setdefault(code, json.loads(_data))
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log_export.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        data_normal = True
        if datas and len(datas) < datas[-1]["index"] + 1:
            data_normal = False
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
        return data_normal
    return True
# 加载所有的l2数据
def load_l2_data_all(force=False):
    datas = log_export.load_l2_from_log()
    for code in datas:
        if force:
            local_today_datas[code] = datas[code]
        else:
            if code not in local_today_datas:
                local_today_datas[code] = datas[code]
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
# 将数据根据orderNo分类,原生数据才有
def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if local_today_buyno_map.get(code) is None:
        local_today_buyno_map[code] = {}
    if clear:
        local_today_buyno_map[code] = {}
    for data in source_datas:
        if data["val"]["operateType"] != 0:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_buyno_map[code].get(key) is None:
            local_today_buyno_map[code].setdefault(key, data)
# 将数据根据orderNo分类已撤订单,原生数据才有
def load_canceled_buy_no_map(local_today_canceled_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if local_today_canceled_buyno_map.get(code) is None:
        local_today_canceled_buyno_map[code] = {}
    if clear:
        local_today_canceled_buyno_map[code] = {}
    for data in source_datas:
        # 只留下买撤
        if data["val"]["operateType"] != 1:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_canceled_buyno_map[code].get(key) is None:
            local_today_canceled_buyno_map[code].setdefault(key, data)
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    # 只有有新曾数据才需要保存
    if add_datas:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(),
                                   json.dumps(datas))
            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
            set_l2_data_latest_count(code, len(datas))
        try:
            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
        except Exception as e:
            logging.exception(e)
# 设置最新的l2数据采集的数量
def set_l2_data_latest_count(code, count):
    key = "latest-l2-count-{}".format(code)
    RedisUtils.setex(_redisManager.getRedis(), key, 2, count)
    pass
# 获取代码最近的l2数据数量
def get_l2_data_latest_count(code):
    if code is None or len(code) < 1:
        return 0
    key = "latest-l2-count-{}".format(code)
    result = RedisUtils.get(_redisManager.getRedis(), key)
    if result is None:
        return 0
    else:
        return int(result)
def parseL2Data(str):
    day = datetime.datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    count = data["count"]
    data = data["data"]
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, data, count
# 元数据是否有差异
def is_origin_data_diffrent(data1, data2):
    if data1 is None or data2 is None:
        return True
    if len(data1) != len(data2):
        return True
    # 比较
    data_length = len(data1)
    step = len(data1) // 10
    for i in range(0, data_length, step):
        if json.dumps(data1[i]) != json.dumps(data2[i]):
            return True
    return False
# 是否为大单
def is_big_money(val):
    price = float(val["price"])
    money = price * int(val["num"])
    if price > 3.0:
        if money >= 29900:
            return True
        else:
            return False
    else:
        max_money = price * 10000
        if money >= max_money * 0.95:
            return True
        else:
            return False
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, latest_datas, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_data = None
        latest_datas_ = latest_datas
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, latest_datas, _datas):
        latest_data = latest_datas
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            # 暂时不将数据保存到redis
            # saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
        datas = []
        dataIndexs = {}
        same_time_num = {}
        for item in data:
            # 解析数据
            time = item["time"]
            if time in same_time_num:
                same_time_num[time] = same_time_num[time] + 1
            else:
                same_time_num[time] = 1
            price = float(item["price"])
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None:
                if limit_up_price == tool.to_price(decimal.Decimal(price)):
                    limitPrice = 1
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                                   cancelTimeUnit)
            if key in dataIndexs:
                # 数据重复次数+1
                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
            else:
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # @classmethod
    # def get_time_as_str(cls, time_seconds):
    #     ts = time_str.split(":")
    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 0:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        return True
    # 涨停卖撤
    @classmethod
    def is_limit_up_price_sell_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 3:
            return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        return True
    @classmethod
    def is_buy_cancel(cls, val):
        if int(val["operateType"]) != 1:
            return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
    # 是否为买
    @classmethod
    def is_buy(cls, val):
        if int(val["operateType"]) == 0:
            return True
        return False
    # l2时间差值
    @classmethod
    def time_sub_as_ms(cls, val1, val2):
        # 计算时间差值
        sub_s = tool.trade_time_sub(val1["time"], val2["time"])
        sub_ms = int(val1["tms"]) - int(val2["tms"])
        fs = sub_s * 1000 + sub_ms
        return fs
    @classmethod
    def get_time_with_ms(cls, val):
        return val["time"] + "." + "{:0>3}".format(int(val["tms"]))
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
    @classmethod
    def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
        val = data["val"]
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        # 是否有买撤数据
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(cancel_data,
                                                                                                    local_today_buyno_map.get(
                                                                                                        code))
                if buy_index == data["index"]:
                    return True
        return False
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   last_index,
                                   latest_not_limit_up_time=None):
        def find_traded_progress_index_simple(queues):
            index_set = set()
            for num in queues:
                buy_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(num, "0", buy_1_price_format))
                if buy_datas is not None and len(buy_datas) > 0:
                    for data in buy_datas:
                        # 在最近一次非涨停买1更新的时间之后才有效
                        if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                                   latest_not_limit_up_time) >= 0:
                            if data["index"] >= last_index:
                                index_set.add(data["index"])
            index_list = list(index_set)
            index_list.sort()
            num_list = []
            new_index_list = []
            for index in index_list:
                for i in range(0, total_datas[index]["re"]):
                    num_list.append(total_datas[index]["val"]["num"])
                    new_index_list.append(index)
            index_list_str = ",".join(list(map(str, num_list)))
            queue_list_str = ",".join(list(map(str, queues)))
            find_index = index_list_str.find(queue_list_str)
            if find_index >= 0:
                temp_str = index_list_str[0:find_index]
                if temp_str.endswith(","):
                    temp_str = temp_str[:-1]
                if temp_str == "":
                    return new_index_list[0], new_index_list[0:len(queues)]
                start_index = len(temp_str.split(","))
                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
            return None, None
        # 3个数据以上的不需要判断最近的一次未涨停时间
        if len(queueList) >= 3:
            latest_not_limit_up_time = None
        # 判断匹配的位置是否可信
        def is_trust(indexes):
            cha = []
            for i in range(1, len(indexes)):
                cha.append(indexes[i] - indexes[i - 1] - 1)
            if len(cha) <= 1:
                return True
            # 标准差小于1
            std_result = numpy.std(cha)
            if std_result < 10:
                # 绝对可信
                return True
            for i in range(0, len(cha)):
                if abs(cha[i]) > 10:
                    # 有超过10 的需要判断两个相临数据间的未撤的买入数量
                    buy_count = 0
                    for index in range(indexes[i] + 1, indexes[i + 1] - 1):
                        if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                            if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
                                buy_count += total_datas[index]["re"]
                    # 暂定3个误差范围
                    if buy_count >= 3:
                        return False
            return True
        if len(queueList) == 0:
            return None
        # last_index不能撤,如果已撤就清零
        if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
            last_index = 0
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)---------
        max_win_len = len(queueList)
        min_win_len = len(queueList) // 2
        if max_win_len == min_win_len:
            min_win_len = max_win_len - 1
        for win_len in range(max_win_len, min_win_len, -1):
            # 窗口移动
            for i in range(0, max_win_len - win_len + 1):
                queues = queueList[i:i + win_len]
                f_start_index, f_indexs = find_traded_progress_index_simple(queues)
                if f_start_index and is_trust(f_indexs):
                    return f_start_index
        raise Exception("尚未找到成交进度")
if __name__ == "__main__":
    print(L2DataUtil.get_time_with_ms({"time": "10:00:00", "tms": 490}))
l2/l2_transaction_data_manager.py
New file
@@ -0,0 +1,191 @@
"""
L2成交数据处理器
"""
import json
from db import redis_manager
from db.redis_manager_delegate import RedisUtils
# from l2 import l2_log
from l2.huaxin import l2_huaxin_util
from log_module import async_log_util
from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order
from utils import tool
# 成交数据统计
class HuaXinBuyOrderManager:
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    # 正在成交的订单
    __dealing_order_info_dict = {}
    # 最近成交的订单{"code":(订单号,是否成交完成)}
    __latest_deal_order_info_dict = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HuaXinBuyOrderManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "dealing_order_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__dealing_order_info_dict, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 将数据持久化到数据库
    def sync_dealing_data_to_db(self):
        for code in self.__dealing_order_info_dict:
            RedisUtils.setex(self.__get_redis(), f"dealing_order_info-{code}", tool.get_expire(),
                             json.dumps(self.__dealing_order_info_dict[code]))
    # 获取代码正在成交的信息
    # 返回数据:[订单号,总股数,开始成交时间,结束成交时间, 总买]
    @classmethod
    def get_dealing_order_info(cls, code):
        return cls.__dealing_order_info_dict.get(code)
    # 统计成交的情况
    @classmethod
    def statistic_deal_desc(cls, code, data, total_buy_num):
        if code not in cls.__dealing_order_info_dict:
            # 数据格式[订单号,总股数,开始成交时间,结束成交时间, 总买]
            cls.__dealing_order_info_dict[code] = [data[6], 0, data[3], data[3], total_buy_num]
        if cls.__dealing_order_info_dict[code][0] == data[6]:
            # 成交同一个订单号
            cls.__dealing_order_info_dict[code][1] += data[2]
            cls.__dealing_order_info_dict[code][3] = data[3]
        else:
            # 保存上一条数据
            async_log_util.info(hx_logger_l2_transaction_desc, f"{code}#{cls.__dealing_order_info_dict[code]}")
            # 设置最近成交完成的一条数据
            deal_info = (
                cls.__dealing_order_info_dict[code][0],
                cls.__dealing_order_info_dict[code][4] == cls.__dealing_order_info_dict[code][1])
            cls.__latest_deal_order_info_dict[code] = deal_info
            # 初始化本条数据
            cls.__dealing_order_info_dict[code] = [data[6], data[2], data[3], data[3], total_buy_num]
            return deal_info
        return None
# 卖单统计数据
class HuaXinSellOrderStatisticManager:
    # 最近的大卖单成交,格式:{code:[卖单信息,...]}
    __latest_sell_order_info_list_dict = {}
    # 大单卖单号的集合,格式:{code:{卖单号}}
    __big_sell_order_ids_dict = {}
    # 大卖单的卖单号->卖单信息映射
    __big_sell_order_info_dict = {}
    # 大单列表
    __big_sell_order_info_list_dict = {}
    # 最近的卖单, 格式{code:[卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)]}
    __latest_sell_order_dict = {}
    # 返回最近1s的大单卖:(总卖金额,[(卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)),...])
    @classmethod
    def add_transaction_datas(cls, code, datas):
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType']))
        if code not in cls.__latest_sell_order_info_list_dict:
            cls.__latest_sell_order_info_list_dict[code] = []
        if code not in cls.__big_sell_order_ids_dict:
            cls.__big_sell_order_ids_dict[code] = set()
        if code not in cls.__big_sell_order_info_dict:
            cls.__big_sell_order_info_dict[code] = {}
        if code not in cls.__big_sell_order_info_list_dict:
            cls.__big_sell_order_info_list_dict[code] = []
        for d in datas:
            cls.__latest_sell_order_info_list_dict[code].append(d)
            if code not in cls.__latest_sell_order_dict:
                cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])]
            else:
                if cls.__latest_sell_order_dict[code][0] == d[7]:
                    cls.__latest_sell_order_dict[code][1] += d[2]
                    cls.__latest_sell_order_dict[code][2] = d[1]
                    cls.__latest_sell_order_dict[code][4] = (d[3], d[6])
                else:
                    info = cls.__latest_sell_order_dict[code]
                    # 上个卖单成交完成
                    # 封存数据,计算新起点
                    # 大于50w的卖单才会保存
                    # 大于50w加入卖单
                    if info[1] * info[2] >= 500000:
                        async_log_util.info(hx_logger_l2_transaction_sell_order, f"{code}#{cls.__latest_sell_order_dict[code]}")
                        cls.__big_sell_order_ids_dict[code].add(info[0])
                        cls.__big_sell_order_info_dict[code][info[0]] = info
                        cls.__big_sell_order_info_list_dict[code].append(info)
                    cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])]
        latest_time = l2_huaxin_util.convert_time(datas[-1][3], with_ms=True)
        min_time = tool.trade_time_add_millionsecond(latest_time, -1000)
        min_time_int = int(min_time.replace(":", "").replace(".", ""))
        # 计算最近1s的大单成交
        total_big_sell_datas = cls.__big_sell_order_info_list_dict.get(code)
        start_index = 0
        total_sell_info = [0, None]  # 总资金,开始成交信息,结束成交信息
        latest_sell_order_info = cls.__latest_sell_order_dict[code]
        big_sell_order_ids = cls.__big_sell_order_ids_dict[code]
        # print("大卖单", big_sell_order_ids)
        big_sell_orders = []
        temp_sell_order_ids = set()
        # 统计已经结算出的大单
        print(f"总大单数量:{len(total_big_sell_datas)}")
        for i in range(len(total_big_sell_datas) - 1, -1, -1):
            bd = total_big_sell_datas[i]
            if bd[0] != latest_sell_order_info[0]:
                # 不是最近的成交且不是大单直接过滤
                if bd[0] not in big_sell_order_ids:
                    continue
                else:
                    if bd[0] not in temp_sell_order_ids:
                        big_sell_orders.append(cls.__big_sell_order_info_dict[code].get(bd[0]))
                        temp_sell_order_ids.add(bd[0])
            else:
                # 是最近的但不是大单需要过滤
                if latest_sell_order_info[1] * latest_sell_order_info[2] < 500000:
                    continue
                else:
                    if latest_sell_order_info[0] not in temp_sell_order_ids:
                        big_sell_orders.append(latest_sell_order_info)
                        temp_sell_order_ids.add(latest_sell_order_info[0])
            if min_time_int > int(
                    l2_huaxin_util.convert_time(bd[3][0], with_ms=True).replace(":", "").replace(".", "")):
                start_index = i
                break
            # 统计最近1s的大卖单数据
            total_sell_info[0] += int(bd[1] * bd[2])
        # 统计最近的大单
        if latest_sell_order_info[1] * latest_sell_order_info[2] >= 500000:
            if latest_sell_order_info[0] not in temp_sell_order_ids:
                big_sell_orders.append(latest_sell_order_info)
                temp_sell_order_ids.add(latest_sell_order_info[0])
        big_sell_orders.reverse()
        total_sell_info[1] = big_sell_orders
        cls.__latest_sell_order_info_list_dict[code] = cls.__latest_sell_order_info_list_dict[code][start_index:]
        return total_sell_info
log_module/log_export.py
New file
@@ -0,0 +1,288 @@
import datetime
import hashlib
import logging
import os
import time
import constant
from utils import tool
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
                                 "{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def __analyse_pricess_time():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    file_path = f"{constant.get_path_prefix()}/sell_logs/gp/l2/l2_process.{date}.log"
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                print(line)
            line = f.readline()
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "{}/sell_logs/gp/l2/{}".format(constant.get_path_prefix(), code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log(date=None):
    today_data = {}
    if date is None:
        date = tool.get_now_date_str()
    try:
        with open("{}/sell_logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
            while True:
                data = f.readline()
                if not data:
                    break
                index = data.find(' - ') + 2
                if data.find('async_log_util') > 0:
                    index = data.find(']', index) + 1
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
                dict_ = eval(data)
                if code not in today_data:
                    today_data[code] = dict_
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            # news = sorted(today_data[key], key=lambda x: x["index"])
            # today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
def __get_async_log_time(line):
    line = line.split(" - ")[1]
    time_str = line[line.find("[") + 1:line.find("[") + 9]
    return time_str
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    try:
                        pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
                    except Exception as e:
                        logging.exception(e)
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            # print(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单,撤单位置:") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
__log_file_contents = {}
# 加载文件内容
def __load_file_content(path_str, expire_timespace=20):
    md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
    if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
        return __log_file_contents[md5][1]
    contents = []
    if os.path.exists(path_str):
        with open(path_str, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                contents.append(line)
    __log_file_contents[md5] = (time.time(), contents)
    return contents
# 加载l2订单成交数据
def load_huaxin_deal_record(code, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_desc.{date}.log"
    # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)]
    fdatas = []
    lines = __load_file_content(path)
    for line in lines:
        data_index = line.find(f"{code}#")
        if data_index > 0:
            line = line.split(" - ")[1]
            time_str = line[line.find("[") + 1:line.find("[") + 9]
            data = line[line.find("]") + 1:].strip()
            code = data.split("#")[0]
            data = data.split("#")[1]
            data = eval(data)
            fdatas.append(data)
    return fdatas
# 加载华鑫成交的卖单
def load_huaxin_transaction_sell_no(code=None, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_sell_order.{date}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    data = line.split(" - ")[1].strip()
                    if data.startswith("["):
                        data = data[data.find("]") + 1:].strip()
                    data = data.split("code=")[1]
                    code_ = data[:6]
                    if code and code != code_:
                        continue
                    data = data[6:].strip()
                    if code_ not in fdatas:
                        fdatas[code_] = []
                    fdatas[code_].append(eval(data))
    return fdatas
# 读取系统日志
def load_system_log():
    path = f"{constant.get_path_prefix()}/sell_logs/gp/system/system.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    try:
                        time_str = line.split("|")[0].strip()
                        level = line.split("|")[1].strip()
                        if level != "INFO" and level != "ERROR":
                            continue
                        data = line.split("|")[2].split(" - ")[1].strip()
                        fdatas.append((time_str, level, data))
                    except:
                        pass
    return fdatas
# 读取系统日志
def load_huaxin_transaction_map(date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction.{date}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    try:
                        data = line.split(" - ")[1].strip()
                        if data.startswith("["):
                            data = data[data.find("]") + 1:].strip()
                        code = data.split("#")[0]
                        l2_data = eval(data.split("#")[1])
                        if code not in fdatas:
                            fdatas[code] = []
                        fdatas[code].append(l2_data)
                    except:
                        pass
    return fdatas
main.py
@@ -1,6 +1,7 @@
import multiprocessing
import threading
from huaxin_client import l1_client_for_trade, trade_client
from huaxin_client import l1_client_for_trade, trade_client, l2_client
from trade import trade_strategy
import huaxin_client
@@ -13,6 +14,8 @@
    # 策略读交易写
    queue_strategy_r_trade_w = multiprocessing.Queue()
    queue_strategy_w_l2_r = multiprocessing.Queue()
    # 启动L1
    l1TradeProcess = multiprocessing.Process(target=l1_client_for_trade.run,
                                             args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,))
@@ -24,4 +27,9 @@
        target=trade_client.run,
        args=(queue_strategy_r_trade_w, queue_strategy_w_trade_r))
    tradeProcess.start()
    trade_strategy.run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r)
    trade_strategy.init_l2_data_callbacks()
    threading.Thread(target=lambda: l2_client.run(queue_strategy_w_l2_r, trade_strategy.l2_data_callbacks),
                     daemon=True).start()
    trade_strategy.run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w,
                       queue_strategy_w_trade_r,queue_strategy_w_l2_r)
trade/huaxin_trade_data_update.py
@@ -48,9 +48,10 @@
                                        need_update_codes.append(d["securityID"])
                            if need_update_codes:
                                gpcode_manager.request_price_pre(need_update_codes)
                            queue_l1_trade_r_strategy_w.put_nowait(
                                {"type": "set_target_codes", "data": list(position_codes)})
                            queue_strategy_w_l2_r.put_nowait(
                                {"type": "l2_cmd", "data": list(position_codes)})
                            # 9点25之前需要订阅持仓票
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
@@ -89,14 +90,16 @@
def test():
    time.sleep(2)
    position_codes =["000333"]
    position_codes = ["000333"]
    queue_l1_trade_r_strategy_w.put_nowait(
        {"type": "set_target_codes", "data": list(position_codes)})
# 运行
def run(queue_l1_trade_r_strategy_w_):
    global queue_l1_trade_r_strategy_w
def run(queue_l1_trade_r_strategy_w_, queue_strategy_w_l2_r_):
    global queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r
    queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_
    queue_strategy_w_l2_r = queue_strategy_w_l2_r_
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()
    # threading.Thread(target=lambda: test(), daemon=True).start()
trade/trade_strategy.py
@@ -6,12 +6,18 @@
import schedule
import constant
from code_atrribute import gpcode_manager
from code_atrribute.history_k_data_util import HistoryKDatasUtils
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from l2 import l2_data_util
from l2.huaxin import l2_huaxin_util
from l2.l2_data_util import local_today_datas, local_today_num_operate_map, local_today_buyno_map, \
    local_today_canceled_buyno_map, L2DataUtil
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \
    logger_trade_position_api_request
    logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction
from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api
from trade.huaxin_trade_record_manager import PositionManager
from trade.sell_rule_manager import TradeRuleManager, SellRule
@@ -254,6 +260,29 @@
            codes = L1DataProcessor.get_latest_update_codes()
            result = {"code": 0, "data": list(codes)}
            self.send_response(result, client_id, request_id)
        elif ctype == "get_l2_datas":
            # 获取L2数据
            code = data["code"]
            max_time = data["max_time"]
            min_money = data["min_money"]
            if not local_today_datas:
                l2_data_util.load_l2_data_all(True)
            total_datas = l2_data_util.local_today_datas.get(code)
            # 只获取买与卖
            for d in total_datas:
                val = d['val']
                if tool.trade_time_sub(d['val']['time'], max_time) > 0:
                    break
                if val['num'] * float(val['price']) * 100 < min_money:
                    continue
                if L2DataUtil.is_buy(val):
                    pass
                if L2DataUtil.is_sell(val):
                    pass
            codes = L1DataProcessor.get_latest_update_codes()
            result = {"code": 0, "data": list(codes)}
            self.send_response(result, client_id, request_id)
class L1DataProcessor:
@@ -279,8 +308,6 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.process_for_sell(datas)
    @classmethod
    def process_for_sell(cls, datas):
@@ -374,6 +401,37 @@
                logging.exception(e)
class MyL2DataCallback(L2DataCallBack):
    def OnL2Order(self, code, origin_datas, timestamp):
        # 保存L2数据
        datas = None
        try:
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
            if code not in local_today_datas:
                local_today_datas[code] = []
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            if len(datas) > 0:
                local_today_datas[code].extend(datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, datas)
                l2_data_util.load_buy_no_map(local_today_buyno_map, code, datas)
                l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, datas)
        except Exception as e:
            async_log_util.error(logger_l2_error, f"code:{code}")
            logger_l2_error.exception(e)
        finally:
            if datas:
                l2_data_util.save_l2_data(code, None, datas)
            origin_datas.clear()
    def OnL2Transaction(self, code, datas):
        async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
# 做一些初始化的操作
def __init():
    def run_pending():
@@ -398,7 +456,16 @@
    threading.Thread(target=request_position, daemon=True).start()
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r):
l2_data_callbacks = []
def init_l2_data_callbacks():
    for i in range(constant.MAX_L2_CHANNEL_COUNT):
        l2_data_callbacks.append(MyL2DataCallback())
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
        queue_strategy_w_l2_r):
    try:
        # 初始化
        __init()
@@ -409,7 +476,7 @@
        threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
        huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w)
        huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r)
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)