Administrator
2023-08-02 b069d586b7ed4fbefd9bac22ae796f185962e7fe
日志优化
8个文件已修改
2个文件已添加
694 ■■■■ 已修改文件
gui.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 320 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 319 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/kp_client_msg_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.txt 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_export_util.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py
@@ -13,7 +13,7 @@
from utils import data_export_util
import multiprocessing
from log_module import log
from log_module import log, log_export
from db import mysql_data, redis_manager
import server
from config import settings
@@ -1011,7 +1011,7 @@
        btn = FlatButton(frame, text="获取m值", command=lambda: compute_m(code.get()))
        btn.place(x=10, y=130)
        btn = FlatButton(frame, text="导出交易日志", command=lambda: log.export_l2_log(code.get()))
        btn = FlatButton(frame, text="导出交易日志", command=lambda: log_export.export_l2_log(code.get()))
        btn.place(x=80, y=130)
        btn = FlatButton(frame, text="清空l2数据", command=lambda: clear_l2(code.get()))
l2/l2_data_util.py
@@ -15,7 +15,7 @@
from code_attribute import gpcode_manager
from db.redis_manager import RedisUtils
from l2 import l2_data_log, l2_data_source_util
from log_module import log
from log_module import log, log_export
from db import redis_manager
from utils import tool
@@ -48,7 +48,7 @@
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log.load_l2_from_log()
        datas = log_export.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas = []
log_module/log.py
@@ -1,16 +1,9 @@
"""
日志
"""
import datetime
import os
import shutil
import sys
from loguru import logger
import constant
from code_attribute import gpcode_manager
from utils import tool
class MyLogger:
@@ -286,315 +279,4 @@
hx_logger_contact_debug = __mylogger.get_logger("hx_contact_debug")
hx_logger_trade_callback = __mylogger.get_logger("hx_trade_callback")
hx_logger_trade_debug = __mylogger.get_logger("hx_trade_debug")
hx_logger_trade_loop = __mylogger.get_logger("hx_trade_loop")
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
                                 "{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def __analyse_pricess_time():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    file_path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_process.{date}.log"
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                print(line)
            line = f.readline()
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "{}/logs/gp/l2/{}".format(constant.get_path_prefix(), code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log(date=None):
    today_data = {}
    if date is None:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    try:
        with open("{}/logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
            while True:
                data = f.readline()
                if not data:
                    break
                index = data.find('save_l2_data:')
                index = data.find('-', index)
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
                dict_ = eval(data)
                if code not in today_data:
                    today_data[code] = dict_
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            news = sorted(today_data[key], key=lambda x: x["index"])
            today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            print(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
# 获取交易进度
def get_trade_progress(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    index_list = []
    buy_queues = []
    with open("{}/logs/gp/l2/l2_trade_buy_queue.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            time_ = __get_log_time(line).strip()
            if int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find(f"{code}-[") >= 0:
                buy_queues.append((eval(line.split(f"{code}-")[1]), time_))
            if line.find("获取成交位置成功: code-{}".format(code)) < 0:
                continue
            try:
                index = int(line.split("index-")[1].split(" ")[0])
                index_list.append((index, time_))
            except:
                pass
    return index_list, buy_queues
# 获取H级撤单计算结果
def get_h_cancel_compute_info(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/l2/cancel/h_cancel.{date}.log"
    latest_info = None
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                if line.find(f"code-{code}") < 0:
                    continue
                if line.find(f"H级撤单计算结果") < 0:
                    continue
                target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
                cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
                total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
                latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
    return latest_info
# 读取看盘消息
def get_kp_msg_list(date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/kp/kp_msg.{date}.log"
    msg_list = []
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                msg_list.append(line)
    return msg_list
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    target_dir = f"{constant.get_path_prefix()}/logs/gp/l2/export/{code}_{code_name}_{date}"
    if os.path.exists(target_dir):
        shutil.rmtree(target_dir)
    os.makedirs(target_dir)
    log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy",
                 "l2_trade_buy_progress", "cancel/h_cancel"]
    # 导出交易日志
    for log_name in log_names:
        key = f"code={code}"
        if log_name == "l2_process" or log_name == "l2_process_time" or log_name == "cancel/h_cancel" or log_name == "l2_trade_buy_progress":
            key = code
        target_path = f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log"
        # 创建文件夹
        dir_path = "/".join(target_path.split("/")[:-1])
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        LogUtil.extract_log_from_key(key, f"{constant.get_path_prefix()}/logs/gp/l2/{log_name}.{date}.log",
                                     target_path)
def export_trade_progress(code):
    path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_trade_buy_progress.{tool.get_now_date_str()}.log"
    index_set = set()
    with open(path, mode='r', encoding="utf-8") as f:
        lines = f.readlines()
        for line in lines:
            if line.find(f"code-{code}") > -1 and line.find("确定交易进度成功") > -1:
                index = line.split("index-")[1].split(" ")[0]
                index_set.add(int(index))
    results = list(index_set)
    results.sort()
    return results
# 加载买入得分记录
def load_buy_score_recod(code):
    path = f"{constant.get_path_prefix()}/logs/gp/trade/trade_record.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data_index = line.find(f"code={code}")
                if data_index > 0:
                    time_str = line[11:19]
                    data = line[line.find("data=") + 5:]
                    type = line[line.find("type=") + 5:line.find(" ", line.find("type="))]
                    fdatas.append((time_str, type, eval("{" + data + "}")))
    return fdatas
def load_kpl_reason_changes():
    path = f"{constant.get_path_prefix()}/logs/gp/kpl/kpl_limit_up_reason_change.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data = line[line.find("code-") + 5:]
                code = data.split(":")[0]
                from_r = data.split(":")[1].split("-")[0]
                to_r = eval(data.split(":")[1].split("-")[1])
                fdatas.append((code, from_r, to_r))
    return fdatas
if __name__ == '__main__':
    logger_l2_process_time.info("test123")
    # print(get_h_cancel_compute_info("603912"))
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    # logger_buy_score.info("测试")
    # codes = ["603083"]
    # for code in codes:
    #     export_logs(code)
    # parse_l2_data()
hx_logger_trade_loop = __mylogger.get_logger("hx_trade_loop")
log_module/log_export.py
New file
@@ -0,0 +1,319 @@
import datetime
import os
import shutil
import constant
from code_attribute import gpcode_manager
from log_module.log import logger_l2_process_time
from utils import tool
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
                                 "{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def __analyse_pricess_time():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    file_path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_process.{date}.log"
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                print(line)
            line = f.readline()
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "{}/logs/gp/l2/{}".format(constant.get_path_prefix(), code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log(date=None):
    today_data = {}
    if date is None:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    try:
        with open("{}/logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
            while True:
                data = f.readline()
                if not data:
                    break
                index = data.find('save_l2_data:')
                index = data.find('-', index)
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
                dict_ = eval(data)
                if code not in today_data:
                    today_data[code] = dict_
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            news = sorted(today_data[key], key=lambda x: x["index"])
            today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            print(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
# 获取交易进度
def get_trade_progress(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    index_list = []
    buy_queues = []
    with open("{}/logs/gp/l2/l2_trade_buy_queue.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            time_ = __get_log_time(line).strip()
            if int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find(f"{code}-[") >= 0:
                buy_queues.append((eval(line.split(f"{code}-")[1]), time_))
            if line.find("获取成交位置成功: code-{}".format(code)) < 0:
                continue
            try:
                index = int(line.split("index-")[1].split(" ")[0])
                index_list.append((index, time_))
            except:
                pass
    return index_list, buy_queues
# 获取H级撤单计算结果
def get_h_cancel_compute_info(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/l2/cancel/h_cancel.{date}.log"
    latest_info = None
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                if line.find(f"code-{code}") < 0:
                    continue
                if line.find(f"H级撤单计算结果") < 0:
                    continue
                target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
                cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
                total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
                latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
    return latest_info
# 读取看盘消息
def get_kp_msg_list(date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/kp/kp_msg.{date}.log"
    msg_list = []
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                msg_list.append(line)
    return msg_list
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    target_dir = f"{constant.get_path_prefix()}/logs/gp/l2/export/{code}_{code_name}_{date}"
    if os.path.exists(target_dir):
        shutil.rmtree(target_dir)
    os.makedirs(target_dir)
    log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy",
                 "l2_trade_buy_progress", "cancel/h_cancel"]
    # 导出交易日志
    for log_name in log_names:
        key = f"code={code}"
        if log_name == "l2_process" or log_name == "l2_process_time" or log_name == "cancel/h_cancel" or log_name == "l2_trade_buy_progress":
            key = code
        target_path = f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log"
        # 创建文件夹
        dir_path = "/".join(target_path.split("/")[:-1])
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        LogUtil.extract_log_from_key(key, f"{constant.get_path_prefix()}/logs/gp/l2/{log_name}.{date}.log",
                                     target_path)
def export_trade_progress(code):
    path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_trade_buy_progress.{tool.get_now_date_str()}.log"
    index_set = set()
    with open(path, mode='r', encoding="utf-8") as f:
        lines = f.readlines()
        for line in lines:
            if line.find(f"code-{code}") > -1 and line.find("确定交易进度成功") > -1:
                index = line.split("index-")[1].split(" ")[0]
                index_set.add(int(index))
    results = list(index_set)
    results.sort()
    return results
# 加载买入得分记录
def load_buy_score_recod(code):
    path = f"{constant.get_path_prefix()}/logs/gp/trade/trade_record.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data_index = line.find(f"code={code}")
                if data_index > 0:
                    time_str = line[11:19]
                    data = line[line.find("data=") + 5:]
                    type = line[line.find("type=") + 5:line.find(" ", line.find("type="))]
                    fdatas.append((time_str, type, eval("{" + data + "}")))
    return fdatas
def load_kpl_reason_changes():
    path = f"{constant.get_path_prefix()}/logs/gp/kpl/kpl_limit_up_reason_change.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data = line[line.find("code-") + 5:]
                code = data.split(":")[0]
                from_r = data.split(":")[1].split("-")[0]
                to_r = eval(data.split(":")[1].split("-")[1])
                fdatas.append((code, from_r, to_r))
    return fdatas
if __name__ == '__main__':
    logger_l2_process_time.info("test123")
    # print(get_h_cancel_compute_info("603912"))
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    # logger_buy_score.info("测试")
    # codes = ["603083"]
    # for code in codes:
    #     export_logs(code)
    # parse_l2_data()
output/code_info_output.py
@@ -12,7 +12,7 @@
from code_attribute import code_volumn_manager, limit_up_time_manager, global_data_loader, gpcode_manager
import constant
from utils import global_util, tool
from log_module import log
from log_module import log, log_export
from l2 import l2_data_manager, l2_data_util, transaction_progress, l2_data_manager_new, code_price_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer
import l2.l2_data_manager_new
@@ -367,7 +367,7 @@
        if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
            # 根据日志读取实时的计算数据
            h_cancel_latest_compute_info = log.get_h_cancel_compute_info(code)
            h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code)
            if hcancel_datas_dict:
                temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict]
                canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set])
@@ -501,7 +501,7 @@
    break_time = limit_up_info[1]
    records = []
    try:
        records = log.load_buy_score_recod(code)
        records = log_export.load_buy_score_recod(code)
    except:
        pass
    records_new = []
output/kp_client_msg_manager.py
@@ -7,7 +7,7 @@
import time
from code_attribute import gpcode_manager
from log_module import log
from log_module import log, log_export
from db.redis_manager import RedisManager, RedisUtils
from log_module.log import logger_kp_msg
@@ -64,7 +64,7 @@
# 读取本地消息列表
def list_msg_from_local():
    return log.get_kp_msg_list()
    return log_export.get_kp_msg_list()
# 运行采集
test/l2_trade_test.py
@@ -10,7 +10,7 @@
from code_attribute import big_money_num_manager, gpcode_manager
from db.redis_manager import RedisUtils
from log_module import log
from log_module import log, log_export
from utils import tool
from db import redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress
@@ -100,7 +100,7 @@
                data["index"] = i
                total_datas.insert(i, data)
        pos_list = log.get_l2_process_position(code)
        pos_list = log_export.get_l2_process_position(code)
        # pos_list.insert(108,(375,448))
        if pos_list[0][0] > 0:
            pos_list.insert(0, (0, pos_list[0][0] - 1))
@@ -130,7 +130,7 @@
        # pos_list.insert(84, (516, 532))
        # 获取交易进度
        trade_progress_list, buy_queues = log.get_trade_progress(code)
        trade_progress_list, buy_queues = log_export.get_trade_progress(code)
        jingxuan_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.JINGXUAN_RANK, tool.get_now_date_str())
        industry_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.INDUSTRY_RANK, tool.get_now_date_str())
        RealTimeKplMarketData().set_top_5_reasons(jingxuan_ranks)
test/test.txt
New file
@@ -0,0 +1,13 @@
Timer unit: 1e-07 s
Total time: 2.01373 s
File: D:/workspace/trade/test/test_profile.py
Function: test1 at line 7
Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     7                                           def test1(args):
     8         1        114.0    114.0      0.0      print(args)
     9         1   20137134.0 20137134.0    100.0      time.sleep(2)
    10         1         22.0     22.0      0.0      return 1
third_data/data_server.py
@@ -7,7 +7,7 @@
from utils import global_util, tool
from code_attribute import gpcode_manager
from log_module import log, log_analyse
from log_module import log, log_analyse, log_export
from l2 import code_price_manager, l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from output.limit_up_data_filter import IgnoreCodeManager
@@ -85,7 +85,7 @@
        total_datas.reverse()
        # 获取涨停原因变化记录
        reason_changes = log.load_kpl_reason_changes()
        reason_changes = log_export.load_kpl_reason_changes()
        reason_changes.reverse()
        reason_changes_dict = {}
        for r in reason_changes:
@@ -395,7 +395,7 @@
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
                    # 根据日志读取实时的计算数据
                    h_cancel_latest_compute_info = log.get_h_cancel_compute_info(code)
                    h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code)
                    if hcancel_datas_dict:
                        temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict]
                        canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set])
utils/data_export_util.py
@@ -10,18 +10,18 @@
import constant
from code_attribute import gpcode_manager
import l2.l2_data_util
from log_module import log
from log_module import log, log_export
from l2 import l2_data_source_util
from trade import deal_big_money_manager
def export_l2_excel(code, date=None):
    # 获取L2的数据
    local_today_datas = log.load_l2_from_log(date)
    local_today_datas = log_export.load_l2_from_log(date)
    datas = local_today_datas[code]
    # 获取L2处理位置信息
    process_indexs = log.get_l2_process_position(code, date)
    trade_indexs = log.get_l2_trade_position(code, date)
    process_indexs = log_export.get_l2_process_position(code, date)
    trade_indexs = log_export.get_l2_trade_position(code, date)
    export_l2_data(code, datas, process_indexs, trade_indexs)
@@ -197,8 +197,8 @@
def test(code):
    progresses = log.export_trade_progress(code)
    local_today_datas = log.load_l2_from_log("2023-04-04")
    progresses = log_export.export_trade_progress(code)
    local_today_datas = log_export.load_l2_from_log("2023-04-04")
    datas = local_today_datas[code]
    num_operate_map = {}
    l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas)