# -*- coding: utf-8 -*-
|
"""
|
可转债正股L2订阅
|
"""
|
import decimal
|
import json
|
import logging
|
import multiprocessing
|
import os
|
import queue
|
import threading
|
import time
|
import concurrent.futures
|
|
from code_attribute import target_codes_manager
|
from code_attribute.history_k_data_util import JueJinApi, JueJinHttpApi
|
from huaxin_client import command_manager
|
from huaxin_client import constant
|
import lev2mdapi
|
from huaxin_client.command_manager import L2ActionCallback
|
from log_module import log, async_log_util
|
from log_module.async_log_util import huaxin_l2_log
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
|
logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog, \
|
logger_trade, logger_debug, logger_local_huaxin_trade_debug
|
from trade.buy_strategy import BuyStrategyDataManager
|
from utils import tool, l2_huaxin_util
|
|
###B类###
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = constant.LOCAL_IP
|
|
g_SubMarketData = False
|
g_SubTransaction = False
|
g_SubOrderDetail = False
|
g_SubXTSTick = False
|
g_SubXTSMarketData = False
|
g_SubNGTSTick = False
|
g_SubBondMarketData = False
|
g_SubBondTransaction = False
|
g_SubBondOrderDetail = False
|
|
SH_Securities = []
|
SH_XTS_Securities = []
|
|
SZ_Securities = []
|
SZ_Bond_Securities = []
|
set_codes_data_queue = queue.Queue()
|
market_code_dict = {}
|
|
l2_transaction_price_queue = queue.Queue()
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_market_codes = set()
|
subscripted_transaction_codes = set()
|
# 代码的上次成交的订单唯一索引
|
__last_transaction_keys_dict = {}
|
|
buyStrategyDataManager = BuyStrategyDataManager()
|
|
# 高涨幅的代码
|
__high_rate_codes = set()
|
|
def __init__(self, api, codes):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
self.codes = codes
|
|
def __split_codes(self, codes):
|
"""
|
分离代码
|
:param codes:
|
:return:上证股票代码, 上证非股票代码, 深证代码
|
"""
|
szse_codes = []
|
sse_other_codes = []
|
sse_stock_codes = []
|
for code in codes:
|
market_type = tool.get_market_type(code)
|
if market_type == tool.MARKET_TYPE_SZSE:
|
szse_codes.append(code.encode())
|
elif market_type == tool.MARKET_TYPE_SSE:
|
if tool.is_stock(code):
|
sse_stock_codes.append(code.encode())
|
else:
|
sse_other_codes.append(code.encode())
|
return sse_stock_codes, sse_other_codes, szse_codes
|
|
# 新增订阅
|
|
# 取消订阅
|
def __unsubscribe_trans(self, _codes):
|
sh_stock, sh_other, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh_stock}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh_other}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh_other:
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeTransaction(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sh_stock:
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeNGTSTick(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe_trans(self, _codes):
|
sh_stock, sh_other, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh_stock}")
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh_other}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh_other:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeTransaction(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
if sh_stock:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeNGTSTick(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
if sz:
|
result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
|
|
def __unsubscribe_market(self, _codes):
|
sh_stock, sh_other, sz = self.__split_codes(_codes)
|
if sh_other:
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeMarketData(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
self.__api.UnSubscribeXTSMarketData(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sh_stock:
|
self.__api.UnSubscribeMarketData(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
self.__api.UnSubscribeNGTSTick(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
|
if sz:
|
self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe_market(self, _codes):
|
sh_stock, sh_other, sz = self.__split_codes(_codes)
|
if sh_stock:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeMarketData(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
result = self.__api.SubscribeXTSMarketData(sh_stock, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
if sh_other:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeMarketData(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
result = self.__api.SubscribeXTSMarketData(sh_other, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
if sz:
|
result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
|
|
def __process_market_codes(self, codes, from_cache=False, delay=0.0):
|
codes = set(codes)
|
if not self.is_login and not constant.TEST:
|
raise Exception("L2尚未登录")
|
if delay > 0:
|
time.sleep(delay)
|
add_codes = codes - self.subscripted_market_codes
|
del_codes = self.subscripted_market_codes - codes
|
printlog("add del codes", add_codes, del_codes)
|
self.__subscribe_market(add_codes)
|
self.__unsubscribe_market(del_codes)
|
|
def __process_transaction_codes(self, codes, from_cache=False, delay=0.0):
|
codes = set(codes)
|
if not self.is_login and not constant.TEST:
|
raise Exception("L2尚未登录")
|
if delay > 0:
|
time.sleep(delay)
|
add_codes = codes - self.subscripted_transaction_codes
|
del_codes = self.subscripted_transaction_codes - codes
|
printlog("add del codes", add_codes, del_codes)
|
self.__subscribe_trans(add_codes)
|
self.__unsubscribe_trans(del_codes)
|
|
# 订阅代码,[代码,...]
|
def set_market_codes(self, codes):
|
try:
|
self.__process_market_codes(codes)
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
finally:
|
pass
|
|
def sub_high_rate_codes(self):
|
"""
|
订阅高涨幅的代码
|
:return:
|
"""
|
self.__process_transaction_codes(self.__high_rate_codes)
|
|
@classmethod
|
def __set_latest_datas(cls, codes_data):
|
data_str = json.dumps([tool.get_now_date_str(), codes_data])
|
with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
|
f.write(data_str)
|
|
@classmethod
|
def __get_latest_datas(cls):
|
if os.path.exists(constant.L2_CODES_INFO_PATH):
|
with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
|
str_ = f.readline()
|
data_json = json.loads(str_)
|
if data_json[0] == tool.get_now_date_str():
|
return data_json[1]
|
return []
|
|
def OnFrontConnected(self):
|
logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo['ErrorID'] == 0:
|
self.is_login = True
|
logger_system.info(f"L2行情登录成功")
|
self.set_market_codes(self.codes)
|
|
def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
|
if pRspInfo["ErrorID"] == 0:
|
self.subscripted_transaction_codes.add(pSpecificSecurity['SecurityID'])
|
logger_local_huaxin_l2_subscript.info(f"成交订阅成功:{pSpecificSecurity['SecurityID']}")
|
|
def OnRspUnSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
try:
|
code = pSpecificSecurity['SecurityID']
|
self.subscripted_transaction_codes.discard(code)
|
logger_local_huaxin_l2_subscript.info(f"成交取消订阅:{pSpecificSecurity['SecurityID']}")
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo["ErrorID"] == 0:
|
self.subscripted_transaction_codes.add(pSpecificSecurity['SecurityID'])
|
logger_local_huaxin_l2_subscript.info(f"成交订阅成功:{pSpecificSecurity['SecurityID']}")
|
|
def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
try:
|
code = pSpecificSecurity['SecurityID']
|
self.subscripted_transaction_codes.discard(code)
|
logger_local_huaxin_l2_subscript.info(f"成交取消订阅:{pSpecificSecurity['SecurityID']}")
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo["ErrorID"] == 0:
|
self.subscripted_market_codes.add(pSpecificSecurity['SecurityID'])
|
logger_local_huaxin_l2_subscript.info(f"market订阅成功:{pSpecificSecurity['SecurityID']}")
|
|
def OnRspUnSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
try:
|
code = pSpecificSecurity['SecurityID']
|
self.subscripted_market_codes.discard(code)
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRtnTransaction(self, pTransaction):
|
try:
|
|
# 输出逐笔成交数据
|
if pTransaction['ExecType'] != b"2":
|
item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
"TradeVolume": pTransaction['TradeVolume'],
|
"OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
"SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
"SellNo": pTransaction['SellNo'],
|
"ExecType": pTransaction['ExecType'].decode()}
|
huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
|
|
results = self.buyStrategyDataManager.add_transaction_info(item)
|
for result in results:
|
if result[0]:
|
l2_transaction_price_queue.put_nowait(
|
(pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeTime'], results))
|
break
|
except Exception as e:
|
logger_local_huaxin_l2_error.exception(e)
|
|
def OnRtnNGTSTick(self, pTick):
|
try:
|
# logger_debug.info(f"OnRtnNGTSTick: {pTick}")
|
# 输出逐笔成交数据
|
if pTick['TickType'] == b'T':
|
item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
|
"TradeVolume": pTick['Volume'],
|
"OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
|
"SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
|
"SellNo": pTick['SellNo'],
|
"ExecType": '1'}
|
huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
|
results = self.buyStrategyDataManager.add_transaction_info(item)
|
for result in results:
|
if result[0]:
|
huaxin_l2_log.info(logger_local_huaxin_trade_debug, f"达到买入条件:{pTick['SecurityID']} - {results} - {item}")
|
l2_transaction_price_queue.put_nowait(
|
(pTick['SecurityID'], pTick['Price'], pTick['TickTime'], results))
|
break
|
except Exception as e:
|
logger_local_huaxin_l2_error.exception(e)
|
|
def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
# 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
|
try:
|
d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
|
"preClosePrice": pDepthMarketData['PreClosePrice'],
|
"lastPrice": pDepthMarketData['LastPrice'],
|
"totalBidVolume": pDepthMarketData['TotalBidVolume'],
|
"avgBidPrice": pDepthMarketData['AvgBidPrice'],
|
"totalAskVolume": pDepthMarketData['TotalAskVolume'],
|
"avgAskPrice": pDepthMarketData["AvgAskPrice"]
|
# "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
|
# (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
|
# (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
|
# (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
|
# (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
|
# "sell": [
|
# (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
|
# (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
|
# (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
|
# (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
|
# (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
|
# ]
|
}
|
# 获取涨幅,如果涨幅大于5%/10%就加入目标代码
|
rate = round(
|
(pDepthMarketData['LastPrice'] - pDepthMarketData['PreClosePrice']) / pDepthMarketData['PreClosePrice'],
|
4)
|
# 代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 委托买入总量, 委托卖出总量, 昨日收盘价
|
|
market_call_back_queue.put_nowait((pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], rate,
|
pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'],
|
pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'],
|
pDepthMarketData['TotalAskVolume'], pDepthMarketData['PreClosePrice'],
|
pDepthMarketData['DataTimeStamp']))
|
code = pDepthMarketData['SecurityID']
|
if code.find("00") == 0 or code.find("60") == 0:
|
if rate >= 0.05:
|
self.__high_rate_codes.add(code)
|
else:
|
self.__high_rate_codes.discard(code)
|
elif code.find("11") == 0 or code.find("12") == 0:
|
# 过滤可转债
|
pass
|
else:
|
if rate >= 0.06:
|
self.__high_rate_codes.add(code)
|
else:
|
self.__high_rate_codes.discard(code)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
def OnRtnXTSMarketData(self, pMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
d = {"dataTimeStamp": pMarketData['DataTimeStamp'], "securityID": pMarketData['SecurityID'],
|
"preClosePrice": pMarketData['PreClosePrice'],
|
"lastPrice": pMarketData['LastPrice'],
|
"totalBidVolume": pMarketData['TotalBidVolume'],
|
"avgBidPrice": pMarketData['AvgBidPrice'],
|
"totalAskVolume": pMarketData['TotalAskVolume'],
|
"avgAskPrice": pMarketData["AvgAskPrice"]}
|
rate = round(
|
(pMarketData['LastPrice'] - pMarketData['PreClosePrice']) / pMarketData['PreClosePrice'],
|
4)
|
# 代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 委托买入总量, 委托卖出总量, 昨日收盘价
|
|
market_call_back_queue.put_nowait((pMarketData['SecurityID'], pMarketData['LastPrice'], rate,
|
pMarketData['BidPrice1'], pMarketData['BidVolume1'],
|
pMarketData['TotalVolumeTrade'], pMarketData['TotalBidVolume'],
|
pMarketData['TotalAskVolume'], pMarketData['PreClosePrice']))
|
|
|
class MyL2ActionCallback(L2ActionCallback):
|
|
def OnSetL2Position(self, codes):
|
huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes))
|
try:
|
spi.set_market_codes(codes)
|
except Exception as e:
|
logging.exception(e)
|
|
|
def __init_l2(codes):
|
printlog(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api, codes)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
|
|
|
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
|
logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
value = queue_trade_w_l2_r.get()
|
if value:
|
if type(value) == bytes:
|
value = value.decode("utf-8")
|
data = json.loads(value)
|
_type = data["type"]
|
if _type == "l2_cmd":
|
huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"订阅代码:{data}")
|
__start_time = time.time()
|
# 线程池
|
__l2_cmd_thread_pool.submit(
|
lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
|
use_time = time.time() - __start_time
|
if use_time > 0.005:
|
huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
|
|
except Exception as e:
|
logging.exception(e)
|
|
|
pipe_strategy = None
|
|
|
def get_pre_price(codes):
|
"""
|
获取昨日收盘价
|
:param codes:
|
:return:
|
"""
|
symbols = JueJinApi.get_juejin_code_list_with_prefix(codes)
|
results = JueJinHttpApi.get_instruments(symbols, 'pre_close,sec_id')
|
return {x['sec_id']: round(x['pre_close'], 2) for x in results}
|
|
|
def __init_data():
|
"""
|
初始化数据
|
:return:
|
"""
|
market_codes = []
|
# 获取目标代码
|
for i in range(3):
|
try:
|
underlying_codes = target_codes_manager.get_subscript_underlying_codes()
|
cb_codes = target_codes_manager.get_subscript_cb_codes()
|
if underlying_codes and cb_codes:
|
market_codes.extend(underlying_codes)
|
market_codes.extend(cb_codes)
|
break
|
except Exception as e:
|
logger_system.exception(e)
|
time.sleep(5)
|
logger_system.info(f'订阅行情数量:{len(market_codes)}')
|
if market_codes:
|
# 获取目标代码的收盘价
|
pre_price_dict = {}
|
for i in range(3):
|
try:
|
pre_price_dict = get_pre_price(market_codes)
|
if pre_price_dict:
|
break
|
except Exception as e:
|
logger_system.exception(e)
|
time.sleep(5)
|
logger_system.info(f'昨日收盘价数量:{len(market_codes)}')
|
if pre_price_dict:
|
for k in pre_price_dict:
|
Lev2MdSpi.buyStrategyDataManager.set_pre_close_price(k, pre_price_dict[k])
|
return market_codes
|
|
|
def start_sub_high_price():
|
while True:
|
try:
|
spi.sub_high_rate_codes()
|
except:
|
pass
|
time.sleep(3)
|
|
|
__latest_transaction_price_dict = {}
|
|
|
def start_process_transactions():
|
while True:
|
try:
|
# 代码, 成交价格, 成交时间
|
result = l2_transaction_price_queue.get()
|
trade_call_back_queue.put_nowait(result)
|
continue
|
code = result[0]
|
if code not in __latest_transaction_price_dict:
|
__latest_transaction_price_dict[code] = []
|
|
if not __latest_transaction_price_dict[code] or __latest_transaction_price_dict[code][-1][0] != result[1]:
|
__latest_transaction_price_dict[code].append((result[1], result[2]))
|
# 删除1s之前的数据
|
while True:
|
end_time, start_time = __latest_transaction_price_dict[code][-1][1], \
|
__latest_transaction_price_dict[code][0][1]
|
if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
|
l2_huaxin_util.convert_time(start_time, with_ms=True)) <= 1000:
|
break
|
else:
|
# 删除第一个元素
|
del __latest_transaction_price_dict[code][0]
|
if __latest_transaction_price_dict[code][-1][0] - __latest_transaction_price_dict[code][0][0] >= 0.1:
|
# 1s内升了10档
|
async_log_util.info(logger_trade, f"1s内连升10档:{code} - {result}")
|
trade_call_back_queue.put_nowait((code, __latest_transaction_price_dict[code][-1][1]))
|
except:
|
pass
|
|
|
def run(trade_call_back_queue_: multiprocessing.Queue, market_call_back_queue_: multiprocessing.Queue) -> None:
|
"""
|
先订阅所有的L2market行情数据,筛选出比较大的涨幅(主板>5%,科创板/创业板>10%)的票,然后订阅其交成交L2数据
|
:param trade_call_back_queue_: 添加的内容格式为:(代码,交易时间)
|
:return:
|
"""
|
logger_system.info("可转债L2进程ID:{}", os.getpid())
|
logger_system.info(f"可转债l2_client 线程ID:{tool.get_thread_id()}")
|
try:
|
# log.close_print()
|
# 初始化
|
global trade_call_back_queue, market_call_back_queue
|
trade_call_back_queue = trade_call_back_queue_
|
market_call_back_queue = market_call_back_queue_
|
codes = __init_data()
|
__init_l2(codes)
|
|
threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
|
threading.Thread(target=start_sub_high_price, daemon=True).start()
|
threading.Thread(target=start_process_transactions, daemon=True).start()
|
|
# TODO 测试
|
# threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
|
global l2CommandManager
|
l2CommandManager = command_manager.L2CommandManager()
|
logger_system.info("可转债L2订阅服务启动成功")
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
run()
|