Administrator
2024-05-16 27e49e5782e07566aac42d6363bd5233bf5e396d
可转债仿真交易/print方法替换
29个文件已修改
487 ■■■■ 已修改文件
cb_main.py 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_atrribute/history_k_data_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/mysql_data.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/cb/l2_client_for_cb.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/cb/test_trade.py 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/cb/trade_client_for_cb.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/cb/trade_manager.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/client_network.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/huaxin_trade_api.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client_for_output.py 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client_for_trade.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_subscript_codes_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_market_client.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/l2_huaxin_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_source_util.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade_data_update.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/sell_rule_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_strategy.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/outside_api_command_manager.py 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/socket_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cb_main.py
@@ -2,10 +2,47 @@
可转债入口函数
"""
import constant
from log_module.log import logger_debug
constant.LOG_DIR = "logs_cb"
from huaxin_client.cb import l2_client_for_cb
from utils import middle_api_protocol
from utils import middle_api_protocol, outside_api_command_manager
def command_callback(client_id, request_id, data):
    """
    命令回调
    :param client_id:
    :param request_id:
    :param data: json格式数据
    :return:
    """
    type_ = data.get('type')
    if type_ == outside_api_command_manager.API_TYPE_TRADE:
        # 交易
        pass
    elif type_ == "get_code_position_info":
        # 查询此仓
        pass
    elif type_ == "get_code_position_info":
        # 查询此仓
        pass
    elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST:
        # 常规接口
        ctype = data['ctype']
        if ctype == 'get_account_money':
            # 获取账户资金
            pass
    logger_debug.info(f"接收到命令:{request_id} - f{client_id} - {data}")
if __name__ == '__main__':
    middle_api_protocol.SERVER_PORT = 10008
    middle_api_protocol.SERVER_HOST = "43.138.167.68"
    # middle_api_protocol.SERVER_HOST = "192.168.3.122"
    manager = outside_api_command_manager.NewApiCommandManager()
    manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
                 command_callback, [("trade_cb", 20)])
    manager.run(blocking=False)
    l2_client_for_cb.run()
code_atrribute/history_k_data_util.py
@@ -10,7 +10,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_request_api
from log_module.log import logger_request_api, printlog
from utils import tool, middle_api_protocol
from db import redis_manager_delegate as redis_manager
@@ -317,4 +317,4 @@
if __name__ == "__main__":
    results = HistoryKDatasUtils.get_codes_limit_rate(list({"000422", "600610"}))
    print(results)
    printlog(results)
db/mysql_data.py
@@ -5,6 +5,7 @@
# 把连接参数定义成字典
import constant
from log_module.log import printlog
config = constant.MYSQL_CONFIG
@@ -53,7 +54,7 @@
            # 提交
            self.conn.commit()
        except Exception as e:
            print("提交出错\n:", e)
            printlog("提交出错\n:", e)
            logging.exception(e)
            # 如果出错要回滚
            self.conn.rollback()
@@ -66,7 +67,7 @@
            self.conn.commit()
        except Exception as e:
            logging.exception(e)
            print("提交出错\n:", e)
            printlog("提交出错\n:", e)
            # 如果出错要回滚
            self.conn.rollback()
huaxin_client/cb/l2_client_for_cb.py
@@ -20,7 +20,7 @@
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog
from utils import tool
###B类###
@@ -111,7 +111,7 @@
            time.sleep(delay)
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        printlog("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        if add_codes:
@@ -233,7 +233,7 @@
def __init_l2(codes):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    printlog(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
@@ -316,7 +316,7 @@
            cb_count_dict[r['underlying_symbol']] = []
        cb_count_dict[r['underlying_symbol']].append(r['symbol'])
        if len(cb_count_dict[r['underlying_symbol']]) > 1:
            print(r['underlying_symbol'], cb_count_dict[r['underlying_symbol']])
            printlog(r['underlying_symbol'], cb_count_dict[r['underlying_symbol']])
            exclude_codes.extend(cb_count_dict[r['underlying_symbol']])
    ffresults = []
    for x in fresults:
huaxin_client/cb/test_trade.py
@@ -2,6 +2,7 @@
# -*- coding: UTF-8 -*-
import traderapi
from log_module.log import printlog
''' 注意: 如果提示找不到_tradeapi 且与已发布的库文件不一致时,可自行重命名为_tradeapi.so (windows下为_tradeapi.pyd)'''
@@ -36,25 +37,25 @@
        self.__session_id = 0
    def OnFrontConnected(self) -> "void":
        print('OnFrontConnected')
        printlog('OnFrontConnected')
        # 获取终端信息
        self.__req_id += 1
        ret = self.__api.ReqGetConnectionInfo(self.__req_id)
        if ret != 0:
            print('ReqGetConnectionInfo fail, ret[%d]' % ret)
            printlog('ReqGetConnectionInfo fail, ret[%d]' % ret)
    def OnFrontDisconnected(self, nReason: "int") -> "void":
        print('OnFrontDisconnected: [%d]' % nReason)
        printlog('OnFrontDisconnected: [%d]' % nReason)
    def OnRspGetConnectionInfo(self, pConnectionInfoField: "CTORATstpConnectionInfoField",
                               pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            print('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress)
            print('inner_port[%d]' % pConnectionInfoField.InnerPort)
            print('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress)
            print('outer_port[%d]' % pConnectionInfoField.OuterPort)
            print('mac_address[%s]' % pConnectionInfoField.MacAddress)
            printlog('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress)
            printlog('inner_port[%d]' % pConnectionInfoField.InnerPort)
            printlog('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress)
            printlog('outer_port[%d]' % pConnectionInfoField.OuterPort)
            printlog('mac_address[%s]' % pConnectionInfoField.MacAddress)
            # 请求登录
            login_req = traderapi.CTORATstpReqUserLoginField()
@@ -98,16 +99,16 @@
            self.__req_id += 1
            ret = self.__api.ReqUserLogin(login_req, self.__req_id)
            if ret != 0:
                print('ReqUserLogin fail, ret[%d]' % ret)
                printlog('ReqUserLogin fail, ret[%d]' % ret)
        else:
            print('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % (
            printlog('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % (
            nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUserLogin(self, pRspUserLoginField: "traderapi.CTORATstpRspUserLoginField",
                       pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            print('Login success! [%d]' % nRequestID)
            printlog('Login success! [%d]' % nRequestID)
            self.__front_id = pRspUserLoginField.FrontID
            self.__session_id = pRspUserLoginField.SessionID
@@ -122,7 +123,7 @@
                self.__req_id += 1
                ret = self.__api.ReqUserPasswordUpdate(req_field, self.__req_id)
                if ret != 0:
                    print('ReqUserPasswordUpdate fail, ret[%d]' % ret)
                    printlog('ReqUserPasswordUpdate fail, ret[%d]' % ret)
                return
            if 0:
@@ -136,7 +137,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQrySecurity(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQrySecurity fail, ret[%d]' % ret)
                    printlog('ReqQrySecurity fail, ret[%d]' % ret)
            if 1:
                # 查询投资者
@@ -148,7 +149,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQryInvestor(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQryInvestor fail, ret[%d]' % ret)
                    printlog('ReqQryInvestor fail, ret[%d]' % ret)
            if 1:
                # 查询股东账号
@@ -160,7 +161,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQryShareholderAccount(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQryShareholderAccount fail, ret[%d]' % ret)
                    printlog('ReqQryShareholderAccount fail, ret[%d]' % ret)
            if 1:
                # 查询资金账号
@@ -174,7 +175,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQryTradingAccount(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQryTradingAccount fail, ret[%d]' % ret)
                    printlog('ReqQryTradingAccount fail, ret[%d]' % ret)
            if 1:
                # 查询报单
@@ -191,7 +192,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQryOrder(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQryOrder fail, ret[%d]' % ret)
                    printlog('ReqQryOrder fail, ret[%d]' % ret)
            if 1:
                # 查询持仓
@@ -203,7 +204,7 @@
                self.__req_id += 1
                ret = self.__api.ReqQryPosition(req_field, self.__req_id)
                if ret != 0:
                    print('ReqQryPosition fail, ret[%d]' % ret)
                    printlog('ReqQryPosition fail, ret[%d]' % ret)
            if 0:
                # 请求报单
@@ -248,7 +249,7 @@
                self.__req_id += 1
                ret = self.__api.ReqOrderInsert(req_field, self.__req_id)
                if ret != 0:
                    print('ReqOrderInsert fail, ret[%d]' % ret)
                    printlog('ReqOrderInsert fail, ret[%d]' % ret)
            if 0:
                # 请求撤单
@@ -281,7 +282,7 @@
                self.__req_id += 1
                ret = self.__api.ReqOrderAction(req_field, self.__req_id)
                if ret != 0:
                    print('ReqOrderAction fail, ret[%d]' % ret)
                    printlog('ReqOrderAction fail, ret[%d]' % ret)
            if 0:
                # 查询集中交易资金
@@ -294,7 +295,7 @@
                self.__req_id += 1
                ret = self.__api.ReqInquiryJZFund(req_field, self.__req_id)
                if ret != 0:
                    print('ReqInquiryJZFund fail, ret[%d]' % ret)
                    printlog('ReqInquiryJZFund fail, ret[%d]' % ret)
            if 0:
                # 资金转移(包括资金调拨和银证转账)
@@ -336,7 +337,7 @@
                self.__req_id += 1
                ret = self.__api.ReqTransferFund(req_field, self.__req_id)
                if ret != 0:
                    print('ReqTransferFund fail, ret[%d]', ret)
                    printlog('ReqTransferFund fail, ret[%d]', ret)
            if 0:
                '''登出,目前登出成功连接会立即被柜台系统断开,终端不会收到OnRspUserLogout应答
@@ -348,24 +349,24 @@
                self.__req_id += 1
                ret = self.__api.ReqUserLogout(req_field, self.__req_id)
                if ret != 0:
                    print('ReqUserLogout fail, ret[%d]' % ret)
                    printlog('ReqUserLogout fail, ret[%d]' % ret)
        else:
            print('Login fail!!! [%d] [%d] [%s]'
            printlog('Login fail!!! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
        return
    def OnRspUserPasswordUpdate(self, pUserPasswordUpdateField: "CTORATstpUserPasswordUpdateField",
                                pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            print('OnRspUserPasswordUpdate: OK! [%d]' % nRequestID)
            printlog('OnRspUserPasswordUpdate: OK! [%d]' % nRequestID)
        else:
            print('OnRspUserPasswordUpdate: Error! [%d] [%d] [%s]'
            printlog('OnRspUserPasswordUpdate: Error! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspOrderInsert(self, pInputOrderField: "CTORATstpInputOrderField", pRspInfoField: "CTORATstpRspInfoField",
                         nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            print('OnRspOrderInsert: OK! [%d]' % nRequestID)
            printlog('OnRspOrderInsert: OK! [%d]' % nRequestID)
        else:
            print('OnRspOrderInsert: Error! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
huaxin_client/cb/trade_client_for_cb.py
@@ -489,7 +489,7 @@
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        # api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        # print("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
        # printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
    else:  # 模拟环境,FENS名字服务器方式
        TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
@@ -497,7 +497,7 @@
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        print("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
        printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
    # 订阅私有流
    api.SubscribePrivateTopic(traderapi.TORA_TERT_QUICK)
huaxin_client/cb/trade_manager.py
@@ -11,7 +11,7 @@
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.command_manager import TradeActionCallback
from log_module import async_log_util
from log_module.log import logger_trade, logger_local_huaxin_trade_debug
from log_module.log import logger_trade, logger_local_huaxin_trade_debug, printlog
from utils import tool, socket_util
ENABLE_ORDER = True
@@ -309,7 +309,7 @@
        以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段:
        '''
        print('卖 price', price, price_type)
        printlog('卖 price', price, price_type)
        if price and price > 0:
            req_field.LimitPrice = price
        if price_type == 1:
@@ -533,7 +533,7 @@
                    if blocking:
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref)
                    print("sell", req_rid_dict)
                    printlog("sell", req_rid_dict)
                except Exception as e:
                    logging.exception(e)
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -576,7 +576,7 @@
    def OnDealList(self, client_id, request_id, sk):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            # print("开始请求成交列表")
            # printlog("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
huaxin_client/client_network.py
@@ -44,7 +44,7 @@
        msg = cls.format_response(msg)
        sk.send(msg)
        result, header_str = socket_util.recv_data(sk)
        # print("响应", result)
        # printlog("响应", result)
        if result:
            result_json = json.loads(result)
            if result_json.get("code") == 0:
huaxin_client/huaxin_trade_api.py
@@ -10,6 +10,7 @@
import crypt
from huaxin_client import socket_util
from log_module.log import printlog
class ClientSocketManager:
@@ -126,11 +127,11 @@
        str_list.sort()
        str_list.append("%Yeshi2014@#.")
        root_data["sign"] = crypt.md5_encrypt("&".join(str_list))
        print("请求前对象", root_data)
        printlog("请求前对象", root_data)
        # 添加请求头
        client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8')))
        result = client[1].recv(1024)
        print("请求发送成功", result.decode(encoding='utf-8'))
        printlog("请求发送成功", result.decode(encoding='utf-8'))
    except BrokenPipeError as e:
        ClientSocketManager.del_client(client[0])
        raise e
@@ -172,7 +173,7 @@
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=True):
    print("客户端", ClientSocketManager.socket_client_dict)
    printlog("客户端", ClientSocketManager.socket_client_dict)
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                                    "direction": direction,
@@ -240,4 +241,4 @@
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))
    printlog(d.pop("id"))
huaxin_client/l1_client_for_output.py
@@ -8,7 +8,7 @@
import xmdapi
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_system, logger_local_huaxin_l1
from log_module.log import logger_system, logger_local_huaxin_l1, printlog
################B类##################
from utils import socket_util, tool
@@ -44,7 +44,7 @@
        self.__api = api
    def OnFrontConnected(self):
        print("OnFrontConnected")
        printlog("OnFrontConnected")
        # 请求登录,目前未校验登录用户,请求域置空即可
        login_req = xmdapi.CTORATstpReqUserLoginField()
@@ -52,24 +52,24 @@
    def subscribe_codes(self, codes_sh, codes_sz):
        # 重新订阅代码
        print(f"订阅数量:sh-{len(codes_sh)}  sz-{len(codes_sz)}")
        printlog(f"订阅数量:sh-{len(codes_sh)}  sz-{len(codes_sz)}")
        if codes_sh:
            ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
                printlog('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
                printlog('SubscribeMarketData success, ret[%d]' % ret)
        if codes_sz:
            ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
                printlog('SubscribeMarketData fail, ret[%d]' % ret)
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
                printlog('SubscribeMarketData success, ret[%d]' % ret)
    def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
        if pRspInfoField.ErrorID == 0:
            print('Login success! [%d]' % nRequestID)
            printlog('Login success! [%d]' % nRequestID)
            '''
            订阅行情
@@ -82,27 +82,27 @@
            # sub_arr = [b'600004']
            # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
            # if ret != 0:
            #     print('UnSubscribeMarketData fail, ret[%d]' % ret)
            #     printlog('UnSubscribeMarketData fail, ret[%d]' % ret)
            # else:
            #     print('SubscribeMarketData success, ret[%d]' % ret)
            #     printlog('SubscribeMarketData success, ret[%d]' % ret)
        else:
            print('Login fail!!! [%d] [%d] [%s]'
            printlog('Login fail!!! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
            printlog('OnRspSubMarketData: OK!')
        else:
            print('OnRspSubMarketData: Error! [%d] [%s]'
            printlog('OnRspSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
            printlog('OnRspUnSubMarketData: OK!')
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
            printlog('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnMarketData(self, pMarketDataField):
@@ -130,7 +130,7 @@
def __upload_codes_info(datas):
    print("上传数据数量", len(datas))
    printlog("上传数据数量", len(datas))
    # if not tool.is_trade_time():
    #     return
    data_bytes = socket_util.load_header(json.dumps({"type": "l1_data", "data": datas}).encode("utf-8"))
@@ -166,7 +166,7 @@
def test_add_datas():
    while True:
        print("发送测试数据")
        printlog("发送测试数据")
        level1_data_queue.put_nowait(("000948", 12.91, 14.20, int(34.60 * 10000), int(4.9 * 1e8),
                                      [(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)],
                                      [(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)],
@@ -188,7 +188,7 @@
            time.sleep(4)
    logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
    # 打印接口版本号
    print(xmdapi.CTORATstpXMdApi_GetApiVersion())
    printlog(xmdapi.CTORATstpXMdApi_GetApiVersion())
    # 创建接口对象
    api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
huaxin_client/l1_client_for_trade.py
@@ -7,7 +7,7 @@
import xmdapi
from huaxin_client import constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_local_huaxin_l1_trade_info
from log_module.log import logger_system, logger_local_huaxin_l1, logger_local_huaxin_l1_trade_info, printlog
################B类##################
from utils import socket_util
@@ -40,7 +40,7 @@
        self.__api = api
    def OnFrontConnected(self):
        print("OnFrontConnected")
        printlog("OnFrontConnected")
        # 请求登录,目前未校验登录用户,请求域置空即可
        login_req = xmdapi.CTORATstpReqUserLoginField()
@@ -98,9 +98,9 @@
            # sub_arr = [b'600004']
            # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
            # if ret != 0:
            #     print('UnSubscribeMarketData fail, ret[%d]' % ret)
            #     printlog('UnSubscribeMarketData fail, ret[%d]' % ret)
            # else:
            #     print('SubscribeMarketData success, ret[%d]' % ret)
            #     printlog('SubscribeMarketData success, ret[%d]' % ret)
        else:
@@ -109,16 +109,16 @@
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
            printlog('OnRspSubMarketData: OK!')
        else:
            logger_local_huaxin_l1.info('OnRspSubMarketData: Error! [%d] [%s]'
                                        % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
            printlog('OnRspUnSubMarketData: OK!')
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
            printlog('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnMarketData(self, pMarketDataField):
@@ -134,7 +134,7 @@
        self.l1_data_queue.append(item)
        logger_local_huaxin_l1_trade_info.info(f"获取到L1数据:{item}")
        # print(
        # printlog(
        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
        #        pMarketDataField.Volume,
@@ -172,7 +172,7 @@
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1_for_trade订阅服务")
    # 打印接口版本号
    print(xmdapi.CTORATstpXMdApi_GetApiVersion())
    printlog(xmdapi.CTORATstpXMdApi_GetApiVersion())
    # 创建接口对象
    api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
@@ -238,7 +238,7 @@
    def read_data():
        while True:
            val = queue_l1_trade_w_strategy_r.get()
            print(val)
            printlog(val)
    queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w = multiprocessing.Queue(), multiprocessing.Queue()
    threading.Thread(target=test_sub, daemon=True).start()
huaxin_client/l1_subscript_codes_manager.py
@@ -9,6 +9,7 @@
# 请求l1订阅的目标代码
from log_module.log import printlog
from utils import socket_util
@@ -35,7 +36,7 @@
                        codes_sz.append(code.encode("utf-8"))
                    else:
                        codes_sh.append(code.encode("utf-8"))
                print("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
                printlog("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
                return codes_sh, codes_sz
        except ConnectionResetError:
            SendResponseSkManager.del_send_response_sk(type_)
huaxin_client/l2_client.py
@@ -20,7 +20,7 @@
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript, printlog
from utils import tool
###B类###
@@ -118,7 +118,7 @@
            time.sleep(delay)
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        printlog("add del codes", add_codes, del_codes)
        try:
            for c in del_codes:
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
@@ -165,7 +165,7 @@
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        printlog("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
@@ -175,10 +175,10 @@
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
        printlog("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            printlog("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
@@ -187,26 +187,26 @@
                daemon=True).start()
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail", pRspInfo)
        printlog("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
        printlog("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            printlog("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            printlog("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspUnSubOrderDetail", bIsLast)
        printlog("OnRspUnSubOrderDetail", bIsLast)
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                printlog("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
@@ -264,7 +264,7 @@
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    printlog(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
huaxin_client/l2_market_client.py
@@ -21,7 +21,7 @@
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript, logger_local_huaxin_l2_market
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript, logger_local_huaxin_l2_market, printlog
from utils import tool
###B类###
@@ -90,7 +90,7 @@
            raise Exception("L2尚未登录")
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        printlog("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
@@ -98,7 +98,7 @@
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        print("订阅代码数量:", len(codes))
        printlog("订阅代码数量:", len(codes))
        try:
            self.__process_codes_data(codes)
        except Exception as e:
@@ -125,7 +125,7 @@
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        printlog("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
@@ -135,10 +135,10 @@
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
        printlog("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            printlog("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
@@ -195,7 +195,7 @@
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    printlog(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
huaxin_client/trade_client.py
@@ -14,7 +14,7 @@
# 正式账号
from log_module import async_log_util
from log_module.log import logger_local_huaxin_trade_debug as logger, logger_system, logger_trade, \
    logger_local_huaxin_trade_debug
    logger_local_huaxin_trade_debug, printlog
from utils import tool, socket_util, middle_api_protocol
########B类########
@@ -275,7 +275,7 @@
        以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段:
        '''
        print('卖 price', price, price_type)
        printlog('卖 price', price, price_type)
        if price and price > 0:
            req_field.LimitPrice = price
        if price_type == 1:
@@ -905,7 +905,7 @@
                    if blocking:
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref)
                    print("sell", req_rid_dict)
                    printlog("sell", req_rid_dict)
                except Exception as e:
                    logging.exception(e)
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -948,7 +948,7 @@
    def OnDealList(self, client_id, request_id, sk):
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            # print("开始请求成交列表")
            # printlog("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
@@ -1036,7 +1036,7 @@
        api.RegisterFensUserInfo(fens_user_info_field)
        api.RegisterNameServer(TD_TCP_FensAddress)
        # 注册名字服务器地址,支持多服务地址逗号隔开 形如:api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
        print("TD_TCP_FensAddress[%s]::%s\n" % (fens_user_info_field.FensNodeID, TD_TCP_FensAddress))
        printlog("TD_TCP_FensAddress[%s]::%s\n" % (fens_user_info_field.FensNodeID, TD_TCP_FensAddress))
    # 订阅私有流
    api.SubscribePrivateTopic(traderapi.TORA_TERT_QUICK)
l2/huaxin/l2_huaxin_util.py
@@ -7,6 +7,7 @@
# item逐笔委托
# (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'],
# data['OrderTime'],data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'])
from log_module.log import printlog
def convert_time(time_str, with_ms=False):
@@ -113,4 +114,4 @@
          ]
    for d in ds:
        d = eval(d)
        print(__convert_order(d, 15.55))
        printlog(__convert_order(d, 15.55))
l2/l2_data_source_util.py
@@ -2,7 +2,7 @@
L2数据溯源
"""
import constant
from log_module.log import logger_l2_error
from log_module.log import logger_l2_error, printlog
from utils import tool
@@ -129,7 +129,7 @@
            try:
                cancel_datas.sort(key=lambda t: t["index"])
            except Exception as e:
                print("测试")
                printlog("测试")
            for item in cancel_datas:
                # 提前做计算
                cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
@@ -216,4 +216,4 @@
#     cancel_index = 900
#     index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index],
#                                                              l2_data_util.local_today_num_operate_map.get(code))
#     print("溯源位置:", index)
#     printlog("溯源位置:", index)
l2/l2_data_util.py
@@ -16,6 +16,7 @@
from l2 import l2_data_source_util
from log_module import log, async_log_util, log_export
from db import redis_manager_delegate as redis_manager
from log_module.log import printlog
from utils import tool
__db = 1
@@ -556,4 +557,4 @@
if __name__ == "__main__":
    print(L2DataUtil.get_time_with_ms({"time": "10:00:00", "tms": 490}))
    printlog(L2DataUtil.get_time_with_ms({"time": "10:00:00", "tms": 490}))
l2/l2_transaction_data_manager.py
@@ -9,7 +9,7 @@
from l2.huaxin import l2_huaxin_util
from log_module import async_log_util
from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order
from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order, printlog
from utils import tool
@@ -153,7 +153,7 @@
        big_sell_orders = []
        temp_sell_order_ids = set()
        # 统计已经结算出的大单
        print(f"总大单数量:{len(total_big_sell_datas)}")
        printlog(f"总大单数量:{len(total_big_sell_datas)}")
        for i in range(len(total_big_sell_datas) - 1, -1, -1):
            bd = total_big_sell_datas[i]
            if bd[0] != latest_sell_order_info[0]:
log_module/async_log_util.py
@@ -38,7 +38,7 @@
    # 运行同步日志
    def run_sync(self, add_to_common_log=False):
        print("run_sync", add_to_common_log)
        printlog("run_sync", add_to_common_log)
        logger_system.info(f"run_sync 线程ID:{tool.get_thread_id()}")
        while True:
            # val = self.__log_queue.get()
log_module/log.py
@@ -128,8 +128,8 @@
                   filter=lambda record: record["extra"].get("name") == "code_operate",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        # logger.add(sys.stdout,
        #            filter=lambda record: record["extra"].get("name") == "code_operate", enqueue=True)
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "print", enqueue=True)
        logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -327,6 +327,8 @@
__mylogger = MyLogger()
logger_print = __mylogger.get_logger("print")
logger_info =  __mylogger.get_logger("info")
logger_trade_gui = __mylogger.get_logger("trade_gui")
@@ -440,6 +442,10 @@
        os.open('/dev/null', os.O_WRONLY)
def printlog(*args):
    logger_print.info(args)
if __name__ == "__main__":
    open_limit_up_codes = set({"000333", "000222"})
    logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}")
log_module/log_export.py
@@ -5,6 +5,7 @@
import time
import constant
from log_module.log import printlog
from utils import tool
@@ -49,7 +50,7 @@
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                print(line)
                printlog(line)
            line = f.readline()
@@ -96,7 +97,7 @@
        for key in today_data:
            # news = sorted(today_data[key], key=lambda x: x["index"])
            # today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
            printlog(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
@@ -151,7 +152,7 @@
                break
            if line.find("code={}".format(code)) < 0:
                continue
            # print(line)
            # printlog(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
@@ -159,18 +160,18 @@
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                # printlog("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                # printlog("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单,撤单位置:") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                # printlog("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
test/test.py
@@ -3,7 +3,7 @@
import time
from huaxin_client.cb import trade_client_for_cb
from log_module.log import logger_info
from log_module.log import logger_info, printlog
from trade import huaxin_trade_api
if __name__ == "__main__":
@@ -15,10 +15,10 @@
            args=(q_s_w_t_r, q_s_r_t_w))
        tradeProcess.start()
        time.sleep(3)
        print("获取委托结果", huaxin_trade_api.get_delegate_list())
        printlog("获取委托结果", huaxin_trade_api.get_delegate_list())
        time.sleep(2)
        print("获取持仓结果", huaxin_trade_api.get_position_list(True))
        printlog("获取持仓结果", huaxin_trade_api.get_position_list(True))
trade/huaxin_trade_data_update.py
@@ -9,7 +9,7 @@
from code_atrribute import gpcode_manager
from code_atrribute.history_k_data_util import HistoryKDatasUtils
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, logger_system
from log_module.log import hx_logger_trade_debug, logger_system, printlog
from trade import huaxin_trade_api, huaxin_trade_record_manager
import concurrent.futures
@@ -107,4 +107,4 @@
if __name__ == "__main__":
    print(HistoryKDatasUtils.get_gp_latest_info(["000333"]))
    printlog(HistoryKDatasUtils.get_gp_latest_info(["000333"]))
trade/sell_rule_manager.py
@@ -7,6 +7,7 @@
from db import mysql_data_delegate as mysql_data
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module.log import printlog
from utils import tool
import concurrent.futures
@@ -211,4 +212,4 @@
if __name__ == "__main__":
    print( TradeRuleManager().list_rules("2023-12-01"))
    printlog( TradeRuleManager().list_rules("2023-12-01"))
trade/trade_strategy.py
@@ -18,7 +18,7 @@
    local_today_canceled_buyno_map, L2DataUtil
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \
    logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction
    logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction, printlog
from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api
from trade.huaxin_trade_record_manager import PositionManager
from trade.sell_rule_manager import TradeRuleManager, SellRule
@@ -50,7 +50,7 @@
        for i in range(3):
            try:
                self.__send_response(data_bytes)
                print("发送数据成功")
                printlog("发送数据成功")
                break
            except Exception as e1:
                logging.exception(e1)
@@ -412,7 +412,7 @@
                if val:
                    async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
                    val = json.loads(val)
                    print("收到来自L1的数据:", val["type"])
                    printlog("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == "upload_l1_trade_datas":
utils/outside_api_command_manager.py
@@ -10,7 +10,7 @@
# 心跳信息
from huaxin_client.client_network import SendResponseSkManager
from log_module.log import logger_system, logger_request_api
from log_module.log import logger_system, logger_request_api, printlog
from utils import middle_api_protocol, tool, socket_util
MSG_TYPE_HEART = "heart"
@@ -97,7 +97,7 @@
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        print("create_and_run_client success", type, key)
        printlog("create_and_run_client success", type, key)
        return key, sk
    @classmethod
@@ -119,7 +119,7 @@
                if result:
                    start_time = time.time()
                    try:
                        print("接收数据", _type, result)
                        printlog("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
@@ -128,10 +128,10 @@
                        data = result_json["data"]
                        content_type = data["type"]
                        print("接收内容", data)
                        printlog("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            print("签名错误")
                            printlog("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
@@ -176,7 +176,7 @@
        while True:
            try:
                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # print("心跳信息发送成功", client_id)
                # printlog("心跳信息发送成功", client_id)
            except Exception as e:
                if _type == CLIENT_TYPE_TRADE_SELL:
                    if client_id in cls.trade_client_dict:
@@ -225,6 +225,173 @@
            t1.start()
class NewApiCommandManager:
    """
    新版交易指令管理
    """
    client_dict = {}  # 保存当前的客户端,格式:{client_type:{client_id:socket}}
    client_count_dict = {}  # 每种client的最大个数,格式:{client_type:count}
    action_callback = None
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance
    @classmethod
    def __create_client(cls, client_type, rid):
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
        # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
        # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
        client.connect(cls.ip_port)
        client.send(SendResponseSkManager.format_response(
            json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
        client.recv(1024)
        return client
    @classmethod
    def __create_and_run_client(cls, type, index=None):
        key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
        if index is not None:
            key += f"_{index}"
        sk = cls.__create_client(type, key)
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        printlog("create_and_run_client success", type, key)
        return key, sk
    @classmethod
    def init(cls, addr, port, action_callback, clients_info):
        """
        初始化
        :param addr: 服务器地址
        :param port: 服务器端口
        :param trade_action_callback: 回调
        :param clients_info: 客户端信息:[(类型,数量)]
        :return:
        """
        cls.client_dict.clear()
        cls.client_count_dict.clear()
        cls.action_callback = action_callback
        cls.ip_port = (addr, port)
        # 初始化
        for client_info in clients_info:
            cls.client_dict[client_info[0]] = {}
            cls.client_count_dict[client_info[0]] = client_info[1]
        # 创建连接客户端
        for client_type in cls.client_count_dict:
            for i in range(cls.client_count_dict[client_type]):
                result = cls.__create_and_run_client(client_type, i)
                cls.client_dict[client_type][result[0]] = result[1]
    # 听取指令
    @classmethod
    def __listen_command(cls, _type, client_id, sk):
        while True:
            try:
                result = socket_util.recv_data(sk)[0]
                if result:
                    start_time = time.time()
                    try:
                        printlog("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
                            sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
                            continue
                        data = result_json["data"]
                        content_type = data["type"]
                        printlog("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            printlog("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
                            continue
                        cls.action_callback(client_id, request_id, data)
                    except Exception as e:
                        logging.exception(e)
                    finally:
                        use_time = int(time.time() - start_time)
                        if use_time > 5:
                            result_json = json.loads(result)
                            logger_request_api.info(f"超时5s以上:{result_json['data']['type']}")
                        # 发送响应
                        sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
                else:
                    raise Exception("接收的内容为空")
            except Exception as e:
                logging.exception(e)
                if _type in cls.client_dict:
                    if client_id in cls.client_dict[_type]:
                        cls.client_dict[_type].pop(client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
    @classmethod
    def __heart_beats(cls, _type, client_id, sk):
        while True:
            try:
                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # printlog("心跳信息发送成功", client_id)
            except Exception as e:
                if _type in cls.client_dict:
                    if client_id in cls.client_dict[_type]:
                        cls.client_dict[_type].pop(client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
            time.sleep(HEART_SPACE_TIME)
    @classmethod
    def __listen_command_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __heartbeats_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __maintain_client(cls):
        logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                for client_type in cls.client_count_dict:
                    if len(cls.client_dict[client_type]) < cls.client_count_dict[client_type]:
                        for i in range(cls.client_count_dict[client_type] - len(cls.client_dict[client_type])):
                            result = cls.__create_and_run_client(client_type)
                            cls.client_dict[client_type][result[0]] = result[1]
            except:
                pass
            time.sleep(1)
    # 维护连接数的稳定
    def run(self, blocking=True):
        # 维护client
        if blocking:
            self.__maintain_client()
        else:
            t1 = threading.Thread(target=lambda: self.__maintain_client())
            t1.setDaemon(True)
            t1.start()
if __name__ == "__main__":
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None)
    manager.run()
utils/socket_util.py
@@ -75,7 +75,7 @@
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    new_sign = crypt_util.md5_encrypt("&".join(str_list))
    # print("加密前字符串","&".join(str_list))
    # printlog("加密前字符串","&".join(str_list))
    if sign == new_sign:
        return True
    else:
utils/tool.py
@@ -12,6 +12,7 @@
from threading import Thread
import constant
from log_module.log import printlog
def async_call(fn):
@@ -143,7 +144,7 @@
        def infunc(*args, **kwargs):
            start = round(time.time() * 1000)
            result = func(args, **kwargs)
            print("执行时间", round(time.time() * 1000) - start)
            printlog("执行时间", round(time.time() * 1000) - start)
            return result
        return infunc
@@ -280,7 +281,7 @@
if __name__ == "__main__":
    print(trade_time_sub("13:00:01",
    printlog(trade_time_sub("13:00:01",
                         "11:29:55"))