| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | from typing import List |
| | | |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ |
| | | logger_local_huaxin_g_cancel, logger_l2_codes_subscript |
| | |
| | | SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", |
| | | b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] |
| | | SZ_Bond_Securities = [b"100303", b"109559", b"112617"] |
| | | spi = None |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | |
| | |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api): |
| | | def __init__(self, api, l2_data_upload_manager): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | | for c in codes: |
| | | l2_data_manager.add_target_code(c) |
| | | for c in del_codes: |
| | | l2_data_manager.del_target_code(c) |
| | | for c in add_codes: |
| | | l2_data_manager.run_upload_task(c, l2_data_callback) |
| | | try: |
| | | for c in del_codes: |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | | logger_system.exception(e) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | |
| | |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ]} |
| | | market_code_dict[pDepthMarketData['SecurityID']] = time.time() |
| | | |
| | | l2_data_manager.add_market_data(d) |
| | | self.l2_data_upload_manager.add_market_data(d) |
| | | except: |
| | | pass |
| | | |
| | |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | # G撤数据暂时注释 |
| | | # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) |
| | | # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos: |
| | | # # 正在成交的订单撤单了 |
| | | # l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo']) |
| | | # async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pTransaction['BuyNo']}") |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | | if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: |
| | |
| | | item["Side"] = "2" |
| | | # 深证撤单 |
| | | print("逐笔委托", item) |
| | | |
| | | l2_data_manager.add_l2_order_detail(item, 0, True) |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) |
| | | else: |
| | | if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: |
| | | # 涨停价 |
| | |
| | | # return |
| | | # self.__last_transaction_keys_dict[code] = key |
| | | # print("逐笔成交", item) |
| | | l2_data_manager.add_transaction_detail(item) |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | |
| | | def OnRtnOrderDetail(self, pOrderDetail): |
| | | can_listen = False |
| | |
| | | start_time = 0 |
| | | if code in self.special_code_volume_for_order_dict: |
| | | start_time = time.time() |
| | | if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ |
| | | 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | # 监控目标订单与影子订单 |
| | | if self.special_code_volume_for_order_dict[code][1] > time.time(): |
| | | # 特殊量监听 |
| | |
| | | else: |
| | | self.special_code_volume_for_order_dict.pop(code) |
| | | if not can_listen: |
| | | # 暂时注释掉G撤相关数据产生 |
| | | # if pOrderDetail['OrderStatus'] == b'D': |
| | | # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) |
| | | # if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos: |
| | | # # 正在成交的订单撤单了 |
| | | # l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO']) |
| | | # async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pOrderDetail['OrderNO']}") |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | |
| | | "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | l2_data_manager.add_l2_order_detail(item, start_time) |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, start_time) |
| | | |
| | | def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, |
| | | FirstLevelSellOrderVolumes): |
| | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, client_id, request_id, codes_data): |
| | | def OnSetL2Position(self, codes_data): |
| | | print("L2订阅数量:", len(codes_data)) |
| | | logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | try: |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(): |
| | | def __init_l2(l2_data_upload_manager): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | |
| | | # case 2非缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api) |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | |
| | | api.Init() |
| | | |
| | | |
| | | def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | value = value.decode("utf-8") |
| | | if type(value) == bytes: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | if data["type"] == "listen_volume": |
| | | _type = data["type"] |
| | | if _type == "listen_volume": |
| | | volume = data["data"]["volume"] |
| | | code = data["data"]["code"] |
| | | spi.set_code_special_watch_volume(code, volume) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __receive_from_pipe_strategy(pipe_): |
| | | logger_system.info(f"l2_client __receive_from_pipe_strategy 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | # print("__receive_from_pipe_strategy") |
| | | try: |
| | | val = pipe_.recv() |
| | | if val: |
| | | print("L2客户端接受到数据") |
| | | data = json.loads(val) |
| | | if data["data"]["type"] == "l2_cmd": |
| | | elif _type == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy, |
| | | _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None: |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | if queue_trade_w_l2_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True) |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | if _pipe_strategy is not None: |
| | | global pipe_strategy |
| | | pipe_strategy = _pipe_strategy |
| | | t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True) |
| | | t1.start() |
| | | __init_l2() |
| | | global l2_data_callback |
| | | l2_data_callback = _l2_data_callback |
| | | l2_data_manager.run_upload_common(l2_data_callback) |
| | | l2_data_manager.run_upload_trading_canceled(l2_data_callback) |
| | | |
| | | # 初始化 |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | l2_data_manager.run_upload_daemon(l2_data_callback) |
| | | # l2_data_manager.run_test(l2_data_callback) |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run(None, None, None) |
| | | # run(None, None, None) |
| | | # spi.set_codes_data([("000333", 12000)]) |
| | | input() |