Administrator
2023-08-30 c265bb186a6b6e2a31689d599be59b5b6f10cf42
trade/huaxin/trade_server.py
@@ -26,6 +26,7 @@
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer, SecondCancelBigNumComputer
@@ -161,24 +162,13 @@
                    elif data_json["type"] == "response":
                        # 主动触发的响应
                        try:
                            client_id = data_json.get("client_id")
                            hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                            # 设置响应内容
                            trade_api.set_response(data_json["request_id"], data_json['data'])
                            my_trade_response.OnTradeResponse(data_json)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "trade_callback":
                        try:
                            # 交易回调
                            data_json = data_json["data"]
                            type_ = data_json["type"]
                            # 记录交易反馈日志
                            hx_logger_trade_callback.info(data_json)
                            # 重新请求委托列表与资金
                            huaxin_trade_data_update.add_delegate_list()
                            huaxin_trade_data_update.add_deal_list()
                            huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                            my_trade_response.OnTradeCallback(data_json)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_order":
@@ -197,71 +187,7 @@
                            data = data_json["data"]
                            code = data["code"]
                            datas = data["data"]
                            hx_logger_l2_transaction.info(f"{code}#{datas}")
                            if datas:
                                # 设置成交价
                                current_price_process_manager.set_trade_price(code, datas[-1][1])
                            try:
                                buyno_map = l2_data_util.local_today_buyno_map.get(code)
                                hx_logger_l2_transaction.info(
                                    f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
                                buy_progress_index = None
                                for i in range(len(datas) - 1, -1, -1):
                                    d = datas[i]
                                    buy_no = f"{d[6]}"
                                    if buyno_map and buy_no in buyno_map:
                                        hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
                                        buy_progress_index = buyno_map[buy_no]["index"]
                                        break
                                # 获取执行位时间
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                                    code)
                                if True:
                                    if buy_progress_index is not None:
                                        self.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                                        total_datas = l2_data_util.local_today_datas.get(code)
                                        num_operate_map = l2_data_util.local_today_num_operate_map.get(
                                            code)
                                        logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                                       buy_progress_index)
                                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if buy_exec_index:
                                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                                          buy_progress_index,
                                                                                                          buy_exec_index,
                                                                                                          total_datas,
                                                                                                          num_operate_map,
                                                                                                          num * 100 * float(
                                                                                                              limit_up_price),
                                                                                                          limit_up_price)
                                            if need_cancel:
                                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                                        f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                                         buy_exec_index,
                                                                                                         buy_progress_index,
                                                                                                         total_datas,
                                                                                                         num_operate_map)
                                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                                      buy_progress_index,
                                                                                                      total_datas)
                                        f3 = dask.delayed(
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                                            code,
                                            buy_progress_index,
                                            total_datas,
                                            num_operate_map)
                                        f4 = dask.delayed(
                                            SecondCancelBigNumComputer().set_transaction_index)(
                                            code,
                                            buy_progress_index)
                                        dask.compute(f1, f2, f3, f4)
                            except Exception as e:
                                hx_logger_l2_transaction.exception(e)
                            TradeServerProcessor.l2_transaction(code, datas)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_market_data":
@@ -269,42 +195,7 @@
                            data = data_json["data"]
                            code = data["code"]
                            data = data["data"]
                            time_str = f"{data['dataTimeStamp']}"
                            if time_str.startswith("9"):
                                time_str = "0" + time_str
                            time_str = time_str[:6]
                            time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
                            buy_1_price, buy_1_volume = data["buy"][0]
                            sell_1_price, sell_1_volume = data["sell"][0]
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price is not None:
                                # 处理买1,卖1信息
                                code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
                                                                              limit_up_price,
                                                                              sell_1_price, sell_1_volume // 100)
                                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                                # 如果涨幅大于7%就读取板块
                                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                                if price_rate > 0.07:
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
                                elif price_rate > 0.03:
                                    # 添加备用板块
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
                                # 更新板块信息
                                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                                CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                                             kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                                             kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                                             yesterday_codes,
                                                                             block_info.get_before_blocks_dict())
                            hx_logger_l2_market_data.info(f"{code}#{data}")
                            TradeServerProcessor.l2_market_data(code, data)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes":
@@ -401,6 +292,9 @@
# 交易服务处理器
class TradeServerProcessor:
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __KPLCodeJXBlockManager = KPLCodeJXBlockManager()
    # 设置目标代码
    @classmethod
    def set_target_codes(cls, data_json):
@@ -415,6 +309,113 @@
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
        l2_log.threadIds[code] = random.randint(0, 100000)
        l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
    @classmethod
    def l2_transaction(cls, code, datas):
        hx_logger_l2_transaction.info(f"{code}#{datas}")
        if datas:
            # 设置成交价
            current_price_process_manager.set_trade_price(code, datas[-1][1])
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            hx_logger_l2_transaction.info(
                f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
            buy_progress_index = None
            for i in range(len(datas) - 1, -1, -1):
                d = datas[i]
                buy_no = f"{d[6]}"
                if buyno_map and buy_no in buyno_map:
                    hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
                    buy_progress_index = buyno_map[buy_no]["index"]
                    break
            # 获取执行位时间
            buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                code)
            if True:
                if buy_progress_index is not None:
                    cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                    total_datas = l2_data_util.local_today_datas.get(code)
                    num_operate_map = l2_data_util.local_today_num_operate_map.get(
                        code)
                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                   buy_progress_index)
                    buy_time = total_datas[buy_progress_index]["val"]["time"]
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if buy_exec_index:
                        need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                      buy_progress_index,
                                                                                      buy_exec_index,
                                                                                      total_datas,
                                                                                      num_operate_map,
                                                                                      num * 100 * float(
                                                                                          limit_up_price),
                                                                                      limit_up_price)
                        if need_cancel:
                            L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                    f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                     buy_exec_index,
                                                                                     buy_progress_index,
                                                                                     total_datas,
                                                                                     num_operate_map)
                    f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                  buy_progress_index,
                                                                                  total_datas)
                    f3 = dask.delayed(
                        deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                        code,
                        buy_progress_index,
                        total_datas,
                        num_operate_map)
                    f4 = dask.delayed(
                        SecondCancelBigNumComputer().set_transaction_index)(
                        code,
                        buy_progress_index)
                    dask.compute(f1, f2, f3, f4)
        except Exception as e:
            hx_logger_l2_transaction.exception(e)
    @classmethod
    def l2_market_data(cls, code, data):
        time_str = f"{data['dataTimeStamp']}"
        if time_str.startswith("9"):
            time_str = "0" + time_str
        time_str = time_str[:6]
        time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
        buy_1_price, buy_1_volume = data["buy"][0]
        sell_1_price, sell_1_volume = data["sell"][0]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is not None:
            # 处理买1,卖1信息
            code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
                                                          limit_up_price,
                                                          sell_1_price, sell_1_volume // 100)
            pre_close_price = round(float(limit_up_price) / 1.1, 2)
            # 如果涨幅大于7%就读取板块
            price_rate = (buy_1_price - pre_close_price) / pre_close_price
            if price_rate > 0.07:
                if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
            elif price_rate > 0.03:
                # 添加备用板块
                if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
            # 更新板块信息
            yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
            CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                         yesterday_codes,
                                                         block_info.get_before_blocks_dict())
        hx_logger_l2_market_data.info(f"{code}#{data}")
def clear_invalid_client():
@@ -764,18 +765,37 @@
    def OnL2Order(self, code, datas, timestamp):
        TradeServerProcessor.l2_order(code, datas, timestamp)
    def OnL2Transaction(self, code, datas, timestamp):
        pass
    def OnL2Transaction(self, code, datas):
        TradeServerProcessor.l2_transaction(code, datas)
    def OnMarketData(self, code, datas, timestamp):
        pass
    def OnMarketData(self, code, datas):
        TradeServerProcessor.l2_market_data(code, datas)
class MyTradeResponse(TradeResponse):
    def OnTradeCallback(self, data_json):
        data_json = data_json["data"]
        type_ = data_json["type"]
        # 记录交易反馈日志
        hx_logger_trade_callback.info(data_json)
        # 重新请求委托列表与资金
        huaxin_trade_data_update.add_delegate_list()
        huaxin_trade_data_update.add_deal_list()
        huaxin_trade_data_update.add_money_list()
    def OnTradeResponse(self, data_json):
        hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
        # 设置响应内容
        trade_api.set_response(data_json["request_id"], data_json['data'])
# 回调
my_l2_data_callback = MyL2DataCallback()
my_trade_response = MyTradeResponse()
def run(pipe_trade, pipe_l1, pipe_l2):
def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback):
    # 执行一些初始化数据
    block_info.init()
@@ -787,7 +807,7 @@
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.run_pipe_trade(pipe_trade)
    huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback)
    # 监听l1那边传过来的代码
    t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)