admin
2023-08-04 ca310f014336d93eba73ed5010c1c5645424a1e0
trade_client.py
@@ -548,7 +548,7 @@
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void":
        logger.info(
        logger_trade_debug.info(
            '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s]'
            % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID, pOrderField.SecurityID,
               pOrderField.OrderRef, pOrderField.OrderLocalID,
@@ -563,7 +563,7 @@
                                                 "accountID": pOrderField.AccountID})
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        logger.info(
        logger_trade_debug.info(
            'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]'
            % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID,
               pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price, pTradeField.Volume))
@@ -724,7 +724,7 @@
    def OnTrade(self, client_id, request_id, type_, data):
        if type_ == 1:
            logger_trade_debug.info(f"请求下单:client_id-{client_id} request_id-{request_id} data-{data}")
            logger_trade_debug.info(f"---------------------\n请求下单:client_id-{client_id} request_id-{request_id}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
@@ -738,8 +738,8 @@
                    req_rid_dict[sinfo] = (client_id, request_id)
                    self.__tradeSimpleApi.buy(code, volume, price, sinfo)
                except Exception as e:
                    SendResponseSkManager.get_send_response_sk(TYPE_ORDER).sendall(
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
                    send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), TYPE_ORDER, client_id,
                                  request_id)
            elif direction == 2:
                try:
@@ -749,11 +749,12 @@
                    print("sell", req_rid_dict)
                except Exception as e:
                    logging.exception(e)
                    SendResponseSkManager.get_send_response_sk(TYPE_ORDER).sendall(
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
                    send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), TYPE_ORDER, client_id,
                                  request_id)
        elif type_ == 2:
            logger_trade_debug.info(f"请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            logger_trade_debug.info(
                f"---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 撤单
            direction = data["direction"]
            code = data["code"]
@@ -765,8 +766,8 @@
                    req_rid_dict[sinfo] = (client_id, request_id)
                    self.__tradeSimpleApi.cancel_buy(code, orderSysID, sinfo)
                except Exception as e:
                    SendResponseSkManager.get_send_response_sk(TYPE_CANCEL_ORDER).sendall(
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
                    send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
            elif direction == 2:
                # 撤卖
@@ -774,8 +775,8 @@
                    req_rid_dict[sinfo] = (client_id, request_id)
                    self.__tradeSimpleApi.cancel_sell(code, orderSysID, sinfo)
                except Exception as e:
                    SendResponseSkManager.get_send_response_sk(TYPE_CANCEL_ORDER).sendall(
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
                    send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
    def OnDealList(self, client_id, request_id):
        logger_trade_debug.info(f"请求成交列表:client_id-{client_id} request_id-{request_id}")
@@ -793,7 +794,8 @@
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id)
        except Exception as e:
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
            send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), "common", client_id,
                          request_id)
    def OnMoney(self, client_id, request_id):
        logger_trade_debug.info(f"请求账户:client_id-{client_id} request_id-{request_id}")
@@ -801,7 +803,8 @@
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id)
        except Exception as e:
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
            send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), "common", client_id,
                          request_id)
    def OnPositionList(self, client_id, request_id):
        logger_trade_debug.info(f"请求持仓:client_id-{client_id} request_id-{request_id}")
@@ -809,7 +812,8 @@
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id)
        except Exception as e:
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
            send_response(json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"), "common", client_id,
                          request_id)
def __init_trade_data_server():
@@ -866,35 +870,44 @@
def __send_response(type, data_bytes):
    sk = SendResponseSkManager.get_send_response_sk(type)
    data_bytes = socket_util.load_header(data_bytes)
    sk.sendall(data_bytes)
    result, header_str = socket_util.recv_data(sk)
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result['msg'])
    sk = SendResponseSkManager.create_send_response_sk()
    try:
        data_bytes = socket_util.load_header(data_bytes)
        sk.sendall(data_bytes)
        result, header_str = socket_util.recv_data(sk)
        result = json.loads(result)
        if result["code"] != 0:
            raise Exception(result['msg'])
    finally:
        sk.close()
def send_response(data_bytes, type, _client_id, _request_id):
    for i in range(3):
        response_client_type = f"{type}#{_client_id}"
        try:
            __send_response(response_client_type, data_bytes)
            print("发送数据成功")
            logger_trade_debug.info(f"第{i}次发送数据成功:type-{type},request_id-{_request_id}")
            break
        except ConnectionResetError:
            logger_trade_debug.error(f"第{i}次发送数据失败:type-{type},request_id-{_request_id} 原因:ConnectionResetError")
            SendResponseSkManager.del_send_response_sk(response_client_type)
        except BrokenPipeError:
            logger_trade_debug.error(f"第{i}次发送数据失败:type-{type},request_id-{_request_id} 原因:BrokenPipeError")
            SendResponseSkManager.del_send_response_sk(response_client_type)
        except TimeoutError:
            logger_trade_debug.error(f"第{i}次发送数据失败:type-{type},request_id-{_request_id} 原因:TimeoutError")
            SendResponseSkManager.del_send_response_sk(response_client_type)
        except Exception as e1:
            logger_trade_debug.error(f"第{i}次发送数据失败:type-{type},request_id-{_request_id} 原因:{str(e1)}")
            logger_trade_debug.exception(e1)
            pass
# 交易反馈回调
def __traderapi_callback(type, req_id, data):
    def send_response(data_str, _client_id, _request_id):
        for i in range(3):
            try:
                __send_response(f"{type}#{_client_id}", data_str)
                print("发送数据成功")
                logger_trade_debug.info(f"第{i}次发送数据成功:type-{type},request_id-{_request_id}")
                break
            except ConnectionResetError as e:
                SendResponseSkManager.del_send_response_sk(type)
            except BrokenPipeError as e:
                SendResponseSkManager.del_send_response_sk(type)
            except Exception as e:
                logger_trade_debug.info(f"第{i}次发送数据失败:type-{type},request_id-{_request_id}")
                logger_trade_debug.exception(e)
                pass
    print("回调", type, req_id, data)
    print("进程ID", os.getpid())
    logger_trade_debug.info("回调:type-{} req_id-{}", type, req_id)
    key = req_id
    if type == TYPE_ORDER or type == TYPE_CANCEL_ORDER:
        key = data["sinfo"]
@@ -903,16 +916,20 @@
        if req_rid_dict and key in req_rid_dict:
            print("API回调")
            client_id, request_id = req_rid_dict.pop(key)
            logger_trade_debug.info("API回调 request_id-{}", request_id)
            # 测试
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}).encode('utf-8'), client_id, request_id)
                            "request_id": request_id}).encode('utf-8'), type, client_id, request_id)
            logger_trade_debug.info("API回调结束 req_id-{} request_id-{}", req_id, request_id)
            print("API回调结束")
        else:
            logger_trade_debug.info("非API回调 req_id-{}", req_id)
            print("非API回调")
            # 非API回调
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}).encode('utf-8'),
                type,
                None,
                req_id)
            print("非API结束")