Administrator
2024-11-19 4a96c448feb8caf33c8b252309b30cc8d87672e9
L2测试
2个文件已修改
87 ■■■■ 已修改文件
huaxin_client/l2_client_test.py 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_test.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import logging
import multiprocessing
import queue
import time
import lev2mdapi
@@ -44,6 +45,8 @@
        self.__big_buy_orders = []
        self.__latest_sell_order = None
        self.__big_sell_orders = []
        self.big_buy_order_queue = queue.Queue()
        self.big_sell_order_queue = queue.Queue()
    def get_big_buy_orders(self):
        return self.__big_buy_orders
@@ -61,25 +64,37 @@
        #         "ExecType": pTransaction['ExecType'].decode()}
        money = round(item[2] * item[3])
        volume = item[3]
        order_time = data["OrderTime"]
        if not self.__latest_buy_order:
            self.__latest_buy_order = [item[0], 0, 0]
            self.__latest_buy_order = [item[0], 0, 0, order_time]
        if self.__latest_buy_order[0] == item[0]:
            self.__latest_buy_order[1] += volume
            self.__latest_buy_order[2] += money
            self.__latest_buy_order[3] = order_time
        else:
            if self.__latest_buy_order[2] > 1e6:
                self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2]))
            self.__latest_buy_order = [item[0],volume,  money]
                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3])
                self.__big_buy_orders.append(d)
                self.big_buy_order_queue.put_nowait(d)
            self.__latest_buy_order = [item[0], volume, money, order_time]
        if not self.__latest_sell_order:
            self.__latest_sell_order = [item[1], 0, 0]
            self.__latest_sell_order = [item[1], 0, 0, order_time]
        if self.__latest_sell_order[0] == item[1]:
            self.__latest_sell_order[1] += volume
            self.__latest_sell_order[2] += money
            self.__latest_sell_order[3] = order_time
        else:
            if self.__latest_sell_order[2] > 1e6:
                self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2]))
            self.__latest_sell_order = [item[1], volume,  money]
                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3])
                self.__big_sell_orders.append(d)
                self.big_sell_order_queue.put_nowait(d)
            self.__latest_sell_order = [item[1], volume, money, order_time]
# 买入的大单订单号
l2_transaction_data_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -90,9 +105,6 @@
    subscripted_codes = set()
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    # 买入的大单订单号
    __l2_transaction_data_dict = {}
    def __init__(self, api, codes):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
@@ -209,9 +221,9 @@
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            if item["SecurityID"] not in self.__l2_transaction_data_dict:
                self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
            self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
            if item["SecurityID"] not in l2_transaction_data_dict:
                l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
    def OnRtnNGTSTick(self, pTick):
        """
@@ -228,9 +240,9 @@
                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                if item["SecurityID"] not in self.__l2_transaction_data_dict:
                    self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
                self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
                if item["SecurityID"] not in l2_transaction_data_dict:
                    l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
@@ -269,17 +281,38 @@
    api.Init()
def run(codes) -> None:
def run(codes, _queue: multiprocessing.Queue) -> None:
    try:
        log.close_print()
        __init_l2(codes)
        logger_system.info(f"L2订阅服务启动成功:")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
        try:
            for code in l2_transaction_data_dict:
                l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code]
                try:
                    while True:
                        result = l2_transaction_data_manager.big_buy_order_queue.get(block=False)
                        if result:
                            _queue.put_nowait((code, 0, result))
                        else:
                            break
                except:
                    pass
                try:
                    while True:
                        result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
                        if result:
                            _queue.put_nowait((code, 1, result))
                        else:
                            break
                except:
                    pass
        except:
            pass
        finally:
            time.sleep(1)
if __name__ == "__main__":
    run({"000009", "601618"})
    input()
l2_test.py
@@ -2,6 +2,7 @@
import time
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_upload
def run():
@@ -12,11 +13,20 @@
    cpu_count = 16
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue()
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size],))
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
        process.start()
    while True:
        try:
            data = big_order_queue.get()
            logger_local_huaxin_l2_upload.info(f"{data}")
        except:
            pass
if __name__ == "__main__":
    run()