Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l2_client_test.py
@@ -4,6 +4,7 @@
import queue
import time
import lev2mdapi
from l2.huaxin import l2_huaxin_util
from log_module import log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system
from utils import tool
@@ -32,7 +33,7 @@
g_SubBondMarketData = False
g_SubBondTransaction = False
g_SubBondOrderDetail = False
set_codes_data_queue = queue.Queue(maxsize=10240)
set_codes_data_queue = queue.Queue(maxsize=102400)
market_code_dict = {}
ENABLE_NGST = True
@@ -49,13 +50,16 @@
        self.__big_buy_orders = []
        # 精确的买单信息,{买单号:订单信息}
        self.__big_accurate_buy_order_dict = {}
        self.__big_accurate_sell_order_dict = {}
        self.__latest_sell_order = None
        self.__big_sell_orders = []
        self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240)
        self.big_buy_order_queue = queue.Queue(maxsize=10240)
        self.big_sell_order_queue = queue.Queue(maxsize=10240)
        self.big_accurate_buy_order_queue = queue.Queue(maxsize=102400)
        self.big_accurate_sell_order_queue = queue.Queue(maxsize=102400)
        self.big_buy_order_queue = queue.Queue(maxsize=102400)
        self.big_sell_order_queue = queue.Queue(maxsize=102400)
        self.accurate_buy = accurate_buy
        self.__last_accurate_buy_count = 0
        self.__last_accurate_sell_count = 0
    def get_big_buy_orders(self):
        return self.__big_buy_orders
@@ -63,9 +67,10 @@
    def get_big_sell_orders(self):
        return self.__big_sell_orders
    def add_transaction_data_for_accurate_buy(self, data):
    def add_transaction_data_for_accurate(self, item, big_order_money_threshold=299e4):
        """
        获取精确的买单信息
        @param big_order_money_threshold: 大单阈值
        @param data:
        @return:
        """
@@ -74,49 +79,75 @@
            time_str = str(timestamp)
            return int(time_str[:5] if time_str[0] == '9' else time_str[:6])
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
        # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
        #         "TradeVolume": pTransaction['TradeVolume'],
        #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
        #         "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
        #         "SellNo": pTransaction['SellNo'],
        #         "ExecType": pTransaction['ExecType'].decode()}
        money = round(item[2] * item[3])
        volume = item[3]
        price = item[2]
        order_time = data["OrderTime"]
        order_time = item[4]
        if item[0] not in self.__big_accurate_buy_order_dict:
            # (买单号, 量, 金额, 时间, 最新成交价格)
            self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price]
            # (买单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格)
            self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price, order_time, price]
        buy_order_info = self.__big_accurate_buy_order_dict[item[0]]
        buy_order_info[1] += volume
        buy_order_info[2] += money
        buy_order_info[3] = order_time
        buy_order_info[4] = price
        # 将大单写入本地文件
        if self.__latest_buy_order[0] != item[0]:
        if self.__latest_buy_order and self.__latest_buy_order[0] != item[0]:
            # 有可能是大单成交完成, 判断上个订单是否是大单
            last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0])
            if last_buy_order[2] > 299e4:
            if last_buy_order[2] > big_order_money_threshold:
                self.big_accurate_buy_order_queue.put_nowait(last_buy_order)
            # 如果数据过多需要移除过长时间的小金额数据
            accurate_buy_count = len(self.__big_accurate_buy_order_dict)
            accurate_buy_count = len(self.__big_accurate_buy_order_dict.keys())
            if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000:
                # 超过1w条数据且新增2000条数据
                # 超过1w条数据就要移除30分钟之前的数据
                now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -1800).replace(":", ""))
                now_time_int = int(
                    tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", ""))
                try:
                    remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if
                                        now_time_int - format_timestamp(
                                            self.__big_accurate_buy_order_dict[x][3]) > 0]
                                        format_timestamp(self.__big_accurate_buy_order_dict[x][3]) < now_time_int]
                    if remove_order_nos:
                        for order_no in remove_order_nos:
                            self.__big_accurate_buy_order_dict.pop(order_no)
                finally:
                    self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict)
    def add_transaction_data(self, data):
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
        # 统计卖单
        if item[1] not in self.__big_accurate_sell_order_dict:
            # (卖单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格)
            self.__big_accurate_sell_order_dict[item[1]] = [item[1], 0, 0, order_time, price, order_time, price]
        sell_order_info = self.__big_accurate_sell_order_dict[item[1]]
        sell_order_info[1] += volume
        sell_order_info[2] += money
        sell_order_info[3] = order_time
        sell_order_info[4] = price
        if self.__latest_sell_order and self.__latest_sell_order[0] != item[1]:
            # 有可能是大单成交完成, 判断上个订单是否是大单
            last_sell_order = self.__big_accurate_sell_order_dict.get(self.__latest_sell_order[0])
            if last_sell_order[2] > big_order_money_threshold:
                self.big_accurate_sell_order_queue.put_nowait(last_sell_order)
            # 如果数据过多需要移除过长时间的小金额数据
            accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
            if accurate_sell_count > 10000 and accurate_sell_count - self.__last_accurate_sell_count > 2000:
                # 超过1w条数据且新增2000条数据
                # 超过1w条数据就要移除30分钟之前的数据
                now_time_int = int(
                    tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", ""))
                try:
                    remove_order_nos = [x for x in self.__big_accurate_sell_order_dict if
                                        now_time_int > format_timestamp(
                                            self.__big_accurate_sell_order_dict[x][3])]
                    if remove_order_nos:
                        for order_no in remove_order_nos:
                            self.__big_accurate_sell_order_dict.pop(order_no)
                finally:
                    self.__last_accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
    def add_transaction_data(self, data, big_order_money_threshold=299e4):
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"], data["OrderTime"])
        # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
        #         "TradeVolume": pTransaction['TradeVolume'],
        #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
@@ -126,10 +157,10 @@
        money = round(item[2] * item[3])
        volume = item[3]
        price = item[2]
        order_time = data["OrderTime"]
        order_time = item[4]
        if self.accurate_buy:
            self.add_transaction_data_for_accurate_buy(data)
            self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4)
        if not self.__latest_buy_order:
            # (买单号, 量, 金额, 时间, 最新成交价格)
@@ -294,9 +325,9 @@
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            if item["SecurityID"] not in l2_transaction_data_dict:
                l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item,
                                                                              item["SecurityID"] in self.special_codes)
                l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
                    "SecurityID"] in self.special_codes)
            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
    def OnRtnNGTSTick(self, pTick):
        """
@@ -314,9 +345,9 @@
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                if item["SecurityID"] not in l2_transaction_data_dict:
                    l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, item[
                    "SecurityID"] in self.special_codes)
                    l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
                        "SecurityID"] in self.special_codes)
                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
@@ -372,39 +403,46 @@
        logger_system.exception(e)
    while True:
        try:
            # 读取一遍
            for code in l2_transaction_data_dict:
                l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code]
                try:
                    while True:
                while True:
                    if not l2_transaction_data_manager.big_buy_order_queue.empty():
                        result = l2_transaction_data_manager.big_buy_order_queue.get(block=False)
                        if result:
                            _queue.put_nowait((code, 0, result))
                        else:
                            break
                except:
                    pass
                    else:
                        break
                try:
                    while True:
                while True:
                    if not l2_transaction_data_manager.big_accurate_buy_order_queue.empty():
                        result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False)
                        if result:
                            accurate_buy_order_queue.put_nowait((code, 0, result))
                        else:
                            break
                except:
                    pass
                    else:
                        break
                while True:
                    if not l2_transaction_data_manager.big_accurate_sell_order_queue.empty():
                        result = l2_transaction_data_manager.big_accurate_sell_order_queue.get(block=False)
                        if result:
                            accurate_buy_order_queue.put_nowait((code, 1, result))
                    else:
                        break
                try:
                    while True:
                while True:
                    if not l2_transaction_data_manager.big_sell_order_queue.empty():
                        result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
                        if result:
                            _queue.put_nowait((code, 1, result))
                        else:
                            break
                except:
                    pass
                    else:
                        break
        except:
            pass
        finally:
            time.sleep(1)
if __name__ == "__main__":
    pass