# -*- coding: utf-8 -*-
|
import json
|
import logging
|
import multiprocessing
|
import os
|
import threading
|
import time
|
|
from huaxin_client import socket_util, l1_subscript_codes_manager
|
import xmdapi
|
from huaxin_client import tool, constant
|
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
|
from third_data import custom_block_in_money_manager
|
from third_data.custom_block_in_money_manager import BlockInMoneyRankManager, CodeInMoneyManager
|
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
|
from utils import tool as out_tool
|
|
################B类##################
|
ADDRESS = "udp://224.224.1.19:7880"
|
|
################A类##################
|
if constant.IS_A:
|
ADDRESS = "udp://224.224.1.9:7880"
|
|
level1_data_dict = {
|
|
}
|
|
|
def __send_response(sk, msg):
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
if result:
|
result_json = json.loads(result)
|
if result_json.get("code") == 0:
|
return True
|
return False
|
|
|
class MdSpi(xmdapi.CTORATstpXMdSpi):
|
def __init__(self, api, codes_sh, codes_sz):
|
for i in range(3):
|
try:
|
self.codes_sh, self.codes_sz = codes_sh, codes_sz
|
break
|
except:
|
time.sleep(2)
|
xmdapi.CTORATstpXMdSpi.__init__(self)
|
self.__api = api
|
|
def OnFrontConnected(self):
|
print("OnFrontConnected")
|
|
# 请求登录,目前未校验登录用户,请求域置空即可
|
login_req = xmdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 1)
|
|
def subscribe_codes(self, codes_sh, codes_sz):
|
# 重新订阅代码
|
if codes_sh:
|
ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
|
if ret != 0:
|
# print('SubscribeMarketData fail, ret[%d]' % ret)
|
pass
|
else:
|
# print('SubscribeMarketData success, ret[%d]' % ret)
|
pass
|
|
if codes_sz:
|
ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
|
if ret != 0:
|
# print('SubscribeMarketData fail, ret[%d]' % ret)
|
pass
|
else:
|
# print('SubscribeMarketData success, ret[%d]' % ret)
|
pass
|
|
def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
|
if pRspInfoField.ErrorID == 0:
|
# print('Login success! [%d]' % nRequestID)
|
|
'''
|
订阅行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情
|
其它情况,订阅sub_arr集合中的合约行情
|
'''
|
|
self.subscribe_codes(self.codes_sh, self.codes_sz)
|
# sub_arr = [b'600004']
|
# ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
|
# if ret != 0:
|
# print('UnSubscribeMarketData fail, ret[%d]' % ret)
|
# else:
|
# print('SubscribeMarketData success, ret[%d]' % ret)
|
|
|
else:
|
pass
|
# print('Login fail!!! [%d] [%d] [%s]'
|
# % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
# print('OnRspSubMarketData: OK!')
|
pass
|
else:
|
# print('OnRspSubMarketData: Error! [%d] [%s]'
|
# % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
pass
|
|
def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
# print('OnRspUnSubMarketData: OK!')
|
pass
|
else:
|
pass
|
# print('OnRspUnSubMarketData: Error! [%d] [%s]'
|
# % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRtnMarketData(self, pMarketDataField):
|
if pMarketDataField.SecurityName.find("S") == 0:
|
return
|
if pMarketDataField.SecurityName.find("ST") >= 0:
|
return
|
close_price = pMarketDataField.PreClosePrice
|
lastPrice = pMarketDataField.LastPrice
|
if pMarketDataField.BidPrice1:
|
lastPrice = pMarketDataField.BidPrice1
|
rate = round((lastPrice - close_price) * 100 / close_price, 2)
|
if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
|
# 涨停板20%以上的打折
|
rate = rate / 2
|
# (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)
|
level1_data_dict[pMarketDataField.SecurityID] = (
|
pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
|
pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
|
pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
|
try:
|
custom_block_in_money_manager.CodeInMoneyManager.set_market_info(pMarketDataField.SecurityID, lastPrice,
|
close_price, pMarketDataField.Turnover)
|
except:
|
pass
|
|
|
__latest_subscript_codes = set()
|
|
|
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
|
if not tool.is_trade_time() and not tool.is_pre_trade_time():
|
return
|
# 上传数据
|
type_ = "set_target_codes"
|
request_id = f"sb_{int(time.time() * 1000)}"
|
fdata = json.dumps(
|
{"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
|
if queue_l1_w_strategy_r is not None:
|
queue_l1_w_strategy_r.put_nowait(fdata)
|
# 记录新增加的代码
|
codes = set([x[0] for x in datas])
|
add_codes = codes - __latest_subscript_codes
|
__latest_subscript_codes.clear()
|
for c in codes:
|
__latest_subscript_codes.add(c)
|
if add_codes:
|
logger_local_huaxin_l1.info(f"({request_id})新增加订阅的代码:{add_codes}")
|
|
|
# 重新订阅代码
|
def re_subscript(spi: MdSpi):
|
try:
|
codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
|
if len(codes_sh) > 100 and len(codes_sz) > 100:
|
logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
spi.subscribe_codes(codes_sh, codes_sz)
|
except:
|
pass
|
|
|
__position_codes = set()
|
|
|
def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue):
|
while True:
|
try:
|
data = queue_l1_r_strategy_w.get()
|
if type(data) == str:
|
data = json.loads(data)
|
if data["type"] == "set_position_codes":
|
codes = set(data["data"])
|
global __position_codes
|
__position_codes = codes
|
logger_local_huaxin_l1.info(f"收到策略消息:{data}", )
|
except:
|
pass
|
finally:
|
time.sleep(1)
|
|
|
def __run_subscript_task(spi):
|
"""
|
运行订阅任务,在9:19到9:29之间开始订阅
|
@return:
|
"""
|
is_re_subscript = False
|
while True:
|
try:
|
# 判断是否需要重新订阅
|
if tool.is_pre_trade_time():
|
re_subscript(spi)
|
is_re_subscript = True
|
if is_re_subscript:
|
break
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
def __test_block_in_money():
|
codes = BlockInMoneyRankManager().get_codes()
|
logger_debug.info("获取到测试净流入代码数量:{}", len(codes))
|
page_size = 200
|
total_page = len(codes) // page_size + 1
|
|
for i in range(0, total_page):
|
temp_codes = codes[i * page_size: (i + 1) * page_size]
|
print(temp_codes)
|
# 获取最近的信息
|
latest_infos = HistoryKDatasUtils.get_gp_latest_info(temp_codes, "sec_id,pre_close")
|
pre_close_dict = {x["sec_id"]: x["pre_close"] for x in latest_infos}
|
|
current_infos = JueJinApi.get_gp_current_info(temp_codes, "symbol,price,cum_amount")
|
current_infos = {x["symbol"].split(".")[1]: (x["price"], x["cum_amount"]) for x in
|
current_infos}
|
|
for code in current_infos:
|
if code not in pre_close_dict:
|
continue
|
CodeInMoneyManager.set_market_info(code, current_infos[code][0], pre_close_dict[code],
|
current_infos[code][1])
|
|
|
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, queue_custom_block_in_money, fixed_codes=None):
|
"""
|
运行l1订阅任务
|
|
@param queue_l1_w_strategy_r: L1方写,策略方读
|
@param queue_l1_r_strategy_w: L1方读,策略方写
|
@param queue_custom_block_in_money: 板块流入流出计算结果
|
@param fixed_codes: 固定要返回数据的代码
|
@return:
|
"""
|
if fixed_codes is None:
|
fixed_codes = set()
|
logger_local_huaxin_l1.info("运行l1订阅服务")
|
codes_sh = []
|
codes_sz = []
|
for i in range(15):
|
try:
|
logger_local_huaxin_l1.info("开始获取目标代码")
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
break
|
except Exception as e:
|
logger_local_huaxin_l1.exception(e)
|
time.sleep(4)
|
logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
# 打印接口版本号
|
print(xmdapi.CTORATstpXMdApi_GetApiVersion())
|
|
# 创建接口对象
|
api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
|
|
# 创建回调对象
|
spi = MdSpi(api, codes_sh, codes_sz)
|
|
# 注册回调接口
|
api.RegisterSpi(spi)
|
|
# 注册单个行情前置服务地址
|
# api.RegisterFront("tcp://210.14.72.16:9402")
|
# 注册多个行情前置服务地址,用逗号隔开
|
# api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
|
# 注册名字服务器地址,支持多服务地址逗号隔开
|
# api.RegisterNameServer('tcp://224.224.3.19:7888')
|
# api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
|
|
# -------------------------正式地址B类-------------------------------
|
api.RegisterMulticast(ADDRESS, None, "")
|
|
# -------------------------正式地址A类-------------------------------
|
# api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
|
|
# 启动接口
|
api.Init()
|
|
logger_system.info("L1订阅服务启动成功")
|
# 测试链路
|
# level1_data_dict["000969"] = (
|
# "000969", 9.46, 9.11, 771000*100, time.time())
|
# level1_data_dict["002292"] = (
|
# "002292", 8.06, 9.96, 969500 * 100, time.time())
|
|
threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start()
|
|
threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
|
|
# 测试
|
# __test_block_in_money()
|
|
# 等待程序结束
|
while True:
|
# print("数量", len(level1_data_dict))
|
try:
|
# 计算流入流出并上传
|
custom_block_in_money_manager.BlockInMoneyRankManager().compute()
|
val = (custom_block_in_money_manager.BlockInMoneyRankManager().get_in_list(), custom_block_in_money_manager.BlockInMoneyRankManager().get_out_list())
|
val = json.dumps(val)
|
queue_custom_block_in_money.put_nowait(val)
|
|
if len(level1_data_dict) < 1:
|
continue
|
# 根据涨幅排序
|
|
# (代码,现价,涨幅,量,时间)
|
list_ = [level1_data_dict[k] for k in level1_data_dict]
|
flist = []
|
now_time_int = int(tool.get_now_time_str().replace(":", ""))
|
threshold_rate = constant.L1_MIN_RATE_PRE if now_time_int < int(
|
"094000") else constant.L1_MIN_RATE
|
for d in list_:
|
if d[2] >= threshold_rate or d[0] in fixed_codes:
|
# 涨幅小于5%的需要删除
|
flist.append(d)
|
flist.sort(key=lambda x: x[2], reverse=True)
|
# 将固定代码的排在最前
|
for code in fixed_codes:
|
if code in level1_data_dict:
|
flist.insert(0, level1_data_dict[code])
|
# 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
|
MAX_COUNT = 500
|
if now_time_int < int("092600"):
|
MAX_COUNT = 200
|
elif now_time_int < int("092800"):
|
MAX_COUNT = 300
|
elif now_time_int < int("092900"):
|
MAX_COUNT = 400
|
datas = flist[:MAX_COUNT]
|
if len(datas) > 0:
|
logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
|
__upload_codes_info(queue_l1_w_strategy_r, datas)
|
|
except Exception as e:
|
logging.exception(e)
|
logger_debug.exception(e)
|
finally:
|
time.sleep(3)
|
|
# 释放接口对象
|
api.Release()
|
|
|
def run_async(pipe_l2):
|
logger_system.info("L1进程ID:{}", os.getpid())
|
t1 = threading.Thread(target=lambda: run(pipe_l2), daemon=True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
pass
|