Administrator
2023-11-14 25a69761838501a775896eb8a2234cb8ee0a446d
bug修复
3个文件已修改
170 ■■■■ 已修改文件
huaxin_client/l2_client.py 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -56,7 +56,7 @@
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    codes_volume_and_price_dict = {}
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
@@ -70,6 +70,7 @@
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
        self.codes_volume_and_price_dict = {}
    def __split_codes(self, codes):
        szse_codes = []
@@ -131,7 +132,7 @@
            code = d[0]
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], d[2])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2]))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -185,9 +186,12 @@
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
        # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        d = self.codes_volume_and_price_dict.get(code)
        if d:
            min_volume, limit_up_price = d[0], d[1]
        self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price,
                                                                {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3)
                                                                    {volume, constant.SHADOW_ORDER_VOLUME},
                                                                    time.time() + 3)
        async_log_util.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}")
@@ -551,6 +555,42 @@
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
    #     #     if type(value) == bytes:
    #     #         value = value.decode("utf-8")
    #     #     data = json.loads(value)
    #     #     _type = data["type"]
    #     #     if _type == "listen_volume":
    #     #         volume = data["data"]["volume"]
    #     #         code = data["data"]["code"]
    #     #         spi.set_code_special_watch_volume(code, volume)
    #     #     elif _type == "l2_cmd":
    #     #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    #
    #     demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
    #                   ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
    #
    #     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    #     time.sleep(1)
    #
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    #     queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
    #     time.sleep(0.1)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
@@ -567,6 +607,8 @@
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        # 测试
        # threading.Thread(target=lambda: test_add_codes(),daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
@@ -577,7 +619,56 @@
        time.sleep(2)
def test():
    def test_add_codes():
        time.sleep(5)
        # if value:
        #     if type(value) == bytes:
        #         value = value.decode("utf-8")
        #     data = json.loads(value)
        #     _type = data["type"]
        #     if _type == "listen_volume":
        #         volume = data["data"]["volume"]
        #         code = data["data"]["code"]
        #         spi.set_code_special_watch_volume(code, volume)
        #     elif _type == "l2_cmd":
        #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
        demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
                      ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
        queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
        time.sleep(1)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
        time.sleep(0.1)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)
if __name__ == "__main__":
    # run(None, None, None)
    # spi.set_codes_data([("000333", 12000)])
    input()
huaxin_client/l2_data_manager.py
@@ -37,6 +37,9 @@
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
    # 设置订单过滤条件
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
@@ -46,7 +49,8 @@
        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
        else:
            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, special_volumes_expire_time]
            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes,
                                                      special_volumes_expire_time]
    # 过滤订单
    def __filter_order(self, item):
@@ -114,12 +118,34 @@
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.order_queue_distribute_manager.get_distributed_queue(code):
        self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
        self.transaction_queue_distribute_manager.distribute_queue(code)
        if code not in self.temp_order_queue_dict:
        self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
        self.temp_transaction_queue_dict[code] = collections.deque()
        threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            self.upload_l2_data_task_dict[code] = (t1, t2)
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
@@ -132,7 +158,7 @@
        upload_queue = queue_info[1]
        while True:
            try:
                if not q:
                if len(q) > 0:
                    data = q.popleft()
                    # 前置数据处理,过滤掉无用的数据
                    data = self.__filter_order(data)
@@ -142,14 +168,16 @@
                    if temp_list:
                        # 上传数据
                        self.__upload_l2_data(code, upload_queue, temp_list)
                        temp_list.clear()
                        temp_list = []
                    else:
                        if code not in self.temp_order_queue_dict:
                            self.l2_order_codes.discard(code)
                            break
                        self.l2_order_codes.add(code)
                        time.sleep(0.001)
            except:
                pass
            except Exception as e:
                logging.exception(e)
            finally:
                pass
@@ -161,7 +189,7 @@
        temp_list = []
        while True:
            try:
                if not q:
                if len(q) > 0:
                    data = q.popleft()
                    data = self.__filter_transaction(data)
                    if data:
@@ -170,26 +198,17 @@
                    if temp_list:
                        # 上传数据
                        self.__upload_l2_data(code, upload_queue, temp_list)
                        temp_list.clear()
                        temp_list = []
                    else:
                        if code not in self.temp_transaction_queue_dict:
                            self.l2_transaction_codes.discard(code)
                            break
                        self.l2_transaction_codes.add(code)
                        time.sleep(0.002)
            except:
                pass
            finally:
                pass
    # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
def add_target_code(code):
@@ -293,7 +312,7 @@
def __test():
    code = "002073"
    # 分配数据
    pass
main.py
@@ -24,7 +24,8 @@
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_):
                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -73,6 +74,9 @@
        logger_system.error(f"端口服务器:{9004} 启动失败")
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
if __name__ == '__main__':
    try:
        logger_l2_trade.info("启动程序")