admin
2025-03-26 8c51895ecab1de9c6faf664d389168473951f3bd
日志修改
3个文件已修改
302 ■■■■ 已修改文件
data_server.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 232 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/kpl_api.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py
@@ -11,7 +11,7 @@
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from db.redis_manager_delegate import RedisUtils
from log_module.log import hx_logger_l2_transaction, logger_debug
from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api
from strategy import data_cache
from strategy.trade_setting import TradeSetting
from trade import huaxin_trade_api, huaxin_trade_data_update
@@ -219,6 +219,7 @@
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                print("买入", params)
                logger_request_api.info(f"买入:{params}")
                # 买入
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
@@ -243,23 +244,26 @@
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("卖出", params)
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
                price = params.get("price")
                if not price:
                    # 没有上传价格,就需要获取最近的价格进行买入
                    data = data_cache.latest_code_market_info_dict.get(code)
                    if not data:
                        raise Exception("没有获取到L1数据")
                    pre_price = data[1]
                    current_price = data[2] if data[2] else data[5][0][0]
                    price = tool.get_buy_min_price(current_price)
                    price = max(price, tool.get_limit_down_price(code, pre_price))
                else:
                    price = round(params.get("price"), 2)  # 价格
                result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                result_str = json.dumps(result)
                try:
                    print("卖出", params)
                    code = params.get("code")  # 代码
                    volume = params.get("volume")  # 量
                    price = params.get("price")
                    if not price:
                        # 没有上传价格,就需要获取最近的价格进行买入
                        data = data_cache.latest_code_market_info_dict.get(code)
                        if not data:
                            raise Exception("没有获取到L1数据")
                        pre_price = data[1]
                        current_price = data[2] if data[2] else data[5][0][0]
                        price = tool.get_buy_min_price(current_price)
                        price = max(price, tool.get_limit_down_price(code, pre_price))
                    else:
                        price = round(params.get("price"), 2)  # 价格
                    result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                    result_str = json.dumps(result)
                finally:
                    logger_request_api.info(f"卖出:{params}")
            elif url.path == "/set_buy_money":
                # 设置每次买入的金额
log_module/log_export.py
@@ -23,86 +23,6 @@
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
                                 "{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def __analyse_pricess_time():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    file_path = f"{constant.get_path_prefix()}/sell_logs/gp/l2/l2_process.{date}.log"
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                printlog(line)
            line = f.readline()
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "{}/sell_logs/gp/l2/{}".format(constant.get_path_prefix(), code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log(date=None):
    today_data = {}
    if date is None:
        date = tool.get_now_date_str()
    try:
        with open("{}/sell_logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
            while True:
                data = f.readline()
                if not data:
                    break
                index = data.find(' - ') + 2
                if data.find('async_log_util') > 0:
                    index = data.find(']', index) + 1
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
                dict_ = eval(data)
                if code not in today_data:
                    today_data[code] = dict_
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            # news = sorted(today_data[key], key=lambda x: x["index"])
            # today_data[key] = news
            printlog(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
@@ -113,71 +33,6 @@
    line = line.split(" - ")[1]
    time_str = line[line.find("[") + 1:line.find("[") + 9]
    return time_str
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    try:
                        pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
                    except Exception as e:
                        logging.exception(e)
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            # printlog(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # printlog("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # printlog("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单,撤单位置:") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # printlog("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
__log_file_contents = {}
@@ -198,92 +53,29 @@
    return contents
# 加载l2订单成交数据
def load_huaxin_deal_record(code, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_desc.{date}.log"
    # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)]
    fdatas = []
    lines = __load_file_content(path)
    for line in lines:
        data_index = line.find(f"{code}#")
        if data_index > 0:
            line = line.split(" - ")[1]
            time_str = line[line.find("[") + 1:line.find("[") + 9]
            data = line[line.find("]") + 1:].strip()
            code = data.split("#")[0]
            data = data.split("#")[1]
            data = eval(data)
            fdatas.append(data)
    return fdatas
# 加载华鑫成交的卖单
def load_huaxin_transaction_sell_no(code=None, date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_sell_order.{date}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    data = line.split(" - ")[1].strip()
                    if data.startswith("["):
                        data = data[data.find("]") + 1:].strip()
                    data = data.split("code=")[1]
                    code_ = data[:6]
                    if code and code != code_:
                        continue
                    data = data[6:].strip()
                    if code_ not in fdatas:
                        fdatas[code_] = []
                    fdatas[code_].append(eval(data))
    return fdatas
# 读取系统日志
def load_system_log():
    path = f"{constant.get_path_prefix()}/sell_logs/gp/system/system.{tool.get_now_date_str()}.log"
def load_stock_of_markets_plate(date=tool.get_now_date_str()):
    """
     获取精选流入的成分股
    :param date:
    :return:
    """
    path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate.{date}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    try:
                        time_str = line.split("|")[0].strip()
                        level = line.split("|")[1].strip()
                        if level != "INFO" and level != "ERROR":
                            continue
                        data = line.split("|")[2].split(" - ")[1].strip()
                        fdatas.append((time_str, level, data))
                    except:
                        pass
    return fdatas
# 读取系统日志
def load_huaxin_transaction_map(date=tool.get_now_date_str()):
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction.{date}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    time_str = __get_async_log_time(line)
                    try:
                        data = line.split(" - ")[1].strip()
                        if data.startswith("["):
                            data = data[data.find("]") + 1:].strip()
                        code = data.split("#")[0]
                        l2_data = eval(data.split("#")[1])
                        if code not in fdatas:
                            fdatas[code] = []
                        fdatas[code].append(l2_data)
                        data_dict = eval(data)
                        fdatas.append((time_str, data_dict))
                    except:
                        pass
    return fdatas
if __name__ == '__main__':
    load_stock_of_markets_plate()
strategy/kpl_api.py
@@ -227,6 +227,7 @@
    @dask.delayed
    def request_plate_codes(i):
        plate_name = i[1]
        log_data = None
        # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name
        if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌',
                               '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name):
@@ -236,14 +237,7 @@
            # print(f"======={i[0]}=======")
            its_stock = json.loads(getCodesByPlate(i[0]))
            # 只在盘中时间获取
            now_time = tool.get_now_time_str()
            if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
                log_data = {plate_name: its_stock['list']}
                async_log_util.info(logger_stock_of_markets_plate, f"{log_data}")
                # logger.info(f"its_stock['list']  ===》》  {log_data}")
                # logger.info(f"its_stock['list'][0]  ===  {its_stock['list'][0]}")
            # its_stock_list_info = its_stock['list']
            # logger.info(f"its_stock_list_info==={its_stock_list_info}")
@@ -267,7 +261,8 @@
            stock_power_list = []
            for s in its_stock['list']:
                # 过滤掉涨幅大于  and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股    and s[9] > 0.0025
                if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
                if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find(
                        "板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
                    # print(f"{s[1]},个股代码:{s[0]},   涨幅:{s[6]}%   涨速:{s[9]}%   概念:{s[4]}   主力资金推测:{s[2]}   领涨次数:{s[40]}  今日第几板:{s[23]} 是否破版{s[24]}")
                    # 对个股强度 主要 属性列表进行装填
                    its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
@@ -282,6 +277,7 @@
                stock_power_item = {i[1]: stock_power_list}
                # 并更新到精选板块个股字典中
                market_sift_plate_stock_dict.update(stock_power_item)
        return log_data
    data = (getMarketJingXuanRealRankingInfo())
    market_sift_plate = json.loads(data)
@@ -297,10 +293,19 @@
        for d in market_sift_plate['list']:
            ds.append(request_plate_codes(d))
        dask_result = batch_get_plate_codes(ds)
        dask_result.compute()
        compute_results = dask_result.compute()
        log_datas={}
        for r in compute_results:
            if not r:
                continue
            for b in r:
                log_datas[b] = r[b]
        now_time = tool.get_now_time_str()
        if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
            logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
            # 只在盘中时间获取
            async_log_util.info(logger_stock_of_markets_plate, f"{(market_sift_plate['list'],log_datas)}")
    return market_sift_plate_stock_dict
@@ -778,7 +783,9 @@
                        if data_cache.BUY_MONEY_PER_CODE < 0:
                            # 根据账户可用金额 计算今日计划下单金额
                            #  ((大盘综合强度分数 + 大盘指数情绪预期分数) * 0.01) * (账户可用金额 * 极端低迷情绪比例 / 今日最大新增持仓票数)
                            data_cache.today_planned_order_amount = ((data_cache.real_time_market_strong + index_trend_expectation_score) * 0.01) * (usefulMoney * low_emotion_mood_ratio / Unfinished_opening_plan_number)
                            data_cache.today_planned_order_amount = ((
                                                                                 data_cache.real_time_market_strong + index_trend_expectation_score) * 0.01) * (
                                                                                usefulMoney * low_emotion_mood_ratio / Unfinished_opening_plan_number)
                        else:
                            data_cache.today_planned_order_amount = data_cache.BUY_MONEY_PER_CODE
@@ -799,4 +806,5 @@
    # start_time = time.time()
    # get_market_sift_plate_its_stock_power()
    # print("耗时:", time.time() - start_time)
    get_market_sift_plate_its_stock_power_process(None)
    get_market_sift_plate_its_stock_power()
    # get_market_sift_plate_its_stock_power_process(None)