| | |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug |
| | | from utils import tool |
| | | |
| | | ###B类### |
| | |
| | | SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", |
| | | b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] |
| | | SZ_Bond_Securities = [b"100303", b"109559", b"112617"] |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | set_codes_data_queue = queue.Queue(maxsize=1000) |
| | | |
| | | ENABLE_NGST = True |
| | | |
| | |
| | | self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | def subscribe_codes(self, _codes): |
| | | self.__subscribe(_codes) |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | |
| | | for d in codes_data: |
| | | code = d[0] |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4]) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | self.l2_data_upload_manager.distribute_upload_queue(c, codes) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: # TODO 清除原来还没释放掉的数据 |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | |
| | | |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | for c in add_codes: |
| | | logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | |
| | |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | # 初始化 |
| | | SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | # 初始化 |
| | | SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | |
| | | FirstLevelSellOrderVolumes): |
| | | # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5 |
| | | try: |
| | | buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), |
| | | (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), |
| | | (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), |
| | | (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), |
| | | (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])] |
| | | for i in range(6, 11): |
| | | if not pDepthMarketData[f"BidVolume{i}"]: |
| | | break |
| | | buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}'])) |
| | | |
| | | sells = [ |
| | | (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), |
| | | (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), |
| | | (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), |
| | | (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ] |
| | | for i in range(6, 11): |
| | | if not pDepthMarketData[f"AskVolume{i}"]: |
| | | break |
| | | sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}'])) |
| | | |
| | | d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], |
| | | "lastPrice": pDepthMarketData['LastPrice'], |
| | | "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], |
| | | "totalValueTrade": pDepthMarketData['TotalValueTrade'], |
| | | "totalAskVolume": pDepthMarketData['TotalAskVolume'], |
| | | "avgAskPrice": pDepthMarketData["AvgAskPrice"], |
| | | "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), |
| | | (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), |
| | | (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), |
| | | (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), |
| | | (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], |
| | | "sell": [ |
| | | (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), |
| | | (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), |
| | | (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), |
| | | (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ]} |
| | | market_code_dict[pDepthMarketData['SecurityID']] = time.time() |
| | | "buy": buys, |
| | | "sell": sells} |
| | | self.l2_data_upload_manager.add_market_data(d) |
| | | SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) |
| | | except: |
| | | pass |
| | | |
| | |
| | | for sell_index in range(0, FirstLevelSellNum): |
| | | print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) |
| | | |
| | | |
| | | class SubscriptDefend: |
| | | """ |
| | | 订阅守护 |
| | | 定义:当订阅的代码超过一定时间没有回调数据时重新订阅 |
| | | """ |
| | | __l2_market_update_time = {} |
| | | |
| | | @classmethod |
| | | def set_l2_market_update(cls, code): |
| | | cls.__l2_market_update_time[code] = time.time() |
| | | |
| | | @classmethod |
| | | def run(cls): |
| | | while True: |
| | | try: |
| | | now_time = tool.get_now_time_as_int() |
| | | if now_time < int("093015"): |
| | | continue |
| | | if int("112945") < now_time < int("130015"): |
| | | continue |
| | | if int("145645") < now_time: |
| | | continue |
| | | if spi.subscripted_codes: |
| | | codes = [] |
| | | for code in spi.subscripted_codes: |
| | | # 获取上次更新时间 |
| | | update_time = cls.__l2_market_update_time.get(code) |
| | | if update_time and time.time() - update_time > 15: |
| | | # 需要重新订阅 |
| | | codes.append(code) |
| | | if codes: |
| | | logger_debug.info(f"重新订阅:{codes}") |
| | | spi.subscribe_codes(codes) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(15) |
| | | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, codes_data): |
| | |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 订阅守护 |
| | | threading.Thread(target=SubscriptDefend.run, daemon=True).start() |
| | | # 初始化 |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | | l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) |
| | |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | |
| | | queue_r = multiprocessing.Queue() |
| | | queue_r = multiprocessing.Queue(maxsize=1024) |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | market_queue = multiprocessing.Queue() |
| | | market_queue = multiprocessing.Queue(maxsize=1024) |
| | | for i in range(20): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | order_queues.append(multiprocessing.Queue(maxsize=1024)) |
| | | transaction_queues.append(multiprocessing.Queue(maxsize=1024)) |
| | | threading.Thread(target=test_add_codes).start() |
| | | |
| | | run(queue_r, order_queues, transaction_queues, market_queue) |