From fb47d36048e94b9a506d5c153e3dd19a01e37df1 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 30 十月 2023 16:30:27 +0800 Subject: [PATCH] bug修复 --- l2/l2_data_manager_new.py | 1269 +++++++++++++++++++++++++++++++++----------------------- 1 files changed, 744 insertions(+), 525 deletions(-) diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index 8c708e8..943967d 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -1,38 +1,34 @@ -import io import logging +import random import time as t from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \ - limit_up_time_manager, global_data_loader, gpcode_manager + limit_up_time_manager, global_data_loader, gpcode_manager, code_nature_analyse import constant -from db.redis_manager import RedisUtils +from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager +from l2.l2_sell_manager import L2MarketSellManager +from log_module import async_log_util, log_export from third_data import kpl_data_manager, block_info from utils import global_util, ths_industry_util, tool import l2_data_util -from db import redis_manager -from third_data.code_plate_key_manager import CodePlateKeyBuyManager +from db import redis_manager_delegate as redis_manager +from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ - trade_result_manager, first_code_score_manager, current_price_process_manager -from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager, \ - transaction_progress -from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \ - L2LimitUpSellStatisticUtil, DCancelBigNumComputer, LCancelBigNumComputer -from l2.l2_data_manager import L2DataException, TradePointManager + trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin, trade_record_log_util +from l2 import l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \ + transaction_progress, cancel_buy_strategy, l2_data_log +from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \ + LCancelBigNumComputer, LatestCancelIndexManager, FastCancelBigNumComputer +from l2.l2_data_manager import L2DataException, OrderBeginPosInfo from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \ - local_latest_datas + local_latest_datas, local_today_canceled_buyno_map import l2.l2_data_util -from log_module.log import logger_l2_trade_buy, logger_l2_process, \ - logger_place_order_score, logger_l2_error +from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug -# TODO l2鏁版嵁绠$悊 from trade.trade_data_manager import CodeActualPriceProcessor -from line_profiler import LineProfiler - -import dask - -from trade.trade_manager import TradeTargetCodeModeManager +from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager class L2DataManager: @@ -57,38 +53,79 @@ # m鍊煎ぇ鍗曞鐞� + + class L2BigNumForMProcessor: + _db = 1 + _redis_manager = redis_manager.RedisManager(1) + m_big_money_begin_cache = {} + m_big_money_process_index_cache = {} + __instance = None - def __init__(self): - self._redis_manager = redis_manager.RedisManager(1) + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance - def __get_redis(self): - return self._redis_manager.getRedis() + @classmethod + def __get_redis(cls): + return cls._redis_manager.getRedis() + + @classmethod + def __load_datas(cls): + _redis = cls._redis_manager.getRedis() + try: + keys = RedisUtils.keys(_redis, "m_big_money_begin-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(_redis, k) + tool.CodeDataCacheUtil.set_cache(cls.m_big_money_begin_cache, code, int(val)) + + keys = RedisUtils.keys(_redis, "m_big_money_process_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(_redis, k) + tool.CodeDataCacheUtil.set_cache(cls.m_big_money_process_index_cache, code, int(val)) + + + + finally: + RedisUtils.realse(_redis) # 淇濆瓨璁$畻寮�濮嬩綅缃� def set_begin_pos(self, code, index): - if self.__get_begin_pos(code) is None: + if self.__get_begin_pos_cache(code) is None: + tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index) # 淇濆瓨浣嶇疆 key = "m_big_money_begin-{}".format(code) - RedisUtils.setex( self.__get_redis(), key, tool.get_expire(), index) + RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 鑾峰彇璁$畻寮�濮嬩綅缃� def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) - val = RedisUtils.get(self.__get_redis(),key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return int(val) + def __get_begin_pos_cache(self, code): + cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code) + if cache_result[0]: + return cache_result[1] + return None + # 娓呴櫎宸茬粡澶勭悊鐨勬暟鎹� def clear_processed_end_index(self, code): + tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code) key = "m_big_money_process_index-{}".format(code) - RedisUtils.delete(self.__get_redis(), key) + RedisUtils.delete_async(self._db, key) # 娣诲姞宸茬粡澶勭悊杩囩殑鍗� def __set_processed_end_index(self, code, index): + tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index) key = "m_big_money_process_index-{}".format(code) - RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), index) + RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 鏄惁宸茬粡澶勭悊杩� def __get_processed_end_index(self, code): @@ -98,15 +135,21 @@ return None return int(val) + def __get_processed_end_index_cache(self, code): + cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code) + if cache_result[0]: + return cache_result[1] + return None + # 澶勭悊澶у崟 def process(self, code, start_index, end_index, limit_up_price): - begin_pos = self.__get_begin_pos(code) + begin_pos = self.__get_begin_pos_cache(code) if begin_pos is None: # 娌℃湁鑾峰彇鍒板紑濮嬩拱鍏ヤ俊鍙� return # 涓婃澶勭悊鍒扮殑鍧愭爣 - processed_index = self.__get_processed_end_index(code) + processed_index = self.__get_processed_end_index_cache(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: @@ -126,9 +169,9 @@ # 濡傛灉鏄定鍋滀拱鎾や俊鍙烽渶瑕佺湅鏁版嵁浣嶇疆鏄惁姣斿紑濮嬪鐞嗘椂闂存棭 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 鑾峰彇涔板叆淇″彿 - buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i], - local_today_num_operate_map.get( - code)) + buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], + local_today_buyno_map.get( + code)) if buy_index is not None and buy_index < begin_pos: continue @@ -159,14 +202,34 @@ class L2TradeDataProcessor: unreal_buy_dict = {} volume_rate_info = {} - l2BigNumForMProcessor = L2BigNumForMProcessor() __codeActualPriceProcessor = CodeActualPriceProcessor() __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() - __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager() __l2PlaceOrderParamsManagerDict = {} __last_buy_single_dict = {} __TradeBuyQueue = transaction_progress.TradeBuyQueue() + __latest_process_order_unique_keys = {} + __latest_process_not_order_unique_keys_count = {} + __trade_log_placr_order_info_dict = {} # 涓嬪崟淇℃伅淇濆瓨 + # 鍒濆鍖� + __TradePointManager = l2_data_manager.TradePointManager() + __SecondCancelBigNumComputer = SecondCancelBigNumComputer() + __HourCancelBigNumComputer = HourCancelBigNumComputer() + __LCancelBigNumComputer = LCancelBigNumComputer() + __TradeStateManager = trade_manager.TradeStateManager() + __CodesTradeStateManager = trade_manager.CodesTradeStateManager() + __PauseBuyCodesManager = gpcode_manager.PauseBuyCodesManager() + __Buy1PriceManager = code_price_manager.Buy1PriceManager() + __AccountAvailableMoneyManager = AccountAvailableMoneyManager() + __TradeBuyDataManager = trade_data_manager.TradeBuyDataManager() + __LimitUpTimeManager = limit_up_time_manager.LimitUpTimeManager() + __BlackListCodeManager = l2_trade_util.BlackListCodeManager() + __WhiteListCodeManager = l2_trade_util.WhiteListCodeManager() + __WantBuyCodesManager = gpcode_manager.WantBuyCodesManager() + __TradeTargetCodeModeManager = TradeTargetCodeModeManager() + __TradeOrderIdManager = trade_huaxin.TradeOrderIdManager() + __LatestCancelIndexManager = LatestCancelIndexManager() + __L2MarketSellManager = L2MarketSellManager() # 鑾峰彇浠g爜璇勫垎 @classmethod @@ -194,7 +257,7 @@ if not is_normal: print("鍘嗗彶鏁版嵁寮傚父:", code) # 鏁版嵁涓嶆甯搁渶瑕佺姝氦鏄� - l2_trade_util.forbidden_trade(code) + l2_trade_util.forbidden_trade(code, msg="L2鍘嗗彶鏁版嵁寮傚父") # 绾犳鏁版嵁 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: # 鍚岃姳椤洪渶瑕佺籂姝f暟鎹紝鍏朵粬娓犻亾涓嶉渶瑕� @@ -212,86 +275,98 @@ # 淇濆瓨鏁版嵁 __start_time = round(t.time() * 1000) l2.l2_data_util.save_l2_data(code, datas, add_datas) - __start_time = l2_data_log.l2_time(code, - round(t.time() * 1000) - __start_time, - "淇濆瓨鏁版嵁鏃堕棿锛坽}锛�".format(len(add_datas))) + # __start_time = l2_data_log.l2_time(code, + # round(t.time() * 1000) - __start_time, + # "淇濆瓨鏁版嵁鏃堕棿锛坽}锛�".format(len(add_datas))) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) + @classmethod + def set_real_place_order_index(cls, code, index, order_begin_pos:OrderBeginPosInfo): + trade_record_log_util.add_real_place_order_position_log(code, index, order_begin_pos.buy_single_index) + if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: + need_cancel = FastCancelBigNumComputer().set_real_order_index(code, index) + if need_cancel: + cls.cancel_buy(code, msg="F鎾や笉澶�2绗旇Е鍙戞挙鍗�") + return + cancel_buy_strategy.set_real_place_position(code, index, order_begin_pos.buy_single_index) + # 澶勭悊鍗庨懌L2鏁版嵁 @classmethod def process_huaxin(cls, code, origin_datas): - print("process_huaxin", code, len(origin_datas)) - origin_start_time = round(t.time() * 1000) datas = None try: + l2_data_log.l2_time_log(code, "寮�濮嬪姞杞藉巻鍙叉暟鎹�") # 鍔犺浇鍘嗗彶鐨凩2鏁版嵁 is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False) if not is_normal: - print("鍘嗗彶鏁版嵁寮傚父:", code) # 鏁版嵁涓嶆甯搁渶瑕佺姝氦鏄� - l2_trade_util.forbidden_trade(code) + l2_trade_util.forbidden_trade(code, msg="L2鍘嗗彶鏁版嵁寮傚父") # 杞崲鏁版嵁鏍煎紡 _start_index = 0 total_datas = local_today_datas.get(code) if total_datas: _start_index = total_datas[-1]["index"] + 1 + l2_data_log.l2_time_log(code, "寮�濮嬫牸寮忓寲鍘熷鏁版嵁") datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas, gpcode_manager.get_limit_up_price(code), _start_index) - # 鑾峰彇涓嬪崟浣嶇疆 - place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas) - if place_order_index: - logger_l2_process.info("code:{} 鑾峰彇鍒颁笅鍗曠湡瀹炰綅缃細{}", code, place_order_index) - DCancelBigNumComputer.set_real_order_index(code, place_order_index) __start_time = round(t.time() * 1000) + l2_data_log.l2_time_log(code, "寮�濮嬪鐞嗘暟鎹�") if len(datas) > 0: cls.process_add_datas(code, datas, 0, __start_time) - else: - pass - # lp = LineProfiler() - # lp.enable() - # lp_wrap = lp(cls.process_add_datas) - # lp_wrap(code, datas, 0, __start_time) - # output = io.StringIO() - # lp.print_stats(stream=output) - # lp.disable() - # with open(f"/home/logs/profile/{code}_{datas[0]['index']}_{datas[-1]['index']}.txt", 'w') as f: - # f.write(output.getvalue()) - # lp.dump_stats(f"/home/logs/profile/{code}_{round(t.time() * 1000)}.txt") except Exception as e: - print("huaxin L2鏁版嵁澶勭悊寮傚父", code, str(e)) - logging.exception(e) - logger_l2_error.exception(e) + async_log_util.error(logger_l2_error, f"code:{code}") + async_log_util.exception(logger_l2_error, e) finally: - l2_data_log.l2_time(code, round(t.time() * 1000) - origin_start_time, - "l2鏁版嵁澶勭悊鎬昏�楁椂", - True) if datas: + l2_data_log.l2_time_log(code, "寮�濮嬩繚瀛樻暟鎹�") l2.l2_data_util.save_l2_data(code, None, datas) @classmethod def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): now_time_str = tool.get_now_time_str() if len(add_datas) > 0: + if code not in cls.__trade_log_placr_order_info_dict: + cls.__trade_log_placr_order_info_dict[code] = trade_record_log_util.PlaceOrderInfo() # 鎷兼帴鏁版嵁 local_today_datas[code].extend(add_datas) l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) l2.l2_data_util.load_buy_no_map(local_today_buyno_map, code, add_datas) - + l2.l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, add_datas) + l2_data_log.l2_time_log(code, "process_add_datas 鍔犺浇瀹屾暟鎹�") + if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: + try: + if constant.TEST: + pass + # order_begin_pos = cls.__get_order_begin_pos(code) + # if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index>=0: + # place_order_index = add_datas[-1]["index"] + # cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index) + else: + # 鑾峰彇涓嬪崟浣嶇疆 + place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float( + gpcode_manager.get_limit_up_price(code)), add_datas) + if place_order_index: + order_begin_pos = cls.__get_order_begin_pos( + code) + cls.set_real_place_order_index(code, place_order_index, order_begin_pos) + async_log_util.info(logger_l2_process, "code:{} 鑾峰彇鍒颁笅鍗曠湡瀹炰綅缃細{}", code, place_order_index) + except: + async_log_util.error(logger_l2_error, f"{code} 澶勭悊鐪熷疄涓嬪崟浣嶇疆鍑洪敊") # 绗�1鏉℃暟鎹槸鍚︿负09:30:00 if add_datas[0]["val"]["time"] == "09:30:00": if global_util.cuurent_prices.get(code): price_data = global_util.cuurent_prices.get(code) if price_data[1]: # 褰撳墠娑ㄥ仠浠凤紝璁剧疆娑ㄥ仠鏃堕棿 - logger_l2_process.info("寮�鐩樻定鍋滐細{}", code) + async_log_util.info(logger_l2_process, "寮�鐩樻定鍋滐細{}", code) # 淇濆瓨娑ㄥ仠鏃堕棿 - limit_up_time_manager.save_limit_up_time(code, "09:30:00") + cls.__LimitUpTimeManager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] - __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, - "l2鏁版嵁棰勫鐞嗘椂闂�") + # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, + # "l2鏁版嵁棰勫鐞嗘椂闂�") if len(add_datas) > 0: # 鏄惁涓洪鏉夸唬鐮� @@ -300,11 +375,11 @@ volume_rate = code_volumn_manager.get_volume_rate(code) volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate) # 璁$畻鍒嗗�� - limit_up_time = limit_up_time_manager.get_limit_up_time(code) + limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() - score = first_code_score_manager.get_score(code, volume_rate, limit_up_time, True) - + # score = first_code_score_manager.get_score(code, volume_rate, limit_up_time, True) + score = None cls.__l2PlaceOrderParamsManagerDict[code] = l2_trade_factor.L2PlaceOrderParamsManager(code, is_first_code, volume_rate, volume_rate_index, @@ -315,41 +390,27 @@ latest_time = add_datas[len(add_datas) - 1]["val"]["time"] - __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, - "l2鏁版嵁鍑嗗鏃堕棿") + # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, + # "l2鏁版嵁鍑嗗鏃堕棿") # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� if not l2_trade_util.is_in_forbidden_trade_codes(code): # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 - state = trade_manager.get_trade_state(code) + state = cls.__CodesTradeStateManager.get_trade_state_cache(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 + l2_data_log.l2_time_log(code, "process_add_datas 寮�濮嬪鐞�") if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: # 宸叉寕鍗� - if True: # len(add_datas) < 10: - cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) - else: - pass - # lp = LineProfiler() - # lp.enable() - # lp_wrap = lp(cls.__process_order) - # lp_wrap(code, start_index, end_index, capture_timestamp, is_first_code) - # output = io.StringIO() - # lp.print_stats(stream=output) - # lp.disable() - # with open( - # f"/home/logs/profile/{code}_process_order_{add_datas[0]['index']}_{add_datas[-1]['index']}.txt", - # 'w') as f: - # f.write(output.getvalue()) + cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) else: # 鏈寕鍗�,鏃堕棿鐩稿樊涓嶅ぇ鎵嶈兘鎸傚崟 if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time): cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code) - logger_l2_process.info("code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{} 鎴浘鏃堕棿鎴筹細{}", code, add_datas[0]["index"], - add_datas[-1]["index"], round(t.time() * 1000) - __start_time, - capture_timestamp) - __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, - "l2鏁版嵁澶勭悊鏃堕棿") + async_log_util.info(logger_l2_process, "code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{} 鎴浘鏃堕棿鎴筹細{}", code, + add_datas[0]["index"], + add_datas[-1]["index"], round(t.time() * 1000) - __start_time, + capture_timestamp) # 澶勭悊鏈寕鍗� @classmethod @@ -357,24 +418,7 @@ __start_time = round(t.time() * 1000) # 鑾峰彇闃堝�� threshold_money, msg = cls.__get_threshmoney(code) - if round(t.time() * 1000) - __start_time > 10: - __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, - "鑾峰彇m鍊兼暟鎹�楁椂") - if True: # end_index - start_index < 10: - cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code) - else: - pass - # lp = LineProfiler() - # lp.enable() - # lp_wrap = lp(cls.__start_compute_buy) - # lp_wrap(code, start_index, end_index, threshold_money, capture_time, is_first_code) - # output = io.StringIO() - # lp.print_stats(stream=output) - # lp.disable() - # with open( - # f"/home/logs/profile/{code}_start_compute_buy_{start_index}_{end_index}.txt", - # 'w') as f: - # f.write(output.getvalue()) + cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code) # 娴嬭瘯涓撶敤 @classmethod @@ -384,133 +428,97 @@ # 澶勭悊宸叉寕鍗� @classmethod def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): - # 璁$畻瀹夊叏绗旀暟 - @dask.delayed - def compute_safe_count(): - _start_time = round(t.time() * 1000) - # 澶勭悊瀹夊叏绗旀暟 - cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data, - local_today_num_operate_map.get(code)) - - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, - "宸蹭笅鍗�-鑾峰彇涔板叆淇℃伅鑰楁椂") - return None, "" - - @dask.delayed - # m鍊煎ぇ鍗曡绠� - def compute_m_big_num(): - _start_time = round(t.time() * 1000) - # 璁$畻m鍊煎ぇ鍗� - cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, - gpcode_manager.get_limit_up_price(code)) - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, - "宸蹭笅鍗�-m鍊煎ぇ鍗曡绠�") - return None, "" - - # 涔�1鎾よ绠� - @dask.delayed - def buy_1_cancel(): - _start_time = round(t.time() * 1000) - # 鎾ゅ崟璁$畻,鍙湅涔�1 - cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, - end_index, - buy_single_index, buy_exec_index) - - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, - "宸蹭笅鍗�-涔�1缁熻鑰楁椂") - # 涔�1涓嶄細瑙﹀彂鎾ゅ崟 - return None, "" - # return cancel_data, cancel_msg + # 澧炲姞鎺ㄥ嚭鏈哄埗 + unique_key = f"{start_index}-{end_index}" + if cls.__latest_process_order_unique_keys.get(code) == unique_key: + async_log_util.error(logger_l2_error, f"閲嶅澶勭悊鏁版嵁锛歝ode-{code} start_index-{start_index} end_index-{end_index}") + return + cls.__latest_process_order_unique_keys[code] = unique_key # S鎾� - @dask.delayed - def s_cancel(): + def s_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) # S鎾ゅ崟璁$畻锛岀湅绉掔骇澶у崟鎾ゅ崟 try: - b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, - buy_exec_index, start_index, - end_index, total_data, - code_volumn_manager.get_volume_rate_index( - buy_volume_rate), - cls.volume_rate_info[code][1], - is_first_code) + b_need_cancel, b_cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, _buy_single_index, + _buy_exec_index, + start_index, + end_index, total_data, + code_volumn_manager.get_volume_rate_index( + buy_volume_rate), + cls.volume_rate_info[code][ + 1], + is_first_code) if b_need_cancel: return b_cancel_data, "S澶у崟鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: logging.exception(e) + async_log_util.error(logger_l2_error, + f"S鎾ゅ嚭閿� 鍙傛暟锛歜uy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") + async_log_util.exception(logger_l2_error, e) finally: - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, - "宸蹭笅鍗�-s绾уぇ鍗曚及绠�") + # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, + # "宸蹭笅鍗�-s绾уぇ鍗曚及绠�") + pass return None, "" # H鎾� - @dask.delayed - def h_cancel(): + def h_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_single_index, - buy_exec_index, start_index, - end_index, total_data, - local_today_num_operate_map.get( - code), - code_volumn_manager.get_volume_rate_index( - buy_volume_rate), - cls.volume_rate_info[code][1], - is_first_code) + b_need_cancel, b_cancel_data = cls.__HourCancelBigNumComputer.need_cancel(code, _buy_single_index, + _buy_exec_index, start_index, + end_index, total_data, + code_volumn_manager.get_volume_rate_index( + order_begin_pos.buy_volume_rate), + cls.volume_rate_info[code][1], + is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "H鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: - logging.exception(e) + if constant.TEST: + logging.exception(e) + async_log_util.error(logger_l2_error, + f"H鎾ゅ嚭閿� 鍙傛暟锛歜uy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} {str(e)}") + async_log_util.exception(logger_l2_error, e) finally: - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�") + # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�") + pass + return None, "" + + # F鎾� + def f_cancel(_buy_single_index, _buy_exec_index): + try: + b_need_cancel, b_cancel_data = FastCancelBigNumComputer().need_cancel(code, start_index, end_index, + order_begin_pos) + if b_need_cancel and b_cancel_data: + return b_cancel_data, f"F鎾�" + except Exception as e: + if constant.TEST: + logging.exception(e) + async_log_util.error(logger_l2_error, + f"F鎾ゅ嚭閿� 鍙傛暟锛歜uy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 閿欒鍘熷洜锛歿str(e)}") + async_log_util.exception(logger_l2_error, e) return None, "" # L鎾� - @dask.delayed - def l_cancel(): + def l_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = LCancelBigNumComputer.need_cancel(code, - buy_exec_index, start_index, - end_index, total_data, - local_today_num_operate_map.get( - code), is_first_code) + b_need_cancel, b_cancel_data, extra_msg = cls.__LCancelBigNumComputer.need_cancel(code, + _buy_exec_index, + start_index, + end_index, total_data, + is_first_code) if b_need_cancel and b_cancel_data: - return b_cancel_data, "L鎾ら攢姣斾緥瑙﹀彂闃堝��" + return b_cancel_data, f"L鎾ら攢姣斾緥瑙﹀彂闃堝��({extra_msg})" except Exception as e: - logging.exception(e) + async_log_util.error(logger_l2_error, + f"L鎾ゅ嚭閿� 鍙傛暟锛歜uy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 閿欒鍘熷洜锛歿str(e)}") + logger_l2_error.exception(e) + async_log_util.exception(logger_l2_error, e) finally: - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-L鎾ゅぇ鍗曡绠�") - return None, "" - - # 鏉夸笂鍗栨挙 - @dask.delayed - def sell_cancel(): - _start_time = round(t.time() * 1000) - # 缁熻鏉夸笂鍗� - try: - cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, - end_index, - buy_exec_index) - return cancel_data, cancel_msg - except Exception as e: - logging.exception(e) - finally: - l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鏉夸笂鍗栬�楁椂") - return None, "" - - # 鏄惁闇�瑕佹挙閿� - @dask.delayed - def is_need_cancel(*args): - try: - for i in range(0, len(args)): - _cancel_data, _cancel_msg = args[i] - if _cancel_data: - return _cancel_data, _cancel_msg - except Exception as e: - logging.exception(e) - finally: + # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-L鎾ゅぇ鍗曡绠�") pass return None, "" @@ -522,50 +530,32 @@ total_data = local_today_datas.get(code) _start_time = tool.get_now_timestamp() # 鑾峰彇涔板叆淇″彿璧峰鐐� - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( + order_begin_pos = cls.__get_order_begin_pos( code) + # 榛樿閲忎负0.2 + if order_begin_pos.buy_volume_rate is None: + buy_volume_rate = 0.2 + cancel_data, cancel_msg = None, "" + if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: + cancel_data, cancel_msg = f_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) - f1 = compute_safe_count() - f2 = compute_m_big_num() - f3 = s_cancel() - f4 = h_cancel() - f5 = buy_1_cancel() - f6 = sell_cancel() - f7 = l_cancel() - dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6, f7) - if is_first_code: - dask_result = is_need_cancel(f3, f4, f7) - - cancel_data, cancel_msg = dask_result.compute() - - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "宸蹭笅鍗�-鎾ゅ崟 鍒ゆ柇鏄惁闇�瑕佹挙鍗�") - + # 渚濇澶勭悊 + if not cancel_data: + cancel_data, cancel_msg = l_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) + # 鏆傛椂鍙栨秷S鎾� + # if not cancel_data: + # cancel_data, cancel_msg = s_cancel(buy_single_index, buy_exec_index) + if not cancel_data: + cancel_data, cancel_msg = h_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) if cancel_data: l2_log.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg) - l2_log.trade_record(code, "鎾ゅ崟", "'index':{} , 'msg':'{}'", cancel_data["index"], cancel_msg) + # 鎾ゅ崟 - if cls.cancel_buy(code, cancel_msg): - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "宸蹭笅鍗�-鎾ゅ崟 鑰楁椂") - # 鎾ゅ崟鎴愬姛锛岀户缁绠椾笅鍗� - cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "澶勭悊鍓╀綑鏁版嵁 鑰楁椂") - else: - # 鎾ゅ崟灏氭湭鎴愬姛 - pass + cls.cancel_buy(code, cancel_msg, cancel_index=cancel_data["index"]) + # 鎾ゅ崟鎴愬姛锛岀户缁绠椾笅鍗� + cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code) else: - # 濡傛灉鏈夎櫄鎷熶笅鍗曢渶瑕佺湡瀹炰笅鍗� - unreal_buy_info = cls.unreal_buy_dict.get(code) - if unreal_buy_info is not None: - l2_log.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time) - # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿) - # 鐪熷疄涓嬪崟 - cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], - unreal_buy_info[0], is_first_code) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "宸茶櫄鎷熶笅鍗�-鎵ц鐪熷疄涓嬪崟 澶栭儴鑰楁椂") + pass @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index, is_first_code): @@ -576,48 +566,70 @@ else: can, need_clear_data, reason = cls.__can_buy_first(code) - __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "鏈�鍚庡垽鏂槸鍚﹁兘涓嬪崟", force=True) + # __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "鏈�鍚庡垽鏂槸鍚﹁兘涓嬪崟", force=True) # 鍒犻櫎铏氭嫙涓嬪崟 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( + order_begin_pos = cls.__get_order_begin_pos( code) if not can: l2_log.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason) + trade_record_log_util.add_cant_place_order_log(code, reason) if need_clear_data: - trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, + trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, + order_begin_pos.buy_exec_index, local_today_datas.get(code)) - return + return False else: l2_log.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason) - try: l2_log.debug(code, "寮�濮嬫墽琛屼拱鍏�") trade_manager.start_buy(code, capture_timestamp, last_data, - last_data_index) - ################涓嬪崟鎴愬姛澶勭悊################ - trade_result_manager.real_buy_success(code) + last_data_index, order_begin_pos.mode) l2_log.debug(code, "鎵ц涔板叆鎴愬姛") + ################涓嬪崟鎴愬姛澶勭悊################ + trade_result_manager.real_buy_success(code, cls.__TradePointManager) + cancel_buy_strategy.set_real_place_position(code, local_today_datas.get(code)[-1]["index"], + order_begin_pos.buy_single_index) + l2_log.debug(code, "澶勭悊涔板叆鎴愬姛") params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc() l2_log.debug(code, params_desc) - l2_log.trade_record(code, "涓嬪崟", - "'buy_start_index':{} ,'buy_exec_index':{},'volume_reate':{},'score':{},'desc':'{}'", - buy_single_index, buy_exec_index, cls.volume_rate_info[code][0], - cls.__l2PlaceOrderParamsManagerDict[code].score, params_desc) + ############璁板綍涓嬪崟鏃剁殑鏁版嵁############ + try: + jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( + code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) + info = cls.__trade_log_placr_order_info_dict[code] + info.mode = order_begin_pos.mode + info.set_buy_index(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) + if jx_blocks: + info.set_kpl_blocks(list(jx_blocks)) + elif jx_blocks_by: + info.set_kpl_blocks(list(jx_blocks_by)) + else: + info.set_kpl_blocks([]) + + trade_record_log_util.add_place_order_log(code, info) + except Exception as e: + async_log_util.error(logger_l2_error, f"鍔犲叆涔板叆璁板綍鏃ュ織鍑洪敊锛歿str(e)}") + + + except Exception as e: - logger_l2_error.exception(e) + async_log_util.exception(logger_l2_error, e) l2_log.debug(code, "鎵ц涔板叆寮傚父:{}", str(e)) pass finally: - l2_log.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) + # l2_log.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) + pass + return True # 鏄惁鍙互鍙栨秷 @classmethod def __can_cancel(cls, code): if constant.TEST: return True, "" - if l2_trade_util.WhiteListCodeManager.is_in(code): + if cls.__WhiteListCodeManager.is_in_cache(code): return False, "浠g爜鍦ㄧ櫧鍚嶅崟涓�" # 鏆傛椂娉ㄩ噴鎺� @@ -651,22 +663,22 @@ @classmethod def __can_buy(cls, code): __start_time = t.time() - if not trade_manager.TradeStateManager.is_can_buy(): + if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"浠婃棩宸茬姝氦鏄�" # 涔嬪墠鐨勪唬鐮� # 棣栨澘浠g爜涓斿皻鏈定鍋滆繃鐨勪笉鑳戒笅鍗� # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code) # if not is_limited_up: # gpcode_manager.FirstCodeManager().add_limited_up_record([code]) - # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( + # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count( # code) # if place_order_count == 0: - # trade_data_manager.placeordercountmanager.place_order(code) + # trade_data_manager.PlaceOrderCountManager().place_order(code) # return False, True, "棣栨澘浠g爜锛屼笖灏氭湭娑ㄥ仠杩�" try: # 涔�1浠锋牸蹇呴』涓烘定鍋滀环鎵嶈兘涔� - # buy1_price = cls.buy1PriceManager.get_price(code) + # buy1_price = cls.buy1PriceManager().get_price(code) # if buy1_price is None: # return False, "涔�1浠峰皻鏈幏鍙栧埌" # limit_up_price = gpcode_manager.get_limit_up_price(code) @@ -685,10 +697,9 @@ if sell1_time is not None and sell1_volumn > 0: # 鑾峰彇鎵ц浣嶄俊鎭� - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( - code) - buy_nums = num - for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1): + order_begin_pos = cls.__get_order_begin_pos(code) + buy_nums = order_begin_pos.num + for i in range(order_begin_pos.buy_exec_index + 1, total_datas[-1]["index"] + 1): _val = total_datas[i]["val"] # 娑ㄥ仠涔� if L2DataUtil.is_limit_up_price_buy(_val): @@ -706,7 +717,7 @@ if volumn_rate >= 1.3: return False, False, "鏈�澶ч噺姣旇秴杩�1.3涓嶈兘涔�" - limit_up_time = limit_up_time_manager.get_limit_up_time(code) + limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) if limit_up_time is not None: limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second( limit_up_time) @@ -774,45 +785,74 @@ # 鍙互涓嬪崟 return True, False, None finally: - l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻") + # l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻") + pass @classmethod def __can_buy_first(cls, code): - if not trade_manager.TradeStateManager.is_can_buy(): + if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"浠婃棩宸茬姝氦鏄�" - if gpcode_manager.PauseBuyCodesManager.is_in(code): + if cls.__PauseBuyCodesManager.is_in_cache(code): return False, True, f"璇ヤ唬鐮佽鏆傚仠浜ゆ槗" limit_up_price = gpcode_manager.get_limit_up_price(code) - if float(limit_up_price) >= 40: - return False, True, "鑲′环澶т簬40鍧�" + if float(limit_up_price) >= constant.MAX_CODE_PRICE: + return False, True, f"鑲′环澶т簬{constant.MAX_CODE_PRICE}鍧�" + + if code_nature_analyse.LatestMaxVolumeManager().is_latest_max_volume(code): + # 鏈�杩戝嚑澶╂湁鏈�澶ч噺锛屽垽鏂噺姣旀槸鍚﹀ぇ浜�60% + if cls.volume_rate_info[code][0] < 0.6: + return False, True, f"杩戞棩鍑虹幇鏈�澶ч噺锛屽綋鏃ラ噺姣�({cls.volume_rate_info[code][0]})灏忎簬0.6" if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: + total_data = local_today_datas.get(code) trade_price = current_price_process_manager.get_trade_price(code) if trade_price is None: return False, True, f"灏氭湭鑾峰彇鍒板綋鍓嶆垚浜や环" - if float(limit_up_price) - float(trade_price) > 0.02001: - return False, False, f"褰撳墠鎴愪氦浠凤紙{trade_price}锛夊皻鏈湪2妗e強浠ュ唴" - + if float(limit_up_price) - float(trade_price) > 0.00001: + # 璁$畻淇″彿璧峰浣嶇疆鍒板綋鍓嶇殑鎵嬫暟 + order_begin_pos = cls.__get_order_begin_pos( + code) + num_operate_map = local_today_num_operate_map.get(code) + total_num = 0 + for i in range(order_begin_pos.buy_single_index, total_data[-1]["index"] + 1): + data = total_data[i] + val = data["val"] + if not L2DataUtil.is_limit_up_price_buy(val): + continue + left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, + data[ + "index"], + total_data, + local_today_canceled_buyno_map.get( + code)) + total_num += left_count * val["num"] + m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) + thresh_hold_num = m_base_val // (float(gpcode_manager.get_limit_up_price(code)) * 100) + if total_num < thresh_hold_num * 2: + return False, False, f"褰撳墠鎴愪氦浠凤紙{trade_price}锛夊皻鏈湪0妗e強浠ュ唴 涓� 绾拱棰�({total_num})灏忎簬2鍊峂鍊�({thresh_hold_num * 2})" # 鍒ゆ柇鎴愪氦杩涘害鏄惁璺濈鎴戜滑鐨勪綅缃緢杩� - total_data = local_today_datas.get(code) trade_index, is_default = cls.__TradeBuyQueue.get_traded_index(code) - if not is_default and trade_index: - buy_index_set = set() + if False and not is_default and trade_index: + not_cancel_num = 0 num_operate_map = local_today_num_operate_map.get(code) for i in range(trade_index + 1, total_data[-1]["index"] + 1): if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]): - left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, - total_data[i]["index"], - total_data, - num_operate_map) + left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, + total_data[ + i][ + "index"], + total_data, + local_today_canceled_buyno_map.get( + code)) if left_count > 0: - buy_index_set.add(total_data[i]["index"]) - - if len(buy_index_set) < 5: - return False, False, f"鎴愪氦浣嶇疆璺濈褰撳墠浣嶇疆灏忎簬5绗�" + not_cancel_num += total_data[i]["val"]["num"] + m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) + not_cancel_money = not_cancel_num * 100 * float(gpcode_manager.get_limit_up_price(code)) + if m_base_val > not_cancel_money: + return False, False, f"鎴愪氦浣嶇疆璺濈褰撳墠浣嶇疆绾拱棰�({not_cancel_money})灏忎簬m鍊�({m_base_val})" else: # 鍒ゆ柇涔�1浠锋牸妗d綅 zyltgb = global_util.zyltgb_map.get(code) @@ -821,7 +861,7 @@ zyltgb = global_util.zyltgb_map.get(code) if zyltgb >= 200 * 100000000: - buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code) + buy1_price = cls.__Buy1PriceManager.get_buy1_price(code) if buy1_price is None: return False, True, f"灏氭湭鑾峰彇鍒颁拱1浠�" dif = float(limit_up_price) - float(buy1_price) @@ -829,106 +869,83 @@ if dif > 0.10001: return False, True, f"鑷敱娴侀��200浜夸互涓婏紝涔�1鍓╀綑妗f暟澶т簬10妗o紝涔颁竴锛坽buy1_price}锛夋定鍋滐紙{limit_up_price}锛�" - open_limit_up_lowest_price = code_price_manager.Buy1PriceManager.get_open_limit_up_lowest_price(code) + open_limit_up_lowest_price = cls.__Buy1PriceManager.get_open_limit_up_lowest_price(code) price_pre_close = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code) if open_limit_up_lowest_price and ( float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05: return False, True, f"鐐告澘鍚庢渶浣庝环璺岃嚦5%浠ヤ笅" - limit_up_info = code_price_manager.Buy1PriceManager.get_limit_up_info(code) - if limit_up_info[0] is None and False: - total_data = local_today_datas.get(code) - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( - code) - # 涔嬪墠娌℃湁娑ㄥ仠杩� - # 缁熻涔板叆淇″彿浣嶅埌褰撳墠浣嶇疆娌℃湁鎾ょ殑澶у崟閲戦 - min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000 - left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer.compute_left_big_num(code, - buy_single_index, - buy_exec_index, - total_data[-1][ - "index"], - total_data, - 0, min_money_w) - if left_big_num > 0: - # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮� - limit_up_time = limit_up_time_manager.get_limit_up_time(code) - if limit_up_time is None: - limit_up_time = tool.get_now_time_str() - score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True, - left_big_num) - cls.__l2PlaceOrderParamsManagerDict[code].set_score(score) + # limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code) + # if limit_up_info[0] is None and False: + # total_data = local_today_datas.get(code) + # buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( + # code) + # # 涔嬪墠娌℃湁娑ㄥ仠杩� + # # 缁熻涔板叆淇″彿浣嶅埌褰撳墠浣嶇疆娌℃湁鎾ょ殑澶у崟閲戦 + # min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000 + # left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code, + # buy_single_index, + # buy_exec_index, + # total_data[-1][ + # "index"], + # total_data, + # 0, min_money_w) + # if left_big_num > 0: + # # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮� + # limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) + # if limit_up_time is None: + # limit_up_time = tool.get_now_time_str() + # score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True, + # left_big_num) + # cls.__l2PlaceOrderParamsManagerDict[code].set_score(score) - logger_place_order_score.info("code={},data='score_index':{},'score_info':{}", code, - cls.__l2PlaceOrderParamsManagerDict[code].score_index, - cls.__l2PlaceOrderParamsManagerDict[code].score_info) + # logger_place_order_score.info("code={},data='score_index':{},'score_info':{}", code, + # cls.__l2PlaceOrderParamsManagerDict[code].score_index, + # cls.__l2PlaceOrderParamsManagerDict[code].score_info) - if not gpcode_manager.WantBuyCodesManager.is_in_cache(code): - if TradeTargetCodeModeManager.get_mode() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES: + if not cls.__WantBuyCodesManager.is_in_cache(code): + if cls.__TradeTargetCodeModeManager.get_mode_cache() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES: return False, True, f"鍙拱鎯充拱鍗曚腑鐨勪唬鐮�" - score_index = cls.__l2PlaceOrderParamsManagerDict[code].score_index - score = cls.__l2PlaceOrderParamsManagerDict[code].score - score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info + score_index = None # cls.__l2PlaceOrderParamsManagerDict[code].score_index + score = None # cls.__l2PlaceOrderParamsManagerDict[code].score + score_info = None # cls.__l2PlaceOrderParamsManagerDict[code].score_info - lp = LineProfiler() - lp.enable() - lp_wrap = lp(cls.can_buy_first) - results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) - output = io.StringIO() - lp.print_stats(stream=output) - lp.disable() - with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f: - f.write(output.getvalue()) - # return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) - return results + # lp = LineProfiler() + # lp.enable() + # lp_wrap = lp(cls.can_buy_first) + # results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) + # output = io.StringIO() + # lp.print_stats(stream=output) + # lp.disable() + # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f: + # f.write(output.getvalue()) + # return results + return cls.can_buy_first(code, limit_up_price) else: return True, False, "鍦ㄦ兂涔板悕鍗曚腑" @classmethod - def can_buy_first(cls, code, limit_up_price, score_index, score, score_info, volume_rate_info): - def is_has_k_format(score_info): - # (15涓氦鏄撴棩娑ㄥ箙鏄惁澶т簬24.9%,鏄惁鐮村墠楂橈紝鏄惁瓒呰穼锛屾槸鍚︽帴杩戝墠楂橈紝鏄惁N,鏄惁V,鏄惁鏈夊舰鎬�,澶╅噺澶ч槼淇℃伅,鏄惁鍏锋湁杈ㄨ瘑搴�) - - if score_info[1][3][6][0] and not score_info[1][3][3][0]: - return True - if score_info[1][3][7][0]: - return True - return False - - if float(limit_up_price) >= 40: - return False, True, "鑲′环澶т簬40鍧�" - - # 9:35涔嬪墠涔板ぇ甯傚��(>=80浜�)绁� - if int(tool.get_now_date_str("%Y%m%d")) < int("093500"): - zyltgb = global_util.zyltgb_map.get(code) - if zyltgb is None: - global_data_loader.load_zyltgb() - zyltgb = global_util.zyltgb_map.get(code) - if zyltgb >= 80 * 100000000: - return True, False, "{9:30:00-9:35:00}鑷敱甯傚�尖墺80浜�" + def can_buy_first(cls, code, limit_up_price): # 鍒ゆ柇鏉垮潡 - yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() - plate_can_buy, msg = CodePlateKeyBuyManager.can_buy(code, - kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, - kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, - yesterday_codes, - block_info.get_before_blocks_dict()) - if not plate_can_buy: - return False, True, msg - return True, False, msg + can_buy_result = CodePlateKeyBuyManager.can_buy(code) + if can_buy_result is None: + async_log_util.warning(logger_debug, "娌℃湁鑾峰彇鍒版澘鍧楃紦瀛橈紝灏嗚幏鍙栨澘鍧�") + yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() + CodePlateKeyBuyManager.update_can_buy_blocks(code, + kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, + kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, + yesterday_codes, + block_info.get_before_blocks_dict()) + can_buy_result = CodePlateKeyBuyManager.can_buy(code) - # if volume_rate_info[0] < 0.4: - # return False, True, f"閲忓ぇ浜�40%鎵嶄笅鍗�,閲忔瘮锛歿volume_rate_info[0]}" + if can_buy_result is None: + return False, True, "灏氭湭鑾峰彇鍒版澘鍧椾俊鎭�" - # 鏄惁鏈塊绾垮舰鎬�(鏈塊绾垮舰鎬佹垨鑰呭ぉ閲忓ぇ闃�),10鐐瑰悗鎵嶉渶瑕佸垽鏂槸鍚︽湁K绾垮舰鎬佷笌鍒嗗�� - if int(tool.get_now_time_str().replace(":", "")) > int("100000"): - has_k_format = score_info[1][3][6][0] or score_info[1][3][7][0] - if not has_k_format: - return False, True, f"鏃燢绾垮舰鎬�" - - if score_index < 0: - return False, True, f"鍒嗗�硷細{score}鏈揪鍒伴渶瑕佷拱鍏ョ殑鍒嗘暟绾�" - return True, False, "" + # 鏉垮潡涓嶅彲涔帮紝涓旀病鏈夋寕杩囧崟鐨勫氨涓嶈兘涔� + if not can_buy_result[0] and trade_manager.CodesTradeStateManager().get_trade_state_cache( + code) == trade_manager.TRADE_STATE_NOT_TRADE: + return False, True, can_buy_result[1] + return True, False, can_buy_result[1] @classmethod def __cancel_buy(cls, code): @@ -941,24 +958,27 @@ logging.exception(e) l2_log.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e)) return False + finally: + pass @classmethod - def cancel_buy(cls, code, msg=None, source="l2"): + def cancel_buy(cls, code, msg=None, source="l2", cancel_index=None): # 鏄惁鏄氦鏄撻槦鍒楄Е鍙� - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( + order_begin_pos = cls.__get_order_begin_pos( code) total_datas = local_today_datas[code] if source == "trade_queue": # 浜ゆ槗闃熷垪瑙﹀彂鐨勯渶瑕佷笅鍗曞悗5s - if buy_exec_index is not None and buy_exec_index > 0: + if order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > 0: now_time_str = tool.get_now_time_str() - if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5: + if tool.trade_time_sub(now_time_str, total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 5: return False if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 鍙栨秷涔板叆鏍囪瘑 - trade_result_manager.virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas) + trade_result_manager.virtual_cancel_success(code, order_begin_pos.buy_single_index, + order_begin_pos.buy_exec_index, total_datas) else: can_cancel, reason = cls.__can_cancel(code) if not can_cancel: @@ -966,9 +986,15 @@ l2_log.cancel_debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) l2_log.debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) return False + if cancel_index is None: + cancel_index = total_datas[-1]["index"] + cls.__LatestCancelIndexManager.set_latest_cancel_index(code, cancel_index) + # 娣诲姞鎾ゅ崟鏃ュ織璁板綍 + trade_record_log_util.add_cancel_msg_log(code, msg) cancel_result = cls.__cancel_buy(code) if cancel_result: - trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas) + trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, + order_begin_pos.buy_exec_index, total_datas) l2_log.debug(code, "鎵ц鎾ゅ崟缁撴潫锛屽師鍥狅細{}", msg) return True @@ -984,73 +1010,104 @@ new_add=True): if compute_end_index < compute_start_index: return + + unique_key = f"{code}-{compute_start_index}-{compute_end_index}" + if cls.__latest_process_not_order_unique_keys_count.get( + unique_key) and cls.__latest_process_not_order_unique_keys_count.get(unique_key) > 2: + async_log_util.error(logger_l2_error, + f"閲嶅澶勭悊鏁版嵁锛歝ode-{code} start_index-{compute_start_index} end_index-{compute_end_index}") + return + if unique_key not in cls.__latest_process_not_order_unique_keys_count: + cls.__latest_process_not_order_unique_keys_count[unique_key] = 0 + cls.__latest_process_not_order_unique_keys_count[unique_key] += 1 + _start_time = tool.get_now_timestamp() total_datas = local_today_datas[code] - # 澶勭悊瀹夊叏绗旀暟 - cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas, - local_today_num_operate_map.get(code)) # 鑾峰彇涔板叆淇″彿璁$畻璧峰浣嶇疆 - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( + order_begin_pos = cls.__get_order_begin_pos( code) # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆 new_get_single = False + buy_single_index = order_begin_pos.buy_single_index if buy_single_index is None: - continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count() - # 鏈変拱鍏ヤ俊鍙� - has_single, _index = cls.__compute_order_begin_pos(code, max( - (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count, - compute_end_index) + # 灏濊瘯璁$畻蹇�熸垚浜や俊鍙� + has_single, _index, sell_info = cls.__compute_fast_order_begin_pos(code, compute_start_index, + compute_end_index) + if has_single: + order_begin_pos.mode = OrderBeginPosInfo.MODE_FAST + order_begin_pos.sell_info = sell_info + elif _index is not None and _index < 0: + continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count() + # 鏈変拱鍏ヤ俊鍙� + has_single, _index = cls.__compute_order_begin_pos(code, max( + (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count, + compute_end_index) + order_begin_pos.mode = OrderBeginPosInfo.MODE_NORMAL # 濡傛灉涔板叆淇″彿涓庝笂娆$殑涔板叆淇″彿涓�鏍峰氨涓嶈兘绠楁柊鐨勪俊鍙� if cls.__last_buy_single_dict.get(code) == _index: has_single = None _index = None - buy_single_index = _index if has_single: cls.__last_buy_single_dict[code] = buy_single_index new_get_single = True - num = 0 - count = 0 - l2_log.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛岄噺姣旓細{}锛屾暟鎹細{}", buy_single_index, compute_start_index, - compute_end_index, cls.volume_rate_info[code], total_datas[buy_single_index]) - # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟寮�濮嬩俊鍙凤紝闇�瑕佽缃ぇ鍗曡捣濮嬬偣 - cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) + order_begin_pos.num = 0 + order_begin_pos.count = 0 + order_begin_pos.buy_single_index = buy_single_index - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿") + if sell_info: + order_begin_pos.threshold_money = sell_info[1] + l2_log.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛岄噺姣旓細{}锛屾暟鎹細{} 妯″紡锛歿}", buy_single_index, + compute_start_index, + compute_end_index, cls.volume_rate_info[code], total_datas[buy_single_index], + order_begin_pos.mode) - if buy_single_index is None: + # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿") + + if order_begin_pos.buy_single_index is None: # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴� return None # 寮�濮嬭绠楃殑浣嶇疆 - start_process_index = max(buy_single_index, compute_start_index) + start_process_index = max(order_begin_pos.buy_single_index, compute_start_index) if new_get_single: - start_process_index = buy_single_index + start_process_index = order_begin_pos.buy_single_index - # 璁$畻m鍊煎ぇ鍗� - cls.l2BigNumForMProcessor.process(code, start_process_index, - compute_end_index, - gpcode_manager.get_limit_up_price(code)) - - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "璁$畻m鍊煎ぇ鍗�") + # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "璁$畻m鍊煎ぇ鍗�") threshold_money, msg = cls.__get_threshmoney(code) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m鍊奸槇鍊艰绠�") + # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m鍊奸槇鍊艰绠�") # 涔板叆绾拱棰濈粺璁� - compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, - start_process_index, - compute_end_index, - num, count, - threshold_money, - buy_single_index, - max_num_set) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "绾拱棰濈粺璁℃椂闂�") + new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = None, None, None, None, [] + if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: - l2_log.debug(code, "m鍊�-{} 閲忔瘮:{}", threshold_money, cls.volume_rate_info[code][0]) + threshold_money = order_begin_pos.threshold_money + new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new = cls.__sum_buy_num_for_order_fast( + code, + start_process_index, + compute_end_index, + order_begin_pos.num, + order_begin_pos.count, + threshold_money, + order_begin_pos.buy_single_index) + threshold_money = threshold_money_new + order_begin_pos.threshold_money = threshold_money + else: + new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3( + code, + start_process_index, + compute_end_index, + order_begin_pos.num, + order_begin_pos.count, + threshold_money, + order_begin_pos.buy_single_index, + order_begin_pos.max_num_set) + l2_log.debug(code, "m鍊�-{} 閲忔瘮:{} rebegin_buy_pos:{}", threshold_money, cls.volume_rate_info[code][0], + rebegin_buy_pos) # 涔板叆淇″彿浣嶄笌璁$畻浣嶇疆闂撮殧2s鍙婁互涓婁簡 if rebegin_buy_pos is not None: @@ -1059,103 +1116,63 @@ is_first_code, False) return - if compute_index is not None: - l2_log.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿} ,閲忔瘮:{} ", compute_index, threshold_money, + if new_buy_exec_index is not None: + l2_log.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿} ,閲忔瘮:{} ,涓嬪崟妯″紡锛歿}", new_buy_exec_index, + threshold_money, buy_nums, - buy_count, total_datas[compute_index], cls.volume_rate_info[code]) - - f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index, - buy_nums, buy_count, max_num_set_new, - cls.volume_rate_info[code][0]) - f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"]) - f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time) - f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) - f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index, - compute_index, - buy_single_index, - buy_exec_index, False) - dask.compute(f1, f2, f3, f4, f5) - - # 宸茶骞惰澶勭悊 - # # 璁板綍涔板叆淇″彿浣嶇疆 - # cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, - # max_num_set_new) - # # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級 - # limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) - # # 铏氭嫙涓嬪崟 - # cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) - # # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙� - # l2_data_manager.TradePointManager.delete_buy_cancel_point(code) - # - # # 娑ㄥ仠灏佸崟棰濊绠� - # L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index, - # buy_single_index, - # buy_exec_index, False) - - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "璁板綍鎵ц涔板叆鏁版嵁", force=True) + buy_count, total_datas[new_buy_exec_index], cls.volume_rate_info[code], order_begin_pos.mode) + cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, + buy_exec_index=new_buy_exec_index, + buy_compute_index=new_buy_exec_index, + num=buy_nums, count=buy_count, + max_num_set=max_num_set_new, + buy_volume_rate=cls.volume_rate_info[code][0], + mode=order_begin_pos.mode, + sell_info=order_begin_pos.sell_info, + threshold_money=threshold_money)) + cls.__LimitUpTimeManager.save_limit_up_time(code, total_datas[new_buy_exec_index]["val"]["time"]) + l2_log.debug(code, "delete_buy_cancel_point") + # 鐩存帴涓嬪崟 + ordered = cls.__buy(code, capture_time, total_datas[-1], total_datas[-1]["index"], is_first_code) # 鏁版嵁鏄惁澶勭悊瀹屾瘯 - if compute_index >= compute_end_index: - need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, - compute_index, - buy_single_index, compute_index, - total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], - True) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "S绾уぇ鍗曞鐞嗚�楁椂", force=True) - l2_log.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) - # 鏁版嵁宸茬粡澶勭悊瀹屾瘯锛屽鏋滆繕娌℃挙鍗曞氨瀹為檯涓嬪崟 - if need_cancel: - if cls.cancel_buy(code, "S绾уぇ鍗曟挙閿�"): - # 鎵ц鎾ゅ崟鎴愬姛 - pass + if new_buy_exec_index < compute_end_index: + if ordered: + cls.__process_order(code, new_buy_exec_index + 1, compute_end_index, capture_time, is_first_code, + False) else: - cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code) - else: - SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, - compute_index, total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], False) - - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - "S绾уぇ鍗曞鐞嗚�楁椂", force=True) - # 鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞� - l2_log.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index) - # 澶勭悊鎾ゅ崟姝ラ - cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, is_first_code, False) - _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, - f"澶勭悊鎾ゅ崟姝ラ鑰楁椂锛岃寖鍥达細{compute_index + 1}-{compute_end_index}", force=True) - + cls.__start_compute_buy(code, new_buy_exec_index + 1, compute_end_index, threshold_money, + capture_time, + is_first_code, False) else: # 鏈揪鍒颁笅鍗曟潯浠讹紝淇濆瓨绾拱棰濓紝璁剧疆绾拱棰� # 璁板綍涔板叆淇″彿浣嶇疆 - cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count, - max_num_set_new, None) - print("淇濆瓨澶у崟鏃堕棿", round((t.time() - _start_time) * 1000)) + cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=-1, + buy_compute_index=compute_end_index, num=buy_nums, + count=buy_count, + max_num_set=max_num_set_new, mode=order_begin_pos.mode, + sell_info=order_begin_pos.sell_info, + threshold_money=threshold_money)) _start_time = t.time() - pass + l2_data_log.l2_time_log(code, "__start_compute_buy 缁撴潫") # 鑾峰彇涓嬪崟璧峰淇″彿 @classmethod - def __get_order_begin_pos(cls, code): - buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data_cache( + def __get_order_begin_pos(cls, code) -> OrderBeginPosInfo: + order_begin_pos = cls.__TradePointManager.get_buy_compute_start_data_cache( code) - return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate + return order_begin_pos # 淇濆瓨涓嬪崟璧峰淇″彿 @classmethod - def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, - volume_rate): - TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count, - max_num_set, volume_rate) + def __save_order_begin_data(cls, code, info: OrderBeginPosInfo): + cls.__TradePointManager.set_buy_compute_start_data_v2(code, info) # 璁$畻涓嬪崟璧峰淇″彿 # compute_data_count 鐢ㄤ簬璁$畻鐨刲2鏁版嵁鏁伴噺 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): + second_930 = 9 * 3600 + 30 * 60 + 0 # 鍊掓暟100鏉℃暟鎹煡璇� datas = local_today_datas[code] @@ -1180,7 +1197,12 @@ continue if L2DataUtil.is_limit_up_price_buy(_val): - + # 閲戦瑕佸ぇ浜�50涓� + if _val["num"] * float(_val["price"]) < 5000: + continue + # 瀵绘壘鍓嶉潰continue_count-1涓定鍋滀拱 + # for j in range(start_index - 1, -1, -1): + # if datas[j]["val"] if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i @@ -1202,9 +1224,70 @@ return False, None + # 蹇�熶拱鍏ユ硶鐨勪俊鍙蜂綅缃煡鎵� + @classmethod + def __compute_fast_order_begin_pos(cls, code, start_index, end_index): + limit_up_price = gpcode_manager.get_limit_up_price(code) + # if float(limit_up_price) >= 10: + # return False, -1, "鑲′环澶т簬10鍧�" + total_datas = local_today_datas[code] + start_time_str = total_datas[start_index]["val"]["time"] + # if tool.trade_time_sub(start_time_str, "13:00:00") > 0: + # return False, -1, "瓒呰繃瑙勫畾鏃堕棿" + refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str) + if refer_sell_data is None: + return False, -1, "鎬诲崠涓虹┖" + if cls.__L2MarketSellManager.is_refer_sell_time_used(code, refer_sell_data[0]): + return False, -1, "鎬诲崠缁熻鏃堕棿宸茶浣跨敤" + # 鏄惁澶т簬500涓� + if refer_sell_data[1] <= 500 * 10000: + return False, -1, "鎬诲崠灏忎簬鎸囧畾閲戦" + # 缁熻涔嬪墠鐨勫崠 + threshold_money = refer_sell_data[1] + for i in range(start_index - 1, -1, -1): + val = total_datas[i]["val"] + if tool.compare_time(val["time"], refer_sell_data[0]) <= 0: + break + if L2DataUtil.is_sell(val): + threshold_money += val["num"] * int(float(val["price"]) * 100) + elif L2DataUtil.is_sell_cancel(val): + threshold_money -= val["num"] * int(float(val["price"]) * 100) + # 鏄惁涓烘湰绉掔殑绗竴涓定鍋滀拱 + for i in range(start_index, end_index + 1): + data = total_datas[i] + val = data['val'] + if not L2DataUtil.is_limit_up_price_buy(val): + # 瑕佺粺璁″崠涓庡崠鎾� + if L2DataUtil.is_sell(val): + threshold_money += val["num"] * int(float(val["price"]) * 100) + elif L2DataUtil.is_sell_cancel(val): + threshold_money -= val["num"] * int(float(val["price"]) * 100) + continue + # 50 涓囦互涓嬬殑涓嶉渶瑕� + if val["num"] * float(val["price"]) < 5000: + continue + # 鏄惁涓烘湰s鐨勭涓�娆℃定鍋� + is_first_limit_up = True + for j in range(i - 1, -1, -1): + temp_val = total_datas[j]["val"] + if temp_val["time"] == val["time"]: + if L2DataUtil.is_limit_up_price_buy(temp_val) and temp_val["num"] * float( + temp_val["price"]) >= 5000: + is_first_limit_up = True + break + else: + break + if is_first_limit_up: + return True, i, [refer_sell_data[0], threshold_money] + return False, None, None + @classmethod def __get_threshmoney(cls, code): - return cls.__l2PlaceOrderParamsManagerDict[code].get_m_val() + m, msg = cls.__l2PlaceOrderParamsManagerDict[code].get_m_val() + if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) == trade_manager.TRADE_STATE_NOT_TRADE: + # 棣栨涓嬪崟m鍊兼墿澶�1.5鍊� + m = int(m * 1.5) + return m, msg # 璁$畻涓囨墜鍝ョ瑪鏁� @classmethod @@ -1221,13 +1304,9 @@ @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set): - def get_threshold_count(): - count = threshold_count - return count - _start_time = t.time() total_datas = local_today_datas[code] - is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) + # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) buy_nums = origin_num buy_count = origin_count @@ -1236,8 +1315,10 @@ raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�") # 鐩爣鎵嬫暟 threshold_num = round(threshold_money / (limit_up_price * 100)) + # 澶х洰鏍囨墜鏁帮紙婊¤冻杩欎釜灏变笉闇�瑕佺湅瀹夊叏绗旀暟锛� + threshold_max_num = int(threshold_num * 1.2) - # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) + # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) # 鐩爣璁㈠崟鏁伴噺 threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count() @@ -1256,7 +1337,7 @@ big_num_count = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count() # 杈冨ぇ鍗曠殑鎵嬫暟 - bigger_num = round(5900 / limit_up_price) + bigger_num = round(5000 / limit_up_price) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] @@ -1264,7 +1345,7 @@ trigger_buy = False # 蹇呴』涓鸿繛缁�2绉掑唴鐨勬暟鎹� if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time: - TradePointManager.delete_buy_point(code) + cls.__TradePointManager.delete_buy_point(code) if i == compute_end_index: # 鏁版嵁澶勭悊瀹屾瘯 return None, buy_nums, buy_count, None, max_buy_num_set @@ -1282,18 +1363,17 @@ # 鍙粺璁�59涓囦互涓婄殑閲戦 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) - if buy_nums >= threshold_num and buy_count >= get_threshold_count(): - logger_l2_trade_buy.info( - f"{code}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿i} 缁熻绾拱鎵嬫暟锛歿buy_nums} 鐩爣绾拱鎵嬫暟锛歿threshold_num} 缁熻绾拱鍗曟暟锛歿buy_count} 鐩爣绾拱鍗曟暟锛歿get_threshold_count()}, 澶у崟鏁伴噺锛歿len(max_buy_num_set)}") + if (buy_nums >= threshold_num and buy_count >= threshold_count) or buy_nums >= threshold_max_num: + async_log_util.info(logger_l2_trade_buy, + f"{code}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿i} 缁熻绾拱鎵嬫暟锛歿buy_nums} 鐩爣绾拱鎵嬫暟锛歿threshold_num}/{threshold_max_num} 缁熻绾拱鍗曟暟锛歿buy_count} 鐩爣绾拱鍗曟暟锛歿threshold_count}, 澶у崟鏁伴噺锛歿len(max_buy_num_set)}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): if _val["num"] >= bigger_num: # 鍙粺璁�59涓囦互涓婄殑閲戦 # 娑ㄥ仠涔版挙 # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓� - buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, - total_datas[i], - local_today_num_operate_map.get( - code)) + buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], + local_today_buyno_map.get( + code)) if buy_index is not None: # 鎵惧埌涔版挙鏁版嵁鐨勪拱鍏ョ偣 if buy_index >= buy_single_index: @@ -1320,21 +1400,160 @@ buy_nums, threshold_num) max_buy_num_set_count = 0 - for i in max_buy_num_set: - max_buy_num_set_count += total_datas[i]["re"] + for i1 in max_buy_num_set: + max_buy_num_set_count += total_datas[i1]["re"] # 鏈夋挙鍗曚俊鍙凤紝涓斿皬浜庨槇鍊� - if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and max_buy_num_set_count >= big_num_count: + if buy_nums >= threshold_num and buy_count >= threshold_count and trigger_buy and max_buy_num_set_count >= big_num_count: + try: + info = cls.__trade_log_placr_order_info_dict[code] + info.set_trade_factor(threshold_money, threshold_count, list(max_buy_num_set)) + except Exception as e: + async_log_util.error(logger_l2_error, f"璁板綍浜ゆ槗鍥犲瓙鍑洪敊锛歿str(e)}") + return i, buy_nums, buy_count, None, max_buy_num_set l2_log.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿} 澶у崟鏁伴噺锛歿} 鐩爣澶у崟鏁伴噺锛歿}", compute_start_index, buy_nums, - threshold_num, buy_count, get_threshold_count(), max_buy_num_set_count, big_num_count) + threshold_num, buy_count, threshold_count, max_buy_num_set_count, big_num_count) return None, buy_nums, buy_count, None, max_buy_num_set + # 杩斿洖(涔板叆鎵ц鐐�, 鎬绘墜, 鎬荤瑪鏁�, 浠庢柊璁$畻璧风偣, 绾拱棰濋槇鍊�) + # 璁$畻蹇�熶拱鍏� + @classmethod + def __sum_buy_num_for_order_fast(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, + threshold_money_origin, buy_single_index): + _start_time = t.time() + total_datas = local_today_datas[code] + # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) + + buy_nums = origin_num + buy_count = origin_count + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is None: + raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�") + limit_up_price = float(limit_up_price) + + threshold_money = threshold_money_origin + # 鐩爣鎵嬫暟 + threshold_num = round(threshold_money / (limit_up_price * 100)) + + buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) + + # 鍙互瑙﹀彂涔帮紝褰撴湁娑ㄥ仠涔颁俊鍙锋椂鎵嶄細瑙﹀彂涔� + trigger_buy = True + # 闂撮殧鏈�澶ф椂闂翠负3s + max_space_time = 3 + for i in range(compute_start_index, compute_end_index + 1): + data = total_datas[i] + _val = total_datas[i]["val"] + trigger_buy = False + # 蹇呴』涓鸿繛缁�2绉掑唴鐨勬暟鎹� + if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time: + cls.__TradePointManager.delete_buy_point(code) + if i == compute_end_index: + # 鏁版嵁澶勭悊瀹屾瘯 + return None, buy_nums, buy_count, None, threshold_money + else: + # 璁$畻涔板叆淇″彿锛屼笉鑳藉悓涓�鏃堕棿寮�濮嬭绠� + for ii in range(buy_single_index + 1, compute_end_index + 1): + if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: + return None, buy_nums, buy_count, ii, threshold_money + if L2DataUtil.is_sell(_val): + threshold_money += _val["num"] * int(float(_val["price"]) * 100) + threshold_num = round(threshold_money / (limit_up_price * 100)) + elif L2DataUtil.is_sell_cancel(_val): + threshold_money -= _val["num"] * int(float(_val["price"]) * 100) + threshold_num = round(threshold_money / (limit_up_price * 100)) + # 娑ㄥ仠涔� + elif L2DataUtil.is_limit_up_price_buy(_val): + trigger_buy = True + # 鍙粺璁�59涓囦互涓婄殑閲戦 + buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) + buy_count += int(total_datas[i]["re"]) + if buy_nums >= threshold_num: + async_log_util.info(logger_l2_trade_buy, + f"{code}鑾峰彇鍒颁拱鍏ユ墽琛岀偣(蹇�熶拱鍏�)锛歿i} 缁熻绾拱鎵嬫暟锛歿buy_nums} 鐩爣绾拱鎵嬫暟锛歿threshold_num} 缁熻绾拱鍗曟暟锛歿buy_count}") + elif L2DataUtil.is_limit_up_price_buy_cancel(_val): + # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓� + buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], + local_today_buyno_map.get( + code)) + if buy_index is not None: + # 鎵惧埌涔版挙鏁版嵁鐨勪拱鍏ョ偣 + if buy_index >= buy_single_index: + buy_nums -= int(_val["num"]) * int(data["re"]) + buy_count -= int(data["re"]) + l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num) + else: + l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index) + if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: + # 鍚屼竴绉�,褰撲綔涔板叆淇″彿涔嬪悗澶勭悊 + buy_nums -= int(_val["num"]) * int(data["re"]) + buy_count -= int(data["re"]) + # 澶у崟鎾ら攢 + l2_log.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i) + else: + # 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐� + l2_log.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data) + buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) + buy_count -= int(total_datas[i]["re"]) + l2_log.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i, + buy_nums, threshold_num) + # 鏈夋挙鍗曚俊鍙凤紝涓斿皬浜庨槇鍊� + if buy_nums >= threshold_num and trigger_buy: + try: + info = cls.__trade_log_placr_order_info_dict[code] + info.set_trade_factor(threshold_money, 0, []) + except Exception as e: + async_log_util.error(logger_l2_error, f"璁板綍浜ゆ槗鍥犲瓙鍑洪敊锛歿str(e)}") + + return i, buy_nums, buy_count, None, threshold_money + + l2_log.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣(蹇�熶拱鍏�)锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿}", + compute_start_index, + buy_nums, + threshold_num, buy_count) + + return None, buy_nums, buy_count, None, threshold_money + + +def test_trade_record(): + code = "000333" + __trade_log_placr_order_info_dict = {code: trade_record_log_util.PlaceOrderInfo()} + try: + jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( + code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) + info = __trade_log_placr_order_info_dict[code] + info.set_buy_index(0, 1) + if jx_blocks: + info.set_kpl_blocks(list(jx_blocks)) + elif jx_blocks_by: + info.set_kpl_blocks(list(jx_blocks_by)) + else: + info.set_kpl_blocks([]) + + trade_record_log_util.add_place_order_log(code, info) + except: + pass + if __name__ == "__main__": - yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1] - yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records]) - print(yesterday_codes) + # test_trade_record() + # yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1] + # yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records]) + # print(yesterday_codes) + code = "603003" + datas = log_export.load_l2_from_log() + datas = datas.get(code) + if datas is None: + datas = [] + l2.l2_data_util.local_today_datas[code] = datas[:191] + l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map, code, + l2.l2_data_util.local_today_datas[code]) + l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, + l2.l2_data_util.local_today_datas[code]) + start_index = 73 + end_index = 190 + LCancelBigNumComputer().compute_watch_index(code, start_index, end_index) -- Gitblit v1.8.0