Administrator
2023-09-11 324a5e115559d5432f04146b4e788d5bcb2237e0
数据编号重复处理
7个文件已修改
61 ■■■■■ 已修改文件
huaxin_client/l2_data_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_export_util.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -13,7 +13,7 @@
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export
from log_module import log_export, async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug
@@ -197,8 +197,7 @@
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
                        logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms")
                        async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms")
                time.sleep(0.01)
            except Exception as e:
l2/l2_data_manager_new.py
@@ -312,13 +312,17 @@
                _start_index = total_datas[-1]["index"] + 1
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            # 获取下单位置
            place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
            if place_order_index:
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                    code)
                cls.set_real_place_order_index(code, place_order_index, buy_single_index)
                async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
            try:
                # 获取下单位置
                place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
                if place_order_index:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                        code)
                    cls.set_real_place_order_index(code, place_order_index, buy_single_index)
                    async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
            except:
                logger_l2_error.exception(f"{code} 处理真实下单位置出错")
            __start_time = round(t.time() * 1000)
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
l2/l2_data_util.py
@@ -15,7 +15,7 @@
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_log, l2_data_source_util
from log_module import log, log_export
from log_module import log, log_export, async_log_util
from db import redis_manager_delegate as redis_manager
from utils import tool
@@ -152,13 +152,14 @@
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(),
                                   json.dumps(datas))
            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
            set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
            async_log_util.info(log.logger_l2_data, "{}-{}", code, add_datas)
        except Exception as e:
            logging.exception(e)
        # 暂时不将数据保存到redis
log_module/log_export.py
@@ -82,8 +82,9 @@
                data = f.readline()
                if not data:
                    break
                index = data.find('save_l2_data:')
                index = data.find('-', index)
                index = data.find(' - ') + 2
                if data.find('async_log_util') > 0:
                    index = data.find(']', index) + 1
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
@@ -93,8 +94,8 @@
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            news = sorted(today_data[key], key=lambda x: x["index"])
            today_data[key] = news
            # news = sorted(today_data[key], key=lambda x: x["index"])
            # today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
trade/huaxin/trade_server.py
@@ -286,8 +286,13 @@
        now_timestamp = int(time.time() * 1000)
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
        l2_log.threadIds[code] = random.randint(0, 100000)
        l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        async_log_util.info(hx_logger_l2_upload, f"{code}数据处理开始:{thread_id}")
        try:
            l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
        finally:
            async_log_util.info(hx_logger_l2_upload, f"{code}数据处理结束:{thread_id}")
    @classmethod
    def l2_transaction(cls, code, datas):
trade/trade_manager.py
@@ -562,7 +562,8 @@
    if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
        l2_data_manager.remove_from_l2_fixed_codes(code)
    async_log_util.info(logger_trade, "{}撤单成功".format(code))
    kp_client_msg_manager.add_msg(code, "撤单成功")
    # 暂时注释掉,有可能拖慢进程
    # kp_client_msg_manager.add_msg(code, "撤单成功")
# 处理交易成功数据
utils/data_export_util.py
@@ -2,6 +2,7 @@
数据导出工具
"""
import json
import logging
import os
import time
@@ -147,8 +148,11 @@
                                                                                                     num_operate_map[
                                                                                                         code])
                    if buy_index == data["index"]:
                        ws.write(index, 8, "{}-{}".format(d["index"], d["val"]["time"]), cancel_style)
                        break
                        try:
                            ws.write(index, 8, "{}-{}".format(d["index"], d["val"]["time"]), cancel_style)
                            break
                        except Exception as e:
                            logging.exception(e)
        ws.write(index, 3, "{}万".format(round(int(data["val"]["num"]) * float(data["val"]["price"]) / 100, 2)), style)
    wb.save(file_name)
@@ -206,4 +210,4 @@
if __name__ == "__main__":
    export_l2_excel("002178")
    export_l2_excel("002207")