Administrator
2023-08-25 d6da023297772c9df90c93c50a327786f5ae782d
交易通道备用
5个文件已修改
109 ■■■■■ 已修改文件
huaxin_client/command_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client_server.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mmap.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -30,27 +30,27 @@
class TradeActionCallback(object):
    # 交易
    def OnTrade(self, client_id, request_id, type_, data):
    def OnTrade(self, client_id, request_id, sk, type_, data):
        pass
    # 委托列表
    def OnDelegateList(self, client_id, request_id):
    def OnDelegateList(self, client_id, request_id, sk, can_cancel):
        pass
    # 成交列表
    def OnDealList(self, client_id, request_id):
    def OnDealList(self, client_id, request_id, sk):
        pass
    # 成交列表
    def OnPositionList(self, client_id, request_id):
    def OnPositionList(self, client_id, request_id, sk):
        pass
    # 获取资金信息
    def OnMoney(self, client_id, request_id):
    def OnMoney(self, client_id, request_id, sk):
        pass
    # 测试
    def OnTest(self, client_id, request_id, data):
    def OnTest(self, client_id, request_id, data, sk):
        pass
@@ -78,7 +78,7 @@
        cls.pipe_l2 = pipe_l2
    @classmethod
    def process_command(cls, _type, client_id, result_json):
    def process_command(cls, _type, client_id, result_json, sk=None):
        try:
            data = result_json["data"]
            print("接收内容", result_json)
@@ -93,18 +93,18 @@
            if _type == CLIENT_TYPE_TRADE:
                # 交易
                ctype = data["trade_type"]
                cls.action_callback.OnTrade(client_id, request_id, ctype, data)
                cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
            elif _type == CLIENT_TYPE_MONEY:
                cls.action_callback.OnMoney(client_id, request_id)
                cls.action_callback.OnMoney(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DEAL_LIST:
                cls.action_callback.OnDealList(client_id, request_id)
                cls.action_callback.OnDealList(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DELEGATE_LIST:
                can_cancel = data["can_cancel"]
                cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
                cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
            elif _type == CLIENT_TYPE_POSITION_LIST:
                cls.action_callback.OnPositionList(client_id, request_id)
                cls.action_callback.OnPositionList(client_id, request_id, sk)
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data)
                cls.action_callback.OnTest(client_id, request_id, data, sk)
        except Exception as e:
            logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}")
            logging.exception(e)
huaxin_client/trade_client.py
@@ -5,7 +5,7 @@
import threading
import time
from huaxin_client import command_manager
from huaxin_client import command_manager, trade_client_server
from huaxin_client import constant
from huaxin_client import socket_util
import traderapi
@@ -307,8 +307,8 @@
        req_field = traderapi.CTORATstpQryOrderField()
        # 以下字段不填表示不设过滤条件,即查询所有报单
        # req_field.SecurityID = '600000'
        # req_field.InsertTimeStart = '09:35:00'
        # req_field.InsertTimeEnd = '10:00:00'
        req_field.InsertTimeStart = '09:15:00'
        req_field.InsertTimeEnd = '15:00:00'
        # IsCancel字段填1表示只查询可撤报单
        if is_cancel:
            req_field.IsCancel = 1
@@ -750,7 +750,7 @@
class MyTradeActionCallback(command_manager.TradeActionCallback):
    __tradeSimpleApi = TradeSimpleApi()
    def OnTrade(self, client_id, request_id, type_, data):
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
            logger_local_huaxin_trade_debug.info(
                f"---------------------\n请求下单:client_id-{client_id} request_id-{request_id}")
@@ -765,7 +765,7 @@
            if direction == 1:
                # 买
                try:
                    req_rid_dict[sinfo] = (client_id, request_id, local_order_id)
                    req_rid_dict[sinfo] = (client_id, request_id, sk, local_order_id)
                    self.__tradeSimpleApi.buy(code, volume, price, sinfo)
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -774,7 +774,7 @@
            elif direction == 2:
                try:
                    price_type = data["price_type"]
                    req_rid_dict[sinfo] = (client_id, request_id)
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo)
                    print("sell", req_rid_dict)
                except Exception as e:
@@ -800,7 +800,7 @@
                        if localOrderID:
                            OrderIDManager.add_need_cancel_local_order_id(localOrderID)
                        raise Exception("没有找到系统订单号")
                    req_rid_dict[sinfo] = (client_id, request_id)
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.cancel_buy(code, orderSysID, sinfo)
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id,
@@ -809,53 +809,53 @@
            elif direction == 2:
                # 撤卖
                try:
                    req_rid_dict[sinfo] = (client_id, request_id)
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.cancel_sell(code, orderSysID, sinfo)
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
    def OnDealList(self, client_id, request_id):
    def OnDealList(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            print("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
            req_rid_dict[req_id] = (client_id, request_id)
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            logging.exception(e)
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnDelegateList(self, client_id, request_id, is_cancel):
    def OnDelegateList(self, client_id, request_id, sk, is_cancel):
        logger_local_huaxin_trade_debug.info(f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id)
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnMoney(self, client_id, request_id):
    def OnMoney(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求账户:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id)
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnPositionList(self, client_id, request_id):
    def OnPositionList(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求持仓:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id)
            req_rid_dict[req_id] = (client_id, request_id, sk)
        except Exception as e:
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnTest(self, client_id, request_id, data):
    def OnTest(self, client_id, request_id, data, sk):
        send_response(
            json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                        "request_id": request_id}), type, client_id, request_id, show_log=False)
                        "request_id": request_id}), type, client_id, request_id, show_log=False, sk=sk)
def __init_trade_data_server():
@@ -924,9 +924,13 @@
        sk.close()
def send_response(data, type, _client_id, _request_id, show_log=True):
def send_response(data, type, _client_id, _request_id, show_log=True, sk=None):
    if show_log:
        logger_local_huaxin_trade_debug.debug(f"回调返回内容:{data}")
    if sk:
        # 采用的是socket通信
        sk.sendall(socket_util.load_header(data.encode('utf-8')))
    else:
    strategy_pipe.send(data)
@@ -949,8 +953,8 @@
        local_order_id = None
        if req_rid_dict and sinfo in req_rid_dict:
            temp_params = req_rid_dict.get(sinfo)
            if len(temp_params) > 2:
                local_order_id = temp_params[2]
            if len(temp_params) > 3:
                local_order_id = temp_params[3]
        if local_order_id:
            if local_order_id not in cls.local_order_id_map and orderSystemId:
                cls.local_order_id_map[local_order_id] = orderSystemId
@@ -980,15 +984,15 @@
            temp_params = req_rid_dict.pop(key)
            client_id, request_id = temp_params[0], temp_params[1]
            # 本地订单号-系统订单号映射
            if len(temp_params) >= 3 and type == TYPE_ORDER:
                local_order_id = temp_params[2]
            if len(temp_params) >= 4 and type == TYPE_ORDER:
                local_order_id = temp_params[3]
                data["localOrderId"] = local_order_id
            logger_local_huaxin_trade_debug.info("API回调 request_id-{}", request_id)
            # 测试
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}), type, client_id, request_id)
                            "request_id": request_id}), type, client_id, request_id, temp_params[2])
            logger_local_huaxin_trade_debug.info("API回调结束 req_id-{} request_id-{}", req_id, request_id)
            print("API回调结束")
        else:
@@ -1023,6 +1027,9 @@
    global strategy_pipe
    strategy_pipe = pipe_strategy
    t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True)
    t1.start()
    global tradeCommandManager
    tradeCommandManager = command_manager.TradeCommandManager()
    tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
huaxin_client/trade_client_server.py
@@ -5,6 +5,8 @@
from huaxin_client.command_manager import TradeCommandManager
from utils import socket_util
SERVER_PORT = 10088
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
@@ -21,7 +23,6 @@
        pass
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
@@ -29,9 +30,14 @@
                # data = sk.recv(1024*1024, socket.MSG_WAITALL)
                data, header = socket_util.recv_data(sk)
                if data:
                    # TODO 处理数据
                    data_json = json.loads(data)
                    type_ = data_json['type']
                    TradeCommandManager.process_command(type_, None, data_json)
                    TradeCommandManager.process_command(type_, None, data_json, sk)
            finally:
                pass
def run():
    laddr = "0.0.0.0", SERVER_PORT
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
test/test_mmap.py
@@ -7,19 +7,21 @@
def run_process_1(pipe):
    with contextlib.closing(mmap.mmap(-1, 1000*100, tagname='l2-000333', access=mmap.ACCESS_WRITE)) as m:
        for i in range(1, 10001):
            start = time.time()
            m.seek(0)
            m.write(("msg " + str(i)).encode("utf-8"))
            m.write((f"msg {i} " * 10000).encode("utf-8"))
            m.flush()
            print("耗时", time.time() - start)
            time.sleep(1)
def run_process_2(pipe):
    while True:
        with contextlib.closing(mmap.mmap(-1, 6, tagname='l2-000333', access=mmap.ACCESS_READ)) as m:
            s = m.read(1024)
        with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname='l2-000333', access=mmap.ACCESS_READ)) as m:
            s = m.read(1000 * 100)
            s = s.decode('utf-8').replace('\x00', '')
            if s:
                print(s)
                print(len(s), s)
            time.sleep(1)
trade/huaxin/huaxin_trade_api.py
@@ -7,6 +7,7 @@
import threading
import time
from huaxin_client import trade_client_server
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade
from trade.huaxin import huaxin_trade_data_update
@@ -230,6 +231,11 @@
                     "data": data,
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        # TODO 测试
        sk = socket_util.create_socket("127.0.0.1", trade_client_server.SERVER_PORT)
        if True:
            sk.sendall(socket_util.load_header(json.dumps(root_data).encode("utf-8")))
        else:
        pipe_trade.send(json.dumps(root_data).encode("utf-8"))
        async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
    except BrokenPipeError as e: