Administrator
2023-09-15 1ff185866bcf0796d2367699bc000abb326360d5
l2/l2_data_manager_new.py
@@ -18,7 +18,7 @@
from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin
from l2 import safe_count_manager, l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress, cancel_buy_strategy
    transaction_progress, cancel_buy_strategy, l2_data_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \
    LCancelBigNumComputer
from l2.l2_data_manager import L2DataException
@@ -294,47 +294,32 @@
    # 处理华鑫L2数据
    @classmethod
    def process_huaxin(cls, code, origin_datas):
        print("process_huaxin", code, len(origin_datas))
        datas = None
        origin_start_time = round(t.time() * 1000)
        try:
            l2_data_log.l2_time_log(code, "开始加载历史数据")
            # 加载历史的L2数据
            is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False)
            if not is_normal:
                print("历史数据异常:", code)
                # 数据不正常需要禁止交易
                l2_trade_util.forbidden_trade(code, msg="L2历史数据异常")
            origin_start_time = round(t.time() * 1000)
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            l2_data_log.l2_time_log(code, "开始格式化原始数据")
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            __start_time = round(t.time() * 1000)
            l2_data_log.l2_time_log(code, "开始处理数据")
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
            else:
                pass
                # lp = LineProfiler()
                # lp.enable()
                # lp_wrap = lp(cls.process_add_datas)
                # lp_wrap(code, datas, 0, __start_time)
                # output = io.StringIO()
                # lp.print_stats(stream=output)
                # lp.disable()
                # with open(f"/home/logs/profile/{code}_{datas[0]['index']}_{datas[-1]['index']}.txt", 'w') as f:
                #     f.write(output.getvalue())
            # lp.dump_stats(f"/home/logs/profile/{code}_{round(t.time() * 1000)}.txt")
        except Exception as e:
            async_log_util.error(logger_l2_error,f"code:{code}")
            async_log_util.exception(logger_l2_error, e)
        finally:
            # l2_data_log.l2_time(code, round(t.time() * 1000) - origin_start_time,
            #                     "l2数据处理总耗时",
            #                     True)
            if datas:
                l2_data_log.l2_time_log(code, "开始保存数据")
                l2.l2_data_util.save_l2_data(code, None, datas)
    @classmethod
@@ -402,21 +387,7 @@
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    if True:  # len(add_datas) < 10:
                        cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
                    else:
                        pass
                        # lp = LineProfiler()
                        # lp.enable()
                        # lp_wrap = lp(cls.__process_order)
                        # lp_wrap(code, start_index, end_index, capture_timestamp, is_first_code)
                        # output = io.StringIO()
                        # lp.print_stats(stream=output)
                        # lp.disable()
                        # with open(
                        #         f"/home/logs/profile/{code}_process_order_{add_datas[0]['index']}_{add_datas[-1]['index']}.txt",
                        #         'w') as f:
                        #     f.write(output.getvalue())
                else:
                    # 未挂单,时间相差不大才能挂单
                    if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time):
@@ -426,8 +397,6 @@
                                add_datas[0]["index"],
                                add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                capture_timestamp)
            # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
            #                                    "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -484,7 +453,8 @@
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
@@ -508,7 +478,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
@@ -527,7 +498,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "L撤销比例触发阈值"
            except Exception as e:
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算")
@@ -975,7 +947,8 @@
        unique_key = f"{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys.get(code) == unique_key:
            async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            async_log_util.error(logger_l2_error,
                                 f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            return
        cls.__latest_process_not_order_unique_keys[code] = unique_key