Administrator
2024-01-30 6e16ef632b6a2df5c1514011e4495949cd8a26bd
根据L1数据来卖的规则单独封装
6个文件已修改
1个文件已添加
342 ■■■■■ 已修改文件
huaxin_client/l1_client_for_trade.py 250 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client_for_trade.py
New file
@@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
import collections
import json
import logging
import multiprocessing
import os
import threading
import time
from huaxin_client import socket_util, l1_subscript_codes_manager
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
################B类##################
ADDRESS = "udp://224.224.1.19:7880"
################A类##################
if constant.IS_A:
    ADDRESS = "udp://224.224.1.9:7880"
level1_data_dict = {
}
def __send_response(sk, msg):
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
class MdSpi(xmdapi.CTORATstpXMdSpi):
    l1_data_queue = collections.deque()
    __subscribed_codes = set()
    def __init__(self, api, codes_sh, codes_sz):
        for i in range(3):
            try:
                self.codes_sh, self.codes_sz = codes_sh, codes_sz
                break
            except:
                time.sleep(2)
        xmdapi.CTORATstpXMdSpi.__init__(self)
        self.__api = api
    def OnFrontConnected(self):
        print("OnFrontConnected")
        # 请求登录,目前未校验登录用户,请求域置空即可
        login_req = xmdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 1)
    # 重新订阅代码
    def subscribe_codes(self, codes_sh, codes_sz):
        print(f"订阅数量:sh-{len(codes_sh)}  sz-{len(codes_sz)}")
        if codes_sh:
            ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
        if codes_sz:
            ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
    def __seperate_codes(self, codes):
        codes_sh = []
        codes_sz = []
        for code in codes:
            if code.find("60") == 0:
                codes_sh.append(code.encode())
            elif code.find("00") == 0:
                codes_sz.append(code.encode())
        return codes_sh, codes_sz
    # 订阅代码
    def subscribe(self, codes: set):
        del_codes = self.__subscribed_codes - codes
        add_codes = codes - self.__subscribed_codes
        if add_codes:
            codes_sh, codes_sz = self.__seperate_codes(add_codes)
            if codes_sh:
                self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if codes_sz:
                self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
        if del_codes:
            codes_sh, codes_sz = self.__seperate_codes(del_codes)
            if codes_sh:
                self.__api.UnSubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if codes_sz:
                self.__api.UnSubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
    def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
        if pRspInfoField.ErrorID == 0:
            print('Login success! [%d]' % nRequestID)
            '''
            订阅行情
            当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情
            当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情
            其它情况,订阅sub_arr集合中的合约行情
            '''
            self.subscribe_codes(self.codes_sh, self.codes_sz)
            # sub_arr = [b'600004']
            # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
            # if ret != 0:
            #     print('UnSubscribeMarketData fail, ret[%d]' % ret)
            # else:
            #     print('SubscribeMarketData success, ret[%d]' % ret)
        else:
            print('Login fail!!! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
            self.__subscribed_codes.add(pSpecificSecurityField["SecurityID"])
        else:
            print('OnRspSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
            self.__subscribed_codes.discard(pSpecificSecurityField["SecurityID"])
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnMarketData(self, pMarketDataField):
        if pMarketDataField.SecurityName.find("S") == 0:
            return
        if pMarketDataField.SecurityName.find("ST") >= 0:
            return
        rate = 0
        self.l1_data_queue.append((
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1))
        # print(
        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
        #        pMarketDataField.Volume,
        #        pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
        #        pMarketDataField.AskPrice1,
        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    # 上传数据
    type_ = "upload_l1_trade_datas"
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue):
    while True:
        try:
            data = queue_l1_r_strategy_w.get()
            if type(data) == str:
                data = json.loads(data)
            if data["type"] == "set_target_codes":
                codes = set(data["data"])
                spi.subscribe(codes)
            logger_local_huaxin_l1.info(f"收到策略消息:{data}", )
        except:
            pass
        finally:
            time.sleep(1)
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1_for_trade订阅服务")
    codes_sh = []
    codes_sz = []
    # 打印接口版本号
    print(xmdapi.CTORATstpXMdApi_GetApiVersion())
    # 创建接口对象
    api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
    # 创建回调对象
    global spi
    spi = MdSpi(api, codes_sh, codes_sz)
    # 注册回调接口
    api.RegisterSpi(spi)
    # 注册单个行情前置服务地址
    # api.RegisterFront("tcp://210.14.72.16:9402")
    # 注册多个行情前置服务地址,用逗号隔开
    # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
    # 注册名字服务器地址,支持多服务地址逗号隔开
    # api.RegisterNameServer('tcp://224.224.3.19:7888')
    # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
    # -------------------------正式地址B类-------------------------------
    api.RegisterMulticast(ADDRESS, None, "")
    # -------------------------正式地址A类-------------------------------
    # api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
    # 启动接口
    api.Init()
    logger_system.info("L1订阅服务启动成功")
    # 测试链路
    # level1_data_dict["000969"] = (
    #     "000969", 9.46, 9.11, 771000*100, time.time())
    # level1_data_dict["002292"] = (
    #     "002292", 8.06, 9.96, 969500 * 100, time.time())
    threading.Thread(target=__read_from_strategy, args=(queue_l1_trade_r_strategy_w,), daemon=True).start()
    # 等待程序结束
    while True:
        try:
            temp_datas = []
            while len(spi.l1_data_queue) > 0:
                data = spi.l1_data_queue.popleft()
                temp_datas.append(data)
            if temp_datas:
                # 上传代码数据
                __upload_codes_info(queue_l1_trade_w_strategy_r, temp_datas)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(0.01)
    # 释放接口对象
    api.Release()
if __name__ == "__main__":
    pass
log_module/log.py
@@ -273,6 +273,10 @@
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_show_info",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l1", "l1_trade"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_trade_info",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "g_cancel"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_g_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -403,6 +407,9 @@
logger_local_huaxin_l1 = __mylogger.get_logger("local_huaxin_l1_show_info")
logger_local_huaxin_g_cancel = __mylogger.get_logger("local_huaxin_g_cancel")
logger_local_huaxin_l2_buy_no = __mylogger.get_logger("local_huaxin_l2_buy_no")
logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info")
def close_print():
main.py
@@ -11,6 +11,7 @@
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
import huaxin_client.l1_client_for_trade
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
@@ -20,7 +21,6 @@
from third_data import data_server
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
# from huaxin_api import trade_client, l2_client, l1_client
from utils import tool
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_,queue_l1_r_strategy_w):
                      market_queue_, queue_l1_trade_r_strategy_w, queue_l1_trade_w_strategy_r):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -41,7 +41,7 @@
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_r_strategy_w),
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w),
                          daemon=True)
    t1.start()
    #
@@ -52,7 +52,8 @@
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_)
                            transaction_queues_, market_queue_,
                            queue_l1_trade_w_strategy_r)
# 主服务
@@ -60,7 +61,8 @@
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
                                                pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
@@ -92,9 +94,12 @@
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        #
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -111,6 +116,10 @@
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,))
        l1Process.start()
        l1TradeProcess = multiprocessing.Process(target=huaxin_client.l1_client_for_trade.run,
                                                 args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,))
        l1TradeProcess.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
@@ -137,7 +146,8 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue,queue_l1_r_strategy_w)
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_api.py
@@ -133,7 +133,7 @@
# 设置交易通信队列
# 暂时不会使用该方法
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_):
    global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read
    global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,queue_l1_trade_r_strategy_w
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_
trade/huaxin/huaxin_trade_api_server.py
@@ -534,11 +534,11 @@
            time.sleep(1)
def run(pipe_server, queue_other_w_l2_r, queue_l1_r_strategy_w):
def run(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w):
    logger_system.info("create TradeApiServer")
    logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}")
    # 拉取交易信息
    huaxin_trade_data_update.run(queue_l1_r_strategy_w, queue_other_w_l2_r)
    huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_other_w_l2_r)
    #
    t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True)
    t1.start()
trade/huaxin/huaxin_trade_data_update.py
@@ -116,8 +116,8 @@
                                    init_data_util.re_set_price_pre(d["securityID"], force=True)
                                if d["prePosition"] > 0:
                                    position_codes.add(d["securityID"])
                            queue_l1_r_strategy_w.put_nowait(
                                {"type": "set_position_codes", "data": list(position_codes)})
                            queue_l1_trade_r_strategy_w.put_nowait(
                                {"type": "set_target_codes", "data": list(position_codes)})
                            # 9点25之前需要订阅持仓票
                            if position_codes and tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") < 0:
                                try:
@@ -168,9 +168,9 @@
# 运行
def run(queue_l1_r_strategy_w_, queue_other_w_l2_r_):
    global queue_l1_r_strategy_w, queue_other_w_l2_r
    queue_l1_r_strategy_w = queue_l1_r_strategy_w_
def run(queue_l1_trade_r_strategy_w_, queue_other_w_l2_r_):
    global queue_l1_trade_r_strategy_w, queue_other_w_l2_r
    queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_
    queue_other_w_l2_r = queue_other_w_l2_r_
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()
trade/huaxin/huaxin_trade_server.py
@@ -44,7 +44,8 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_trade_position_api_request, logger_request_api
    logger_system, logger_trade, logger_trade_position_api_request, logger_request_api, \
    logger_local_huaxin_l1_trade_info
from third_data import block_info, kpl_data_manager, kpl_util
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
@@ -361,9 +362,16 @@
        request_id = data_json["request_id"]
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__sell_thread_pool.submit(lambda: cls.__sell(datas))
        cls.__process_l1_data_thread_pool.submit(
            lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id))
    @classmethod
    def set_l1_trade_codes_info(cls, data_json):
        data = data_json["data"]
        request_id = data_json["request_id"]
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__sell(datas)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
@@ -505,6 +513,27 @@
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logging.exception(e)
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
    if queue_l1_trade_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_trade_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == "upload_l1_trade_datas":
                        # 处理专为交易提供的L1数据
                        TradeServerProcessor.set_l1_trade_codes_info(val)
                        async_log_util.info(logger_local_huaxin_l1_trade_info, val)
            except Exception as e:
                logger_local_huaxin_l1_trade_info.exception(e)
                logging.exception(e)
@@ -663,7 +692,8 @@
                    buy1_price = gpcode_manager.get_limit_up_price(code)
                    if not buy1_price:
                        raise Exception("尚未获取到涨停价")
                rule = SellRule(id_= data["id"], code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price,
                rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"],
                                buy1_price=buy1_price,
                                sell_volume=data["sell_volume"], sell_price_type=data["sell_price_type"],
                                end_time=data["end_time"])
                SellRuleManager().update_rule(rule)
@@ -1286,7 +1316,8 @@
                if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
                    raise Exception("尚未下单")
                cancel_buy_strategy.set_real_place_position(code, real_order_index,
                                                            buy_single_index=order_begin_pos.buy_single_index,is_default=False)
                                                            buy_single_index=order_begin_pos.buy_single_index,
                                                            is_default=False)
                result = {"code": 0, "data": {}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_positions":
@@ -1388,7 +1419,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue):
        market_queue, queue_l1_trade_w_strategy_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1414,6 +1445,10 @@
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
        t1.start()
        # 监听l1交易那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
        t1.start()
        # 同步异步日志
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()