Administrator
2024-04-15 d9cc42e05b16984b9b8760661e0f6c3abaaa1f7e
添加L1数据采集
4个文件已添加
393 ■■■■■ 已修改文件
huaxin_client/l1_client_for_output.py 239 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_subscript_codes_manager.py 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l1_main.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l1_main.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client_for_output.py
New file
@@ -0,0 +1,239 @@
# -*- coding: utf-8 -*-
import json
import logging
import os
import queue
import threading
import time
import xmdapi
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_system, logger_local_huaxin_l1
################B类##################
from utils import socket_util, tool
ADDRESS = "udp://224.224.1.19:7880"
SERVER_HOST = '43.138.167.68'
SERVER_PORT = 12881
level1_data_queue = queue.Queue()
def __send_response(sk, msg):
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
class MdSpi(xmdapi.CTORATstpXMdSpi):
    def __init__(self, api, codes_sh, codes_sz):
        for i in range(3):
            try:
                self.codes_sh, self.codes_sz = codes_sh, codes_sz
                break
            except:
                time.sleep(2)
        xmdapi.CTORATstpXMdSpi.__init__(self)
        self.__api = api
    def OnFrontConnected(self):
        print("OnFrontConnected")
        # 请求登录,目前未校验登录用户,请求域置空即可
        login_req = xmdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 1)
    def subscribe_codes(self, codes_sh, codes_sz):
        # 重新订阅代码
        print(f"订阅数量:sh-{len(codes_sh)}  sz-{len(codes_sz)}")
        if codes_sh:
            ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
        if codes_sz:
            ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
    def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
        if pRspInfoField.ErrorID == 0:
            print('Login success! [%d]' % nRequestID)
            '''
            订阅行情
            当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情
            当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情
            其它情况,订阅sub_arr集合中的合约行情
            '''
            self.subscribe_codes(self.codes_sh, self.codes_sz)
            # sub_arr = [b'600004']
            # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
            # if ret != 0:
            #     print('UnSubscribeMarketData fail, ret[%d]' % ret)
            # else:
            #     print('SubscribeMarketData success, ret[%d]' % ret)
        else:
            print('Login fail!!! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
        else:
            print('OnRspSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnMarketData(self, pMarketDataField):
        if pMarketDataField.SecurityName.find("S") == 0:
            return
        if pMarketDataField.SecurityName.find("ST") >= 0:
            return
        """
        (代码,昨日收盘价,最新价,总成交量,总成交额,更新时间)
        """
        level1_data_queue.put_nowait((
            pMarketDataField.SecurityID, pMarketDataField.PreClosePrice, pMarketDataField.LastPrice,
            pMarketDataField.Volume, pMarketDataField.Turnover, pMarketDataField.UpdateTime))
def __upload_codes_info(datas):
    if not tool.is_trade_time():
        return
    data_bytes = socket_util.load_header(json.dumps({"type": "l1_data", "data": datas}))
    # 上传数据
    sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT)
    try:
        sk.sendall(data_bytes)
    finally:
        sk.close()
    # 上传数据
is_re_subscript = False
# 重新订阅代码
def re_subscript(spi: MdSpi):
    try:
        global is_re_subscript
        if is_re_subscript:
            return
        is_re_subscript = True
        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
        if len(codes_sh) > 100 and len(codes_sz) > 100:
            logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}")
            spi.subscribe_codes(codes_sh, codes_sz)
    except:
        pass
__position_codes = set()
def test_add_datas():
    while True:
        level1_data_queue.put_nowait(("000948", 12.91, 14.20, int(34.60 * 10000), 4.9 * 1e8, tool.get_now_time_str()))
        time.sleep(3)
def run():
    logger_local_huaxin_l1.info("运行l1订阅服务")
    codes_sh = []
    codes_sz = []
    for i in range(15):
        try:
            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
            logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
            break
        except Exception as e:
            logger_local_huaxin_l1.exception(e)
            time.sleep(4)
    logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
    # 打印接口版本号
    print(xmdapi.CTORATstpXMdApi_GetApiVersion())
    # 创建接口对象
    api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
    # 创建回调对象
    spi = MdSpi(api, codes_sh, codes_sz)
    # 注册回调接口
    api.RegisterSpi(spi)
    # 注册单个行情前置服务地址
    # api.RegisterFront("tcp://210.14.72.16:9402")
    # 注册多个行情前置服务地址,用逗号隔开
    # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
    # 注册名字服务器地址,支持多服务地址逗号隔开
    # api.RegisterNameServer('tcp://224.224.3.19:7888')
    # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
    # -------------------------正式地址B类-------------------------------
    api.RegisterMulticast(ADDRESS, None, "")
    # -------------------------正式地址A类-------------------------------
    # api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
    # 启动接口
    api.Init()
    logger_system.info("L1订阅服务启动成功")
    # 测试链路
    # level1_data_dict["000969"] = (
    #     "000969", 9.46, 9.11, 771000*100, time.time())
    # level1_data_dict["002292"] = (
    #     "002292", 8.06, 9.96, 969500 * 100, time.time())
    # TODO:测试
    threading.Thread(target= lambda: test_add_datas(), daemon=True).start()
    # 等待程序结束
    while True:
        try:
            # (代码,现价,涨幅,量,时间)
            datas = []
            while True:
                if not level1_data_queue.empty():
                    data = level1_data_queue.get()
                    datas.append(data)
                else:
                    break
            if datas:
                __upload_codes_info(datas)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(0.5)
    # 释放接口对象
    api.Release()
def run_async():
    logger_system.info("L1进程ID:{}", os.getpid())
    t1 = threading.Thread(target=lambda: run(), daemon=True)
    t1.start()
if __name__ == "__main__":
    pass
huaxin_client/l1_subscript_codes_manager.py
New file
@@ -0,0 +1,100 @@
"""
L1需要订阅的代码管理
"""
import json
import os
import constant
from huaxin_client.client_network import SendResponseSkManager
# 请求l1订阅的目标代码
from utils import socket_util
def request_l1_subscript_target_codes():
    type_ = "get_level1_codes"
    fdata = json.dumps(
        {"type": type_, "data": {}})
    msg = fdata.encode("utf-8")
    # 发送消息
    for i in range(3):
        try:
            sk = SendResponseSkManager.create_send_response_sk()
            msg = socket_util.load_header(msg)
            sk.sendall(msg)
            result, header_str = socket_util.recv_data(sk)
            # 读取代码
            result_json = json.loads(result)
            if result_json["code"] == 0:
                codes = result_json["data"]
                codes_sh = []
                codes_sz = []
                for code in codes:
                    if code.find("00") == 0:
                        codes_sz.append(code.encode("utf-8"))
                    else:
                        codes_sh.append(code.encode("utf-8"))
                print("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
                return codes_sh, codes_sz
        except ConnectionResetError:
            SendResponseSkManager.del_send_response_sk(type_)
        except BrokenPipeError:
            SendResponseSkManager.del_send_response_sk(type_)
    return None, None
__DIR_PATH = f"{constant.get_path_prefix()}/codes"
__CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text"
__CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text"
# 保存目标代码
def save_codes(codes_sh, codes_sz):
    if not os.path.exists(__DIR_PATH):
        os.mkdir(__DIR_PATH)
    with open(__CODE_SH_PATH, 'w') as f:
        for c in codes_sh:
            if type(c) == bytes:
                f.write(c.decode('utf-8'))
            else:
                f.write(c)
            f.write("\n")
    with open(__CODE_SZ_PATH, 'w') as f:
        for c in codes_sz:
            if type(c) == bytes:
                f.write(c.decode('utf-8'))
            else:
                f.write(c)
            f.write("\n")
def get_codes_from_file():
    codes_sh, codes_sz = [], []
    if os.path.exists(__CODE_SH_PATH):
        with open(__CODE_SH_PATH, 'r') as f:
            line = f.readline()
            while line:
                if line.strip():
                    codes_sh.append(line.strip().encode('utf-8'))
                line = f.readline()
    if os.path.exists(__CODE_SZ_PATH):
        with open(__CODE_SZ_PATH, 'r') as f:
            line = f.readline()
            while line:
                if line.strip():
                    codes_sz.append(line.strip().encode('utf-8'))
                line = f.readline()
    return codes_sh, codes_sz
def get_codes():
    codes_sh, codes_sz = get_codes_from_file()
    if not codes_sh or not codes_sz:
        return request_l1_subscript_target_codes()
    return codes_sh, codes_sz
if __name__ == '__main__':
    pass
l1_main.py
New file
@@ -0,0 +1,4 @@
from huaxin_client import l1_client_for_output
if __name__ == "__main__":
    l1_client_for_output.run()
l1_main.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['l1_main.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='l1_data_server',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='l1_data_server',
)