From a7a394e1525cfb85aff1ba02f0961dbb07748bc8 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 10 二月 2023 19:06:09 +0800 Subject: [PATCH] 日志优化,部分大单并行化处理 --- trade/trade_result_manager.py | 74 +++++-- log.py | 8 data_export_util.py | 2 l2/l2_data_util.py | 2 l2_data_util.py | 2 server.py | 16 + l2/l2_data_log.py | 5 juejin.py | 6 tool.py | 4 trade/trade_manager.py | 83 +++++++- l2/l2_log.py | 16 + l2_trade_test.py | 6 l2/cancel_buy_strategy.py | 55 +++-- l2/l2_data_manager_new.py | 241 +++++++++++++------------- 14 files changed, 305 insertions(+), 215 deletions(-) diff --git a/data_export_util.py b/data_export_util.py index 67227fb..0bdef92 100644 --- a/data_export_util.py +++ b/data_export_util.py @@ -193,6 +193,6 @@ if __name__ == "__main__": - codes = ["601890"] + codes = ["603083"] for code in codes: export_l2_excel(code) diff --git a/juejin.py b/juejin.py index e8c0bd2..c51f0fb 100644 --- a/juejin.py +++ b/juejin.py @@ -166,8 +166,8 @@ # 鑾峰彇鏈�鏂扮殑淇℃伅 def get_current_info(): - data = gpcode_manager.get_gp_list(); - results = JueJinManager.get_gp_current_info(data); + data = gpcode_manager.get_gp_list() + results = JueJinManager.get_gp_current_info(data) logger_juejin_tick.debug("瀹氭椂鑾峰彇锛歿}", results) for result in results: price = result["price"] @@ -184,7 +184,7 @@ def re_set_price_pres(codes): - result = JueJinManager.get_gp_latest_info(codes); + result = JueJinManager.get_gp_latest_info(codes) for item in result: symbol = item['symbol'] symbol = symbol.split(".")[1] diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index 35e9753..750ee5c 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -114,7 +114,7 @@ # 鍙畧鎶�30s if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30: return False, None - l2_log.cancel_debug(threadId, code, "S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) + l2_log.cancel_debug(code, "S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) logger_l2_s_cancel.debug(f"code-{code} S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿start_index}-{end_index}") if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30: @@ -202,7 +202,7 @@ return True, total_data[i] finally: - l2_log.cancel_debug(threadId, code, "S绾уぇ鍗� 鑼冨洿锛歿}-{} 鍙栨秷璁$畻缁撴灉:{}/{},姣斾緥锛歿}", start_index, end_index, cancel_num, + l2_log.cancel_debug( code, "S绾уぇ鍗� 鑼冨洿锛歿}-{} 鍙栨秷璁$畻缁撴灉:{}/{},姣斾緥锛歿}", start_index, end_index, cancel_num, buy_num, round(cancel_num / max(buy_num,1), 2)) # 淇濆瓨澶勭悊杩涘害涓庢暟鎹� @@ -288,12 +288,12 @@ total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2] if watch_indexs is None: - l2_log.cancel_debug(threadId, code, "H鎾ゆ病鑾峰彇鍒扮洃鍚寖鍥存暟鎹�") + l2_log.cancel_debug(code, "H鎾ゆ病鑾峰彇鍒扮洃鍚寖鍥存暟鎹�") return False, None processed_index, cancel_num = cls.__get_compute_data(code) - l2_log.cancel_debug(threadId, code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) + l2_log.cancel_debug( code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) # 鑾峰彇涓嬪崟娆℃暟 place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE @@ -323,7 +323,7 @@ if cancel_num / total_nums > cancel_rate_threshold: return True, total_data[i] finally: - l2_log.cancel_debug(threadId, code, "H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿}-{} 澶勭悊杩涘害锛歿} 鍙栨秷璁$畻缁撴灉:{}/{}", start_index, end_index, + l2_log.cancel_debug(code, "H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿}-{} 澶勭悊杩涘害锛歿} 鍙栨秷璁$畻缁撴灉:{}/{}", start_index, end_index, process_index, cancel_num, total_nums) logger_l2_h_cancel.info(f"code-{code} H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿start_index}-{end_index} 澶勭悊杩涘害锛歿process_index} 鐩爣姣斾緥锛歿cancel_rate_threshold} 鍙栨秷璁$畻缁撴灉:{cancel_num}/{total_nums}") @@ -428,7 +428,7 @@ if total_count >= safe_count: # and total_num >= threshold_num finished = True # 鏈�灏�8绗� - l2_log.cancel_debug(0, code, "鑾峰彇鍒癏鎾ょ洃鍚暟鎹細{},璁$畻鎴嚦浣嶇疆锛歿}", json.dumps(list(watch_set)), + l2_log.cancel_debug(code, "鑾峰彇鍒癏鎾ょ洃鍚暟鎹細{},璁$畻鎴嚦浣嶇疆锛歿}", json.dumps(list(watch_set)), total_data[-1]["index"]) break # 璁$畻TOP N澶у崟 @@ -439,7 +439,7 @@ final_watch_set = set.union(watch_set, top_n_watch_set) final_watch_list = list(final_watch_set) final_watch_list.sort(key=lambda x: x[0]) - logger_l2_h_cancel.info(f"code-{code} H鎾ゆ渶缁堢洃鎺уぇ鍗�:{final_watch_list}") + logger_l2_h_cancel.info(f"code-{code} 瀹夊叏绗旀暟锛歿safe_count} H鎾ゆ渶缁堢洃鎺уぇ鍗�:{final_watch_list}") # 淇濆瓨璁$畻鑼冨洿 cls.__save_watch_index_set(code, final_watch_set, process_index, finished) # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹� @@ -522,16 +522,31 @@ key = None # 鑾峰彇鐭鏃堕棿鍓�1鍒嗛挓鐨勬暟鎹� keys = [] - for i in range(0, 3600): - temp_time = tool.trade_time_add_second(time_str, 0 - i) - # 鍙鐞�9锛�30鍚庣殑鏁版嵁 - if int(temp_time.replace(":", "")) < int("093000"): - break - keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", "")) - if len(keys_) > 0: - keys.append(keys_[0]) - if len(keys) >= 1: - break + if not constant.TEST: + for i in range(0, 3600): + temp_time = tool.trade_time_add_second(time_str, 0 - i) + # 鍙鐞�9锛�30鍚庣殑鏁版嵁 + if int(temp_time.replace(":", "")) < int("093000"): + break + keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", "")) + if len(keys_) > 0: + keys.append(keys_[0]) + if len(keys) >= 1: + break + else: + keys_ = cls.__get_l2_second_money_record_keys(code, "*") + key_list=[] + for k in keys_: + time__ = k.split("-")[-1] + key_list.append((int(time__),k)) + key_list.sort(key=lambda tup: tup[0]) + for t in key_list: + if t[0] <= int(time_): + keys.append(t[1]) + break + + + keys.sort(key=lambda tup: int(tup.split("-")[-1])) if len(keys) > 0: key = keys[0] @@ -775,11 +790,11 @@ process_end_index = cancel_index # 淇濆瓨鏈�鏂扮疮璁¢噾棰� # cls.__set_l2_latest_money_record(code, process_end_index, total_num) - l2_data_log.l2_time(code, random_key, round(time.time() * 1000) - start_time, + l2_data_log.l2_time(code, round(time.time() * 1000) - start_time, "l2鏁版嵁灏佸崟棰濊绠楁椂闂�", False) if cancel_index: - l2_log.cancel_debug(random_key, code, "鏁版嵁澶勭悊浣嶇疆锛歿}-{}锛寋}锛屾渶缁堜拱1涓猴細{}", start_index, end_index, record_msg, + l2_log.cancel_debug(code, "鏁版嵁澶勭悊浣嶇疆锛歿}-{}锛寋}锛屾渶缁堜拱1涓猴細{}", start_index, end_index, record_msg, total_num) return total_datas[cancel_index], cancel_msg return None, None @@ -864,7 +879,7 @@ process_index = cancel_index else: process_index = end_index - l2_log.cancel_debug(random_key, code, "鏉夸笂鍗栦俊鎭細璁$畻浣嶇疆锛歿}-{} 鏉夸笂鍗栨暟鎹畕}/{}", start_index, end_index, total_num, + l2_log.cancel_debug(code, "鏉夸笂鍗栦俊鎭細璁$畻浣嶇疆锛歿}-{} 鏉夸笂鍗栨暟鎹畕}/{}", start_index, end_index, total_num, threshold_num) cls.__save_process_index(code, process_index) diff --git a/l2/l2_data_log.py b/l2/l2_data_log.py index 4a0745e..1da78cf 100644 --- a/l2/l2_data_log.py +++ b/l2/l2_data_log.py @@ -2,13 +2,14 @@ import time import log +from l2 import l2_log -def l2_time(code, do_id, time_, description, new_line=False, force=False): +def l2_time(code, time_, description, new_line=False, force=False): timestamp = int(time.time() * 1000) # 鍙褰曡�楁椂杈冮暱鐨勪俊鎭� if time_ > 1 or force: - log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", do_id, timestamp, description, code, time_, + log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, description, code, time_, "\n" if new_line else "") return timestamp diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index d9cb6be..3d741bc 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -16,7 +16,7 @@ import tool from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ trade_result_manager -from l2 import safe_count_manager, l2_data_manager, l2_data_log +from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \ L2LimitUpSellStatisticUtil from l2.l2_data_manager import L2DataException, TradePointManager @@ -161,20 +161,6 @@ __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager() @classmethod - def debug(cls, code, content, *args): - logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) - - @classmethod - def cancel_debug(cls, code, content, *args): - logger_l2_trade_cancel.debug( - ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) - - @classmethod - def buy_debug(cls, code, content, *args): - logger_l2_trade_buy.debug( - ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) - - @classmethod # 鏁版嵁澶勭悊鍏ュ彛 # datas: 鏈鎴浘鏁版嵁 # capture_timestamp:鎴浘鏃堕棿鎴� @@ -204,7 +190,7 @@ # 淇濆瓨鏁版嵁 __start_time = round(t.time() * 1000) l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code]) - __start_time = l2_data_log.l2_time(code, cls.random_key[code], + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "淇濆瓨鏁版嵁鏃堕棿锛坽}锛�".format(len(add_datas))) finally: @@ -231,7 +217,7 @@ limit_up_time_manager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] - __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time, + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2鏁版嵁棰勫鐞嗘椂闂�") if len(add_datas) > 0: @@ -254,7 +240,7 @@ logger_l2_process.info("code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{} 鎴浘鏃堕棿鎴筹細{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, capture_timestamp) - __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time, + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2鏁版嵁澶勭悊鏃堕棿") # 澶勭悊鏈寕鍗� @@ -264,7 +250,7 @@ # 鑾峰彇闃堝�� threshold_money, msg = cls.__get_threshmoney(code) if round(t.time() * 1000) - __start_time > 10: - __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time, + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "鑾峰彇m鍊兼暟鎹�楁椂") cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time) @@ -285,7 +271,7 @@ cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data, local_today_num_operate_map.get(code)) - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鑾峰彇涔板叆淇℃伅鑰楁椂") return None, "" @@ -296,7 +282,7 @@ # 璁$畻m鍊煎ぇ鍗� cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, gpcode_manager.get_limit_up_price(code)) - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-m鍊煎ぇ鍗曡绠�") return None, "" @@ -309,7 +295,7 @@ end_index, buy_single_index, buy_exec_index) - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-涔�1缁熻鑰楁椂") return cancel_data, cancel_msg @@ -328,7 +314,7 @@ except Exception as e: logging.exception(e) finally: - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-s绾уぇ鍗曚及绠�") return None, "" @@ -345,7 +331,7 @@ except Exception as e: logging.exception(e) finally: - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�") + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�") return None, "" # 鏉夸笂鍗栨挙 @@ -361,7 +347,7 @@ except Exception as e: logging.exception(e) finally: - l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鏉夸笂鍗栬�楁椂") + l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鏉夸笂鍗栬�楁椂") return None, "" # 鏄惁闇�瑕佹挙閿� @@ -384,7 +370,7 @@ if end_index < start_index: return total_data = local_today_datas.get(code) - _start_time = round(t.time() * 1000) + _start_time = tool.get_now_timestamp() # 鑾峰彇涔板叆淇″彿璧峰鐐� buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code) @@ -397,18 +383,18 @@ dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6) cancel_data, cancel_msg = dask_result.compute() - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "宸蹭笅鍗�-鎾ゅ崟 鍒ゆ柇鏄惁闇�瑕佹挙鍗�") if cancel_data: - cls.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg) + l2_log.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg) # 鎾ゅ崟 if cls.cancel_buy(code, cancel_msg): - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "宸蹭笅鍗�-鎾ゅ崟 鑰楁椂") # 鎾ゅ崟鎴愬姛锛岀户缁绠椾笅鍗� cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "澶勭悊鍓╀綑鏁版嵁 鑰楁椂") else: # 鎾ゅ崟灏氭湭鎴愬姛 @@ -417,41 +403,43 @@ # 濡傛灉鏈夎櫄鎷熶笅鍗曢渶瑕佺湡瀹炰笅鍗� unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: - cls.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time) + l2_log.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time) # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿) # 鐪熷疄涓嬪崟 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], unreal_buy_info[0]) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, - "宸蹭笅鍗�-鐪熷疄涓嬪崟 鑰楁椂") + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, + "宸茶櫄鎷熶笅鍗�-鎵ц鐪熷疄涓嬪崟 澶栭儴鑰楁椂") @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index): + __start_time = tool.get_now_timestamp() can, reason = cls.__can_buy(code) + __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "鏈�鍚庡垽鏂槸鍚﹁兘涓嬪崟", force=True) # 鍒犻櫎铏氭嫙涓嬪崟 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) if not can: - cls.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason) + l2_log.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason) if not reason.startswith("涔�1浠蜂笉涓烘定鍋滀环"): # 涓柇涔板叆 trade_manager.break_buy(code, reason) return else: - cls.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason) + l2_log.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason) try: - cls.debug(code, "寮�濮嬫墽琛屼拱鍏�") + l2_log.debug(code, "寮�濮嬫墽琛屼拱鍏�") trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) ################涓嬪崟鎴愬姛澶勭悊################ trade_result_manager.real_buy_success(code) - cls.debug(code, "鎵ц涔板叆鎴愬姛") + l2_log.debug(code, "鎵ц涔板叆鎴愬姛") except Exception as e: - cls.debug(code, "鎵ц涔板叆寮傚父:{}", str(e)) + l2_log.debug(code, "鎵ц涔板叆寮傚父:{}", str(e)) pass finally: - cls.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) + l2_log.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 鏄惁鍙互鍙栨秷 @classmethod @@ -502,7 +490,7 @@ total_datas = local_today_datas[code] try: sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code) - cls.buy_debug(code, "鍗�1淇℃伅涓猴細({},{},{})", sell1_time, sell1_price, sell1_volumn) + l2_log.buy_debug(code, "鍗�1淇℃伅涓猴細({},{},{})", sell1_time, sell1_price, sell1_volumn) if sell1_time is not None and sell1_volumn > 0: # 鑾峰彇鎵ц浣嶄俊鎭� @@ -591,18 +579,18 @@ # 鍙互涓嬪崟 return True, None finally: - l2_data_log.l2_time(code, cls.random_key[code], round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻") + l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻") @classmethod def __cancel_buy(cls, code): try: - cls.debug(code, "寮�濮嬫墽琛屾挙鍗�") + l2_log.debug(code, "寮�濮嬫墽琛屾挙鍗�") trade_manager.start_cancel_buy(code) - cls.debug(code, "鎵ц鎾ゅ崟鎴愬姛") + l2_log.debug(code, "鎵ц鎾ゅ崟鎴愬姛") return True except Exception as e: logging.exception(e) - cls.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e)) + l2_log.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e)) return False @classmethod @@ -626,13 +614,13 @@ can_cancel, reason = cls.__can_cancel(code) if not can_cancel: # 涓嶈兘鍙栨秷 - cls.cancel_debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) - cls.debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) + l2_log.cancel_debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) + l2_log.debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason) return False cancel_result = cls.__cancel_buy(code) if cancel_result: trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas) - cls.debug(code, "鎵ц鎾ゅ崟缁撴潫锛屽師鍥狅細{}", msg) + l2_log.debug(code, "鎵ц鎾ゅ崟缁撴潫锛屽師鍥狅細{}", msg) return True # 铏氭嫙涓嬪崟 @@ -646,7 +634,7 @@ new_add=True): if compute_end_index < compute_start_index: return - _start_time = round(t.time() * 1000) + _start_time = tool.get_now_timestamp() total_datas = local_today_datas[code] # 澶勭悊瀹夊叏绗旀暟 cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas, @@ -657,6 +645,7 @@ code) # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆 + new_get_single = False if buy_single_index is None: place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) continue_count = 3 @@ -669,33 +658,46 @@ compute_end_index) buy_single_index = _index if has_single: + new_get_single = True num = 0 count = 0 - cls.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛屾暟鎹細{}", buy_single_index, compute_start_index, - compute_end_index, total_datas[buy_single_index]) + l2_log.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛屾暟鎹細{}", buy_single_index, compute_start_index, + compute_end_index, total_datas[buy_single_index]) # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟寮�濮嬩俊鍙凤紝闇�瑕佽缃ぇ鍗曡捣濮嬬偣 cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿") + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿") if buy_single_index is None: # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴� return None + # 寮�濮嬭绠楃殑浣嶇疆 + start_process_index = min(buy_single_index, compute_start_index) if new_get_single else max(buy_single_index, + compute_start_index) + # 璁$畻m鍊煎ぇ鍗� - cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index, + cls.l2BigNumForMProcessor.process(code, start_process_index, + compute_end_index, gpcode_manager.get_limit_up_price(code)) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "璁$畻m鍊煎ぇ鍗�") + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "璁$畻m鍊煎ぇ鍗�") threshold_money, msg = cls.__get_threshmoney(code) - # 涔板叆绾拱棰濈粺璁� - compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max( - buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index, - max_num_set) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "绾拱棰濈粺璁℃椂闂�") - cls.debug(code, "m鍊�-{} m鍊煎洜瀛�-{}", threshold_money, msg) + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m鍊奸槇鍊艰绠�") + + # 涔板叆绾拱棰濈粺璁� + compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, + start_process_index, + compute_end_index, + num, count, + threshold_money, + buy_single_index, + max_num_set) + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "绾拱棰濈粺璁℃椂闂�") + + l2_log.debug(code, "m鍊�-{} m鍊煎洜瀛�-{}", threshold_money, msg) # 涔板叆淇″彿浣嶄笌璁$畻浣嶇疆闂撮殧2s鍙婁互涓婁簡 if rebegin_buy_pos is not None: @@ -704,25 +706,37 @@ return if compute_index is not None: - cls.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿}", compute_index, threshold_money, buy_nums, - buy_count, - total_datas[compute_index]) - # 璁板綍涔板叆淇″彿浣嶇疆 - cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, - max_num_set_new) - # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級 - limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) - # 铏氭嫙涓嬪崟 - cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) - # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙� - l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + l2_log.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿}", compute_index, threshold_money, buy_nums, + buy_count, total_datas[compute_index]) - # 娑ㄥ仠灏佸崟棰濊绠� - L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index, - buy_single_index, - buy_exec_index, False) + f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index, + buy_nums, buy_count, max_num_set_new) + f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"]) + f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time) + f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) + f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(cls.random_key[code], code, buy_single_index, + compute_index, + buy_single_index, + buy_exec_index, False) + dask.compute(f1, f2, f3, f4, f5) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + # 宸茶骞惰澶勭悊 + # # 璁板綍涔板叆淇″彿浣嶇疆 + # cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, + # max_num_set_new) + # # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級 + # limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) + # # 铏氭嫙涓嬪崟 + # cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) + # # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙� + # l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + # + # # 娑ㄥ仠灏佸崟棰濊绠� + # L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index, + # buy_single_index, + # buy_exec_index, False) + + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "璁板綍鎵ц涔板叆鏁版嵁", force=True) # 鏁版嵁鏄惁澶勭悊瀹屾瘯 @@ -732,9 +746,9 @@ buy_single_index, compute_index, total_datas, cls.random_key[code], True) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S绾уぇ鍗曞鐞嗚�楁椂", force=True) - cls.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) + l2_log.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) # 鏁版嵁宸茬粡澶勭悊瀹屾瘯锛屽鏋滆繕娌℃挙鍗曞氨瀹為檯涓嬪崟 if need_cancel: if cls.cancel_buy(code, "S绾уぇ鍗曟挙閿�"): @@ -746,13 +760,13 @@ SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, compute_index, total_datas, cls.random_key[code], False) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S绾уぇ鍗曞鐞嗚�楁椂", force=True) # 鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞� - cls.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index) + l2_log.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index) # 澶勭悊鎾ゅ崟姝ラ cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False) - _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, f"澶勭悊鎾ゅ崟姝ラ鑰楁椂锛岃寖鍥达細{compute_index + 1}-{compute_end_index}", force=True) else: @@ -841,23 +855,11 @@ def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set): def get_threshold_count(): - count = threshold_count # - sub_threshold_count - # if count < 3: - # count = 3 - # count = round(count * buy1_factor) - # # 鏈�楂�30绗旓紝鏈�浣�8绗� - # if count > 21: - # count = 21 - # if count < 8: - # count = 8 + count = threshold_count return count _start_time = t.time() total_datas = local_today_datas[code] - # 璁$畻浠庝拱鍏ヤ俊鍙峰紑濮嬪埌璁$畻寮�濮嬩綅缃殑澶у崟鏁伴噺 - sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1) - if sub_threshold_count < 0: - sub_threshold_count = 0 buy_nums = origin_num buy_count = origin_count @@ -868,15 +870,6 @@ # 鐩爣鎵嬫暟 threshold_num = round(threshold_money / (limit_up_price * 100)) - buy1_factor = 1 - # 鑾峰彇涔�1鏄惁涓烘定鍋滀环 - if buy1_price is None: - buy1_factor = 1.3 - elif limit_up_price is None: - buy1_factor = 1.3 - elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01: - print("涔�1浠蜂笉涓烘定鍋滀环锛屼拱1浠�-{} 娑ㄥ仠浠�-{}".format(buy1_price, limit_up_price)) - buy1_factor = 1.3 # 鐩爣璁㈠崟鏁伴噺 threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code) @@ -899,6 +892,9 @@ # 绗竴娆′笅鍗曢渶瑕佸ぇ鍗曟渶灏�2绗旓紝浠ュ悗鍙渶瑕�1绗� big_num_count = 1 + # 杈冨ぇ鍗曠殑鎵嬫暟 + bigger_num = round(5900 / limit_up_price) + for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] @@ -917,22 +913,17 @@ # 娑ㄥ仠涔� if L2DataUtil.is_limit_up_price_buy(_val): if l2_data_util.is_big_money(_val): - # sub_threshold_count += int(total_datas[i]["re"]) max_buy_num_set.add(i) - if round(int(_val["num"]) * float(_val["price"])) >= 5900: + if _val["num"] >= bigger_num: trigger_buy = True # 鍙粺璁�59涓囦互涓婄殑閲戦 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num and buy_count >= get_threshold_count(): - logger_l2_trade_buy.info("{}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿}, 澶у崟鏁伴噺锛歿}", code, - i, - buy_nums, - threshold_num, buy_count, get_threshold_count(), sub_threshold_count, ) + logger_l2_trade_buy.info( + f"{code}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿i} 缁熻绾拱鎵嬫暟锛歿buy_nums} 鐩爣绾拱鎵嬫暟锛歿threshold_num} 缁熻绾拱鍗曟暟锛歿buy_count} 鐩爣绾拱鍗曟暟锛歿get_threshold_count()}, 澶у崟鏁伴噺锛歿len(max_buy_num_set)}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): - if l2_data_util.is_big_money(_val): - sub_threshold_count -= int(total_datas[i]["re"]) - if round(int(_val["num"]) * float(_val["price"])) >= 5900: + if _val["num"] >= bigger_num: # 鍙粺璁�59涓囦互涓婄殑閲戦 # 娑ㄥ仠涔版挙 # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓� @@ -944,31 +935,35 @@ if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) - cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num) + # 澶у崟鎾ら攢 + max_buy_num_set.discard(buy_index) + l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num) else: - cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index) + l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: # 鍚屼竴绉�,褰撲綔涔板叆淇″彿涔嬪悗澶勭悊 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) - cls.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i) + # 澶у崟鎾ら攢 + max_buy_num_set.discard(buy_index) + l2_log.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i) else: # 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐� - cls.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data) + l2_log.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) - cls.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i, - buy_nums, threshold_num) + l2_log.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i, + buy_nums, threshold_num) # 鏈夋挙鍗曚俊鍙凤紝涓斿皬浜庨槇鍊� if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len( max_buy_num_set) >= big_num_count: return i, buy_nums, buy_count, None, max_buy_num_set - cls.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿} 澶у崟鏁伴噺锛歿} 鐩爣澶у崟鏁伴噺锛歿}", - compute_start_index, - buy_nums, - threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count) + l2_log.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿} 澶у崟鏁伴噺锛歿} 鐩爣澶у崟鏁伴噺锛歿}", + compute_start_index, + buy_nums, + threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count) return None, buy_nums, buy_count, None, max_buy_num_set @@ -1132,6 +1127,6 @@ # local_today_num_operate_map.get( # "600213")) # print(buy_index, buy_data) - dict_={"code":0} + dict_ = {"code": 0} dict_.clear() print(dict_) diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py index 6546e9a..7c60dea 100644 --- a/l2/l2_data_util.py +++ b/l2/l2_data_util.py @@ -122,7 +122,7 @@ # 淇濆瓨鏈�杩戠殑鏁版嵁 __start_time = round(time.time() * 1000) redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) - l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") + l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") # 璁剧疆杩涘唴瀛� local_latest_datas[code] = datas __set_l2_data_latest_count(code, len(datas)) diff --git a/l2/l2_log.py b/l2/l2_log.py index c2e4174..48df92e 100644 --- a/l2/l2_log.py +++ b/l2/l2_log.py @@ -1,15 +1,17 @@ from log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_trade - -def debug(random_key, code, content, *args): - logger_l2_trade.debug(("thread-id={} code={} ".format(random_key, code) + content).format(*args)) +threadIds = {} -def buy_debug(random_key, code, content, *args): +def debug( code, content, *args): + logger_l2_trade.debug(("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + + +def buy_debug(code, content, *args): logger_l2_trade_buy.debug( - ("thread-id={} code={} ".format(random_key, code) + content).format(*args)) + ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) -def cancel_debug(random_key, code, content, *args): +def cancel_debug(code, content, *args): logger_l2_trade_cancel.debug( - ("thread-id={} code={} ".format(random_key, code) + content).format(*args)) + ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) diff --git a/l2_data_util.py b/l2_data_util.py index eccea79..2f48827 100644 --- a/l2_data_util.py +++ b/l2_data_util.py @@ -28,7 +28,7 @@ # 鏄惁涓哄ぇ鍗� def is_big_money(val): price = float(val["price"]) - money = price * int(val["num"]) + money = price * val["num"] if price > 3.0: if money >= 30000: return True diff --git a/l2_trade_test.py b/l2_trade_test.py index 65a9873..1c93c3d 100644 --- a/l2_trade_test.py +++ b/l2_trade_test.py @@ -71,9 +71,9 @@ except Exception as e: pass - @unittest.skip("璺宠繃姝ゅ崟鍏冩祴璇�") + # @unittest.skip("璺宠繃姝ゅ崟鍏冩祴璇�") def test_trade(self): - code = "002328" + code = "002131" clear_trade_data(code) l2.l2_data_util.load_l2_data(code) total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) @@ -85,7 +85,7 @@ total_datas.insert(i, data) pos_list = log.get_l2_process_position(code) - pos_list.insert(108,(375,448)) + # pos_list.insert(108,(375,448)) if pos_list[0][0] > 0: pos_list.insert(0, (0, pos_list[0][0] - 1)) del pos_list[-1] diff --git a/log.py b/log.py index 8b62697..cace136 100644 --- a/log.py +++ b/log.py @@ -323,10 +323,10 @@ if line.find("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{}".format(code)) < 0: continue try: - index = int(line.split("index-")[1].split(" ")[0]) - index_list.append((index, time_)) + index = int(line.split("index-")[1].split(" ")[0]) + index_list.append((index, time_)) except: - pass + pass return index_list, buy_queues @@ -350,7 +350,7 @@ if __name__ == '__main__': # logger_l2_h_cancel.info("test") # logger_l2_process_time.info("test123") - codes = ["002328"] + codes = ["002131", "003035", "002131"] for code in codes: export_logs(code) diff --git a/server.py b/server.py index 5ee8e04..c0d9c50 100644 --- a/server.py +++ b/server.py @@ -19,7 +19,7 @@ import gpcode_manager import authority import juejin -from l2 import l2_data_manager_new, l2_data_manager, l2_data_log +from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log import l2_data_util from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil import l2.l2_data_util @@ -96,12 +96,14 @@ return_str = "OK" if type == 0: try: + origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) do_id = random.randint(0, 100000) # level2鐩樺彛鏁版嵁 day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data( _str) + l2_log.threadIds[code] = random.randint(0, 100000) if channel == 0: now_time = round(time.time() * 1000) if self.last_time.get(channel) is not None: @@ -120,12 +122,12 @@ # 10ms鐨勭綉缁滀紶杈撳欢鏃� capture_timestamp = __start_time - process_time - 10 # print("鎴浘鏃堕棿锛�", process_time) - __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time)) cid, pid = gpcode_manager.get_listen_code_pos(code) - __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� if cid is not None and pid is not None and client == int(cid) and channel == int(pid): @@ -134,19 +136,19 @@ l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) __start_time = round(time.time() * 1000) if gpcode_manager.is_listen(code): - __start_time = l2_data_log.l2_time(code, do_id, + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂") l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, do_id) - __start_time = l2_data_log.l2_time(code, do_id, + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", False) # 淇濆瓨鍘熷鏁版嵁鏁伴噺 l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) if round(time.time() * 1000) - __start_time > 20: - l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", False) @@ -173,7 +175,7 @@ __end_time = round(time.time() * 1000) # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� if __end_time - origin_start_time > 100: - l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, + l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, "l2鏁版嵁澶勭悊鎬昏�楁椂", True) except Exception as e: diff --git a/tool.py b/tool.py index d647715..9f911ab 100644 --- a/tool.py +++ b/tool.py @@ -44,6 +44,10 @@ return time_str +def get_now_timestamp(): + return round(time.time() * 1000) + + # 杞负浠锋牸锛屽洓鑸嶄簲鍏ヤ繚鐣�2浣嶅皬鏁� def to_price(_decimal): return _decimal.quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP) diff --git a/trade/trade_manager.py b/trade/trade_manager.py index 0306dea..4a5a7ac 100644 --- a/trade/trade_manager.py +++ b/trade/trade_manager.py @@ -5,11 +5,13 @@ # 浜ゆ槗绠$悊鍣� import time +import dask + from db import mysql_data, redis_manager from trade import trade_data_manager, l2_trade_util from trade.trade_gui import THSBuyWinManagerNew, THSGuiTrade import time as t -from l2 import l2_data_manager +from l2 import l2_data_manager, l2_data_log from log import * @@ -165,26 +167,72 @@ # 寮�濮嬩氦鏄� def start_buy(code, capture_timestamp, last_data, last_data_index): - # 鏄惁绂佹浜ゆ槗 - if l2_trade_util.is_in_forbidden_trade_codes(code): - raise Exception("绂佹浜ゆ槗") - trade_state = get_trade_state(code) - if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING: - raise Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��") - money = get_available_money() - if money is None: - raise Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾") - price = gpcode_manager.get_limit_up_price(code) - if price is None: - raise Exception("灏氭湭鑾峰彇鍒版定鍋滀环") - # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲 - if price * 100 > money: - raise Exception("璐︽埛鍙敤璧勯噾涓嶈冻") + @dask.delayed + def is_forbidden(code): + if l2_trade_util.is_in_forbidden_trade_codes(code): + return Exception("绂佹浜ゆ槗") + return None, None + + @dask.delayed + def is_state_right(code): + trade_state = get_trade_state(code) + if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING: + return Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��"), trade_state + return None, trade_state + + @dask.delayed + def is_money_enough(code): + money = get_available_money() + if money is None: + return Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾"), None + price = gpcode_manager.get_limit_up_price(code) + if price is None: + return Exception("灏氭湭鑾峰彇鍒版定鍋滀环"), None + # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲 + if price * 100 > money: + return Exception("璐︽埛鍙敤璧勯噾涓嶈冻"), price + return None, price + + @dask.delayed + def can_trade(*args): + for arg in args: + if arg[0] is not None: + return arg[0], None, None + return None, args[1][1], args[2][1] + + _start_time = tool.get_now_timestamp() + + f1 = is_forbidden(code) + f2 = is_state_right(code) + f3 = is_money_enough(code) + dask_result = can_trade(f1, f2, f3) + ex, trade_state, price = dask_result.compute() + if ex is not None: + raise ex + + # 骞惰鏀归�� + # # 鏄惁绂佹浜ゆ槗 + # if l2_trade_util.is_in_forbidden_trade_codes(code): + # raise Exception("绂佹浜ゆ槗") + # trade_state = get_trade_state(code) + # if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING: + # raise Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��") + # money = get_available_money() + # if money is None: + # raise Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾") + # price = gpcode_manager.get_limit_up_price(code) + # if price is None: + # raise Exception("灏氭湭鑾峰彇鍒版定鍋滀环") + # # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲 + # if price * 100 > money: + # raise Exception("璐︽埛鍙敤璧勯噾涓嶈冻") print("寮�濮嬩拱鍏�") logger_trade.info("{}寮�濮嬩拱鍏�".format(code)) set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER) + _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涔板叆鍒ゆ柇鏃堕棿", force=True) __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index) + l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "寮傛涔板叆鏃堕棿", force=True) # 涓柇涔板叆 @@ -269,9 +317,10 @@ break except Exception as e: logger_trade.error("{}鍐嶆鎾ゅ崟寮傚父锛歿}".format(code, str(e))) - time.sleep(0.1+0.05*i) + time.sleep(0.1 + 0.05 * i) pass + # 鍙栨秷濮旀墭鎴愬姛 def __cancel_success(code): trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code) diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py index 317f007..c8ede05 100644 --- a/trade/trade_result_manager.py +++ b/trade/trade_result_manager.py @@ -1,6 +1,8 @@ # 铏氭嫙涔版垚鍔� import logging +import dask + from l2 import l2_data_manager from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil from l2.l2_data_util import local_today_datas, local_today_num_operate_map @@ -22,41 +24,61 @@ # 铏氭嫙鎾ゆ垚鍔� def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas): - l2_data_manager.TradePointManager.delete_buy_point(code) - l2_data_manager.TradePointManager.delete_buy_cancel_point(code) - l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) - l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) + f1 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code) + f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) + f3 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code) + f4 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code) # 瀹夊叏绗旀暟璁$畻 - __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"]) - SecondCancelBigNumComputer.cancel_success(code) + f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index, + total_datas[-1]["index"]) + f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) + dask.compute(f1, f2, f3, f4, f5, f6) # 鐪熷疄涔版垚鍔� def real_buy_success(code): - # 涓嬪崟鎴愬姛锛岄渶瑕佸垹闄ゆ渶澶т拱1 - __thsBuy1VolumnManager.clear_max_buy1_volume(code) - # 鑾峰彇涔板叆浣嶇疆淇℃伅 - try: - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( - code) - __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None) - HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index, - local_today_datas.get(code), - local_today_num_operate_map.get(code)) - except Exception as e: - logging.exception(e) - logger_l2_error.exception(e) + @dask.delayed + def clear_max_buy1_volume(code): + # 涓嬪崟鎴愬姛锛岄渶瑕佸垹闄ゆ渶澶т拱1 + __thsBuy1VolumnManager.clear_max_buy1_volume(code) + + @dask.delayed + def safe_count(code, buy_single_index, buy_exec_index): + try: + __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None) + except Exception as e: + logging.exception(e) + logger_l2_error.exception(e) + + @dask.delayed + def h_cancel(code, buy_single_index, buy_exec_index): + try: + HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index, + local_today_datas.get(code), + local_today_num_operate_map.get(code)) + except Exception as e: + logging.exception(e) + logger_l2_error.exception(e) + + buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( + code) + + f1 = clear_max_buy1_volume(code) + f2 = safe_count(code, buy_single_index, buy_exec_index) + f3 = h_cancel(code, buy_single_index, buy_exec_index) + dask.compute(f1, f2, f3) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) # 鐪熷疄鎾ゆ垚鍔� def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas): # 瀹夊叏绗旀暟璁$畻 - __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"]) + f1 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index, + total_datas[-1]["index"]) # 鍙栨秷涔板叆鏍囪瘑 - l2_data_manager.TradePointManager.delete_buy_point(code) - l2_data_manager.TradePointManager.delete_buy_cancel_point(code) - l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) - l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) - - SecondCancelBigNumComputer.cancel_success(code) + f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code) + f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) + f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code) + f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code) + f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) + dask.compute(f1, f2, f3, f4, f5, f6) -- Gitblit v1.8.0