New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | """ |
| | | 可转债正股L2订阅 |
| | | """ |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import queue |
| | | import threading |
| | | import time |
| | | import concurrent.futures |
| | | |
| | | from code_atrribute.history_k_data_util import JueJinHttpApi, JueJinApi |
| | | from huaxin_client import command_manager |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \ |
| | | logger_local_huaxin_l2_transaction |
| | | from utils import tool |
| | | |
| | | ###B类### |
| | | Front_Address = "tcp://10.0.1.101:6900" |
| | | Multicast_Address = "udp://224.224.2.19:7889" |
| | | Multicast_Address2 = "udp://224.224.224.234:7890" |
| | | Local_Interface_Address = constant.LOCAL_IP |
| | | |
| | | g_SubMarketData = False |
| | | g_SubTransaction = False |
| | | g_SubOrderDetail = False |
| | | g_SubXTSTick = False |
| | | g_SubXTSMarketData = False |
| | | g_SubNGTSTick = False |
| | | g_SubBondMarketData = False |
| | | g_SubBondTransaction = False |
| | | g_SubBondOrderDetail = False |
| | | |
| | | SH_Securities = [] |
| | | SH_XTS_Securities = [] |
| | | |
| | | SZ_Securities = [] |
| | | SZ_Bond_Securities = [] |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | | latest_codes_set = set() |
| | | |
| | | special_code_volume_for_order_dict = {} |
| | | # 已经订阅的代码 |
| | | subscripted_codes = set() |
| | | # 代码的上次成交的订单唯一索引 |
| | | __last_transaction_keys_dict = {} |
| | | |
| | | limit_up_price_dict = {} |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api, ): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | | sse_codes = [] |
| | | for code in codes: |
| | | if code.find("00") == 0: |
| | | szse_codes.append(code.encode()) |
| | | elif code.find("60") == 0: |
| | | sse_codes.append(code.encode()) |
| | | return sse_codes, szse_codes |
| | | |
| | | # 新增订阅 |
| | | |
| | | # 取消订阅 |
| | | def __unsubscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") |
| | | if sh: |
| | | # 取消订阅逐笔成交 |
| | | self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | if sz: |
| | | self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | if sh: |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | if sz: |
| | | result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") |
| | | |
| | | def __process_codes_data(self, codes, from_cache=False, delay=0.0): |
| | | codes = set(codes) |
| | | if not self.is_login and not constant.TEST: |
| | | raise Exception("L2尚未登录") |
| | | if delay > 0: |
| | | time.sleep(delay) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | | try: |
| | | for c in del_codes: |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | | logger_system.exception(e) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | # 设置最近的代码列表 |
| | | self.latest_codes_set = codes |
| | | |
| | | # 订阅代码,[代码,...] |
| | | def set_codes_data(self, codes): |
| | | try: |
| | | self.__process_codes_data(codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_l2_codes_subscript.exception(e) |
| | | finally: |
| | | # 保存一份最新的数据 |
| | | self.__set_latest_datas(codes) |
| | | |
| | | @classmethod |
| | | def __set_latest_datas(cls, codes_data): |
| | | data_str = json.dumps([tool.get_now_date_str(), codes_data]) |
| | | with open(constant.L2_CODES_INFO_PATH, mode='w') as f: |
| | | f.write(data_str) |
| | | |
| | | @classmethod |
| | | def __get_latest_datas(cls): |
| | | if os.path.exists(constant.L2_CODES_INFO_PATH): |
| | | with open(constant.L2_CODES_INFO_PATH, mode='r') as f: |
| | | str_ = f.readline() |
| | | data_json = json.loads(str_) |
| | | if data_json[0] == tool.get_now_date_str(): |
| | | return data_json[1] |
| | | return [] |
| | | |
| | | def OnFrontConnected(self): |
| | | logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") |
| | | logout_req = lev2mdapi.CTORATstpUserLogoutField() |
| | | self.__api.ReqUserLogout(logout_req, 1) |
| | | time.sleep(1) |
| | | # 请求登录 |
| | | login_req = lev2mdapi.CTORATstpReqUserLoginField() |
| | | self.__api.ReqUserLogin(login_req, 2) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | # 初始设置值 |
| | | threading.Thread( |
| | | target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0), |
| | | daemon=True).start() |
| | | |
| | | def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | |
| | | def OnRspUnSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | def OnRtnTransaction(self, pTransaction): |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | # 撤单 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], |
| | | "Volume": pTransaction['TradeVolume'], |
| | | "OrderType": "2", |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], |
| | | "OrderStatus": "D"} |
| | | buyNo = pTransaction['BuyNo'] |
| | | sellNo = pTransaction['SellNo'] |
| | | if buyNo > 0: |
| | | # 买 |
| | | item["OrderNO"] = buyNo |
| | | item["Side"] = "1" |
| | | elif sellNo > 0: |
| | | # 卖 |
| | | item["OrderNO"] = sellNo |
| | | item["Side"] = "2" |
| | | else: |
| | | item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | "TradeVolume": pTransaction['TradeVolume'], |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | |
| | | if pTransaction['TradePrice'] == self.limit_up_price_dict.get(pTransaction['SecurityID']): |
| | | # TODO 成交价是涨停价才输出 |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}") |
| | | else: |
| | | pass |
| | | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, codes): |
| | | huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes)) |
| | | try: |
| | | spi.set_codes_data(codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | | # case 2: 组播方式 |
| | | g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST |
| | | |
| | | # case 1缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | | api.RegisterFront(Front_Address) |
| | | else: |
| | | # case 1 从一个组播地址收取行情 |
| | | api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") |
| | | |
| | | # case 2:注册多个组播地址同时收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); |
| | | |
| | | # case 3:efvi模式收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); |
| | | |
| | | # case 1 不绑核运行 |
| | | api.Init() |
| | | |
| | | |
| | | __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) |
| | | |
| | | |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | if type(value) == bytes: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | _type = data["type"] |
| | | if _type == "l2_cmd": |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"订阅代码:{data}") |
| | | __start_time = time.time() |
| | | # 线程池 |
| | | __l2_cmd_thread_pool.submit( |
| | | lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.005: |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s") |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def test_add_codes(queue_r): |
| | | time.sleep(10) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | | # data = json.loads(value) |
| | | # _type = data["type"] |
| | | # if _type == "listen_volume": |
| | | # volume = data["data"]["volume"] |
| | | # code = data["data"]["code"] |
| | | # spi.set_code_special_watch_volume(code, volume) |
| | | # elif _type == "l2_cmd": |
| | | # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | |
| | | time.sleep(2) |
| | | demo_datas = ["603002", |
| | | "002654", |
| | | "603701", |
| | | "002908"] |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": demo_datas})) |
| | | time.sleep(10) |
| | | while True: |
| | | try: |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | |
| | | def get_subscript_codes(): |
| | | """ |
| | | 获取需要订阅的代码 |
| | | :return: |
| | | """ |
| | | results = JueJinHttpApi.get_exchanges_codes("SHSE,SZSE", sec_types=[8], skip_suspended=True, skip_st=True, |
| | | fields="symbol, sec_type, sec_id,sec_name, underlying_symbol, delisted_date") |
| | | fresults = [] |
| | | for r in results: |
| | | tool.get_now_date_str() |
| | | if int(tool.get_now_date_str('%Y%m%d')) >= int(r['delisted_date'].strftime('%Y%m%d')): |
| | | continue |
| | | fresults.append(r) |
| | | print(len(fresults)) |
| | | return [x['underlying_symbol'].split('.')[1] for x in fresults] |
| | | |
| | | |
| | | def get_pre_price(codes): |
| | | """ |
| | | 获取昨日收盘价 |
| | | :param codes: |
| | | :return: |
| | | """ |
| | | symbols = JueJinApi.get_juejin_code_list_with_prefix(codes) |
| | | results = JueJinHttpApi.get_instruments(symbols, 'pre_close,sec_id') |
| | | return {x['sec_id']: round(x['pre_close'], 2) for x in results} |
| | | |
| | | |
| | | def __init_data(): |
| | | """ |
| | | 初始化数据 |
| | | :return: |
| | | """ |
| | | codes = None |
| | | # 获取目标代码 |
| | | for i in range(3): |
| | | try: |
| | | codes = get_subscript_codes() |
| | | if codes: |
| | | break |
| | | except: |
| | | time.sleep(5) |
| | | if codes: |
| | | # 设置订阅代码 |
| | | spi.set_codes_data(codes) |
| | | # 获取目标代码的收盘价 |
| | | pre_price_dict = None |
| | | for i in range(3): |
| | | try: |
| | | pre_price_dict = get_pre_price(codes) |
| | | if pre_price_dict: |
| | | break |
| | | except: |
| | | time.sleep(5) |
| | | if pre_price_dict: |
| | | for k in pre_price_dict: |
| | | limit_up_price = tool.to_price(decimal.Decimal(str(pre_price_dict[k])) * decimal.Decimal("1.1")) |
| | | Lev2MdSpi.limit_up_price_dict[k] = round(float(limit_up_price), 2) |
| | | |
| | | |
| | | def run() -> None: |
| | | logger_system.info("可转债L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"可转债l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | # 初始化 |
| | | __init_l2() |
| | | __init_data() |
| | | |
| | | threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start() |
| | | # TODO 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | logger_system.info("可转债L2订阅服务启动成功") |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run() |