| | |
| | | # -*- coding: utf-8 -*- |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import time |
| | | import lev2mdapi |
| | | from l2.huaxin import l2_huaxin_util |
| | | from log_module import log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system |
| | | from utils import tool |
| | | |
| | | IS_TEST = True |
| | | IS_TEST = False |
| | | |
| | | ###B类### |
| | | Front_Address = "tcp://10.0.1.101:6900" |
| | |
| | | g_SubBondMarketData = False |
| | | g_SubBondTransaction = False |
| | | g_SubBondOrderDetail = False |
| | | set_codes_data_queue = queue.Queue() |
| | | set_codes_data_queue = queue.Queue(maxsize=102400) |
| | | market_code_dict = {} |
| | | |
| | | ENABLE_NGST = True |
| | | |
| | | |
| | | class L2TransactionDataManager: |
| | | def __init__(self, code): |
| | | def __init__(self, code, accurate_buy=False): |
| | | """ |
| | | @param code: |
| | | @param accurate_buy: 是否需要精确的买单信息 |
| | | """ |
| | | self.code = code |
| | | self.__latest_buy_order = None |
| | | self.__big_buy_orders = [] |
| | | # 精确的买单信息,{买单号:订单信息} |
| | | self.__big_accurate_buy_order_dict = {} |
| | | self.__big_accurate_sell_order_dict = {} |
| | | self.__latest_sell_order = None |
| | | self.__big_sell_orders = [] |
| | | self.big_accurate_buy_order_queue = queue.Queue(maxsize=102400) |
| | | self.big_accurate_sell_order_queue = queue.Queue(maxsize=102400) |
| | | self.big_buy_order_queue = queue.Queue(maxsize=102400) |
| | | self.big_sell_order_queue = queue.Queue(maxsize=102400) |
| | | self.accurate_buy = accurate_buy |
| | | self.__last_accurate_buy_count = 0 |
| | | self.__last_accurate_sell_count = 0 |
| | | |
| | | def get_big_buy_orders(self): |
| | | return self.__big_buy_orders |
| | |
| | | def get_big_sell_orders(self): |
| | | return self.__big_sell_orders |
| | | |
| | | def add_transaction_data(self, data): |
| | | item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"]) |
| | | def add_transaction_data_for_accurate(self, item, big_order_money_threshold=299e4): |
| | | """ |
| | | 获取精确的买单信息 |
| | | @param big_order_money_threshold: 大单阈值 |
| | | @param data: |
| | | @return: |
| | | """ |
| | | |
| | | def format_timestamp(timestamp): |
| | | time_str = str(timestamp) |
| | | return int(time_str[:5] if time_str[0] == '9' else time_str[:6]) |
| | | |
| | | money = round(item[2] * item[3]) |
| | | volume = item[3] |
| | | price = item[2] |
| | | order_time = item[4] |
| | | if item[0] not in self.__big_accurate_buy_order_dict: |
| | | # (买单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格) |
| | | self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price, order_time, price] |
| | | buy_order_info = self.__big_accurate_buy_order_dict[item[0]] |
| | | buy_order_info[1] += volume |
| | | buy_order_info[2] += money |
| | | buy_order_info[3] = order_time |
| | | buy_order_info[4] = price |
| | | # 将大单写入本地文件 |
| | | if self.__latest_buy_order and self.__latest_buy_order[0] != item[0]: |
| | | # 有可能是大单成交完成, 判断上个订单是否是大单 |
| | | last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0]) |
| | | |
| | | if last_buy_order[2] > big_order_money_threshold: |
| | | self.big_accurate_buy_order_queue.put_nowait(last_buy_order) |
| | | |
| | | # 如果数据过多需要移除过长时间的小金额数据 |
| | | accurate_buy_count = len(self.__big_accurate_buy_order_dict.keys()) |
| | | if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000: |
| | | # 超过1w条数据且新增2000条数据 |
| | | # 超过1w条数据就要移除30分钟之前的数据 |
| | | now_time_int = int( |
| | | tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", "")) |
| | | try: |
| | | remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if |
| | | format_timestamp(self.__big_accurate_buy_order_dict[x][3]) < now_time_int] |
| | | if remove_order_nos: |
| | | for order_no in remove_order_nos: |
| | | self.__big_accurate_buy_order_dict.pop(order_no) |
| | | finally: |
| | | self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict) |
| | | |
| | | # 统计卖单 |
| | | if item[1] not in self.__big_accurate_sell_order_dict: |
| | | # (卖单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格) |
| | | self.__big_accurate_sell_order_dict[item[1]] = [item[1], 0, 0, order_time, price, order_time, price] |
| | | sell_order_info = self.__big_accurate_sell_order_dict[item[1]] |
| | | sell_order_info[1] += volume |
| | | sell_order_info[2] += money |
| | | sell_order_info[3] = order_time |
| | | sell_order_info[4] = price |
| | | if self.__latest_sell_order and self.__latest_sell_order[0] != item[1]: |
| | | # 有可能是大单成交完成, 判断上个订单是否是大单 |
| | | last_sell_order = self.__big_accurate_sell_order_dict.get(self.__latest_sell_order[0]) |
| | | if last_sell_order[2] > big_order_money_threshold: |
| | | self.big_accurate_sell_order_queue.put_nowait(last_sell_order) |
| | | # 如果数据过多需要移除过长时间的小金额数据 |
| | | accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys()) |
| | | if accurate_sell_count > 10000 and accurate_sell_count - self.__last_accurate_sell_count > 2000: |
| | | # 超过1w条数据且新增2000条数据 |
| | | # 超过1w条数据就要移除30分钟之前的数据 |
| | | now_time_int = int( |
| | | tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", "")) |
| | | try: |
| | | remove_order_nos = [x for x in self.__big_accurate_sell_order_dict if |
| | | now_time_int > format_timestamp( |
| | | self.__big_accurate_sell_order_dict[x][3])] |
| | | if remove_order_nos: |
| | | for order_no in remove_order_nos: |
| | | self.__big_accurate_sell_order_dict.pop(order_no) |
| | | finally: |
| | | self.__last_accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys()) |
| | | |
| | | def add_transaction_data(self, data, big_order_money_threshold=299e4): |
| | | item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"], data["OrderTime"]) |
| | | # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | # "TradeVolume": pTransaction['TradeVolume'], |
| | | # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | |
| | | # "ExecType": pTransaction['ExecType'].decode()} |
| | | money = round(item[2] * item[3]) |
| | | volume = item[3] |
| | | price = item[2] |
| | | order_time = item[4] |
| | | |
| | | if self.accurate_buy: |
| | | self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4) |
| | | |
| | | if not self.__latest_buy_order: |
| | | self.__latest_buy_order = [item[0], 0, 0] |
| | | # (买单号, 量, 金额, 时间, 最新成交价格) |
| | | self.__latest_buy_order = [item[0], 0, 0, order_time, price] |
| | | if self.__latest_buy_order[0] == item[0]: |
| | | self.__latest_buy_order[1] += volume |
| | | self.__latest_buy_order[2] += money |
| | | self.__latest_buy_order[3] = order_time |
| | | self.__latest_buy_order[4] = price |
| | | else: |
| | | if self.__latest_buy_order[2] > 1e6: |
| | | self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2])) |
| | | self.__latest_buy_order = [item[0],volume, money] |
| | | d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], |
| | | self.__latest_buy_order[3], self.__latest_buy_order[4]) |
| | | self.__big_buy_orders.append(d) |
| | | self.big_buy_order_queue.put_nowait(d) |
| | | |
| | | self.__latest_buy_order = [item[0], volume, money, order_time, price] |
| | | |
| | | if not self.__latest_sell_order: |
| | | self.__latest_sell_order = [item[1], 0, 0] |
| | | self.__latest_sell_order = [item[1], 0, 0, order_time, price] |
| | | if self.__latest_sell_order[0] == item[1]: |
| | | self.__latest_sell_order[1] += volume |
| | | self.__latest_sell_order[2] += money |
| | | self.__latest_sell_order[3] = order_time |
| | | self.__latest_sell_order[4] = price |
| | | else: |
| | | if self.__latest_sell_order[2] > 1e6: |
| | | self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2])) |
| | | self.__latest_sell_order = [item[1], volume, money] |
| | | d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], |
| | | self.__latest_sell_order[3], self.__latest_sell_order[4]) |
| | | self.__big_sell_orders.append(d) |
| | | self.big_sell_order_queue.put_nowait(d) |
| | | self.__latest_sell_order = [item[1], volume, money, order_time, price] |
| | | |
| | | |
| | | # 买入的大单订单号 |
| | | l2_transaction_data_dict = {} |
| | | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | |
| | | # 代码的上次成交的订单唯一索引 |
| | | __last_transaction_keys_dict = {} |
| | | |
| | | # 买入的大单订单号 |
| | | __l2_transaction_data_dict = {} |
| | | |
| | | def __init__(self, api, codes): |
| | | def __init__(self, api, codes, special_codes): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.codes = codes |
| | | self.codes_volume_and_price_dict = {} |
| | | self.special_codes = special_codes |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | if item["SecurityID"] not in self.__l2_transaction_data_dict: |
| | | self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) |
| | | self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | if item["SecurityID"] not in l2_transaction_data_dict: |
| | | l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[ |
| | | "SecurityID"] in self.special_codes) |
| | | l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | |
| | | def OnRtnNGTSTick(self, pTick): |
| | | """ |
| | |
| | | "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], |
| | | "SellNo": pTick['SellNo'], |
| | | "ExecType": '1'} |
| | | if item["SecurityID"] not in self.__l2_transaction_data_dict: |
| | | self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) |
| | | self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | if item["SecurityID"] not in l2_transaction_data_dict: |
| | | l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[ |
| | | "SecurityID"] in self.special_codes) |
| | | l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_subscript.exception(e) |
| | | |
| | | |
| | | def __init_l2(codes): |
| | | def __init_l2(codes, special_codes): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, codes) |
| | | spi = Lev2MdSpi(api, codes, special_codes) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | |
| | | api.Init() |
| | | |
| | | |
| | | def run(codes) -> None: |
| | | def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None: |
| | | """ |
| | | 运行订阅 |
| | | @param accurate_buy_order_queue: 精确大单队列 |
| | | @param codes: 订阅的代码 |
| | | @param _queue: 数据传输的队列 |
| | | @param special_codes: 需要确定完整大单的代码 |
| | | @return: |
| | | """ |
| | | try: |
| | | log.close_print() |
| | | __init_l2(codes) |
| | | __init_l2(codes, special_codes) |
| | | logger_system.info(f"L2订阅服务启动成功:") |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | while True: |
| | | time.sleep(2) |
| | | try: |
| | | # 读取一遍 |
| | | for code in l2_transaction_data_dict: |
| | | l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code] |
| | | |
| | | while True: |
| | | if not l2_transaction_data_manager.big_buy_order_queue.empty(): |
| | | result = l2_transaction_data_manager.big_buy_order_queue.get(block=False) |
| | | if result: |
| | | _queue.put_nowait((code, 0, result)) |
| | | else: |
| | | break |
| | | |
| | | while True: |
| | | if not l2_transaction_data_manager.big_accurate_buy_order_queue.empty(): |
| | | result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False) |
| | | if result: |
| | | accurate_buy_order_queue.put_nowait((code, 0, result)) |
| | | else: |
| | | break |
| | | |
| | | while True: |
| | | if not l2_transaction_data_manager.big_accurate_sell_order_queue.empty(): |
| | | result = l2_transaction_data_manager.big_accurate_sell_order_queue.get(block=False) |
| | | if result: |
| | | accurate_buy_order_queue.put_nowait((code, 1, result)) |
| | | else: |
| | | break |
| | | |
| | | while True: |
| | | if not l2_transaction_data_manager.big_sell_order_queue.empty(): |
| | | result = l2_transaction_data_manager.big_sell_order_queue.get(block=False) |
| | | if result: |
| | | _queue.put_nowait((code, 1, result)) |
| | | else: |
| | | break |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(1) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run({"000009", "601618"}) |
| | | input() |
| | | pass |