Administrator
2022-12-05 8218790ab15e752d982ee9c0df156ceea849c9a9
策略优化,增加买1价格的影响
7个文件已修改
229 ■■■■ 已修改文件
l2_data_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 111 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py
@@ -25,7 +25,7 @@
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
from trade_data_manager import TradeBuyDataManager
import trade_data_manager
import limit_up_time_manager
_redisManager = redis_manager.RedisManager(1)
@@ -888,7 +888,7 @@
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            TradeBuyDataManager.remove_buy_position_info(code)
            trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
@@ -1047,7 +1047,7 @@
    # 获取预估挂买位
    @classmethod
    def __get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
l2_data_manager_new.py
@@ -17,7 +17,9 @@
import redis_manager
import ths_industry_util
import tool
import trade_data_manager
import trade_manager
import trade_queue_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn
@@ -151,6 +153,7 @@
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    @classmethod
    def debug(cls, code, content, *args):
@@ -280,31 +283,70 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            if not reason.startswith("买1价不为涨停价"):
                # 中断买入
                trade_manager.break_buy(code, reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
            try:
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
                cls.debug(code, "执行买入异常:{}", str(e))
                pass
            finally:
                cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以取消
    @classmethod
    def __can_cancel(cls, code):
        # 14点后如果是板块老大就不需要取消了
        now_time_str = tool.get_now_time_str()
        if int(now_time_str.replace(":", "")) >= 140000:
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
            codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
            if codes_index is not None and codes_index.get(code) is not None:
                # 同一板块中老二后面的不能买
                if codes_index.get(code) == 0:
                    return False, "14:00后老大不能撤单"
                elif codes_index.get(code) == 1:
                    # 判断老大是否都是09:30:00涨停的
                    # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
                    first_count = 0
                    for key in codes_index:
                        if codes_index[key] == 0:
                            first_count += 1
                            if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
                                first_count -= 1
                    if first_count == 0:
                        return False, "14:00后老大都开盘涨停,老二不能撤单"
        return True, ""
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        # 买1价格必须为涨停价才能买
        # buy1_price = cls.buy1PriceManager.get_price(code)
        # if buy1_price is None:
        #     return False, "买1价尚未获取到"
        # limit_up_price = gpcode_manager.get_limit_up_price(code)
        # if limit_up_price is None:
        #     return False, "尚未获取到涨停价"
        # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
        #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
        # 量比超过1.3的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
@@ -389,6 +431,12 @@
    @classmethod
    def cancel_buy(cls, code, msg=None):
        can_cancel, reason = cls.__can_cancel(code)
        if not can_cancel:
            # 不能取消
            cls.cancel_debug(code, "撤单中断,原因:{}", reason)
            return
        l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
@@ -580,7 +628,8 @@
            count = threshold_count - sub_threshold_count
            if count < 3:
                count = 3
            return count
            return round(count*buy1_factor)
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # 计算从买入信号开始到计算开始位置的大单数量
@@ -591,10 +640,22 @@
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        buy1_price = cls.buy1PriceManager.get_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = threshold_money / (limit_up_price * 100)
        buy1_factor = 1
        # 获取买1是否为涨停价
        if buy1_price is None:
            buy1_factor = 1.3
        elif limit_up_price is None:
            buy1_factor = 1.3
        elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
            print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
            buy1_factor = 1.3
        # 目标订单数量
        threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
@@ -616,14 +677,15 @@
                            return None, buy_nums, buy_count, ii
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price,_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count += int(total_datas[i]["re"])
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code, i, buy_nums,
                                             threshold_num, buy_count, get_threshold_count(),sub_threshold_count)
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code, i,
                                             buy_nums,
                                             threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count -= int(total_datas[i]["re"])
@@ -655,9 +717,10 @@
            if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                return i, buy_nums, buy_count, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}", compute_start_index,
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(),sub_threshold_count)
                      threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
        return None, buy_nums, buy_count, None
log.py
@@ -2,10 +2,13 @@
日志
"""
import datetime
import json
import os
import sys
from loguru import logger
import tool
class MyLogger:
@@ -64,7 +67,12 @@
        logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_1_volumn"), filter=lambda record: record["extra"].get("name") == "buy_1_volumn",
        logger.add(self.get_path("ths", "buy_1_volumn"),
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_1_volumn_record"),
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
@@ -94,6 +102,8 @@
logger_system = __mylogger.get_logger("system")
logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn")
logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record")
class LogUtil:
@@ -152,10 +162,28 @@
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    LogUtil.extract_log_from_key("002383", "D:/logs/gp/l2/l2_process_time.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_process_time{}.{}.log".format("002383", date))
    LogUtil.extract_log_from_key("000666", "D:/logs/gp/ths/buy_1_volumn_record.{}.log".format(date),
                                 "D:/logs/gp/ths/buy_1_volumn_record{}.{}.log".format("000666", date))
    # __analyse_pricess_time()
    # with open("D:\\logs\\gp\\ths\\buy_1_volumn_record002911.2022-12-01.log",encoding="utf-8") as f:
    #     line = "1"
    #     while line:
    #         line = f.readline()
    #         line = (line.split("-")[-1].replace("'","\""))
    #         data = json.loads(line)
    #         print(compute_space_time(data["time"]),data["volumn"])
    #
    # print( compute_space_time("10:00:06"))
server.py
@@ -30,8 +30,8 @@
import l2_code_operate
from code_data_util import ZYLTGBUtil
from log import logger_l2_error, logger_device, logger_trade_delegate
from trade_queue_manager import THSBuy1VolumnManager
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager
class MyTCPServer(socketserver.TCPServer):
@@ -49,6 +49,8 @@
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    latest_buy1_volumn_dict={}
    buy1_price_manager = Buy1PriceManager()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -250,11 +252,15 @@
                            global_data_loader.load_name_codes()
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 记录日志
                            if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn,price):
                                # 记录数据
                                logger_buy_1_volumn_record.info("{}-{}",code,data)
                            self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn,price)
                            # 保存买1价格
                            self.buy1_price_manager.save(code,price)
                            # 校正时间
                            seconds = tool.get_time_as_second(time_)
                            if seconds % 3 > 0:
                                seconds = seconds - seconds % 3
                            time_ = tool.time_seconds_format(seconds)
                            time_ = tool.compute_buy1_real_time(time_)
                            # 保存数据
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
tool.py
@@ -34,6 +34,16 @@
    return date
def get_now_date_str():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    return date
def get_now_time_str():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return time_str
# 转为价格,四舍五入保留2位小数
def to_price(_decimal):
    return _decimal.quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)
@@ -54,6 +64,7 @@
        return True
    else:
        return False
# 是否为报警时间
def is_alert_time():
@@ -81,7 +92,6 @@
        return False
def is_set_code_time():
    # 测试
    if constant.TEST:
@@ -96,6 +106,7 @@
        return True
    else:
        return False
def run_time():
    def decorator(func):
@@ -135,6 +146,12 @@
    return time_1 - time_2
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return time_seconds_format(s - 2 - cha)
if __name__ == "__main__":
    print(trade_time_sub("11:29:59", "13:00:00"))
    print(trade_time_sub("11:29:59", "14:00:00"))
trade_manager.py
@@ -35,8 +35,6 @@
# 买成功
TRADE_STATE_BUY_SUCCESS = 12
guiTrade = THSGuiTrade()
@@ -159,7 +157,6 @@
        return json.loads(result), time_str
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    # 是否禁止交易
@@ -184,6 +181,11 @@
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
# 中断买入
def break_buy(code, reason):
    TradeBuyDataManager.remove_buy_position_info(code)
# 购买
@tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
trade_queue_manager.py
@@ -169,5 +169,36 @@
        return time_str, volumn
# 买1实时价格管理器
class Buy1PriceManager:
    __redisManager = redis_manager.RedisManager(0)
    def __init__(self):
        self.latest_prices = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __save_recod(self, code, price):
        # 保存每一次的
        key = "buy1_price-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), price)
    def __get_record(self, code):
        key = "buy1_price-{}".format(code)
        val = self.__get_redis().get(key)
        return val
    # 保存数据
    def save(self, code, price):
        if self.latest_prices.get(code) == price:
            return
        self.latest_prices[code] = price
        self.__save_recod(code, price)
    def get_price(self, code):
        return self.__get_record(code)
if __name__ == '__main__':
    JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12)
    print( Buy1PriceManager().get_price("002644"))