| | |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api, l2_data_upload_manager): |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | |
| | | code = d[0] |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], d[2]) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | |
| | | def set_code_special_watch_volume(self, code, volume): |
| | | # 有效期为3s |
| | | self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) |
| | | # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume,limit_up_price, |
| | | {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3) |
| | | |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}") |
| | | |
| | | def OnFrontConnected(self): |
| | |
| | | |
| | | def OnRtnTransaction(self, pTransaction): |
| | | code = str(pTransaction['SecurityID']) |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | | if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: |
| | | return |
| | | elif pTransaction['TradeVolume'] < min_volume: |
| | | return |
| | | # if min_volume is None: |
| | | # # 默认筛选50w |
| | | # if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: |
| | | # return |
| | | # elif pTransaction['TradeVolume'] < min_volume: |
| | | # return |
| | | # 撤单 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], |
| | | "Volume": pTransaction['TradeVolume'], |
| | |
| | | item["Side"] = "2" |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) |
| | | else: |
| | | if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: |
| | | # 涨停价 |
| | | # 成交 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | "TradeVolume": pTransaction['TradeVolume'], |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | # 暂时注释掉同1单号至多上传1次 |
| | | # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}" |
| | | # if self.__last_transaction_keys_dict.get(code) == key: |
| | | # return |
| | | # self.__last_transaction_keys_dict[code] = key |
| | | # print("逐笔成交", item) |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | # if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: |
| | | # 涨停价 |
| | | # 成交 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | "TradeVolume": pTransaction['TradeVolume'], |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | # 暂时注释掉同1单号至多上传1次 |
| | | # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}" |
| | | # if self.__last_transaction_keys_dict.get(code) == key: |
| | | # return |
| | | # self.__last_transaction_keys_dict[code] = key |
| | | # print("逐笔成交", item) |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | |
| | | def OnRtnOrderDetail(self, pOrderDetail): |
| | | can_listen = False |
| | | code = str(pOrderDetail['SecurityID']) |
| | | start_time = 0 |
| | | if code in self.special_code_volume_for_order_dict: |
| | | start_time = time.time() |
| | | if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ |
| | | 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | # 监控目标订单与影子订单 |
| | | if self.special_code_volume_for_order_dict[code][1] > time.time(): |
| | | # 特殊量监听 |
| | | can_listen = True |
| | | else: |
| | | self.special_code_volume_for_order_dict.pop(code) |
| | | if not can_listen: |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | | if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: |
| | | return |
| | | elif pOrderDetail['Volume'] < min_volume: |
| | | return |
| | | # can_listen = False |
| | | # code = str(pOrderDetail['SecurityID']) |
| | | # start_time = 0 |
| | | # if code in self.special_code_volume_for_order_dict: |
| | | # start_time = time.time() |
| | | # if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ |
| | | # 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | # # 监控目标订单与影子订单 |
| | | # if self.special_code_volume_for_order_dict[code][1] > time.time(): |
| | | # # 特殊量监听 |
| | | # can_listen = True |
| | | # else: |
| | | # self.special_code_volume_for_order_dict.pop(code) |
| | | # if not can_listen: |
| | | # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # if min_volume is None: |
| | | # # 默认筛选50w |
| | | # if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: |
| | | # return |
| | | # elif pOrderDetail['Volume'] < min_volume: |
| | | # return |
| | | # 输出逐笔委托数据 |
| | | # 上证OrderStatus=b"D"表示撤单 |
| | | item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | |
| | | "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, start_time) |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0) |
| | | |
| | | def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, |
| | | FirstLevelSellOrderVolumes): |
| | |
| | | self.temp_transaction_queue_dict = {} |
| | | self.filter_order_condition_dict = {} |
| | | |
| | | # TODO 需要调用 |
| | | # 设置订单过滤条件 |
| | | def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None): |
| | | def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, |
| | | special_volumes_expire_time=None): |
| | | if special_volumes is None: |
| | | special_volumes = set() |
| | | self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] |
| | | if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: |
| | | self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) |
| | | else: |
| | | self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | | filter_condition = self.filter_order_condition_dict.get(item[0]) |
| | | if filter_condition: |
| | | # item[2]为量 |
| | | if item[2] >= filter_condition[0]: |
| | | if item[2] >= filter_condition[0][0]: |
| | | return item |
| | | if filter_condition[1] and item[2] in filter_condition[1]: |
| | | if filter_condition[2] and time.time() > filter_condition[2]: |
| | |
| | | return None |
| | | return item |
| | | |
| | | # 过滤订单 |
| | | |
| | | def __filter_transaction(self, item): |
| | | filter_condition = self.filter_order_condition_dict.get(item[0]) |
| | | if filter_condition: |
| | | # item[2]为量 |
| | | if abs(item[1] - filter_condition[0][1]) < 0.201: |
| | | return item |
| | | return None |
| | | return item |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time, istransaction=False): |
| | | def add_l2_order_detail(self, data, start_time=0, istransaction=False): |
| | | code = data["SecurityID"] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | queue_info[1].put_nowait( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | # 不直接加入 |
| | | # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | # if not queue_info: |
| | | # return |
| | | # queue_info[1].put_nowait( |
| | | # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | # 判断是否为大单成交 |
| | | queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | # 不直接加入 |
| | | # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | # if not queue_info: |
| | | # return |
| | | # # 判断是否为大单成交 |
| | | # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | # data['SellNo'], data['ExecType'])) |
| | | |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | |
| | | try: |
| | | if not q: |
| | | data = q.popleft() |
| | | temp_list.append(data) |
| | | data = self.__filter_transaction(data) |
| | | if data: |
| | | temp_list.append(data) |
| | | else: |
| | | if temp_list: |
| | | # 上传数据 |
| | |
| | | __id = id(q) |
| | | count = 0 |
| | | while True: |
| | | datas_dict = {} |
| | | # datas_dict = {} |
| | | try: |
| | | while not q.empty(): |
| | | if not q.empty(): |
| | | item = q.get() |
| | | if item[0] not in datas_dict: |
| | | datas_dict[item[0]] = [] |
| | | datas_dict[item[0]].append(item) |
| | | if datas_dict: |
| | | for c in datas_dict: |
| | | self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10]) |
| | | self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2]) |
| | | else: |
| | | time.sleep(0.002) |
| | | time.sleep(0.001) |
| | | |
| | | # while not q.empty(): |
| | | # item = q.get() |
| | | # if item[0] not in datas_dict: |
| | | # datas_dict[item[0]] = [] |
| | | # datas_dict[item[0]].append(item) |
| | | # if datas_dict: |
| | | # for c in datas_dict: |
| | | # self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10]) |
| | | # else: |
| | | # time.sleep(0.002) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | datas_dict.clear() |
| | | # datas_dict.clear() |
| | | count += 1 |
| | | if count > 100: |
| | | count = 0 |
| | |
| | | # 接收L2逐笔成交数据 |
| | | def __recive_transaction_orders(self, q: multiprocessing.Queue): |
| | | __id = id(q) |
| | | datas_dict = {} |
| | | # datas_dict = {} |
| | | count = 0 |
| | | while True: |
| | | try: |
| | | while not q.empty(): |
| | | # while not q.empty(): |
| | | # item = q.get() |
| | | # if item[0] not in datas_dict: |
| | | # datas_dict[item[0]] = [] |
| | | # datas_dict[item[0]].append(item) |
| | | # if datas_dict: |
| | | # for c in datas_dict: |
| | | # self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c]) |
| | | # else: |
| | | # time.sleep(0.01) |
| | | if not q.empty(): |
| | | item = q.get() |
| | | if item[0] not in datas_dict: |
| | | datas_dict[item[0]] = [] |
| | | datas_dict[item[0]].append(item) |
| | | if datas_dict: |
| | | for c in datas_dict: |
| | | self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c]) |
| | | self.my_l2_data_callback.OnL2Transaction(item[0], item[1]) |
| | | else: |
| | | time.sleep(0.01) |
| | | time.sleep(0.005) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | datas_dict.clear() |
| | | # datas_dict.clear() |
| | | count += 1 |
| | | if count > 50: |
| | | count = 0 |