Administrator
2024-03-28 af7d9b55cad79aa7ade0b02802a26ebeb742c297
trade/trade_strategy.py
@@ -6,12 +6,18 @@
import schedule
import constant
from code_atrribute import gpcode_manager
from code_atrribute.history_k_data_util import HistoryKDatasUtils
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from l2 import l2_data_util
from l2.huaxin import l2_huaxin_util
from l2.l2_data_util import local_today_datas, local_today_num_operate_map, local_today_buyno_map, \
    local_today_canceled_buyno_map, L2DataUtil
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \
    logger_trade_position_api_request
    logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction
from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api
from trade.huaxin_trade_record_manager import PositionManager
from trade.sell_rule_manager import TradeRuleManager, SellRule
@@ -254,6 +260,29 @@
            codes = L1DataProcessor.get_latest_update_codes()
            result = {"code": 0, "data": list(codes)}
            self.send_response(result, client_id, request_id)
        elif ctype == "get_l2_datas":
            # 获取L2数据
            code = data["code"]
            max_time = data["max_time"]
            min_money = data["min_money"]
            if not local_today_datas:
                l2_data_util.load_l2_data_all(True)
            total_datas = l2_data_util.local_today_datas.get(code)
            # 只获取买与卖
            for d in total_datas:
                val = d['val']
                if tool.trade_time_sub(d['val']['time'], max_time) > 0:
                    break
                if val['num'] * float(val['price']) * 100 < min_money:
                    continue
                if L2DataUtil.is_buy(val):
                    pass
                if L2DataUtil.is_sell(val):
                    pass
            codes = L1DataProcessor.get_latest_update_codes()
            result = {"code": 0, "data": list(codes)}
            self.send_response(result, client_id, request_id)
class L1DataProcessor:
@@ -279,8 +308,6 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.process_for_sell(datas)
    @classmethod
    def process_for_sell(cls, datas):
@@ -374,6 +401,37 @@
                logging.exception(e)
class MyL2DataCallback(L2DataCallBack):
    def OnL2Order(self, code, origin_datas, timestamp):
        # 保存L2数据
        datas = None
        try:
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
            if code not in local_today_datas:
                local_today_datas[code] = []
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            if len(datas) > 0:
                local_today_datas[code].extend(datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, datas)
                l2_data_util.load_buy_no_map(local_today_buyno_map, code, datas)
                l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, datas)
        except Exception as e:
            async_log_util.error(logger_l2_error, f"code:{code}")
            logger_l2_error.exception(e)
        finally:
            if datas:
                l2_data_util.save_l2_data(code, None, datas)
            origin_datas.clear()
    def OnL2Transaction(self, code, datas):
        async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
# 做一些初始化的操作
def __init():
    def run_pending():
@@ -398,7 +456,16 @@
    threading.Thread(target=request_position, daemon=True).start()
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r):
l2_data_callbacks = []
def init_l2_data_callbacks():
    for i in range(constant.MAX_L2_CHANNEL_COUNT):
        l2_data_callbacks.append(MyL2DataCallback())
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
        queue_strategy_w_l2_r):
    try:
        # 初始化
        __init()
@@ -409,7 +476,7 @@
        threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
        huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w)
        huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r)
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)