admin
2024-05-22 4ee1bd5de9ca76a69adac6e17a11bd686c742ef3
融入交易
1个文件已删除
14个文件已修改
12个文件已添加
10629 ■■■■■ 已修改文件
_traderapi.cp37-win32.pyd 补丁 | 查看 | 原始文档 | blame | 历史
_traderapi.cp37-win_amd64.pyd 补丁 | 查看 | 原始文档 | blame | 历史
_traderapi.cpython-37m-x86_64-linux-gnu.so 补丁 | 查看 | 原始文档 | blame | 历史
constant.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/mysql_data.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/client_network.py 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py 164 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/huaxin_trade_client.py 524 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_manager.py 616 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 464 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/request_log_util.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_cb_api_server.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_l1_data_server.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/push_msg_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
socket_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade_api.py 323 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
traderapi.py 8064 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/kp_client_msg_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
_traderapi.cp37-win32.pyd
Binary files differ
_traderapi.cp37-win_amd64.pyd
Binary files differ
_traderapi.cpython-37m-x86_64-linux-gnu.so
Binary files differ
constant.py
@@ -95,6 +95,8 @@
        "passwd": "Yeshi2016@"
    }
LOG_DIR = "logs"
# 获取根路径
def get_path_prefix():
    return 'D:' if is_windows() else '/home'
data_server.py
@@ -6,10 +6,8 @@
import dask
import log
from code_attribute import gpcode_manager
from log import logger_request_debug
from log_module import log_analyse, log_export
from log_module import log_analyse, log_export, log, request_log_util
from output import limit_up_data_filter, output_util
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
@@ -273,7 +271,7 @@
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        log.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}")
        request_log_util.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}")
        try:
            if url.path == "/kpl/get_limit_up_list":
                response_data = self.__get_limit_up_list()
@@ -287,12 +285,12 @@
                result = hosting_api_util.get_from_data_server(url.path, ps_dict)
                self.__send_response(result)
        finally:
            log.request_info("DATA_SERVER_GET", f"GET 请求结束")
            request_log_util.request_info("DATA_SERVER_GET", f"GET 请求结束")
    def do_POST(self):
        path = self.path
        url = urlparse.urlparse(path)
        log.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}")
        request_log_util.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}")
        try:
            if url.path == "/upload_kpl_data":
                # 接受开盘啦数据
@@ -300,7 +298,7 @@
                result_str = self.__process_kpl_data(params)
                self.__send_response(result_str)
        finally:
            log.request_info("DATA_SERVER_POST", f"POST 请求结束")
            request_log_util.request_info("DATA_SERVER_POST", f"POST 请求结束")
    def __process_kpl_data(self, data):
        data = json.loads(json.dumps(data).replace("概念", ""))
db/mysql_data.py
@@ -6,11 +6,11 @@
# 把连接参数定义成字典
import constant
config = constant.MYSQL_CONFIG
class Mysqldb:
    # 初始化方法
    def __init__(self):
    def __init__(self, config=constant.MYSQL_CONFIG):
        self.config = config
        # 初始化方法中调用连接数据库的方法
        self.conn = self.get_conn()
        # 调用获取游标的方法
@@ -22,7 +22,7 @@
    # 连接数据库的方法
    def get_conn(self):
        # **config代表不定长参数
        conn = pymysql.connect(**config)
        conn = pymysql.connect(**self.config)
        return conn
    # 获取游标
@@ -82,4 +82,5 @@
    # 插入单条数据
    mysqldb.execute("insert into clients(account,pwd,rule) values(%s,%s,%s)", ("test", 123456, "\"123"))
    # 插入多条数据
    mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)", [("test", 123456, "\"123"),("test", 123456, "\"123")])
    mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)",
                         [("test", 123456, "\"123"), ("test", 123456, "\"123")])
db/redis_manager.py
@@ -6,7 +6,7 @@
import redis
import constant
from log import logger_redis_debug
from log_module.log import logger_redis_debug
config = constant.REDIS_CONFIG
huaxin_client/client_network.py
New file
@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
import json
import socket
from utils import socket_util
SERVER_IP = "127.0.0.1"
SERVER_PORT = 10008
class SendResponseSkManager:
    __send_response_sk_dict = {}
    @classmethod
    def get_send_response_sk(cls, type):
        if type not in cls.__send_response_sk_dict:
            client = cls.create_send_response_sk()
            cls.__send_response_sk_dict[type] = client
        return cls.__send_response_sk_dict[type]
    @classmethod
    def del_send_response_sk(cls, type_):
        if type_ in cls.__send_response_sk_dict:
            sk = cls.__send_response_sk_dict[type_]
            cls.__send_response_sk_dict.pop(type_)
            try:
                sk.close()
            except:
                pass
    @classmethod
    def create_send_response_sk(cls, addr=SERVER_IP, port=SERVER_PORT):
        client = socket.socket()  # 生成socket,连接server
        client.connect((addr, port))
        return client
    @classmethod
    def send_error_response(cls, type, request_id, client_id, msg):
        cls.send_normal_response(type, cls.load_response(client_id, request_id, {"code": 1, "msg": msg}))
    @classmethod
    def __send_normal_response(cls, sk, msg):
        # 添加内容长度头
        msg = cls.format_response(msg)
        sk.send(msg)
        result, header_str = socket_util.recv_data(sk)
        # printlog("响应", result)
        if result:
            result_json = json.loads(result)
            if result_json.get("code") == 0:
                return True
        return False
    # 发送消息
    @classmethod
    def send_normal_response(cls, type, msg):
        try:
            sk = SendResponseSkManager.get_send_response_sk(type)
            if cls.__send_normal_response(sk, msg):
                return True
            else:
                # 再次发送
                sk = SendResponseSkManager.get_send_response_sk(type)
                return cls.__send_normal_response(sk, msg)
        except ConnectionResetError as e:
            SendResponseSkManager.del_send_response_sk(type)
            sk = SendResponseSkManager.get_send_response_sk(type)
            return cls.__send_normal_response(sk, msg)
        except BrokenPipeError as e:
            SendResponseSkManager.del_send_response_sk(type)
            sk = SendResponseSkManager.get_send_response_sk(type)
            return cls.__send_normal_response(sk, msg)
    @classmethod
    def load_response(cls, client_id, request_id, data_json):
        return json.dumps({"type": "response", "data": data_json, "client_id": client_id,
                           "request_id": request_id}).encode(
            'utf-8')
    @classmethod
    def format_response(cls, data_bytes):
        slen = '##%08d' % len(data_bytes)
        return slen.encode("utf-8") + data_bytes
huaxin_client/command_manager.py
New file
@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
"""
命令管理器
"""
import concurrent.futures
import logging
import multiprocessing
import threading
from log_module import async_log_util
from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug, logger_info
MSG_TYPE_HEART = "heart"
# 命令信息
MSG_TYPE_CMD = "cmd"
CLIENT_TYPE_TRADE = "trade"
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
CLIENT_TYPE_DEAL_LIST = "deal_list"
CLIENT_TYPE_POSITION_LIST = "position_list"
CLIENT_TYPE_MONEY = "money"
CLIENT_TYPE_DEAL = "deal"
CLIENT_TYPE_CMD_L2 = "l2_cmd"
# 心跳时间间隔
HEART_SPACE_TIME = 3
class TradeActionCallback(object):
    # 交易
    def OnTrade(self, client_id, request_id, sk, type_, data):
        pass
    # 委托列表
    def OnDelegateList(self, client_id, request_id, sk, can_cancel):
        pass
    # 成交列表
    def OnDealList(self, client_id, request_id, sk):
        pass
    # 成交列表
    def OnPositionList(self, client_id, request_id, sk):
        pass
    # 获取资金信息
    def OnMoney(self, client_id, request_id, sk):
        pass
    # 测试
    def OnTest(self, client_id, request_id, data, sk):
        pass
class L2ActionCallback(object):
    # 监听L2数据
    def OnSetL2Position(self, codes_data):
        pass
# 交易指令管理
class TradeCommandManager:
    trade_client_dict = {}
    _instance = None
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_w_trade_r_for_query: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_w_trade_r = queue_strategy_trade_read_for_trade
        cls.queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_r_for_query
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}")
        # 查看是否是设置L2的代码
        try:
            data = result_json["data"]
            request_id = result_json.get('request_id')
            if _type == CLIENT_TYPE_TRADE:
                # 交易
                ctype = data["trade_type"]
                async_log_util.info(logger_trade, f"交易开始:{request_id}")
                cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
                async_log_util.info(logger_trade, f"交易结束:{request_id}")
            elif _type == CLIENT_TYPE_MONEY:
                cls.action_callback.OnMoney(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DEAL_LIST:
                cls.action_callback.OnDealList(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DELEGATE_LIST:
                can_cancel = data["can_cancel"]
                cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
            elif _type == CLIENT_TYPE_POSITION_LIST:
                cls.action_callback.OnPositionList(client_id, request_id, sk)
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data, sk)
        except Exception as e:
            async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}")
            # logging.exception(e)
            # logging.error(result_json)
    @classmethod
    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
        if queue_strategy_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = queue_strategy_trade.get()
                    if val:
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                            # TODO 测试
                            logger_info.info(f"接受到信息: {val}")
                        cls.process_command(_type, None, val)
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r_for_query),
                             daemon=True).start()
            self.run_process_command(self.queue_strategy_w_trade_r)
        else:
            # 接受命令
            threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r), daemon=True).start()
            threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r_for_query),
                             daemon=True).start()
# L2指令管理
class L2CommandManager:
    action_callback = None
    @classmethod
    def init(cls, l2_action_callback):
        cls.action_callback = l2_action_callback
    @classmethod
    def process_command(cls, _type, client_id, result_json):
        data = result_json["data"]
        ctype = result_json["type"]
        if ctype == CLIENT_TYPE_CMD_L2:
            cls.action_callback.OnSetL2Position(data)
if __name__ == "__main__":
    manager = TradeCommandManager("127.0.0.1", 10008, None)
    manager.run()
    input()
huaxin_client/huaxin_trade_client.py
New file
@@ -0,0 +1,524 @@
# -*- coding: utf-8 -*-
import concurrent.futures
import threading
import time
from loguru import logger
import traderapi
from huaxin_client import trade_manager
from huaxin_client.command_manager import TradeCommandManager
from huaxin_client.trade_manager import TradeSimpleApi
from log_module import async_log_util
from log_module.log import logger_system, printlog, logger_local_huaxin_trade_debug, logger_info
TEST_TRADE = True
########B类########
UserID = '388000013349'
# 登陆密码
Password = '110808'
# 投资者账户
InvestorID = '388000013349'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '388000013349'
# 沪市股东账号
SSE_ShareHolderID = 'A641420991'
# 深市股东账号
SZSE_ShareHolderID = '0345104949'
LOCAL_IP = "192.168.84.75"
FRONT_ADDRESS = "tcp://192.168.84.31:6500"
FRONT_ADDRESS1 = "tcp://192.168.84.32:26500"
if TEST_TRADE:
    # # 仿真
    # from mylog import logger_trade_debug
    #
    UserID = '00032047'
    # 登陆密码
    Password = '59009218'
    # 投资者账户
    InvestorID = '00032047'
    # 经济公司部门代码
    DepartmentID = '0003'
    # 资金账户
    AccountID = '00032047'
    # 沪市股东账号
    SSE_ShareHolderID = 'A00032047'
    # 深市股东账号
    SZSE_ShareHolderID = '700032047'
class TraderSpi(traderapi.CTORATstpTraderSpi):
    def __init__(self, api, callback):
        traderapi.CTORATstpTraderSpi.__init__(self)
        self.__api = api
        self.__front_id = 0
        self.__session_id = 0
        self.__data_callback = callback
        self.__temp_order_list_dict = {}
        self.__temp_position_list_dict = {}
        self.__temp_money_account_list_dict = {}
        self.call_back_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    def OnFrontConnected(self) -> "void":
        # 获取终端信息
        TradeSimpleApi.req_id += 1
        ret = self.__api.ReqGetConnectionInfo(TradeSimpleApi.req_id)
        if ret != 0:
            logger_system.info('ReqGetConnectionInfo fail, ret[%d]' % ret)
    def OnFrontDisconnected(self, nReason: "int") -> "void":
        printlog('OnFrontDisconnected: [%d]' % nReason)
    def OnRspGetConnectionInfo(self, pConnectionInfoField: "CTORATstpConnectionInfoField",
                               pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger.info('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress)
            logger.info('inner_port[%d]' % pConnectionInfoField.InnerPort)
            logger.info('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress)
            logger.info('outer_port[%d]' % pConnectionInfoField.OuterPort)
            logger.info('mac_address[%s]' % pConnectionInfoField.MacAddress)
            # 请求登录
            login_req = traderapi.CTORATstpReqUserLoginField()
            # 支持以用户代码、资金账号和股东账号方式登录
            # (1)以用户代码方式登录
            login_req.LogInAccount = UserID
            login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID
            # (2)以资金账号方式登录
            # login_req.DepartmentID = DepartmentID
            # login_req.LogInAccount = AccountID
            # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID
            # (3)以上海股东账号方式登录
            # login_req.LogInAccount = SSE_ShareHolderID
            # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock
            # (4)以深圳股东账号方式登录
            # login_req.LogInAccount = SZSE_ShareHolderID
            # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock
            # 支持以密码和指纹(移动设备)方式认证
            # (1)密码认证
            # 密码认证时AuthMode可不填
            # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password
            login_req.Password = Password
            # (2)指纹认证
            # 非密码认证时AuthMode必填
            # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint
            # login_req.DeviceID = '03873902'
            # login_req.CertSerial = '9FAC09383D3920CAEFF039'
            # 终端信息采集
            # UserProductInfo填写终端名称
            login_req.UserProductInfo = 'jiabei'
            # 按照监管要求填写终端信息
            login_req.TerminalInfo = f'PC;IIP=NA;IPORT=NA;LIP={LOCAL_IP};MAC=5C6F69CC2B40;HD=004bc76004aff0882b9052ba0eb00506;@jiabei'
            # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送
            # login_req.MacAddress = '5C-87-9C-96-F3-E3'
            # login_req.InnerIPAddress = '10.0.1.102'
            # login_req.OuterIPAddress = '58.246.43.50'
            TradeSimpleApi.req_id += 1
            ret = self.__api.ReqUserLogin(login_req, TradeSimpleApi.req_id)
            if ret != 0:
                printlog('ReqUserLogin fail, ret[%d]' % ret)
        else:
            printlog('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % (
                nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUserLogin(self, pRspUserLoginField: "CTORATstpRspUserLoginField", pRspInfoField: "CTORATstpRspInfoField",
                       nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger_system.info('Login success! [%d]' % nRequestID)
            self.__front_id = pRspUserLoginField.FrontID
            self.__session_id = pRspUserLoginField.SessionID
            # TradeSimpleApi.set_login_info(self.__session_id, self.__front_id)
            # if 1:
            #     # 查询股东账号
            #     req_field = traderapi.CTORATstpQryShareholderAccountField()
            #
            #     # 以下字段不填表示不设过滤条件,即查询所有股东账号
            #     # req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
            #
            #     TradeSimpleApi.req_id += 1
            #     ret = api.ReqQryShareholderAccount(req_field, TradeSimpleApi.req_id)
            #     if ret != 0:
            #         logger_info.info('ReqQryShareholderAccount fail, ret[%d]' % ret)
        else:
            logger_system.info('Login fail!!! [%d] [%d] [%s]'
                               % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUserPasswordUpdate(self, pUserPasswordUpdateField: "CTORATstpUserPasswordUpdateField",
                                pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            printlog('OnRspUserPasswordUpdate: OK! [%d]' % nRequestID)
        else:
            printlog('OnRspUserPasswordUpdate: Error! [%d] [%d] [%s]'
                     % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspOrderInsert(self, pInputOrderField: "CTORATstpInputOrderField", pRspInfoField: "CTORATstpRspInfoField",
                         nRequestID: "int") -> "void":
        try:
            if pRspInfoField.ErrorID == 0:
                async_log_util.info(logger_local_huaxin_trade_debug,
                                    '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID))
            else:
                async_log_util.error(logger_local_huaxin_trade_debug,
                                     f"OnRspOrderInsert 报单出错:{pRspInfoField.ErrorID}-{pRspInfoField.ErrorMsg}")
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderField.SInfo,
                                                   "orderStatus": -1,
                                                   "orderStatusMsg": pRspInfoField.ErrorMsg})
        except:
            pass
    # 撤单响应
    def OnRspOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField",
                         pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        try:
            if pRspInfoField.ErrorID == 0:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: OK! [%d]' % nRequestID)
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_CANCEL_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderActionField.SInfo,
                                                   "orderSysID": pInputOrderActionField.OrderSysID,
                                                   "cancel": 1})
            else:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: Error! [%d] [%d] [%s]'
                                    % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_CANCEL_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderActionField.SInfo,
                                                   "orderSysID": pInputOrderActionField.OrderSysID,
                                                   "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                   "errorMsg": pRspInfoField.ErrorMsg})
        except:
            pass
    # 撤单错误回报
    def OnErrRtnOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField",
                            pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        try:
            if pInputOrderActionField and pRspInfoField:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]'
                                    % (nRequestID, pInputOrderActionField.OrderSysID,
                                       pRspInfoField.ErrorID,
                                       pRspInfoField.ErrorMsg))
        except:
            async_log_util.info(logger_local_huaxin_trade_debug, "OnErrRtnOrderAction: 撤单出错")
    def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField",
                           pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        # try:
        #     if pRspInfoField.ErrorID == 0:
        #         logger.info('OnRspInquiryJZFund: OK! [%d] [%.2f] [%.2f]'
        #                     % (nRequestID, pRspInquiryJZFundField.UsefulMoney, pRspInquiryJZFundField.FetchLimit))
        #     else:
        #         logger.info('OnRspInquiryJZFund: Error! [%d] [%d] [%s]'
        #                     % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
        # except:
        #     pass
        pass
    def OnRspTransferFund(self, pInputTransferFundField: "CTORATstpInputTransferFundField",
                          pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        # try:
        #     if pRspInfoField.ErrorID == 0:
        #         logger.info('OnRspTransferFund: OK! [%d]' % nRequestID)
        #     else:
        #         logger.info('OnRspTransferFund: Error! [%d] [%d] [%s]'
        #                     % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
        # except:
        #     pass
        pass
    def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void":
        try:
            async_log_util.info(logger_local_huaxin_trade_debug,
                                '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s] InsertTime[%s]'
                                % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID,
                                   pOrderField.SecurityID,
                                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                                   pOrderField.OrderStatus, pOrderField.InsertTime))
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown:
                pass
            #     if queue_trade_w_l2_r is not None:
            #         queue_trade_w_l2_r.put_nowait(
            #             json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID,
            #                                                           "volume": pOrderField.VolumeTotalOriginal}}).encode(
            #                 'utf-8'))
            else:
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
                              "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                              "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                              "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                              "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                              "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded,
                              "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                              "statusMsg": pOrderField.StatusMsg}
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_ORDER, 0, order_data)
        except Exception as e:
            async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错")
        except:
            async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错")
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        try:
            async_log_util.info(logger_local_huaxin_trade_debug,
                                'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]'
                                % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID,
                                   pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price,
                                   pTradeField.Volume))
        except:
            pass
    def OnRtnMarketStatus(self, pMarketStatusField: "CTORATstpMarketStatusField") -> "void":
        # TORA_TSTP_MKD_SHA(1): 上海A股
        # TORA_TSTP_MKD_SZA(2): 深圳A股
        # TORA_TSTP_MKD_BJMain(a):北京主板
        # TORA_TSTP_MST_UnKnown(  # ):未知
        # TORA_TSTP_MST_BeforeTrading(0): 开盘前
        # TORA_TSTP_MST_Continous(1): 连续交易
        # TORA_TSTP_MST_Closed(2): 收盘
        # TORA_TSTP_MST_OpenCallAuction(3): 开盘集合竞价
        try:
            logger.info('OnRtnMarketStatus: MarketID[%s] MarketStatus[%s]'
                        % (pMarketStatusField.MarketID, pMarketStatusField.MarketStatus))
        except:
            pass
    def OnRspQrySecurity(self, pSecurityField: "CTORATstpSecurityField", pRspInfoField: "CTORATstpRspInfoField",
                         nRequestID: "int", bIsLast: "bool") -> "void":
        if bIsLast != 1:
            logger.info(
                'OnRspQrySecurity[%d]: SecurityID[%s] SecurityName[%s] MarketID[%s] OrderUnit[%s] OpenDate[%s] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]'
                % (nRequestID, pSecurityField.SecurityID, pSecurityField.SecurityName, pSecurityField.MarketID,
                   pSecurityField.OrderUnit, pSecurityField.OpenDate, pSecurityField.UpperLimitPrice,
                   pSecurityField.LowerLimitPrice))
        else:
            logger.info('查询合约结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspQryInvestor(self, pInvestorField: "CTORATstpInvestorField", pRspInfoField: "CTORATstpRspInfoField",
                         nRequestID: "int", bIsLast: "bool") -> "void":
        if bIsLast != 1:
            logger.info('OnRspQryInvestor[%d]: InvestorID[%s] InvestorName[%s] Operways[%s]'
                        % (nRequestID, pInvestorField.InvestorID, pInvestorField.InvestorName,
                           pInvestorField.Operways))
        else:
            logger.info('查询投资者结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspQryShareholderAccount(self, pShareholderAccountField: "CTORATstpShareholderAccountField",
                                   pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int",
                                   bIsLast: "bool") -> "void":
        if bIsLast != 1:
            logger_local_huaxin_trade_debug.info(
                'OnRspQryShareholderAccount[%d]: InvestorID[%s] ExchangeID[%s] ShareholderID[%s]'
                % (nRequestID, pShareholderAccountField.InvestorID, pShareholderAccountField.ExchangeID,
                   pShareholderAccountField.ShareholderID))
        else:
            logger.info('查询股东账户结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspQryTradingAccount(self, pTradingAccountField: "CTORATstpTradingAccountField",
                               pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void":
        try:
            if nRequestID not in self.__temp_money_account_list_dict:
                self.__temp_money_account_list_dict[nRequestID] = []
            if bIsLast != 1:
                self.__temp_money_account_list_dict[nRequestID].append(
                    {"departmentID": pTradingAccountField.DepartmentID, "investorID": pTradingAccountField.InvestorID,
                     "accountID": pTradingAccountField.AccountID, "currencyID": pTradingAccountField.CurrencyID,
                     "usefulMoney": round(pTradingAccountField.UsefulMoney, 2),
                     "frozenCash": round(pTradingAccountField.FrozenCash, 2),
                     "fetchLimit": round(pTradingAccountField.FetchLimit, 2),
                     "preDeposit": round(pTradingAccountField.PreDeposit, 2)})
                # logger.info(
                #     'OnRspQryTradingAccount[%d]: DepartmentID[%s] InvestorID[%s] AccountID[%s] CurrencyID[%s] UsefulMoney[%.2f] FetchLimit[%.2f]'
                #     % (nRequestID, pTradingAccountField.DepartmentID, pTradingAccountField.InvestorID,
                #        pTradingAccountField.AccountID, pTradingAccountField.CurrencyID,
                #        pTradingAccountField.UsefulMoney, pTradingAccountField.FetchLimit))
            else:
                results = self.__temp_money_account_list_dict.pop(nRequestID)
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_MONEY, nRequestID,
                                                  results)
                # logger.info('查询资金账号结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
        except:
            pass
    def OnRspQryOrder(self, pOrderField: "CTORATstpOrderField", pRspInfoField: "CTORATstpRspInfoField",
                      nRequestID: "int", bIsLast: "bool") -> "void":
        try:
            if nRequestID not in self.__temp_order_list_dict:
                self.__temp_order_list_dict[nRequestID] = []
            if not bIsLast:
                # logger.info(
                #     'OnRspQryOrder[%d]: SecurityID[%s] OrderLocalID[%s] Direction[%s] OrderRef[%d] OrderSysID[%s] VolumeTraded[%d] OrderStatus[%s] OrderSubmitStatus[%s], StatusMsg[%s]'
                #     % (nRequestID, pOrderField.SecurityID, pOrderField.OrderLocalID, pOrderField.Direction,
                #        pOrderField.OrderRef, pOrderField.OrderSysID,
                #        pOrderField.VolumeTraded, pOrderField.OrderStatus, pOrderField.OrderSubmitStatus,
                #        pOrderField.StatusMsg))
                self.__temp_order_list_dict[nRequestID].append(
                    {"securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID,
                     "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                     "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                     "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                     "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                     "turnover": pOrderField.Turnover, "orderRef": pOrderField.OrderRef,
                     "volume": pOrderField.VolumeTotalOriginal,
                     "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                     "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg})
            else:
                # logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_DELEGATE, nRequestID,
                                                  self.__temp_order_list_dict[nRequestID])
                self.__temp_order_list_dict.pop(nRequestID)
        except:
            pass
    def OnRspQryPosition(self, pPositionField: "CTORATstpPositionField", pRspInfoField: "CTORATstpRspInfoField",
                         nRequestID: "int", bIsLast: "bool") -> "void":
        try:
            if nRequestID not in self.__temp_position_list_dict:
                self.__temp_position_list_dict[nRequestID] = []
            if bIsLast != 1:
                self.__temp_position_list_dict[nRequestID].append(
                    {"investorID": pPositionField.InvestorID, "tradingDay": pPositionField.TradingDay,
                     "securityName": pPositionField.SecurityName,
                     "securityID": pPositionField.SecurityID, "historyPos": pPositionField.HistoryPos,
                     "historyPosFrozen": pPositionField.HistoryPosFrozen,
                     "todayBSPos": pPositionField.TodayBSPos, "todayBSPosFrozen": pPositionField.TodayBSPosFrozen,
                     "historyPosPrice": pPositionField.HistoryPosPrice, "totalPosCost": pPositionField.TotalPosCost,
                     "prePosition": pPositionField.PrePosition, "availablePosition": pPositionField.AvailablePosition,
                     "currentPosition": pPositionField.CurrentPosition, "openPosCost": pPositionField.OpenPosCost,
                     "todayCommission": pPositionField.TodayCommission,
                     "todayTotalBuyAmount": pPositionField.TodayTotalBuyAmount,
                     "todayTotalSellAmount": pPositionField.TodayTotalSellAmount})
            else:
                # logger.info('查询持仓结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_POSITION, nRequestID,
                                                  self.__temp_position_list_dict[nRequestID])
                self.__temp_position_list_dict.pop(nRequestID)
        except:
            pass
    # 成交回报,参数pTradeField是一个CTORATstpTradeField类对象
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        pass
        # logger.info("OnRtnTrade")
    # 查询成交响应,参数pTradeField是一个CTORATstpTradeField类对象
    def OnRspQryTrade(self, pTradeField: "CTORATstpTradeField", pRspInfoField: "CTORATstpRspInfoField",
                      nRequestID: "int", bIsLast: "bool") -> "void":
        try:
            # logger.info("查询成交响应")
            pass
            if nRequestID not in self.__temp_order_list_dict:
                self.__temp_order_list_dict[nRequestID] = []
            if not bIsLast:
                self.__temp_order_list_dict[nRequestID].append(
                    {"tradeID": pTradeField.TradeID, "securityID": pTradeField.SecurityID,
                     "orderLocalID": pTradeField.OrderLocalID,
                     "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID,
                     "price": pTradeField.Price,
                     "tradeTime": pTradeField.TradeTime,
                     "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate,
                     "tradingDay": pTradeField.TradingDay,
                     "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID})
            else:
                self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_TRADED, nRequestID,
                                                  self.__temp_order_list_dict[nRequestID])
                self.__temp_order_list_dict.pop(nRequestID)
        except:
            pass
def run(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_result):
    """
    交易运行
    :param queue_strategy_w_trade_r:
    :return:
    """
    logger_system.info("交易初始化")
    # -----------初始化交易环境---------------------
    trade_manager.set_result_read_queue(queue_result)
    api = traderapi.CTORATstpTraderApi.CreateTstpTraderApi('./flow', False)
    # 创建回调对象
    spi = TraderSpi(api, trade_manager.traderapi_callback)
    # 注册回调接口
    api.RegisterSpi(spi)
    # 注册多个交易前置服务地址,用逗号隔开
    # api.RegisterFront('tcp://10.0.1.101:6500,tcp://10.0.1.101:26500')
    # 注册名字服务器地址,支持多服务地址逗号隔开
    # api.RegisterNameServer('tcp://10.0.1.101:52370')
    # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
    if not TEST_TRADE:  # 模拟环境,TCP 直连Front方式
        # 注册单个交易前置服务地址
        ##B类服务器##
        logger.info(f"注册交易地址:{FRONT_ADDRESS}/{FRONT_ADDRESS1}")
        api.RegisterFront(FRONT_ADDRESS)  # 正式环境主地址
        api.RegisterFront(FRONT_ADDRESS1)  # 正式环境备用地址
        ##A类服务器##
        # api.RegisterFront("tcp://10.224.123.143:6500")  # 正式环境主地址
        # api.RegisterFront("tcp://10.224.123.147:26500")  # 正式环境备用地址
        # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400"  # 24小时环境A套
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        # api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        # printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
    else:  # 模拟环境,FENS名字服务器方式
        TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        # TD_TCP_FrontAddress="tcp://210.14.72.15:4400" #24小时环境A套
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
    # 订阅私有流
    api.SubscribePrivateTopic(traderapi.TORA_TERT_QUICK)
    # 订阅公有流
    api.SubscribePublicTopic(traderapi.TORA_TERT_QUICK)
    # 启动接口
    api.Init()
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
    data_callback = trade_manager.MyTradeActionCallback(UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api)
    # 不需要运行命令解析
    tradeCommandManager = TradeCommandManager()
    tradeCommandManager.init(
        data_callback,
        queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query)
    logger_info.debug("全部初始化完成")
    tradeCommandManager.run(True)
    while True:
        time.sleep(2)
if __name__ == "__main__":
    pass
huaxin_client/trade_manager.py
New file
@@ -0,0 +1,616 @@
"""
华鑫交易管理
"""
import concurrent.futures
import json
import logging
import time
import multiprocessing
import traderapi
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.command_manager import TradeActionCallback
from log_module import async_log_util
from log_module.log import logger_trade, logger_local_huaxin_trade_debug, printlog
from utils import tool, socket_util
ENABLE_ORDER = True
TYPE_ORDER = 0
TYPE_CANCEL_ORDER = 1
TYPE_LIST_DELEGATE = 2
TYPE_LIST_TRADED = 3
TYPE_LIST_POSITION = 4
TYPE_LIST_MONEY = 5
# 成交
TYPE_DEAL = 6
__queue_result: multiprocessing.Queue = None
def set_result_read_queue(queue_result):
    """
    设置结果读取队列
    :param queue_result:
    :return:
    """
    global __queue_result
    __queue_result = queue_result
def __send_response(type, data_bytes):
    sk = SendResponseSkManager.create_send_response_sk()
    try:
        data_bytes = socket_util.load_header(data_bytes)
        sk.sendall(data_bytes)
        result, header_str = socket_util.recv_data(sk)
        result = json.loads(result)
        if result["code"] != 0:
            raise Exception(result['msg'])
    finally:
        sk.close()
def send_response(data, type, _client_id, _request_id, show_log=True):
    if show_log:
        async_log_util.debug(logger_local_huaxin_trade_debug, f"回调返回内容:{data}")
    __queue_result.put_nowait(data)
# 交易反馈回调
def __traderapi_callback(type, req_id, data):
    async_log_util.info(logger_local_huaxin_trade_debug, "回调:type-{} req_id-{}", type, req_id)
    key = req_id
    if type == TYPE_ORDER or type == TYPE_CANCEL_ORDER:
        key = data["sinfo"]
    try:
        if req_rid_dict and key in req_rid_dict:
            temp_params = req_rid_dict.pop(key)
            client_id, request_id = temp_params[0], temp_params[1]
            # 本地订单号-系统订单号映射
            if len(temp_params) >= 4 and type == TYPE_ORDER:
                order_ref = temp_params[3]
                data["orderRef"] = order_ref
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调 request_id-{}", request_id)
            # 测试
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
            #                 "request_id": request_id}), type, client_id, request_id, temp_params[2])
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}), type, client_id, request_id)
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id)
        else:
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                type,
                None,
                req_id)
    except Exception as e:
        logging.exception(e)
# 采用异步回调
def traderapi_callback(type, req_id, data):
    __traderapi_callback(type, req_id, data)
req_rid_dict = {}
class TradeSimpleApi:
    req_id = 0
    __buy_sinfo_set = set()
    __sell_sinfo_set = set()
    __cancel_buy_sinfo_set = set()
    __cancel_sell_sinfo_set = set()
    def __init__(self, UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api: traderapi.CTORATstpTraderApi):
        """
        :param SZSE_ShareHolderID: 深证投资者代码
        :param SSE_ShareHolderID: 上证投资者代码
        :param api: 交易接口
        """
        self.UserID = UserID
        self.Password = Password
        self.SZSE_ShareHolderID = SZSE_ShareHolderID
        self.SSE_ShareHolderID = SSE_ShareHolderID
        self.api = api
    @classmethod
    def set_login_info(cls, session_id, front_id):
        cls.__session_id = session_id
        cls.__front_id = front_id
    # sinfo char(32)
    def buy(self, code, count, price, sinfo, order_ref, shadow_price=None):
        if not ENABLE_ORDER:
            return
        if sinfo in self.__buy_sinfo_set:
            raise Exception(f'下单请求已经提交:{sinfo}')
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始")
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入买入方法:code-{code} sinfo-{sinfo} order_ref-{order_ref}")
        self.__buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求报单
        req_field = traderapi.CTORATstpInputOrderField()
        # TORA_TSTP_EXD_COMM(0): 通用(内部使用)
        # TORA_TSTP_EXD_SSE(1): 上海交易所
        # TORA_TSTP_EXD_SZSE(2): 深圳交易所
        # TORA_TSTP_EXD_HK(3): 香港交易所
        # TORA_TSTP_EXD_BSE(4): 北京证券交易所
        if tool.get_market_type(code) == tool.MARKET_TYPE_SZSE:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
            req_field.ShareholderID = self.SZSE_ShareHolderID
        elif tool.get_market_type(code) == tool.MARKET_TYPE_SSE:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
            req_field.ShareholderID = self.SSE_ShareHolderID
        # 证券代码
        req_field.SecurityID = code
        req_field.Direction = traderapi.TORA_TSTP_D_Buy
        req_field.VolumeTotalOriginal = count
        req_field.SInfo = sinfo
        req_field.OrderRef = order_ref
        '''
        上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令
        深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令
        限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格
        以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段:
        '''
        req_field.LimitPrice = price
        req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice
        req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD
        req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV
        '''
        OrderRef为报单引用,类型为整型,该字段报单时为选填
        若不填写,则系统会为每笔报单自动分配一个报单引用
        若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号
        '''
        # req_field.OrderRef = 1
        '''
        InvestorID为选填,若填写则需保证填写正确
        Operway为委托方式,根据券商要求填写,无特殊说明置空即可
        终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端
        '''
        # req_field.SInfo = 'sinfo'
        # req_field.IInfo = 123
        '''
        其它字段置空
        '''
        # 给L2发送消息
        ret = self.api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        # 常态化监听不需要单独设置
        # if queue_other_w_l2_r is not None:
        #     queue_other_w_l2_r.put_nowait(
        #         json.dumps({"type": "listen_volume", "data": {"code": code,
        #                                                       "volume": count}}).encode(
        #             'utf-8'))
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
        # --------------------------------影子订单--------------------------------
        if shadow_price:
            if order_ref:
                # 下一个影子订单
                shadow_order_ref = order_ref + 1
                shadow_sinfo = f"s_b_{order_ref}"
                req_field.LimitPrice = shadow_price
                req_field.SInfo = shadow_sinfo
                req_field.OrderRef = shadow_order_ref
                req_field.VolumeTotalOriginal = 100
                self.req_id += 1
                ret = self.api.ReqOrderInsert(req_field, self.req_id)
                if ret != 0:
                    raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
                # 影子订单撤单
                # 撤掉影子单
                shadow_cancel_order_ref = shadow_order_ref + 1
                # 深证停留50ms上证停留200ms
                delay_s = 0.05 if code.find("00") == 0 else 0.2
                self.cancel_buy(code, f"s_c_{shadow_order_ref}", order_sys_id=None,
                                order_ref=shadow_order_ref,
                                order_action_ref=None, delay_s=delay_s)
        return ret
    # 撤买
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None, delay_s=0.0):
        if delay_s > 0:
            time.sleep(delay_s)
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入撤单方法:code-{code} order_sys_id-{order_sys_id}  order_ref-{order_ref} sinfo-{sinfo}")
        self.__cancel_buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求撤单
        req_field = traderapi.CTORATstpInputOrderActionField()
        if code.find('00') == 0:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
        elif code.find('60') == 0:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
        req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete
        # 撤单支持以下两种方式定位原始报单:
        # (1)报单引用方式
        # req_field.FrontID = self.__front_id
        # req_field.SessionID = self.__session_id
        # req_field.OrderRef = 1
        # (2)系统报单编号方式
        if order_sys_id:
            req_field.OrderSysID = order_sys_id
        elif order_ref is not None:
            req_field.OrderRef = order_ref
            req_field.SessionID = self.__session_id
            req_field.FrontID = self.__front_id
        if order_action_ref:
            req_field.OrderActionRef = order_action_ref
        # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填
        '''
        终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端
        '''
        req_field.SInfo = sinfo
        # req_field.IInfo = 123
        '''
        委托方式字段根据券商要求填写,无特殊说明置空即可
        其它字段置空
        '''
        ret = self.api.ReqOrderAction(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderAction fail, ret[%d]' % ret)
        return
    # 卖
    def sell(self, code, count, price, price_type, sinfo, order_ref=None):
        if sinfo in self.__sell_sinfo_set:
            raise Exception(f'下单请求已经提交:{sinfo}')
        self.__sell_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求报单
        req_field = traderapi.CTORATstpInputOrderField()
        # TORA_TSTP_EXD_COMM(0): 通用(内部使用)
        # TORA_TSTP_EXD_SSE(1): 上海交易所
        # TORA_TSTP_EXD_SZSE(2): 深圳交易所
        # TORA_TSTP_EXD_HK(3): 香港交易所
        # TORA_TSTP_EXD_BSE(4): 北京证券交易所
        if tool.get_market_type(code) == tool.MARKET_TYPE_SZSE:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
            req_field.ShareholderID = self.SZSE_ShareHolderID
        elif tool.get_market_type(code) == tool.MARKET_TYPE_SSE:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
            req_field.ShareholderID = self.SSE_ShareHolderID
        # 证券代码
        req_field.SecurityID = code
        req_field.Direction = traderapi.TORA_TSTP_D_Sell
        req_field.VolumeTotalOriginal = count
        req_field.SInfo = sinfo
        '''
        上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令
        深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令
        限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格
        以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段:
        '''
        printlog('卖 price', price, price_type)
        if price and price > 0:
            req_field.LimitPrice = price
        if price_type == 1:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_AnyPrice
        elif price_type == 2:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice
        elif price_type == 3:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_BestPrice
        elif price_type == 4:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FixPrice
        elif price_type == 5:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FiveLevelPrice
        elif price_type == 6:
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_HomeBestPrice
        req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD
        req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV
        if order_ref:
            req_field.OrderRef = order_ref
        '''
        OrderRef为报单引用,类型为整型,该字段报单时为选填
        若不填写,则系统会为每笔报单自动分配一个报单引用
        若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号
        '''
        # req_field.OrderRef = 1
        '''
        InvestorID为选填,若填写则需保证填写正确
        Operway为委托方式,根据券商要求填写,无特殊说明置空即可
        终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端
        '''
        # req_field.SInfo = 'sinfo'
        # req_field.IInfo = 123
        '''
        其它字段置空
        '''
        ret = self.api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        return
    # 撤卖
    def cancel_sell(self, code, order_sys_id, sinfo):
        if sinfo in self.__cancel_sell_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        self.__cancel_sell_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求撤单
        req_field = traderapi.CTORATstpInputOrderActionField()
        if code.find('00') == 0:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
        elif code.find('60') == 0:
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
        req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete
        # 撤单支持以下两种方式定位原始报单:
        # (1)报单引用方式
        # req_field.FrontID = self.__front_id
        # req_field.SessionID = self.__session_id
        # req_field.OrderRef = 1
        # (2)系统报单编号方式
        req_field.OrderSysID = order_sys_id
        # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填
        '''
        终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端
        '''
        req_field.SInfo = sinfo
        # req_field.IInfo = 123
        '''
        委托方式字段根据券商要求填写,无特殊说明置空即可
        其它字段置空
        '''
        ret = self.api.ReqOrderAction(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderAction fail, ret[%d]' % ret)
        return
    # 查询当日可撤销的委托
    def list_delegate_orders(self, is_cancel):
        self.req_id += 1
        req_id = self.req_id
        req_field = traderapi.CTORATstpQryOrderField()
        # 以下字段不填表示不设过滤条件,即查询所有报单
        # req_field.SecurityID = '600000'
        req_field.InsertTimeStart = '09:15:00'
        req_field.InsertTimeEnd = '15:00:00'
        # IsCancel字段填1表示只查询可撤报单
        if is_cancel:
            req_field.IsCancel = 1
        # req_field.SInfo = sinfo
        ret = self.api.ReqQryOrder(req_field, req_id)
        if ret != 0:
            raise Exception('ReqQryOrder fail, ret[%d]' % ret)
        return req_id
    # 查询当日成交的订单
    def list_traded_orders(self):
        self.req_id += 1
        req_id = self.req_id
        req_field = traderapi.CTORATstpQryTradeField()
        ret = self.api.ReqQryTrade(req_field, req_id)
        if ret != 0:
            raise Exception('ReqQryTrade fail, ret[%d]' % ret)
        return req_id
    # 查询持仓
    def list_positions(self):
        self.req_id += 1
        req_id = self.req_id
        req_field = traderapi.CTORATstpQryPositionField()
        ret = self.api.ReqQryPosition(req_field, req_id)
        if ret != 0:
            raise Exception('ReqQryPosition fail, ret[%d]' % ret)
        return req_id
    # 查询资金账户
    def get_money_account(self):
        self.req_id += 1
        req_id = self.req_id
        req_field = traderapi.CTORATstpQryTradingAccountField()
        req_field.CurrencyID = traderapi.TORA_TSTP_CID_CNY
        ret = self.api.ReqQryTradingAccount(req_field, req_id)
        if ret != 0:
            raise Exception('ReqQryTradingAccount fail, ret[%d]' % ret)
        return req_id
    def login(self):
        # 请求登录
        login_req = traderapi.CTORATstpReqUserLoginField()
        # 支持以用户代码、资金账号和股东账号方式登录
        # (1)以用户代码方式登录
        login_req.LogInAccount = self.UserID
        login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID
        # (2)以资金账号方式登录
        # login_req.DepartmentID = DepartmentID
        # login_req.LogInAccount = AccountID
        # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID
        # (3)以上海股东账号方式登录
        # login_req.LogInAccount = SSE_ShareHolderID
        # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock
        # (4)以深圳股东账号方式登录
        # login_req.LogInAccount = SZSE_ShareHolderID
        # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock
        # 支持以密码和指纹(移动设备)方式认证
        # (1)密码认证
        # 密码认证时AuthMode可不填
        # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password
        login_req.Password = self.Password
        # (2)指纹认证
        # 非密码认证时AuthMode必填
        # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint
        # login_req.DeviceID = '03873902'
        # login_req.CertSerial = '9FAC09383D3920CAEFF039'
        # 终端信息采集
        # UserProductInfo填写终端名称
        login_req.UserProductInfo = 'jiabei'
        # 按照监管要求填写终端信息
        login_req.TerminalInfo = 'PC;IIP=123.112.154.118;IPORT=50361;LIP=192.168.118.107;MAC=54EE750B1713FCF8AE5CBD58;HD=TF655AY91GHRVL'
        # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送
        # login_req.MacAddress = '5C-87-9C-96-F3-E3'
        # login_req.InnerIPAddress = '10.0.1.102'
        # login_req.OuterIPAddress = '58.246.43.50'
        TradeSimpleApi.req_id += 1
        ret = self.api.ReqUserLogin(login_req, TradeSimpleApi.req_id)
        if ret != 0:
            raise Exception('ReqUserLogin fail, ret[%d]' % ret)
class MyTradeActionCallback(TradeActionCallback):
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def __init__(self, UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api: traderapi.CTORATstpTraderApi):
        self.__tradeSimpleApi = TradeSimpleApi(UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api)
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
            async_log_util.info(logger_local_huaxin_trade_debug,
                                f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id}  data:{data}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
            code = data["code"]
            volume = data["volume"]
            price = data["price"]
            sinfo = data["sinfo"]
            order_ref = data.get("order_ref")
            shadow_price = data.get("shadow_price")
            blocking = data.get("blocking")
            if direction == 1:
                async_log_util.info(logger_trade, f"{code}华鑫本地开始下单")
                # 买
                try:
                    if blocking:
                        req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref)
                    # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                    #                  daemon=True).start()
                    self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref,
                                                  shadow_price)
                    async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
                                  request_id)
                async_log_util.info(logger_local_huaxin_trade_debug,
                                    f"买入结束:code-{code} sinfo-{sinfo}")
            elif direction == 2:
                try:
                    price_type = data["price_type"]
                    if blocking:
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref)
                    printlog("sell", req_rid_dict)
                except Exception as e:
                    logging.exception(e)
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
                                  request_id)
        elif type_ == 2:
            async_log_util.info(logger_local_huaxin_trade_debug,
                                f"\n---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 撤单
            direction = data["direction"]
            code = data["code"]
            orderSysID = data.get("orderSysID")
            orderRef = data.get("orderRef")
            orderActionRef = data.get("orderActionRef")
            sinfo = data["sinfo"]
            if direction == 1:
                # 撤买
                try:
                    if not orderSysID and orderRef is None:
                        raise Exception("没有找到系统订单号或者报单引用")
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.trade_thread_pool.submit(
                        lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                 order_ref=orderRef, order_action_ref=orderActionRef))
                    async_log_util.info(logger_local_huaxin_trade_debug,
                                        f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
            elif direction == 2:
                # 撤卖
                try:
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.cancel_sell(code, orderSysID, sinfo)
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
    def OnDealList(self, client_id, request_id, sk):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            # printlog("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            logging.exception(e)
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnDelegateList(self, client_id, request_id, sk, is_cancel):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnMoney(self, client_id, request_id, sk):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求账户:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnPositionList(self, client_id, request_id, sk):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求持仓:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnTest(self, client_id, request_id, data, sk):
        send_response(
            json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                        "request_id": request_id}), type, client_id, request_id, show_log=False, sk=sk)
log.py
File was deleted
log_module/async_log_util.py
New file
@@ -0,0 +1,101 @@
"""
异步日志管理器
"""
import logging
import queue
import threading
import time
from log_module.log import printlog, logger_system, logger_debug
from utils import tool
class AsyncLogManager:
    def __init__(self):
        self.__log_queue = queue.Queue()
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
    def add_log(self, data):
        self.__log_queue.put_nowait(data)
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
    def info(self, logger, *args):
        self.__add_log(logger, "info", *args)
    def warning(self, logger, *args):
        self.__add_log(logger, "warning", *args)
    def error(self, logger, *args):
        self.__add_log(logger, "error", *args)
    def exception(self, logger, *args):
        self.__add_log(logger, "exception", *args)
    # 运行同步日志
    def run_sync(self, add_to_common_log=False):
        printlog("run_sync", add_to_common_log)
        logger_system.info(f"run_sync 线程ID:{tool.get_thread_id()}")
        while True:
            # val = self.__log_queue.get()
            try:
                val = self.__log_queue.get()
                if not add_to_common_log:
                    time_s = val[1]
                    cmd = val[2]
                    method = getattr(val[0], cmd)
                    d = list(val[3])
                    d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
                    d = tuple(d)
                    method(*d)
                else:
                    _common_log.add_log(val)
            except Exception as e:
                logging.exception(e)
l2_data_log = AsyncLogManager()
huaxin_l2_log = AsyncLogManager()
_common_log = AsyncLogManager()
def debug(logger, *args):
    _common_log.debug(logger, *args)
def info(logger, *args):
    _common_log.info(logger, *args)
def warning(logger, *args):
    _common_log.warning(logger, *args)
def error(logger, *args):
    _common_log.error(logger, *args)
def exception(logger, *args):
    _common_log.exception(logger, *args)
# 运行同步日志
def run_sync():
    logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}")
    _common_log.run_sync()
if __name__ == "__main__":
    # info(logger_debug, "*-{}", "test")
    asyncLogManager = AsyncLogManager()
    asyncLogManager.info(logger_debug, "测试123")
    threading.Thread(target=lambda: asyncLogManager.run_sync(), daemon=True).start()
    time.sleep(1)
    # info(logger_debug, "002375")
    run_sync()
log_module/log.py
New file
@@ -0,0 +1,464 @@
"""
日志
"""
import logging
import os
import sys
from loguru import logger
import constant
class MyLogger:
    def __init__(self):
        logger.remove()
        #   每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
        logger.add(self.get_path("trade", "trade_gui"),
                   filter=lambda record: record["extra"].get("name") == "trade_gui",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade",
                   rotation="00:00",
                   compression="zip", enqueue=True)
        logger.add(self.get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate",
                   rotation="00:00",
                   compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_process_time"),
                   filter=lambda record: record["extra"].get("name") == "l2_process_time",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_latest_data"),
                   filter=lambda record: record["extra"].get("name") == "l2_latest_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("mysql", "mysql_debug"),
                   filter=lambda record: record["extra"].get("name") == "mysql_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "info", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_cancel"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/s_cancel"),
                   filter=lambda record: record["extra"].get("name") == "s_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/h_cancel"),
                   filter=lambda record: record["extra"].get("name") == "h_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/l_cancel"),
                   filter=lambda record: record["extra"].get("name") == "l_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/g_cancel"),
                   filter=lambda record: record["extra"].get("name") == "g_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/d_cancel"),
                   filter=lambda record: record["extra"].get("name") == "d_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/f_cancel"),
                   filter=lambda record: record["extra"].get("name") == "f_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_big_data"),
                   filter=lambda record: record["extra"].get("name") == "l2_big_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_queue"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy_queue"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy_progress"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_real_place_order_position"),
                   filter=lambda record: record["extra"].get("name") == "l2_real_place_order_position",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_subscript"),
                   filter=lambda record: record["extra"].get("name") == "l2_codes_subscript",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_market_sell"),
                   filter=lambda record: record["extra"].get("name") == "l2_market_sell",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_not_buy_reasons"),
                   filter=lambda record: record["extra"].get("name") == "l2_not_buy_reasons",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_trade"),
                   filter=lambda record: record["extra"].get("name") == "juejin_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "huaxin_trade"),
                   filter=lambda record: record["extra"].get("name") == "huaxin_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "code_operate"),
                   filter=lambda record: record["extra"].get("name") == "code_operate",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "print", enqueue=True)
        logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_1_volumn"),
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_1_volumn_record"),
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "trade_queue_price_info"),
                   filter=lambda record: record["extra"].get("name") == "trade_queue_price_info",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "day_volumn"),
                   filter=lambda record: record["extra"].get("name") == "day_volumn",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_win_distibute"),
                   filter=lambda record: record["extra"].get("name") == "buy_win_distibute",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("first_code", "first_code_record"),
                   filter=lambda record: record["extra"].get("name") == "first_code_record",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("debug", "debug"),
                   filter=lambda record: record["extra"].get("name") == "debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("debug", "request_api"),
                   filter=lambda record: record["extra"].get("name") == "request_api",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("trade", "trade_record"),
                   filter=lambda record: record["extra"].get("name") == "trade_record",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("trade", "position_api_request"),
                   filter=lambda record: record["extra"].get("name") == "position_api_request",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("score", "place_order_score"),
                   filter=lambda record: record["extra"].get("name") == "place_order_score",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_limit_up_reason_change"),
                   filter=lambda record: record["extra"].get("name") == "kpl_limit_up_reason_change",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_limit_up"),
                   filter=lambda record: record["extra"].get("name") == "kpl_limit_up",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_debug"),
                   filter=lambda record: record["extra"].get("name") == "kpl_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_block_can_buy"),
                   filter=lambda record: record["extra"].get("name") == "kpl_block_can_buy",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_open_limit_up"),
                   filter=lambda record: record["extra"].get("name") == "kpl_open_limit_up",
                   rotation="00:00", compression="zip", enqueue=True)
        # 看盘日志
        logger.add(self.get_path("kp", "kp_msg"),
                   filter=lambda record: record["extra"].get("name") == "kp_msg",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("redis", "redis_debug"),
                   filter=lambda record: record["extra"].get("name") == "redis_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("profile", "profile"),
                   filter=lambda record: record["extra"].get("name") == "profile",
                   rotation="00:00", compression="zip", enqueue=True)
        ################################华鑫日志################################
        logger.add(self.get_hx_path("l2", "transaction"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "transaction_sell_order"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction_sell_order",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "transaction_desc"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction_desc",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "orderdetail"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_orderdetail",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "marketdata"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_market_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "upload"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_upload",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "debug"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("contact", "debug"),
                   filter=lambda record: record["extra"].get("name") == "hx_contact_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("trade", "trade_callback"),
                   filter=lambda record: record["extra"].get("name") == "hx_trade_callback",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("trade", "debug"),
                   filter=lambda record: record["extra"].get("name") == "hx_trade_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("trade", "trade_loop"),
                   filter=lambda record: record["extra"].get("name") == "hx_trade_loop",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "transaction"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_transaction",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "orderdetail"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_orderdetail",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "upload"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_upload",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "error"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_error",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "subscript"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_subscript",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "market"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_market",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("contact", "debug"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("trade", "trade_debug"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_trade_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_show_info")
        logger.add(self.get_local_huaxin_path("l1", "show_info"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_show_info",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l1", "l1_for_trade"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_trade_info",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "g_cancel"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_g_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "special_volume"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_special_volume",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "l2_buy_no"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_buy_no",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("request", "request_debug"),
                   filter=lambda record: record["extra"].get("name") == "request_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("request", "tuoguan_request_debug"),
                   filter=lambda record: record["extra"].get("name") == "tuoguan_request_debug",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        path_str = "{}/{}/gp/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name,
                                           log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
    def get_hx_path(self, dir_name, log_name):
        path_str = "{}/{}/huaxin/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name,
                                               log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
    def get_local_huaxin_path(self, dir_name, log_name):
        path_str = "{}/{}/huaxin_local/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name,
                                                     log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
    def get_logger(self, log_name):
        return logger.bind(name=log_name)
__mylogger = MyLogger()
logger_print = __mylogger.get_logger("print")
logger_info = __mylogger.get_logger("info")
logger_trade_gui = __mylogger.get_logger("trade_gui")
logger_trade = __mylogger.get_logger("trade")
logger_trade_delegate = __mylogger.get_logger("delegate")
logger_l2_error = __mylogger.get_logger("l2_error")
logger_l2_process = __mylogger.get_logger("l2_process")
logger_l2_process_time = __mylogger.get_logger("l2_process_time")
logger_l2_data = __mylogger.get_logger("l2_data")
logger_l2_latest_data = __mylogger.get_logger("l2_latest_data")
logger_l2_market_sell = __mylogger.get_logger("l2_market_sell")
logger_l2_not_buy_reasons = __mylogger.get_logger("l2_not_buy_reasons")
logger_l2_trade = __mylogger.get_logger("l2_trade")
logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel")
logger_l2_s_cancel = __mylogger.get_logger("s_cancel")
logger_l2_h_cancel = __mylogger.get_logger("h_cancel")
logger_l2_d_cancel = __mylogger.get_logger("d_cancel")
logger_l2_f_cancel = __mylogger.get_logger("f_cancel")
logger_l2_l_cancel = __mylogger.get_logger("l_cancel")
logger_l2_g_cancel = __mylogger.get_logger("g_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress")
logger_real_place_order_position = __mylogger.get_logger("l2_real_place_order_position")
# 代码订阅日志
logger_l2_codes_subscript = __mylogger.get_logger("l2_codes_subscript")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
logger_juejin_trade = __mylogger.get_logger("juejin_trade")
logger_huaxin_trade = __mylogger.get_logger("huaxin_trade")
logger_code_operate = __mylogger.get_logger("code_operate")
logger_device = __mylogger.get_logger("device")
logger_system = __mylogger.get_logger("system")
logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn")
logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record")
logger_trade_queue_price_info = __mylogger.get_logger("trade_queue_price_info")
logger_day_volumn = __mylogger.get_logger("day_volumn")
logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute")
logger_first_code_record = __mylogger.get_logger("first_code_record")
logger_debug = __mylogger.get_logger("debug")
logger_request_api = __mylogger.get_logger("request_api")
logger_trade_record = __mylogger.get_logger("trade_record")
logger_trade_position_api_request = __mylogger.get_logger("position_api_request")
logger_place_order_score = __mylogger.get_logger("place_order_score")
logger_kpl_limit_up_reason_change = __mylogger.get_logger("kpl_limit_up_reason_change")
logger_kpl_limit_up = __mylogger.get_logger("kpl_limit_up")
logger_kpl_debug = __mylogger.get_logger("kpl_debug")
logger_kpl_block_can_buy = __mylogger.get_logger("kpl_block_can_buy")
logger_kpl_open_limit_up = __mylogger.get_logger("kpl_open_limit_up")
logger_kp_msg = __mylogger.get_logger("kp_msg")
logger_redis_debug = __mylogger.get_logger("redis_debug")
logger_profile = __mylogger.get_logger("profile")
logger_mysql_debug = __mylogger.get_logger("mysql_debug")
# -------------------------------华鑫日志---------------------------------
hx_logger_l2_orderdetail = __mylogger.get_logger("hx_l2_orderdetail")
hx_logger_l2_transaction = __mylogger.get_logger("hx_l2_transaction")
hx_logger_l2_transaction_sell_order = __mylogger.get_logger("hx_l2_transaction_sell_order")
hx_logger_l2_transaction_desc = __mylogger.get_logger("hx_l2_transaction_desc")
hx_logger_l2_market_data = __mylogger.get_logger("hx_l2_market_data")
hx_logger_l2_upload = __mylogger.get_logger("hx_l2_upload")
hx_logger_l2_debug = __mylogger.get_logger("hx_l2_debug")
hx_logger_contact_debug = __mylogger.get_logger("hx_contact_debug")
hx_logger_trade_callback = __mylogger.get_logger("hx_trade_callback")
hx_logger_trade_debug = __mylogger.get_logger("hx_trade_debug")
hx_logger_trade_loop = __mylogger.get_logger("hx_trade_loop")
# -------------------------------华鑫本地日志---------------------------------
logger_local_huaxin_l2_transaction = __mylogger.get_logger("local_huaxin_transaction")
logger_local_huaxin_l2_orderdetail = __mylogger.get_logger("local_huaxin_orderdetail")
logger_local_huaxin_l2_upload = __mylogger.get_logger("local_huaxin_upload")
logger_local_huaxin_l2_error = __mylogger.get_logger("local_huaxin_error")
logger_local_huaxin_l2_subscript = __mylogger.get_logger("local_huaxin_subscript")
logger_local_huaxin_l2_market = __mylogger.get_logger("local_huaxin_l2_market")
logger_local_huaxin_contact_debug = __mylogger.get_logger("local_huaxin_debug")
logger_local_huaxin_trade_debug = __mylogger.get_logger("local_huaxin_trade_debug")
logger_local_huaxin_l1 = __mylogger.get_logger("local_huaxin_l1_show_info")
logger_local_huaxin_g_cancel = __mylogger.get_logger("local_huaxin_g_cancel")
logger_local_huaxin_l2_buy_no = __mylogger.get_logger("local_huaxin_l2_buy_no")
logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info")
logger_local_huaxin_l2_special_volume = __mylogger.get_logger("local_huaxin_l2_special_volume")
logger_request_debug = __mylogger.get_logger("request_debug")
logger_tuoguan_request_debug = __mylogger.get_logger("tuoguan_request_debug")
def close_print():
    logging.basicConfig(level=logging.ERROR)
    if not constant.is_windows():
        os.close(1)
        os.open('/dev/null', os.O_WRONLY)
def printlog(*args):
    logger_print.info(args)
if __name__ == "__main__":
    open_limit_up_codes = set({"000333", "000222"})
    logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}")
log_module/request_log_util.py
New file
@@ -0,0 +1,10 @@
from log_module import async_log_util
from log_module.log import logger_request_debug
from utils import tool
def request_info(type_name, content, thread_id=None):
    if not thread_id:
        thread_id = tool.get_thread_id()
    async_log_util.info(logger_request_debug, f"【{thread_id}】【{type_name}】 {content}")
main.py
@@ -1,18 +1,23 @@
import multiprocessing
import threading
import constant
import data_server
import log
import middle_api_server
import middle_cb_api_server
import middle_server
# from huaxin_client import huaxin_trade_client
# from trade import huaxin_trade_api
# from huaxin_client import huaxin_trade_client
from log_module import async_log_util
# from trade import huaxin_trade_api
if __name__ == "__main__":
    t1 = threading.Thread(target=lambda: middle_api_server.run(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: data_server.run("0.0.0.0", constant.DATA_SERVER_PORT), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: log.async_log_util.run_sync(), daemon=True)
    t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: middle_server.run(12880), daemon=True)
    t1.start()
@@ -21,4 +26,15 @@
    t1.start()
    # t1 = threading.Thread(target=lambda: middle_l1_data_server.run(12881), daemon=True)
    # t1.start()
    # 运行仿真交易
    # queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query)
    # ===========运行交易端==========
    # tradeProcess = multiprocessing.Process(
    #     target=huaxin_trade_client.run,
    #     args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,))
    # tradeProcess.start()
    middle_server.run()
middle_api_server.py
@@ -8,12 +8,12 @@
import time
import constant
import log
import socket_manager
import trade_manager
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
from log import logger_request_debug
from log_module import log, request_log_util
from log_module.log import logger_request_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
@@ -69,7 +69,7 @@
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    try:
                        log.request_info("middle_api_server", f"请求开始:{type_}")
                        request_log_util.request_info("middle_api_server", f"请求开始:{type_}")
                        if type(type_) == int:
                            # 处理数字型TYPE
                            return_str = self.process_num_type(sk, type_, data_str)
@@ -397,7 +397,7 @@
                            results = L1DataManager.get_current_l1_data()
                            return_str = json.dumps({"code": 0, "data": results})
                    finally:
                        log.request_info("middle_api_server", f"请求结束:{type_}")
                        request_log_util.request_info("middle_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
middle_cb_api_server.py
@@ -3,8 +3,9 @@
import logging
import socket
import socketserver
import log
from log import logger_request_debug
from log_module import log, request_log_util
from log_module.log import logger_request_debug
from utils import socket_util, hosting_api_util
"""
@@ -60,7 +61,7 @@
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    try:
                        log.request_info("middle_cb_api_server", f"请求开始:{type_}")
                        request_log_util.request_info("middle_cb_api_server", f"请求开始:{type_}")
                        is_sign_right = socket_util.is_client_params_sign_right(data_json)
                        # ------客户端请求接口-------
                        if type_ == 'buy':
@@ -152,7 +153,7 @@
                            return_str = json.dumps(result)
                            break
                    finally:
                        log.request_info("middle_cb_api_server", f"请求结束:{type_}")
                        request_log_util.request_info("middle_cb_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
middle_l1_data_server.py
@@ -3,21 +3,13 @@
import json
import logging
import queue
import random
import socket
import socketserver
import threading
import time
import constant
import log
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
from log_module import log
from utils import socket_util
trade_data_request_queue = queue.Queue()
middle_server.py
@@ -1,4 +1,5 @@
import builtins
import copy
import hashlib
import json
import logging
@@ -10,17 +11,34 @@
import time
import constant
import log
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from log_module import log
from log_module.log import logger_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from trade import huaxin_trade_api
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
trade_data_request_queue = queue.Queue()
__mysql_config_dict = {}
def get_mysql_config(db_name):
    """
    获取mysql的配置
    :param db_name:
    :return:
    """
    if db_name in __mysql_config_dict:
        return __mysql_config_dict.get(db_name)
    config = copy.deepcopy(constant.MYSQL_CONFIG)
    config["database"] = db_name
    __mysql_config_dict[db_name] = config
    return config
class MyTCPServer(socketserver.TCPServer):
@@ -235,7 +253,8 @@
                                db = data["db"]
                                cmd = data["cmd"]
                                args = data.get("args")
                                mysql = mysql_data.Mysqldb()
                                mysql_config = get_mysql_config(db)
                                mysql = mysql_data.Mysqldb(mysql_config)
                                method = getattr(mysql, cmd)
                                args_ = []
                                if args:
@@ -300,11 +319,21 @@
                            datas = data_json["data"]
                            L1DataManager.add_datas(datas)
                            break
                        elif data_json["type"] == "simulation_trade":
                            datas = data_json["data"]
                            ctype = datas["ctype"]
                            data = datas["data"]
                            result = huaxin_trade_api.request(ctype,data)
                            result_str = json.dumps({"code": 0, "data": result}, default=str)
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
                        if time.time() - __start_time > 2:
                            log.logger_tuoguan_request_debug.info(f"耗时:{int(time.time() - __start_time)}s  数据:{data_json}")
                            log.logger_tuoguan_request_debug.info(
                                f"耗时:{int(time.time() - __start_time)}s  数据:{data_json}")
                else:
                    # 断开连接
                    break
@@ -341,7 +370,7 @@
                pass
def run(port =  constant.MIDDLE_SERVER_PORT):
def run(port=constant.MIDDLE_SERVER_PORT):
    print("create MiddleServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
@@ -353,4 +382,4 @@
if __name__ == "__main__":
    print(builtins.type("")==str)
    pass
output/push_msg_manager.py
@@ -3,7 +3,7 @@
"""
import json
from log import logger_debug
from log_module.log import logger_debug
from utils import socket_util
TYPE_ORDER_ALMOST_DEAL = "order_almost_deal"  # 订单即将成交
socket_manager.py
@@ -18,7 +18,7 @@
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
@@ -30,7 +30,7 @@
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
third_data/kpl_data_manager.py
@@ -7,7 +7,7 @@
import constant
from db.redis_manager import RedisUtils
from log import logger_kpl_limit_up_reason_change
from log_module.log import logger_kpl_limit_up_reason_change
from utils import tool
# 开盘啦历史涨停数据管理
trade/huaxin_trade_api.py
New file
@@ -0,0 +1,323 @@
"""
交易API
"""
import json
import logging
import multiprocessing
import queue
import random
import threading
import time
import concurrent.futures
from utils import socket_util, tool
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
__save_data_queue = queue.Queue()
def __run_recv_queue_trade(queue: multiprocessing.Queue):
    # 设置结果
    def __set_response(data_json):
        if 'request_id' not in data_json:
            return
        # 设置响应内容
        set_response(data_json["request_id"], data_json['data'])
    if queue is not None:
        while True:
            try:
                val = queue.get()
                if val:
                    data_json = json.loads(val)
                    # 处理数据
                    type_ = data_json["type"]
                    if type_ == "response":
                        # 主动触发的响应
                        request_id = data_json['request_id']
                        __response_thread_pool.submit(__set_response, data_json)
                    elif type_ == "trade_callback":
                        try:
                            # 交易回调
                            data_json = data_json["data"]
                            ctype = data_json["type"]
                            # 记录交易反馈日志
                        finally:
                            pass
            except:
                pass
# 设置交易通信队列
# 暂时不会使用该方法
def run_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_query_):
    """
    :param queue_strategy_r_trade_w_: 接收交易结果数据队列
    :param queue_strategy_w_trade_r_: 发送交易指令队列
    :param queue_strategy_w_trade_r_for_query_:发送查询的交易指令队列
    :param trade_ipc_addr:
    :return:
    """
    global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_r_for_query_
    # 读取交易结果
    threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True).start()
# 交易通道的错误次数
trade_pipe_channel_error_count = 0
# pipe的交易通道是否正常
def is_pipe_channel_normal():
    return True
# 测试交易通道
def test_trade_channel():
    global trade_pipe_channel_error_count
    sid = random.randint(0, 1000000)
    result = __test_trade_channel(sid)
    if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
        trade_pipe_channel_error_count = 0
        return True
    trade_pipe_channel_error_count += 1
    if trade_pipe_channel_error_count > 100:
        trade_pipe_channel_error_count = 100
    return False
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
    CLIENT_TYPE_DEAL_LIST = "deal_list"
    CLIENT_TYPE_POSITION_LIST = "position_list"
    CLIENT_TYPE_MONEY = "money"
    CLIENT_TYPE_DEAL = "deal"
    CLIENT_TYPE_CMD_L2 = "l2_cmd"
    socket_client_dict = {}
    socket_client_lock_dict = {}
    active_client_dict = {}
    @classmethod
    def list_client(cls, _type):
        if _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                return cls.socket_client_dict.get(_type)
        else:
            if _type in cls.socket_client_dict:
                return [cls.socket_client_dict.get(_type)]
        return []
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_TRADE:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
            cls.socket_client_dict[_type].append((rid, sk))
            cls.socket_client_lock_dict[rid] = threading.Lock()
        else:
            cls.socket_client_dict[_type] = (rid, sk)
            cls.socket_client_lock_dict[rid] = threading.Lock()
    # 是否已经被锁住
    @classmethod
    def is_client_locked(cls, rid):
        if rid in cls.socket_client_lock_dict:
            return cls.socket_client_lock_dict[rid].locked()
        return None
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
                                                                                                                          0] in cls.active_client_dict else 0,
                                     reverse=True)
                for d in client_list:
                    if d[0] in cls.socket_client_lock_dict:
                        try:
                            if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                                return d
                        except threading.TimeoutError:
                            pass
        else:
            if _type in cls.socket_client_dict:
                try:
                    d = cls.socket_client_dict[_type]
                    if d[0] in cls.socket_client_lock_dict:
                        if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                            return d
                except threading.TimeoutError:
                    pass
        return None
    @classmethod
    def release_client(cls, client_id):
        sucess = False
        if client_id in cls.socket_client_lock_dict:
            sucess = True
            # 释放锁
            if cls.socket_client_lock_dict[client_id].locked():
                cls.socket_client_lock_dict[client_id].release()
    @classmethod
    def del_client(cls, rid):
        # 删除线程锁
        if rid in cls.socket_client_lock_dict:
            cls.socket_client_lock_dict.pop(rid)
        # 删除sk
        for t in cls.socket_client_dict:
            if type(cls.socket_client_dict[t]) == list:
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        try:
                            # 关闭socket
                            d[1].close()
                        except:
                            pass
                        cls.socket_client_dict[t].remove(d)
                        break
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    try:
                        # 关闭socket
                        cls.socket_client_dict[t][1].close()
                    except:
                        pass
                    cls.socket_client_dict.pop(t)
                    break
    # 心跳信息
    @classmethod
    def heart(cls, rid):
        cls.active_client_dict[rid] = time.time()
    @classmethod
    def del_invalid_clients(cls):
        # 清除长时间无心跳的客户端通道
        for k in cls.active_client_dict.keys():
            if time.time() - cls.active_client_dict[k] > 20:
                # 心跳时间间隔20s以上视为无效
                cls.del_client(k)
TRADE_DIRECTION_BUY = 1
TRADE_DIRECTION_SELL = 2
# 超时时间2s
TIMEOUT = 2.0
# 交易代理
TRADE_DELEGATED = True
# 等待响应的request_id
__request_response_dict = {}
def __get_request_id(type):
    return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
# 网络请求
def __request(_type, data, request_id=None, log_enable=True, is_trade=False):
    """
    请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离
    @param _type:
    @param data:
    @param request_id:
    @param log_enable:
    @param is_trade:
    @return:
    """
    if not request_id:
        request_id = __get_request_id(_type)
    try:
        root_data = {"type": _type,
                     "data": data,
                     "request_id": request_id,
                     "time": time.time()
                     }
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        if is_trade:
            queue_strategy_w_trade_r.put_nowait(root_data)
        else:
            queue_strategy_w_trade_r_for_query.put_nowait(root_data)
        use_time = int((time.time() - start_time) * 1000)
    except BrokenPipeError as e:
        raise e
    except Exception as e:
        logging.exception(e)
        raise e
    return request_id
def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True):
    if blocking:
        start_time = time.time()
        try:
            while True:
                time.sleep(0.005)
                if request_id in __request_response_dict:
                    # 获取到了响应内容
                    result = __request_response_dict.pop(request_id)
                    return result
                if time.time() - start_time > timeout:
                    # 读取内容超时才会释放
                    raise Exception(f"读取内容超时: request_id={request_id}")
        finally:
            pass
    return None
def set_response(request_id, response):
    if request_id:
        # 主动触发
        __request_response_dict[request_id] = response
    else:
        # 被动触发
        pass
def request(type_, data):
    request_id = __request(type_,
                           data)
    return __read_response(request_id, blocking=True)
# 设置L2订阅数据
def __test_trade_channel(sid):
    request_id = __request("test",
                           {"type": "test", "data": {"sid": sid}}, log_enable=False)
    return __read_response(request_id, True, log_enable=False)
def parseResponse(data_str):
    if not data_str:
        raise Exception("反馈内容为空")
    res = data_str
    if type(res) == str:
        res = json.loads(data_str)
    res = res['data']
    if res['code'] != 0:
        raise Exception(res['msg'])
    return res['data']
if __name__ == "__main__":
    pass
traderapi.py
New file
Diff too large
utils/hosting_api_util.py
@@ -388,6 +388,20 @@
    return common_request_for_cb({"ctype": "get_account_money"}, blocking)
def refresh_trade_data_for_cb(type_, blocking=True):
    """
    刷新可转债交易数据
    :param code:
    :param blocking:
    :return:
    """
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_CB,
                                   {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type_,
                                    "sinfo": f"cb_{API_TYPE_REFRESH_TRADE_DATA}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
def common_request_for_cb(params, blocking=True):
    """
    通用请求
utils/kp_client_msg_manager.py
@@ -6,7 +6,7 @@
import threading
import time
from log import logger_kp_msg
from log_module.log import logger_kp_msg
from utils import log_export
CLIENT_IDS = ["zjb", "hxh"]