Administrator
2023-08-22 a933bf8e8e7f0c5a5131a95ed5d0f0b58bf9ac34
添加托管交易通道测试
7个文件已修改
131 ■■■■ 已修改文件
constant.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 52 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -136,4 +136,5 @@
    TRADE_ENABLE = True
# 最大的代码价格
MAX_CODE_PRICE = 10
MAX_CODE_PRICE = 10
MAX_SUBSCRIPT_CODE_PRICE = 40
huaxin_client/command_manager.py
@@ -48,6 +48,10 @@
    def OnMoney(self, client_id, request_id):
        pass
    # 测试
    def OnTest(self, client_id, request_id, data):
        pass
class L2ActionCallback(object):
    # 监听L2数据
@@ -97,6 +101,8 @@
                cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
            elif _type == CLIENT_TYPE_POSITION_LIST:
                cls.action_callback.OnPositionList(client_id, request_id)
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data)
        except Exception as e:
            logger_local_huaxin_trade_debug.debug(f"__process_command出错:{result_json}")
            logging.exception(e)
@@ -123,6 +129,7 @@
                        t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True)
                        t1.start()
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
                logging.exception(e)
    # 维护连接数的稳定
huaxin_client/trade_client.py
@@ -148,6 +148,7 @@
    def cancel_buy(self, code, order_sys_id, sinfo):
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        logger_local_huaxin_trade_debug.info(f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} sinfo-{sinfo}")
        self.__cancel_buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求撤单
@@ -522,24 +523,32 @@
                                                                  "orderStatusMsg": pRspInfoField.ErrorMsg}),
                             daemon=True).start()
    # 撤单响应
    def OnRspOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField",
                         pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger.info('OnRspOrderAction: OK! [%d]' % nRequestID)
            logger_local_huaxin_trade_debug.info('OnRspOrderAction: OK! [%d]' % nRequestID)
            threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                 {"sinfo": pInputOrderActionField.SInfo,
                                                                  "orderSysID": pInputOrderActionField.OrderSysID,
                                                                  "cancel": 1}), daemon=True).start()
        else:
            logger.info('OnRspOrderAction: Error! [%d] [%d] [%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            logger_local_huaxin_trade_debug.info('OnRspOrderAction: Error! [%d] [%d] [%s]'
                                                 % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                 {"sinfo": pInputOrderActionField.SInfo,
                                                                  "orderSysID": pInputOrderActionField.OrderSysID,
                                                                  "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                                  "errorMsg": pRspInfoField.ErrorMsg}),
                             daemon=True).start()
    # 撤单错误回报
    def OnErrRtnOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField",
                            pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        logger_local_huaxin_trade_debug.info('OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]'
                                             % (nRequestID, pInputOrderActionField.OrderSysID, pRspInfoField.ErrorID,
                                                pRspInfoField.ErrorMsg))
    def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField",
                           pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
@@ -565,8 +574,8 @@
               pOrderField.OrderRef, pOrderField.OrderLocalID,
               pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
               pOrderField.OrderStatus))
        OrderIDManager.set_system_order_id(pOrderField.SecurityID, pOrderField.SInfo, pOrderField.OrderSysID)
        if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
            OrderIDManager.set_system_order_id(pOrderField.SecurityID, pOrderField.SInfo, pOrderField.OrderSysID)
            threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, {"sinfo": pOrderField.SInfo,
                                                                                 "securityId": pOrderField.SecurityID,
                                                                                 "orderLocalId": pOrderField.OrderLocalID,
@@ -732,7 +741,6 @@
                             daemon=True).start()
            self.__temp_order_list_dict.pop(nRequestID)
# 获取响应发送socket
global req_rid_dict
req_rid_dict = {}
@@ -843,6 +851,12 @@
            send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id,
                          request_id)
    def OnTest(self, client_id, request_id, data):
        logger_local_huaxin_trade_debug.info(f"测试通道:client_id-{client_id} request_id-{request_id} data-{data}")
        send_response(
            json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                        "request_id": request_id}), type, client_id, request_id)
def __init_trade_data_server():
    logger.info("初始化交易服务器")
@@ -939,7 +953,9 @@
        if local_order_id:
            if local_order_id not in cls.local_order_id_map and orderSystemId:
                cls.local_order_id_map[local_order_id] = orderSystemId
                logger_local_huaxin_trade_debug.info(f"本地订单号与系统订单号映射,{code}:{local_order_id} {orderSystemId}")
            if local_order_id in cls.not_canceled_local_order_ids:
                logger_local_huaxin_trade_debug.info(f"执行等待撤单,{code}:{local_order_id} {orderSystemId}")
                # 执行撤单
                cls.not_canceled_local_order_ids.discard(local_order_id)
                cls.__TradeSimpleApi.cancel_buy(code, orderSystemId,
l2/huaxin/huaxin_target_codes_manager.py
@@ -72,7 +72,7 @@
            if code in yesterday_codes:
                continue
            # 剔除股价大于40块的票
            if d[1] > constant.MAX_CODE_PRICE:
            if d[1] > constant.MAX_SUBSCRIPT_CODE_PRICE:
                continue
            # 获取自由流通市值
            if code not in global_util.zyltgb_map:
third_data/code_plate_key_manager.py
@@ -21,34 +21,54 @@
class KPLCodeJXBlockManager:
    __redisManager = redis_manager.RedisManager(3)
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks):
    def save_jx_blocks(self, code, blocks, by=False):
        if blocks is None:
            return
        if len(blocks) >2:
        if len(blocks) > 2:
            blocks = blocks[:2]
        # 保存前2条数据
        RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks[code] = blocks
        if by:
            RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_by_blocks[code] = blocks
        else:
            RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_blocks[code] = blocks
    # 获取精选板块
    def get_jx_blocks(self, code):
        if code in self.__code_blocks:
            return self.__code_blocks[code]
        val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
        if val is None:
            return None
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            val = json.loads(val)
            self.__code_blocks[code] = val
        return self.__code_blocks[code]
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def get_jx_blocks_cache(self, code):
        return self.__code_blocks.get(code)
    def get_jx_blocks_cache(self, code, by=False):
        if by:
            return self.__code_by_blocks.get(code)
        else:
            return self.__code_blocks.get(code)
# 开盘啦禁止交易板块管理
@@ -367,6 +387,8 @@
        k4 = set()
        jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code)
        if not jingxuan_blocks:
            jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True)
        if jingxuan_blocks:
            jingxuan_blocks = jingxuan_blocks[:2]
            k4 |= set([x[1] for x in jingxuan_blocks])
trade/huaxin/huaxin_trade_api.py
@@ -67,6 +67,14 @@
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
    t1.start()
# 测试交易通道
def test_trade_channel():
    sid = random.randint(0, 1000000)
    result = __test_trade_channel(sid)
    if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
        return True
    return False
class ClientSocketManager:
    # 客户端类型
@@ -274,7 +282,7 @@
                    if localOrderId and orderSysID:
                        # 移除本地单号,添加系统单号
                        __TradeOrderIdManager.add_order_id(code, accountID, orderSysID)
                        __TradeOrderIdManager.remove_local_order_id(code,localOrderId)
                        __TradeOrderIdManager.remove_local_order_id(code, localOrderId)
        except:
            pass
@@ -379,6 +387,13 @@
    return __read_response(request_id, blocking)
# 设置L2订阅数据
def __test_trade_channel(sid):
    request_id = __request("test",
                           {"type": "test", "data": {"sid": sid}})
    return __read_response(request_id, True)
def parseResponse(data_str):
    if not data_str:
        raise Exception("反馈内容为空")
trade/huaxin/trade_server.py
@@ -272,10 +272,17 @@
                                                                              sell_1_price, sell_1_volume // 100)
                                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                                # 如果涨幅大于7%就读取板块
                                if (buy_1_price - pre_close_price) / pre_close_price > 0.07:
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks(code):
                                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                                if price_rate > 0.07:
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
                                elif price_rate > 0.03:
                                    # 添加备用板块
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
                                # 更新板块信息
                                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                                CodePlateKeyBuyManager.update_can_buy_blocks(code,
@@ -316,7 +323,7 @@
                                continue
                            if d["sec_level"] != 1:
                                continue
                            if d["pre_close"] * 1.1 > constant.MAX_CODE_PRICE:
                            if d["pre_close"] * 1.1 > constant.MAX_SUBSCRIPT_CODE_PRICE:
                                continue
                            if (d["listed_date"] + datetime.timedelta(
                                    days=100)).timestamp() > datetime.datetime.now().timestamp():
@@ -337,6 +344,10 @@
                        code = data["code"]
                        order_no = data["data"]
                        hx_logger_l2_upload.info(f"{code}-正在成交的订单撤单,order_no:{order_no}")
                        # buyno_map = l2_data_util.local_today_buyno_map.get(code)
                        # if buyno_map:
                        #     l2_data = buyno_map.get(order_no)
                        #     buyno_map.get(order_no)
                        # 执行撤单
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "G撤撤单", "G撤")
                else:
@@ -654,6 +665,13 @@
            memory_info = psutil.virtual_memory()
            cpu_percent = psutil.cpu_percent(interval=1)
            fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
            # 获取交易通道
            try:
                can_access = huaxin_trade_api.test_trade_channel()
                fdata["trade_channel_access"] = 1 if can_access else 0
            except:
                fdata["trade_channel_access"] = 0
            result = {"code": 0, "data": fdata, "msg": ""}
            print("OnGetEnvInfo 成功")
            self.send_response(result, client_id, request_id)