admin
2023-07-21 f51466f1d4563f97b1ec620b70a1c94f01a6a2e1
trade_client.py
@@ -2,43 +2,47 @@
import logging
import os
import socket
import threading
import time
import command_manager
import constant
import socket_util
import traderapi
from client_network import SendResponseSkManager
from log import logger
# 正式账号
UserID = '388000013349'
# 登陆密码
Password = '110808'
# 投资者账户
InvestorID = '388000013349'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '388000013349'
# 沪市股东账号
SSE_ShareHolderID = 'A641420991'
# 深市股东账号
SZSE_ShareHolderID = '0345104949'
# # 仿真
# UserID = '00043201'
# UserID = '388000013349'
# # 登陆密码
# Password = '45249973'
# Password = '110808'
# # 投资者账户
# InvestorID = '11160150'
# InvestorID = '388000013349'
# # 经济公司部门代码
# DepartmentID = '0003'
# # 资金账户
# AccountID = '00043201'
# AccountID = '388000013349'
# # 沪市股东账号
# SSE_ShareHolderID = 'A00043201'
# SSE_ShareHolderID = 'A641420991'
# # 深市股东账号
# SZSE_ShareHolderID = '700043201'
# SZSE_ShareHolderID = '0345104949'
# 仿真
from mylog import logger_trade_debug
UserID = '00043201'
# 登陆密码
Password = '45249973'
# 投资者账户
InvestorID = '11160150'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '00043201'
# 沪市股东账号
SSE_ShareHolderID = 'A00043201'
# 深市股东账号
SZSE_ShareHolderID = '700043201'
# # 登录用户
# UserID = '00572083'
@@ -719,8 +723,8 @@
    __tradeSimpleApi = TradeSimpleApi()
    def OnTrade(self, client_id, request_id, type_, data):
        print("请求进程ID", os.getpid())
        if type_ == 1:
            logger_trade_debug.info(f"请求下单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
@@ -749,6 +753,7 @@
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
        elif type_ == 2:
            logger_trade_debug.info(f"请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 撤单
            direction = data["direction"]
            code = data["code"]
@@ -773,6 +778,7 @@
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
    def OnDealList(self, client_id, request_id):
        logger_trade_debug.info(f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            print("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
@@ -782,6 +788,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnDelegateList(self, client_id, request_id, is_cancel):
        logger_trade_debug.info(f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id)
@@ -789,6 +796,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnMoney(self, client_id, request_id):
        logger_trade_debug.info(f"请求账户:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id)
@@ -796,6 +804,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnPositionList(self, client_id, request_id):
        logger_trade_debug.info(f"请求持仓:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id)
@@ -822,13 +831,13 @@
    if 1:  # 模拟环境,TCP 直连Front方式
        # 注册单个交易前置服务地址
        api.RegisterFront("tcp://192.168.84.31:6500")  # 正式环境主地址
        api.RegisterFront("tcp://192.168.84.32:26500")  # 正式环境备用地址
        # api.RegisterFront("tcp://192.168.84.31:6500")  # 正式环境主地址
        # api.RegisterFront("tcp://192.168.84.32:26500")  # 正式环境备用地址
        # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400"  # 24小时环境A套
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        # api.RegisterFront(TD_TCP_FrontAddress)
        api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        # print("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
@@ -856,17 +865,33 @@
    api.Init()
def __send_response(type, data_bytes):
    sk = SendResponseSkManager.get_send_response_sk(type)
    data_bytes = socket_util.load_header(data_bytes)
    sk.sendall(data_bytes)
    result, header_str = socket_util.recv_data(sk)
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result['msg'])
# 交易反馈回调
def traderapi_callback(type, req_id, data):
    def send_response(data_str):
def __traderapi_callback(type, req_id, data):
    def send_response(data_str, _client_id, _request_id):
        for i in range(3):
        try:
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
                __send_response(f"{type}#{_client_id}", data_str)
                print("发送数据成功")
                logger_trade_debug.info(f"第{i}次发送数据成功:type-{type},request_id-{_request_id}")
                break
        except ConnectionResetError as e:
            SendResponseSkManager.del_send_response_sk(type)
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
        except BrokenPipeError as e:
            SendResponseSkManager.del_send_response_sk(type)
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
            except Exception as e:
                logger_trade_debug.info(f"第{i}次发送数据失败:type-{type},request_id-{_request_id}")
                logger_trade_debug.exception(e)
                pass
    print("回调", type, req_id, data)
    print("进程ID", os.getpid())
@@ -876,20 +901,31 @@
    try:
        print("traderapi_callback", req_rid_dict)
        if req_rid_dict and key in req_rid_dict:
            print("API回调")
            client_id, request_id = req_rid_dict.pop(key)
            # 测试
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}).encode('utf-8'))
            print("结果发送完毕")
                            "request_id": request_id}).encode('utf-8'), client_id, request_id)
            print("API回调结束")
        else:
            print("非API回调")
            # 非API回调
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}).encode('utf-8'))
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}).encode('utf-8'),
                None,
                req_id)
            print("非API结束")
    except Exception as e:
        logging.exception(e)
# 采用异步回调
def traderapi_callback(type, req_id, data):
    t1 = threading.Thread(target=lambda: __traderapi_callback(type, req_id, data), daemon=True)
    t1.start()
addr, port = constant.SERVER_IP, constant.SERVER_PORT