Administrator
2023-09-14 165042052bbc089787eba9fb71f72659135a34e9
下单采用线程池
3个文件已修改
1个文件已添加
118 ■■■■■ 已修改文件
huaxin_client/l2_client.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 76 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_asyncio.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_threadpool.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -147,8 +147,11 @@
            logger_l2_codes_subscript.exception(e)
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
        self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
        # 有效期为2s
        self.special_code_volume_for_order_dict[code] = (volume, time.time() + 2)
        async_log_util.info(logger_local_huaxin_l2_subscript,f"设置下单量监听:{code}-{volume}")
    def OnFrontConnected(self):
        print("OnFrontConnected")
huaxin_client/trade_client.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import concurrent.futures
import json
import logging
import os
@@ -149,12 +150,12 @@
        '''
        其它字段置空
        '''
        ret = api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        # 给L2发送消息
        if l2pipe is not None:
            l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8'))
        ret = api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
        return
@@ -424,6 +425,7 @@
        self.__temp_order_list_dict = {}
        self.__temp_position_list_dict = {}
        self.__temp_money_account_list_dict = {}
        self.call_back_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    def OnFrontConnected(self) -> "void":
        logger.info('OnFrontConnected')
@@ -541,11 +543,10 @@
            else:
                # logger.info('OnRspOrderInsert: Error! [%d] [%d] [%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderField.SInfo,
                                                                      "orderStatus": -1,
                                                                      "orderStatusMsg": pRspInfoField.ErrorMsg}),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderField.SInfo,
                                                   "orderStatus": -1,
                                                   "orderStatusMsg": pRspInfoField.ErrorMsg})
        except:
            pass
@@ -555,20 +556,19 @@
        try:
            if pRspInfoField.ErrorID == 0:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: OK! [%d]' % nRequestID)
                threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderActionField.SInfo,
                                                                      "orderSysID": pInputOrderActionField.OrderSysID,
                                                                      "cancel": 1}), daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_CANCEL_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderActionField.SInfo,
                                                   "orderSysID": pInputOrderActionField.OrderSysID,
                                                   "cancel": 1})
            else:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: Error! [%d] [%d] [%s]'
                                    % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderActionField.SInfo,
                                                                      "orderSysID": pInputOrderActionField.OrderSysID,
                                                                      "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                                      "errorMsg": pRspInfoField.ErrorMsg}),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_CANCEL_ORDER, nRequestID,
                                                  {"sinfo": pInputOrderActionField.SInfo,
                                                   "orderSysID": pInputOrderActionField.OrderSysID,
                                                   "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                   "errorMsg": pRspInfoField.ErrorMsg})
        except:
            pass
@@ -629,8 +629,7 @@
                              "volume": pOrderField.VolumeTotalOriginal,
                              "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg}
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, order_data),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data)
        except Exception as e:
            async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错")
        except:
@@ -715,8 +714,7 @@
                #        pTradingAccountField.UsefulMoney, pTradingAccountField.FetchLimit))
            else:
                results = self.__temp_money_account_list_dict.pop(nRequestID)
                threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_MONEY, nRequestID, results),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_MONEY, nRequestID, results)
                # logger.info('查询资金账号结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
        except:
@@ -747,9 +745,9 @@
            else:
                # logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_DELEGATE, nRequestID,
                                                                     self.__temp_order_list_dict[nRequestID]),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_DELEGATE, nRequestID,
                                                  self.__temp_order_list_dict[nRequestID])
                self.__temp_order_list_dict.pop(nRequestID)
        except:
            pass
@@ -775,9 +773,9 @@
            else:
                # logger.info('查询持仓结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_POSITION, nRequestID,
                                                                     self.__temp_position_list_dict[nRequestID]),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_POSITION, nRequestID,
                                                  self.__temp_position_list_dict[nRequestID])
                self.__temp_position_list_dict.pop(nRequestID)
        except:
            pass
@@ -806,9 +804,8 @@
                     "tradingDay": pTradeField.TradingDay,
                     "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID})
            else:
                threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_TRADED, nRequestID,
                                                                     self.__temp_order_list_dict[nRequestID]),
                                 daemon=True).start()
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_TRADED, nRequestID,
                                                  self.__temp_order_list_dict[nRequestID])
                self.__temp_order_list_dict.pop(nRequestID)
        except:
            pass
@@ -821,6 +818,7 @@
class MyTradeActionCallback(command_manager.TradeActionCallback):
    __tradeSimpleApi = TradeSimpleApi()
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
@@ -840,8 +838,9 @@
                # 买
                try:
                    req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref)
                    threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                                     daemon=True).start()
                    # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                    #                  daemon=True).start()
                    self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref)
                    async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -874,7 +873,13 @@
                    if not orderSysID and orderRef is None:
                        raise Exception("没有找到系统订单号或者报单引用")
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    threading.Thread(target=lambda : self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID, order_ref=orderRef), daemon=True).start()
                    threading.Thread(
                        target=lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                        order_ref=orderRef), daemon=True).start()
                    self.trade_thread_pool.submit(
                        lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                 order_ref=orderRef))
                    async_log_util.info(logger_local_huaxin_trade_debug,
                                        f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                except Exception as e:
@@ -1050,8 +1055,7 @@
# 采用异步回调
def traderapi_callback(type, req_id, data):
    t1 = threading.Thread(target=lambda: __traderapi_callback(type, req_id, data), daemon=True)
    t1.start()
    __traderapi_callback(type, req_id, data)
addr, port = constant.SERVER_IP, constant.SERVER_PORT
test/test_asyncio.py
New file
@@ -0,0 +1,19 @@
import asyncio
import time
async def coroutine_function():
    print("Start coroutine function")
    time.sleep(1)
    print("Coroutine function completed")
async def main():
    print("Start main")
    asyncio.gather(
        coroutine_function()
    )
    print("Main completed")
main()
test/test_threadpool.py
@@ -58,12 +58,12 @@
        time.sleep(2)
if __name__ == '__main__':
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,))
    serverProcess.start()
    jueJinProcess.start()
def test1(params):
    time.sleep(1)
    print("执行结束", params)
    while True:
        time.sleep(2)
if __name__ == '__main__':
    thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    thread_pool.submit(lambda: test1("123123"))
    input()