From a7a394e1525cfb85aff1ba02f0961dbb07748bc8 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 10 二月 2023 19:06:09 +0800
Subject: [PATCH] 日志优化,部分大单并行化处理

---
 trade/trade_result_manager.py |   74 +++++--
 log.py                        |    8 
 data_export_util.py           |    2 
 l2/l2_data_util.py            |    2 
 l2_data_util.py               |    2 
 server.py                     |   16 +
 l2/l2_data_log.py             |    5 
 juejin.py                     |    6 
 tool.py                       |    4 
 trade/trade_manager.py        |   83 +++++++-
 l2/l2_log.py                  |   16 +
 l2_trade_test.py              |    6 
 l2/cancel_buy_strategy.py     |   55 +++--
 l2/l2_data_manager_new.py     |  241 +++++++++++++-------------
 14 files changed, 305 insertions(+), 215 deletions(-)

diff --git a/data_export_util.py b/data_export_util.py
index 67227fb..0bdef92 100644
--- a/data_export_util.py
+++ b/data_export_util.py
@@ -193,6 +193,6 @@
 
 
 if __name__ == "__main__":
-    codes = ["601890"]
+    codes = ["603083"]
     for code in codes:
         export_l2_excel(code)
diff --git a/juejin.py b/juejin.py
index e8c0bd2..c51f0fb 100644
--- a/juejin.py
+++ b/juejin.py
@@ -166,8 +166,8 @@
 
 # 鑾峰彇鏈�鏂扮殑淇℃伅
 def get_current_info():
-    data = gpcode_manager.get_gp_list();
-    results = JueJinManager.get_gp_current_info(data);
+    data = gpcode_manager.get_gp_list()
+    results = JueJinManager.get_gp_current_info(data)
     logger_juejin_tick.debug("瀹氭椂鑾峰彇锛歿}", results)
     for result in results:
         price = result["price"]
@@ -184,7 +184,7 @@
 
 
 def re_set_price_pres(codes):
-    result = JueJinManager.get_gp_latest_info(codes);
+    result = JueJinManager.get_gp_latest_info(codes)
     for item in result:
         symbol = item['symbol']
         symbol = symbol.split(".")[1]
diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py
index 35e9753..750ee5c 100644
--- a/l2/cancel_buy_strategy.py
+++ b/l2/cancel_buy_strategy.py
@@ -114,7 +114,7 @@
         # 鍙畧鎶�30s
         if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
             return False, None
-        l2_log.cancel_debug(threadId, code, "S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index)
+        l2_log.cancel_debug(code, "S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index)
         logger_l2_s_cancel.debug(f"code-{code} S绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿start_index}-{end_index}")
 
         if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
@@ -202,7 +202,7 @@
                             return True, total_data[i]
         finally:
 
-            l2_log.cancel_debug(threadId, code, "S绾уぇ鍗� 鑼冨洿锛歿}-{} 鍙栨秷璁$畻缁撴灉:{}/{},姣斾緥锛歿}", start_index, end_index, cancel_num,
+            l2_log.cancel_debug( code, "S绾уぇ鍗� 鑼冨洿锛歿}-{} 鍙栨秷璁$畻缁撴灉:{}/{},姣斾緥锛歿}", start_index, end_index, cancel_num,
                                 buy_num, round(cancel_num / max(buy_num,1), 2))
 
             # 淇濆瓨澶勭悊杩涘害涓庢暟鎹�
@@ -288,12 +288,12 @@
             total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
 
         if watch_indexs is None:
-            l2_log.cancel_debug(threadId, code, "H鎾ゆ病鑾峰彇鍒扮洃鍚寖鍥存暟鎹�")
+            l2_log.cancel_debug(code, "H鎾ゆ病鑾峰彇鍒扮洃鍚寖鍥存暟鎹�")
             return False, None
 
         processed_index, cancel_num = cls.__get_compute_data(code)
 
-        l2_log.cancel_debug(threadId, code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index)
+        l2_log.cancel_debug( code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index)
         # 鑾峰彇涓嬪崟娆℃暟
         place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
         cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
@@ -323,7 +323,7 @@
                         if cancel_num / total_nums > cancel_rate_threshold:
                             return True, total_data[i]
         finally:
-            l2_log.cancel_debug(threadId, code, "H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿}-{} 澶勭悊杩涘害锛歿} 鍙栨秷璁$畻缁撴灉:{}/{}", start_index, end_index,
+            l2_log.cancel_debug(code, "H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿}-{} 澶勭悊杩涘害锛歿} 鍙栨秷璁$畻缁撴灉:{}/{}", start_index, end_index,
                                 process_index, cancel_num,
                                 total_nums)
             logger_l2_h_cancel.info(f"code-{code} H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿start_index}-{end_index} 澶勭悊杩涘害锛歿process_index} 鐩爣姣斾緥锛歿cancel_rate_threshold} 鍙栨秷璁$畻缁撴灉:{cancel_num}/{total_nums}")
@@ -428,7 +428,7 @@
                 if total_count >= safe_count:  # and total_num >= threshold_num
                     finished = True
                     # 鏈�灏�8绗�
-                    l2_log.cancel_debug(0, code, "鑾峰彇鍒癏鎾ょ洃鍚暟鎹細{},璁$畻鎴嚦浣嶇疆锛歿}", json.dumps(list(watch_set)),
+                    l2_log.cancel_debug(code, "鑾峰彇鍒癏鎾ょ洃鍚暟鎹細{},璁$畻鎴嚦浣嶇疆锛歿}", json.dumps(list(watch_set)),
                                         total_data[-1]["index"])
                     break
         # 璁$畻TOP N澶у崟
@@ -439,7 +439,7 @@
         final_watch_set = set.union(watch_set, top_n_watch_set)
         final_watch_list = list(final_watch_set)
         final_watch_list.sort(key=lambda x: x[0])
-        logger_l2_h_cancel.info(f"code-{code}  H鎾ゆ渶缁堢洃鎺уぇ鍗�:{final_watch_list}")
+        logger_l2_h_cancel.info(f"code-{code} 瀹夊叏绗旀暟锛歿safe_count}  H鎾ゆ渶缁堢洃鎺уぇ鍗�:{final_watch_list}")
         # 淇濆瓨璁$畻鑼冨洿
         cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
         # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹�
@@ -522,16 +522,31 @@
         key = None
         # 鑾峰彇鐭鏃堕棿鍓�1鍒嗛挓鐨勬暟鎹�
         keys = []
-        for i in range(0, 3600):
-            temp_time = tool.trade_time_add_second(time_str, 0 - i)
-            # 鍙鐞�9锛�30鍚庣殑鏁版嵁
-            if int(temp_time.replace(":", "")) < int("093000"):
-                break
-            keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
-            if len(keys_) > 0:
-                keys.append(keys_[0])
-            if len(keys) >= 1:
-                break
+        if not constant.TEST:
+            for i in range(0, 3600):
+                temp_time = tool.trade_time_add_second(time_str, 0 - i)
+                # 鍙鐞�9锛�30鍚庣殑鏁版嵁
+                if int(temp_time.replace(":", "")) < int("093000"):
+                    break
+                keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
+                if len(keys_) > 0:
+                    keys.append(keys_[0])
+                if len(keys) >= 1:
+                    break
+        else:
+            keys_ = cls.__get_l2_second_money_record_keys(code, "*")
+            key_list=[]
+            for k in keys_:
+                time__ = k.split("-")[-1]
+                key_list.append((int(time__),k))
+            key_list.sort(key=lambda tup: tup[0])
+            for t in key_list:
+                if t[0] <= int(time_):
+                    keys.append(t[1])
+                    break
+
+
+
         keys.sort(key=lambda tup: int(tup.split("-")[-1]))
         if len(keys) > 0:
             key = keys[0]
@@ -775,11 +790,11 @@
             process_end_index = cancel_index
         # 淇濆瓨鏈�鏂扮疮璁¢噾棰�
         # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
-        l2_data_log.l2_time(code, random_key, round(time.time() * 1000) - start_time,
+        l2_data_log.l2_time(code, round(time.time() * 1000) - start_time,
                             "l2鏁版嵁灏佸崟棰濊绠楁椂闂�",
                             False)
         if cancel_index:
-            l2_log.cancel_debug(random_key, code, "鏁版嵁澶勭悊浣嶇疆锛歿}-{}锛寋}锛屾渶缁堜拱1涓猴細{}", start_index, end_index, record_msg,
+            l2_log.cancel_debug(code, "鏁版嵁澶勭悊浣嶇疆锛歿}-{}锛寋}锛屾渶缁堜拱1涓猴細{}", start_index, end_index, record_msg,
                                 total_num)
             return total_datas[cancel_index], cancel_msg
         return None, None
@@ -864,7 +879,7 @@
             process_index = cancel_index
         else:
             process_index = end_index
-        l2_log.cancel_debug(random_key, code, "鏉夸笂鍗栦俊鎭細璁$畻浣嶇疆锛歿}-{} 鏉夸笂鍗栨暟鎹畕}/{}", start_index, end_index, total_num,
+        l2_log.cancel_debug(code, "鏉夸笂鍗栦俊鎭細璁$畻浣嶇疆锛歿}-{} 鏉夸笂鍗栨暟鎹畕}/{}", start_index, end_index, total_num,
                             threshold_num)
 
         cls.__save_process_index(code, process_index)
diff --git a/l2/l2_data_log.py b/l2/l2_data_log.py
index 4a0745e..1da78cf 100644
--- a/l2/l2_data_log.py
+++ b/l2/l2_data_log.py
@@ -2,13 +2,14 @@
 import time
 
 import log
+from l2 import l2_log
 
 
-def l2_time(code, do_id, time_, description, new_line=False, force=False):
+def l2_time(code, time_, description, new_line=False, force=False):
     timestamp = int(time.time() * 1000)
     # 鍙褰曡�楁椂杈冮暱鐨勪俊鎭�
     if time_ > 1 or force:
-        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", do_id, timestamp, description, code, time_,
+        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, description, code, time_,
                                         "\n" if new_line else "")
     return timestamp
 
diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py
index d9cb6be..3d741bc 100644
--- a/l2/l2_data_manager_new.py
+++ b/l2/l2_data_manager_new.py
@@ -16,7 +16,7 @@
 import tool
 from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
     trade_result_manager
-from l2 import safe_count_manager, l2_data_manager, l2_data_log
+from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log
 from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
     L2LimitUpSellStatisticUtil
 from l2.l2_data_manager import L2DataException, TradePointManager
@@ -161,20 +161,6 @@
     __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
 
     @classmethod
-    def debug(cls, code, content, *args):
-        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
-
-    @classmethod
-    def cancel_debug(cls, code, content, *args):
-        logger_l2_trade_cancel.debug(
-            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
-
-    @classmethod
-    def buy_debug(cls, code, content, *args):
-        logger_l2_trade_buy.debug(
-            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
-
-    @classmethod
     # 鏁版嵁澶勭悊鍏ュ彛
     # datas: 鏈鎴浘鏁版嵁
     # capture_timestamp:鎴浘鏃堕棿鎴�
@@ -204,7 +190,7 @@
                     # 淇濆瓨鏁版嵁
                     __start_time = round(t.time() * 1000)
                     l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
-                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
+                    __start_time = l2_data_log.l2_time(code,
                                                        round(t.time() * 1000) - __start_time,
                                                        "淇濆瓨鏁版嵁鏃堕棿锛坽}锛�".format(len(add_datas)))
         finally:
@@ -231,7 +217,7 @@
                         limit_up_time_manager.save_limit_up_time(code, "09:30:00")
 
         total_datas = local_today_datas[code]
-        __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
+        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                            "l2鏁版嵁棰勫鐞嗘椂闂�")
 
         if len(add_datas) > 0:
@@ -254,7 +240,7 @@
             logger_l2_process.info("code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{} 鎴浘鏃堕棿鎴筹細{}", code, add_datas[0]["index"],
                                    add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                    capture_timestamp)
-            __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
+            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                                "l2鏁版嵁澶勭悊鏃堕棿")
 
     # 澶勭悊鏈寕鍗�
@@ -264,7 +250,7 @@
         # 鑾峰彇闃堝��
         threshold_money, msg = cls.__get_threshmoney(code)
         if round(t.time() * 1000) - __start_time > 10:
-            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
+            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                                "鑾峰彇m鍊兼暟鎹�楁椂")
 
         cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
@@ -285,7 +271,7 @@
             cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                           local_today_num_operate_map.get(code))
 
-            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                 "宸蹭笅鍗�-鑾峰彇涔板叆淇℃伅鑰楁椂")
             return None, ""
 
@@ -296,7 +282,7 @@
             # 璁$畻m鍊煎ぇ鍗�
             cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                               gpcode_manager.get_limit_up_price(code))
-            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                 "宸蹭笅鍗�-m鍊煎ぇ鍗曡绠�")
             return None, ""
 
@@ -309,7 +295,7 @@
                                                                                end_index,
                                                                                buy_single_index, buy_exec_index)
 
-            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                 "宸蹭笅鍗�-涔�1缁熻鑰楁椂")
             return cancel_data, cancel_msg
 
@@ -328,7 +314,7 @@
             except Exception as e:
                 logging.exception(e)
             finally:
-                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                     "宸蹭笅鍗�-s绾уぇ鍗曚及绠�")
             return None, ""
 
@@ -345,7 +331,7 @@
             except Exception as e:
                 logging.exception(e)
             finally:
-                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�")
+                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-H鎾ゅぇ鍗曡绠�")
             return None, ""
 
         # 鏉夸笂鍗栨挙
@@ -361,7 +347,7 @@
             except Exception as e:
                 logging.exception(e)
             finally:
-                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鏉夸笂鍗栬�楁椂")
+                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "宸蹭笅鍗�-鏉夸笂鍗栬�楁椂")
             return None, ""
 
         # 鏄惁闇�瑕佹挙閿�
@@ -384,7 +370,7 @@
         if end_index < start_index:
             return
         total_data = local_today_datas.get(code)
-        _start_time = round(t.time() * 1000)
+        _start_time = tool.get_now_timestamp()
         # 鑾峰彇涔板叆淇″彿璧峰鐐�
         buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code)
 
@@ -397,18 +383,18 @@
         dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
         cancel_data, cancel_msg = dask_result.compute()
 
-        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                           "宸蹭笅鍗�-鎾ゅ崟 鍒ゆ柇鏄惁闇�瑕佹挙鍗�")
 
         if cancel_data:
-            cls.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg)
+            l2_log.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg)
             # 鎾ゅ崟
             if cls.cancel_buy(code, cancel_msg):
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                   "宸蹭笅鍗�-鎾ゅ崟 鑰楁椂")
                 # 鎾ゅ崟鎴愬姛锛岀户缁绠椾笅鍗�
                 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                   "澶勭悊鍓╀綑鏁版嵁 鑰楁椂")
             else:
                 # 鎾ゅ崟灏氭湭鎴愬姛
@@ -417,41 +403,43 @@
             # 濡傛灉鏈夎櫄鎷熶笅鍗曢渶瑕佺湡瀹炰笅鍗�
             unreal_buy_info = cls.unreal_buy_dict.get(code)
             if unreal_buy_info is not None:
-                cls.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time)
+                l2_log.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time)
                 # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿)
                 # 鐪熷疄涓嬪崟
                 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                           unreal_buy_info[0])
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
-                                                  "宸蹭笅鍗�-鐪熷疄涓嬪崟 鑰楁椂")
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
+                                                  "宸茶櫄鎷熶笅鍗�-鎵ц鐪熷疄涓嬪崟 澶栭儴鑰楁椂")
 
     @classmethod
     def __buy(cls, code, capture_timestamp, last_data, last_data_index):
+        __start_time = tool.get_now_timestamp()
         can, reason = cls.__can_buy(code)
+        __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "鏈�鍚庡垽鏂槸鍚﹁兘涓嬪崟", force=True)
         # 鍒犻櫎铏氭嫙涓嬪崟
         if code in cls.unreal_buy_dict:
             cls.unreal_buy_dict.pop(code)
 
         if not can:
-            cls.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason)
+            l2_log.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason)
             if not reason.startswith("涔�1浠蜂笉涓烘定鍋滀环"):
                 # 涓柇涔板叆
                 trade_manager.break_buy(code, reason)
             return
         else:
-            cls.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason)
+            l2_log.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason)
             try:
-                cls.debug(code, "寮�濮嬫墽琛屼拱鍏�")
+                l2_log.debug(code, "寮�濮嬫墽琛屼拱鍏�")
                 trade_manager.start_buy(code, capture_timestamp, last_data,
                                         last_data_index)
                 ################涓嬪崟鎴愬姛澶勭悊################
                 trade_result_manager.real_buy_success(code)
-                cls.debug(code, "鎵ц涔板叆鎴愬姛")
+                l2_log.debug(code, "鎵ц涔板叆鎴愬姛")
             except Exception as e:
-                cls.debug(code, "鎵ц涔板叆寮傚父:{}", str(e))
+                l2_log.debug(code, "鎵ц涔板叆寮傚父:{}", str(e))
                 pass
             finally:
-                cls.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
+                l2_log.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
 
     # 鏄惁鍙互鍙栨秷
     @classmethod
@@ -502,7 +490,7 @@
             total_datas = local_today_datas[code]
             try:
                 sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
-                cls.buy_debug(code, "鍗�1淇℃伅涓猴細({},{},{})", sell1_time, sell1_price, sell1_volumn)
+                l2_log.buy_debug(code, "鍗�1淇℃伅涓猴細({},{},{})", sell1_time, sell1_price, sell1_volumn)
                 if sell1_time is not None and sell1_volumn > 0:
                     # 鑾峰彇鎵ц浣嶄俊鎭�
 
@@ -591,18 +579,18 @@
             # 鍙互涓嬪崟
             return True, None
         finally:
-            l2_data_log.l2_time(code, cls.random_key[code], round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻")
+            l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "鏄惁鍙互涓嬪崟璁$畻")
 
     @classmethod
     def __cancel_buy(cls, code):
         try:
-            cls.debug(code, "寮�濮嬫墽琛屾挙鍗�")
+            l2_log.debug(code, "寮�濮嬫墽琛屾挙鍗�")
             trade_manager.start_cancel_buy(code)
-            cls.debug(code, "鎵ц鎾ゅ崟鎴愬姛")
+            l2_log.debug(code, "鎵ц鎾ゅ崟鎴愬姛")
             return True
         except Exception as e:
             logging.exception(e)
-            cls.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e))
+            l2_log.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e))
             return False
 
     @classmethod
@@ -626,13 +614,13 @@
             can_cancel, reason = cls.__can_cancel(code)
             if not can_cancel:
                 # 涓嶈兘鍙栨秷
-                cls.cancel_debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason)
-                cls.debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason)
+                l2_log.cancel_debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason)
+                l2_log.debug(code, "鎾ゅ崟涓柇锛屽師鍥狅細{}", reason)
                 return False
             cancel_result = cls.__cancel_buy(code)
             if cancel_result:
                 trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
-        cls.debug(code, "鎵ц鎾ゅ崟缁撴潫锛屽師鍥狅細{}", msg)
+        l2_log.debug(code, "鎵ц鎾ゅ崟缁撴潫锛屽師鍥狅細{}", msg)
         return True
 
     # 铏氭嫙涓嬪崟
@@ -646,7 +634,7 @@
                             new_add=True):
         if compute_end_index < compute_start_index:
             return
-        _start_time = round(t.time() * 1000)
+        _start_time = tool.get_now_timestamp()
         total_datas = local_today_datas[code]
         # 澶勭悊瀹夊叏绗旀暟
         cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
@@ -657,6 +645,7 @@
             code)
 
         # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆
+        new_get_single = False
         if buy_single_index is None:
             place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
             continue_count = 3
@@ -669,33 +658,46 @@
                                                                compute_end_index)
             buy_single_index = _index
             if has_single:
+                new_get_single = True
                 num = 0
                 count = 0
-                cls.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛屾暟鎹細{}", buy_single_index, compute_start_index,
-                          compute_end_index, total_datas[buy_single_index])
+                l2_log.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} ,璁$畻鑼冨洿锛歿}-{} 锛屾暟鎹細{}", buy_single_index, compute_start_index,
+                             compute_end_index, total_datas[buy_single_index])
                 # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟寮�濮嬩俊鍙凤紝闇�瑕佽缃ぇ鍗曡捣濮嬬偣
                 cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
 
-        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿")
+        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涓嬪崟淇″彿璁$畻鏃堕棿")
 
         if buy_single_index is None:
             # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴�
             return None
 
+        # 寮�濮嬭绠楃殑浣嶇疆
+        start_process_index = min(buy_single_index, compute_start_index) if new_get_single else max(buy_single_index,
+                                                                                                    compute_start_index)
+
         # 璁$畻m鍊煎ぇ鍗�
-        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
+        cls.l2BigNumForMProcessor.process(code, start_process_index,
+                                          compute_end_index,
                                           gpcode_manager.get_limit_up_price(code))
 
-        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "璁$畻m鍊煎ぇ鍗�")
+        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "璁$畻m鍊煎ぇ鍗�")
 
         threshold_money, msg = cls.__get_threshmoney(code)
-        # 涔板叆绾拱棰濈粺璁�
-        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(
-            buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index,
-                                                                                                             max_num_set)
-        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "绾拱棰濈粺璁℃椂闂�")
 
-        cls.debug(code, "m鍊�-{} m鍊煎洜瀛�-{}", threshold_money, msg)
+        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m鍊奸槇鍊艰绠�")
+
+        # 涔板叆绾拱棰濈粺璁�
+        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code,
+                                                                                                             start_process_index,
+                                                                                                             compute_end_index,
+                                                                                                             num, count,
+                                                                                                             threshold_money,
+                                                                                                             buy_single_index,
+                                                                                                             max_num_set)
+        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "绾拱棰濈粺璁℃椂闂�")
+
+        l2_log.debug(code, "m鍊�-{} m鍊煎洜瀛�-{}", threshold_money, msg)
 
         # 涔板叆淇″彿浣嶄笌璁$畻浣嶇疆闂撮殧2s鍙婁互涓婁簡
         if rebegin_buy_pos is not None:
@@ -704,25 +706,37 @@
             return
 
         if compute_index is not None:
-            cls.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿}", compute_index, threshold_money, buy_nums,
-                      buy_count,
-                      total_datas[compute_index])
-            # 璁板綍涔板叆淇″彿浣嶇疆
-            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
-                                        max_num_set_new)
-            # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級
-            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
-            # 铏氭嫙涓嬪崟
-            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
-            # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙�
-            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
+            l2_log.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿}", compute_index, threshold_money, buy_nums,
+                         buy_count, total_datas[compute_index])
 
-            # 娑ㄥ仠灏佸崟棰濊绠�
-            L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index,
-                                                     buy_single_index,
-                                                     buy_exec_index, False)
+            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
+                                                           buy_nums, buy_count, max_num_set_new)
+            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
+            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
+            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
+            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(cls.random_key[code], code, buy_single_index,
+                                                                        compute_index,
+                                                                        buy_single_index,
+                                                                        buy_exec_index, False)
+            dask.compute(f1, f2, f3, f4, f5)
 
-            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+            # 宸茶骞惰澶勭悊
+            # # 璁板綍涔板叆淇″彿浣嶇疆
+            # cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
+            #                             max_num_set_new)
+            # # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級
+            # limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
+            # # 铏氭嫙涓嬪崟
+            # cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
+            # # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙�
+            # l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
+            #
+            # # 娑ㄥ仠灏佸崟棰濊绠�
+            # L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index,
+            #                                          buy_single_index,
+            #                                          buy_exec_index, False)
+
+            _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                               "璁板綍鎵ц涔板叆鏁版嵁", force=True)
 
             # 鏁版嵁鏄惁澶勭悊瀹屾瘯
@@ -732,9 +746,9 @@
                                                                                   buy_single_index, compute_index,
                                                                                   total_datas, cls.random_key[code],
                                                                                   True)
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                   "S绾уぇ鍗曞鐞嗚�楁椂", force=True)
-                cls.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time)
+                l2_log.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time)
                 # 鏁版嵁宸茬粡澶勭悊瀹屾瘯锛屽鏋滆繕娌℃挙鍗曞氨瀹為檯涓嬪崟
                 if need_cancel:
                     if cls.cancel_buy(code, "S绾уぇ鍗曟挙閿�"):
@@ -746,13 +760,13 @@
                 SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                        compute_index, total_datas, cls.random_key[code], False)
 
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                   "S绾уぇ鍗曞鐞嗚�楁椂", force=True)
                 # 鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞�
-                cls.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index)
+                l2_log.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index)
                 # 澶勭悊鎾ゅ崟姝ラ
                 cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
-                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
+                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                   f"澶勭悊鎾ゅ崟姝ラ鑰楁椂锛岃寖鍥达細{compute_index + 1}-{compute_end_index}", force=True)
 
         else:
@@ -841,23 +855,11 @@
     def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                   threshold_money, buy_single_index, max_num_set):
         def get_threshold_count():
-            count = threshold_count  # - sub_threshold_count
-            # if count < 3:
-            #     count = 3
-            # count = round(count * buy1_factor)
-            # # 鏈�楂�30绗旓紝鏈�浣�8绗�
-            # if count > 21:
-            #     count = 21
-            # if count < 8:
-            #     count = 8
+            count = threshold_count
             return count
 
         _start_time = t.time()
         total_datas = local_today_datas[code]
-        # 璁$畻浠庝拱鍏ヤ俊鍙峰紑濮嬪埌璁$畻寮�濮嬩綅缃殑澶у崟鏁伴噺
-        sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1)
-        if sub_threshold_count < 0:
-            sub_threshold_count = 0
 
         buy_nums = origin_num
         buy_count = origin_count
@@ -868,15 +870,6 @@
         # 鐩爣鎵嬫暟
         threshold_num = round(threshold_money / (limit_up_price * 100))
 
-        buy1_factor = 1
-        # 鑾峰彇涔�1鏄惁涓烘定鍋滀环
-        if buy1_price is None:
-            buy1_factor = 1.3
-        elif limit_up_price is None:
-            buy1_factor = 1.3
-        elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
-            print("涔�1浠蜂笉涓烘定鍋滀环锛屼拱1浠�-{} 娑ㄥ仠浠�-{}".format(buy1_price, limit_up_price))
-            buy1_factor = 1.3
         # 鐩爣璁㈠崟鏁伴噺
         threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code)
 
@@ -899,6 +892,9 @@
             # 绗竴娆′笅鍗曢渶瑕佸ぇ鍗曟渶灏�2绗旓紝浠ュ悗鍙渶瑕�1绗�
             big_num_count = 1
 
+        # 杈冨ぇ鍗曠殑鎵嬫暟
+        bigger_num = round(5900 / limit_up_price)
+
         for i in range(compute_start_index, compute_end_index + 1):
             data = total_datas[i]
             _val = total_datas[i]["val"]
@@ -917,22 +913,17 @@
             # 娑ㄥ仠涔�
             if L2DataUtil.is_limit_up_price_buy(_val):
                 if l2_data_util.is_big_money(_val):
-                    # sub_threshold_count += int(total_datas[i]["re"])
                     max_buy_num_set.add(i)
-                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
+                if _val["num"] >= bigger_num:
                     trigger_buy = True
                     # 鍙粺璁�59涓囦互涓婄殑閲戦
                     buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                     buy_count += int(total_datas[i]["re"])
                     if buy_nums >= threshold_num and buy_count >= get_threshold_count():
-                        logger_l2_trade_buy.info("{}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿}, 澶у崟鏁伴噺锛歿}", code,
-                                                 i,
-                                                 buy_nums,
-                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count, )
+                        logger_l2_trade_buy.info(
+                            f"{code}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿i} 缁熻绾拱鎵嬫暟锛歿buy_nums} 鐩爣绾拱鎵嬫暟锛歿threshold_num} 缁熻绾拱鍗曟暟锛歿buy_count} 鐩爣绾拱鍗曟暟锛歿get_threshold_count()}, 澶у崟鏁伴噺锛歿len(max_buy_num_set)}")
             elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
-                if l2_data_util.is_big_money(_val):
-                    sub_threshold_count -= int(total_datas[i]["re"])
-                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
+                if _val["num"] >= bigger_num:
                     # 鍙粺璁�59涓囦互涓婄殑閲戦
                     # 娑ㄥ仠涔版挙
                     # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓�
@@ -944,31 +935,35 @@
                         if buy_index >= buy_single_index:
                             buy_nums -= int(_val["num"]) * int(data["re"])
                             buy_count -= int(data["re"])
-                            cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num)
+                            # 澶у崟鎾ら攢
+                            max_buy_num_set.discard(buy_index)
+                            l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num)
                         else:
-                            cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index)
+                            l2_log.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index)
                             if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                                 # 鍚屼竴绉�,褰撲綔涔板叆淇″彿涔嬪悗澶勭悊
                                 buy_nums -= int(_val["num"]) * int(data["re"])
                                 buy_count -= int(data["re"])
-                                cls.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i)
+                                # 澶у崟鎾ら攢
+                                max_buy_num_set.discard(buy_index)
+                                l2_log.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i)
                     else:
                         # 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�
-                        cls.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data)
+                        l2_log.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data)
                         buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                         buy_count -= int(total_datas[i]["re"])
-            cls.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i,
-                          buy_nums, threshold_num)
+            l2_log.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i,
+                             buy_nums, threshold_num)
 
             # 鏈夋挙鍗曚俊鍙凤紝涓斿皬浜庨槇鍊�
             if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                     max_buy_num_set) >= big_num_count:
                 return i, buy_nums, buy_count, None, max_buy_num_set
 
-        cls.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿}  缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿} 澶у崟鏁伴噺锛歿} 鐩爣澶у崟鏁伴噺锛歿}",
-                      compute_start_index,
-                      buy_nums,
-                      threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
+        l2_log.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿}  缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿} 澶у崟鏁伴噺锛歿} 鐩爣澶у崟鏁伴噺锛歿}",
+                         compute_start_index,
+                         buy_nums,
+                         threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
 
         return None, buy_nums, buy_count, None, max_buy_num_set
 
@@ -1132,6 +1127,6 @@
     #                                                                  local_today_num_operate_map.get(
     #                                                                      "600213"))
     # print(buy_index, buy_data)
-    dict_={"code":0}
+    dict_ = {"code": 0}
     dict_.clear()
     print(dict_)
diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py
index 6546e9a..7c60dea 100644
--- a/l2/l2_data_util.py
+++ b/l2/l2_data_util.py
@@ -122,7 +122,7 @@
         # 淇濆瓨鏈�杩戠殑鏁版嵁
         __start_time = round(time.time() * 1000)
         redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
-        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
+        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
         # 璁剧疆杩涘唴瀛�
         local_latest_datas[code] = datas
         __set_l2_data_latest_count(code, len(datas))
diff --git a/l2/l2_log.py b/l2/l2_log.py
index c2e4174..48df92e 100644
--- a/l2/l2_log.py
+++ b/l2/l2_log.py
@@ -1,15 +1,17 @@
 from log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_trade
 
-
-def debug(random_key, code, content, *args):
-    logger_l2_trade.debug(("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
+threadIds = {}
 
 
-def buy_debug(random_key, code, content, *args):
+def debug( code, content, *args):
+    logger_l2_trade.debug(("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
+
+
+def buy_debug(code, content, *args):
     logger_l2_trade_buy.debug(
-        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
+        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
 
 
-def cancel_debug(random_key, code, content, *args):
+def cancel_debug(code, content, *args):
     logger_l2_trade_cancel.debug(
-        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
+        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
diff --git a/l2_data_util.py b/l2_data_util.py
index eccea79..2f48827 100644
--- a/l2_data_util.py
+++ b/l2_data_util.py
@@ -28,7 +28,7 @@
 # 鏄惁涓哄ぇ鍗�
 def is_big_money(val):
     price = float(val["price"])
-    money = price * int(val["num"])
+    money = price * val["num"]
     if price > 3.0:
         if money >= 30000:
             return True
diff --git a/l2_trade_test.py b/l2_trade_test.py
index 65a9873..1c93c3d 100644
--- a/l2_trade_test.py
+++ b/l2_trade_test.py
@@ -71,9 +71,9 @@
                 except Exception as e:
                     pass
 
-    @unittest.skip("璺宠繃姝ゅ崟鍏冩祴璇�")
+    # @unittest.skip("璺宠繃姝ゅ崟鍏冩祴璇�")
     def test_trade(self):
-        code = "002328"
+        code = "002131"
         clear_trade_data(code)
         l2.l2_data_util.load_l2_data(code)
         total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -85,7 +85,7 @@
                 total_datas.insert(i, data)
 
         pos_list = log.get_l2_process_position(code)
-        pos_list.insert(108,(375,448))
+        # pos_list.insert(108,(375,448))
         if pos_list[0][0] > 0:
             pos_list.insert(0, (0, pos_list[0][0] - 1))
         del pos_list[-1]
diff --git a/log.py b/log.py
index 8b62697..cace136 100644
--- a/log.py
+++ b/log.py
@@ -323,10 +323,10 @@
             if line.find("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{}".format(code)) < 0:
                 continue
             try:
-               index = int(line.split("index-")[1].split(" ")[0])
-               index_list.append((index, time_))
+                index = int(line.split("index-")[1].split(" ")[0])
+                index_list.append((index, time_))
             except:
-               pass
+                pass
     return index_list, buy_queues
 
 
@@ -350,7 +350,7 @@
 if __name__ == '__main__':
     # logger_l2_h_cancel.info("test")
     # logger_l2_process_time.info("test123")
-    codes = ["002328"]
+    codes = ["002131", "003035", "002131"]
     for code in codes:
         export_logs(code)
 
diff --git a/server.py b/server.py
index 5ee8e04..c0d9c50 100644
--- a/server.py
+++ b/server.py
@@ -19,7 +19,7 @@
 import gpcode_manager
 import authority
 import juejin
-from l2 import l2_data_manager_new, l2_data_manager, l2_data_log
+from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log
 import l2_data_util
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
 import l2.l2_data_util
@@ -96,12 +96,14 @@
                 return_str = "OK"
                 if type == 0:
                     try:
+
                         origin_start_time = round(time.time() * 1000)
                         __start_time = round(time.time() * 1000)
                         do_id = random.randint(0, 100000)
                         # level2鐩樺彛鏁版嵁
                         day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                             _str)
+                        l2_log.threadIds[code] = random.randint(0, 100000)
                         if channel == 0:
                             now_time = round(time.time() * 1000)
                             if self.last_time.get(channel) is not None:
@@ -120,12 +122,12 @@
                             # 10ms鐨勭綉缁滀紶杈撳欢鏃�
                             capture_timestamp = __start_time - process_time - 10
                             # print("鎴浘鏃堕棿锛�", process_time)
-                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time))
 
                             cid, pid = gpcode_manager.get_listen_code_pos(code)
 
-                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "l2鑾峰彇浠g爜浣嶇疆鑰楁椂")
                             # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷�
                             if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -134,19 +136,19 @@
                                     l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                     __start_time = round(time.time() * 1000)
                                     if gpcode_manager.is_listen(code):
-                                        __start_time = l2_data_log.l2_time(code, do_id,
+                                        __start_time = l2_data_log.l2_time(code,
                                                                            round(time.time() * 1000) - __start_time,
                                                                            "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂")
                                         l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                          do_id)
-                                        __start_time = l2_data_log.l2_time(code, do_id,
+                                        __start_time = l2_data_log.l2_time(code,
                                                                            round(time.time() * 1000) - __start_time,
                                                                            "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂",
                                                                            False)
                                         # 淇濆瓨鍘熷鏁版嵁鏁伴噺
                                         l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                         if round(time.time() * 1000) - __start_time > 20:
-                                            l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                                            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                 "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂",
                                                                 False)
 
@@ -173,7 +175,7 @@
                                     __end_time = round(time.time() * 1000)
                                     # 鍙褰曞ぇ浜�40ms鐨勬暟鎹�
                                     if __end_time - origin_start_time > 100:
-                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
+                                        l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                             "l2鏁版嵁澶勭悊鎬昏�楁椂",
                                                             True)
                     except Exception as e:
diff --git a/tool.py b/tool.py
index d647715..9f911ab 100644
--- a/tool.py
+++ b/tool.py
@@ -44,6 +44,10 @@
     return time_str
 
 
+def get_now_timestamp():
+    return round(time.time() * 1000)
+
+
 # 杞负浠锋牸锛屽洓鑸嶄簲鍏ヤ繚鐣�2浣嶅皬鏁�
 def to_price(_decimal):
     return _decimal.quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)
diff --git a/trade/trade_manager.py b/trade/trade_manager.py
index 0306dea..4a5a7ac 100644
--- a/trade/trade_manager.py
+++ b/trade/trade_manager.py
@@ -5,11 +5,13 @@
 # 浜ゆ槗绠$悊鍣�
 import time
 
+import dask
+
 from db import mysql_data, redis_manager
 from trade import trade_data_manager, l2_trade_util
 from trade.trade_gui import THSBuyWinManagerNew, THSGuiTrade
 import time as t
-from l2 import l2_data_manager
+from l2 import l2_data_manager, l2_data_log
 
 from log import *
 
@@ -165,26 +167,72 @@
 
 # 寮�濮嬩氦鏄�
 def start_buy(code, capture_timestamp, last_data, last_data_index):
-    # 鏄惁绂佹浜ゆ槗
-    if l2_trade_util.is_in_forbidden_trade_codes(code):
-        raise Exception("绂佹浜ゆ槗")
-    trade_state = get_trade_state(code)
-    if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
-        raise Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��")
-    money = get_available_money()
-    if money is None:
-        raise Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾")
-    price = gpcode_manager.get_limit_up_price(code)
-    if price is None:
-        raise Exception("灏氭湭鑾峰彇鍒版定鍋滀环")
-    # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲
-    if price * 100 > money:
-        raise Exception("璐︽埛鍙敤璧勯噾涓嶈冻")
+    @dask.delayed
+    def is_forbidden(code):
+        if l2_trade_util.is_in_forbidden_trade_codes(code):
+            return Exception("绂佹浜ゆ槗")
+        return None, None
+
+    @dask.delayed
+    def is_state_right(code):
+        trade_state = get_trade_state(code)
+        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
+            return Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��"), trade_state
+        return None, trade_state
+
+    @dask.delayed
+    def is_money_enough(code):
+        money = get_available_money()
+        if money is None:
+            return Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾"), None
+        price = gpcode_manager.get_limit_up_price(code)
+        if price is None:
+            return Exception("灏氭湭鑾峰彇鍒版定鍋滀环"), None
+        # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲
+        if price * 100 > money:
+            return Exception("璐︽埛鍙敤璧勯噾涓嶈冻"), price
+        return None, price
+
+    @dask.delayed
+    def can_trade(*args):
+        for arg in args:
+            if arg[0] is not None:
+                return arg[0], None, None
+        return None, args[1][1], args[2][1]
+
+    _start_time = tool.get_now_timestamp()
+
+    f1 = is_forbidden(code)
+    f2 = is_state_right(code)
+    f3 = is_money_enough(code)
+    dask_result = can_trade(f1, f2, f3)
+    ex, trade_state, price = dask_result.compute()
+    if ex is not None:
+        raise ex
+
+    # 骞惰鏀归��
+    # # 鏄惁绂佹浜ゆ槗
+    # if l2_trade_util.is_in_forbidden_trade_codes(code):
+    #     raise Exception("绂佹浜ゆ槗")
+    # trade_state = get_trade_state(code)
+    # if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
+    #     raise Exception("浠g爜澶勪簬涓嶅彲浜ゆ槗鐘舵��")
+    # money = get_available_money()
+    # if money is None:
+    #     raise Exception("鏈幏鍙栧埌璐︽埛鍙敤璧勯噾")
+    # price = gpcode_manager.get_limit_up_price(code)
+    # if price is None:
+    #     raise Exception("灏氭湭鑾峰彇鍒版定鍋滀环")
+    # # 涔颁竴鎵嬬殑璧勯噾鏄惁瓒冲
+    # if price * 100 > money:
+    #     raise Exception("璐︽埛鍙敤璧勯噾涓嶈冻")
 
     print("寮�濮嬩拱鍏�")
     logger_trade.info("{}寮�濮嬩拱鍏�".format(code))
     set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
+    _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "涔板叆鍒ゆ柇鏃堕棿", force=True)
     __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
+    l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "寮傛涔板叆鏃堕棿", force=True)
 
 
 # 涓柇涔板叆
@@ -269,9 +317,10 @@
             break
         except Exception as e:
             logger_trade.error("{}鍐嶆鎾ゅ崟寮傚父锛歿}".format(code, str(e)))
-            time.sleep(0.1+0.05*i)
+            time.sleep(0.1 + 0.05 * i)
             pass
 
+
 # 鍙栨秷濮旀墭鎴愬姛
 def __cancel_success(code):
     trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py
index 317f007..c8ede05 100644
--- a/trade/trade_result_manager.py
+++ b/trade/trade_result_manager.py
@@ -1,6 +1,8 @@
 # 铏氭嫙涔版垚鍔�
 import logging
 
+import dask
+
 from l2 import l2_data_manager
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil
 from l2.l2_data_util import local_today_datas, local_today_num_operate_map
@@ -22,41 +24,61 @@
 
 # 铏氭嫙鎾ゆ垚鍔�
 def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
-    l2_data_manager.TradePointManager.delete_buy_point(code)
-    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
-    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
-    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
+    f1 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
+    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
+    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
+    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
     # 瀹夊叏绗旀暟璁$畻
-    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
-    SecondCancelBigNumComputer.cancel_success(code)
+    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
+                                                                     total_datas[-1]["index"])
+    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
+    dask.compute(f1, f2, f3, f4, f5, f6)
 
 
 # 鐪熷疄涔版垚鍔�
 def real_buy_success(code):
-    # 涓嬪崟鎴愬姛锛岄渶瑕佸垹闄ゆ渶澶т拱1
-    __thsBuy1VolumnManager.clear_max_buy1_volume(code)
-    # 鑾峰彇涔板叆浣嶇疆淇℃伅
-    try:
-        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
-            code)
-        __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
-        HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
-                                                     local_today_datas.get(code),
-                                                     local_today_num_operate_map.get(code))
-    except Exception as e:
-        logging.exception(e)
-        logger_l2_error.exception(e)
+    @dask.delayed
+    def clear_max_buy1_volume(code):
+        # 涓嬪崟鎴愬姛锛岄渶瑕佸垹闄ゆ渶澶т拱1
+        __thsBuy1VolumnManager.clear_max_buy1_volume(code)
+
+    @dask.delayed
+    def safe_count(code, buy_single_index, buy_exec_index):
+        try:
+            __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
+        except Exception as e:
+            logging.exception(e)
+            logger_l2_error.exception(e)
+
+    @dask.delayed
+    def h_cancel(code, buy_single_index, buy_exec_index):
+        try:
+            HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
+                                                         local_today_datas.get(code),
+                                                         local_today_num_operate_map.get(code))
+        except Exception as e:
+            logging.exception(e)
+            logger_l2_error.exception(e)
+
+    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
+        code)
+
+    f1 = clear_max_buy1_volume(code)
+    f2 = safe_count(code, buy_single_index, buy_exec_index)
+    f3 = h_cancel(code, buy_single_index, buy_exec_index)
+    dask.compute(f1, f2, f3)
     l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
 
 
 # 鐪熷疄鎾ゆ垚鍔�
 def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
     # 瀹夊叏绗旀暟璁$畻
-    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
+    f1 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
+                                                                     total_datas[-1]["index"])
     # 鍙栨秷涔板叆鏍囪瘑
-    l2_data_manager.TradePointManager.delete_buy_point(code)
-    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
-    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
-    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
-
-    SecondCancelBigNumComputer.cancel_success(code)
+    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
+    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
+    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
+    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
+    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
+    dask.compute(f1, f2, f3, f4, f5, f6)

--
Gitblit v1.8.0