From 3ec79004bd769828c8dc18ed35280f81cfb473ff Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 08 二月 2023 19:30:45 +0800
Subject: [PATCH] 交易结果整理

---
 server.py |  262 ++++++++++++++++++++++++++++------------------------
 1 files changed, 142 insertions(+), 120 deletions(-)

diff --git a/server.py b/server.py
index acb687c..cdec9e6 100644
--- a/server.py
+++ b/server.py
@@ -19,10 +19,10 @@
 import gpcode_manager
 import authority
 import juejin
-import l2_data_log
-from l2 import l2_data_manager_new, l2_data_manager
+from l2 import l2_data_manager_new, l2_data_manager, l2_data_log
 import l2_data_util
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
+import l2.l2_data_util
 
 import ths_industry_util
 import ths_util
@@ -60,6 +60,7 @@
     l2_save_time_dict = {}
     l2_trade_buy_queue_dict = {}
     tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
+    last_time = {}
 
     def setup(self):
         super().setup()  # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛
@@ -80,90 +81,101 @@
         # print("- " * 30)
         sk: socket.socket = self.request
         while True:
-            data = sk.recv(1024 * 1024 * 20)
+            data = sk.recv(1024 * 100)
             if len(data) == 0:
                 # print("瀹㈡埛绔柇寮�杩炴帴")
                 break
             _str = str(data, encoding="gbk")
             if len(_str) > 0:
                 # print("缁撴灉锛�",_str)
-                type = data_process.parseType(_str)
+                type = -1
+                try:
+                    type = data_process.parseType(_str)
+                except:
+                    print(_str)
                 return_str = "OK"
                 if type == 0:
-
                     try:
                         origin_start_time = round(time.time() * 1000)
                         __start_time = round(time.time() * 1000)
                         do_id = random.randint(0, 100000)
-
                         # level2鐩樺彛鏁版嵁
-                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
+                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                             _str)
-                        # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹�
-                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
-                            code] >= 1000 and len(datas) > 0:
-                            self.l2_save_time_dict[code] = origin_start_time
-                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
+                        if channel == 0:
+                            now_time = round(time.time() * 1000)
+                            if self.last_time.get(channel) is not None:
+                                #print("鎺ュ彈鍒癓2鐨勬暟鎹�", channel, now_time - self.last_time.get(channel), "瑙f瀽鑰楁椂",now_time - origin_start_time)
+                                pass
 
-                        # 10ms鐨勭綉缁滀紶杈撳欢鏃�
-                        capture_timestamp = __start_time - process_time - 10
-                        # print("鎴浘鏃堕棿锛�", process_time)
-                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
-                                                           "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time))
+                            self.last_time[channel] = now_time
 
-                        cid, pid = gpcode_manager.get_listen_code_pos(code)
+                        if True:
+                            # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹�
+                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
+                                code] >= 1000 and len(datas) > 0:
+                                self.l2_save_time_dict[code] = origin_start_time
+                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
 
-                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
-                                                           "l2鑾峰彇浠g爜浣嶇疆鑰楁椂")
-                        # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷�
-                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
-                            try:
-                                # 鏍¢獙瀹㈡埛绔唬鐮�
-                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
-                                __start_time = round(time.time() * 1000)
-                                if gpcode_manager.is_listen(code):
-                                    __start_time = l2_data_log.l2_time(code, do_id,
-                                                                       round(time.time() * 1000) - __start_time,
-                                                                       "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂")
-                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
-                                                                                     do_id)
-                                    __start_time = l2_data_log.l2_time(code, do_id,
-                                                                       round(time.time() * 1000) - __start_time,
-                                                                       "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂",
-                                                                       False)
-                                    # 淇濆瓨鍘熷鏁版嵁鏁伴噺
-                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
-                                    if round(time.time() * 1000) - __start_time > 20:
-                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
-                                                            "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂",
-                                                            False)
+                            # 10ms鐨勭綉缁滀紶杈撳欢鏃�
+                            capture_timestamp = __start_time - process_time - 10
+                            # print("鎴浘鏃堕棿锛�", process_time)
+                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                                                               "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time))
 
-                            except l2_data_manager.L2DataException as l:
-                                # 鍗曚环涓嶇
-                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
-                                    key = "{}-{}-{}".format(client, channel, code)
-                                    if key not in self.l2_data_error_dict or round(
-                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
-                                        # self.l2CodeOperate.repaire_l2_data(code)
-                                        # todo 澶晱鎰熺Щ闄や唬鐮�
-                                        logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg)
-                                        # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔�
-                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒")
-                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
+                            cid, pid = gpcode_manager.get_listen_code_pos(code)
 
-                            except Exception as e:
-                                print("寮傚父", str(e), code)
-                                logging.exception(e)
-                                logger_l2_error.error("鍑洪敊锛歿}".format(str(e)))
-                                logger_l2_error.error("鍐呭锛歿}".format(_str))
-                            finally:
+                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                                                               "l2鑾峰彇浠g爜浣嶇疆鑰楁椂")
+                            # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷�
+                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
+                                try:
+                                    # 鏍¢獙瀹㈡埛绔唬鐮�
+                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
+                                    __start_time = round(time.time() * 1000)
+                                    if gpcode_manager.is_listen(code):
+                                        __start_time = l2_data_log.l2_time(code, do_id,
+                                                                           round(time.time() * 1000) - __start_time,
+                                                                           "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂")
+                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
+                                                                                         do_id)
+                                        __start_time = l2_data_log.l2_time(code, do_id,
+                                                                           round(time.time() * 1000) - __start_time,
+                                                                           "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂",
+                                                                           False)
+                                        # 淇濆瓨鍘熷鏁版嵁鏁伴噺
+                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
+                                        if round(time.time() * 1000) - __start_time > 20:
+                                            l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
+                                                                "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂",
+                                                                False)
 
-                                __end_time = round(time.time() * 1000)
-                                # 鍙褰曞ぇ浜�40ms鐨勬暟鎹�
-                                if __end_time - origin_start_time > 100:
-                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
-                                                        "l2鏁版嵁澶勭悊鎬昏�楁椂",
-                                                        True)
+                                except l2_data_manager.L2DataException as l:
+                                    # 鍗曚环涓嶇
+                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
+                                        key = "{}-{}-{}".format(client, channel, code)
+                                        if key not in self.l2_data_error_dict or round(
+                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
+                                            # self.l2CodeOperate.repaire_l2_data(code)
+                                            # todo 澶晱鎰熺Щ闄や唬鐮�
+                                            logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg)
+                                            # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔�
+                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒")
+                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
+
+                                except Exception as e:
+                                    print("寮傚父", str(e), code)
+                                    logging.exception(e)
+                                    logger_l2_error.error("鍑洪敊锛歿}".format(str(e)))
+                                    logger_l2_error.error("鍐呭锛歿}".format(_str))
+                                finally:
+
+                                    __end_time = round(time.time() * 1000)
+                                    # 鍙褰曞ぇ浜�40ms鐨勬暟鎹�
+                                    if __end_time - origin_start_time > 100:
+                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
+                                                            "l2鏁版嵁澶勭悊鎬昏�楁椂",
+                                                            True)
                     except Exception as e:
                         logger_l2_error.exception(e)
                 elif type == 1:
@@ -221,40 +233,45 @@
 
                 elif type == 5:
                     logger_trade_delegate.debug("鎺ユ敹鍒板鎵樹俊鎭�")
-                    # 浜ゆ槗濮旀墭淇℃伅
-                    dataList = data_process.parseList(_str)
-                    if self.last_trade_delegate_data != _str:
-                        self.last_trade_delegate_data = _str
-                        # 淇濆瓨濮旀墭淇℃伅
-                        logger_trade_delegate.info(dataList)
+                    __start_time = round(time.time() * 1000)
                     try:
-                        # 璁剧疆鐢虫姤鏃堕棿
-                        for item in dataList:
-                            apply_time = item["apply_time"]
-                            if apply_time and len(apply_time) >= 8:
-                                code = item["code"]
-                                trade_state = trade_manager.get_trade_state(code)
-                                # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭
-                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
-                                    origin_apply_time = apply_time
-                                    apply_time = apply_time[0:6]
-                                    apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6])
-                                    ms = origin_apply_time[6:9]
-                                    if int(ms) > 500:
-                                        # 鏃堕棿+1s
-                                        apply_time = tool.trade_time_add_second(apply_time, 1)
+                        # 浜ゆ槗濮旀墭淇℃伅
+                        dataList = data_process.parseList(_str)
+                        if self.last_trade_delegate_data != _str:
+                            self.last_trade_delegate_data = _str
+                            # 淇濆瓨濮旀墭淇℃伅
+                            logger_trade_delegate.info(dataList)
+                        try:
+                            # 璁剧疆鐢虫姤鏃堕棿
+                            for item in dataList:
+                                apply_time = item["apply_time"]
+                                if apply_time and len(apply_time) >= 8:
+                                    code = item["code"]
+                                    trade_state = trade_manager.get_trade_state(code)
+                                    # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭
+                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
+                                        origin_apply_time = apply_time
+                                        apply_time = apply_time[0:6]
+                                        apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
+                                                                       apply_time[4:6])
+                                        ms = origin_apply_time[6:9]
+                                        if int(ms) > 500:
+                                            # 鏃堕棿+1s
+                                            apply_time = tool.trade_time_add_second(apply_time, 1)
 
-                                    print(apply_time)
-                    except Exception as e:
-                        logging.exception(e)
+                                        print(apply_time)
+                        except Exception as e:
+                            logging.exception(e)
 
-                    try:
-                        trade_manager.process_trade_delegate_data(dataList)
-                    except Exception as e:
-                        logging.exception(e)
-                    trade_manager.save_trade_delegate_data(dataList)
-                    # 鍒锋柊浜ゆ槗鐣岄潰
-                    trade_gui.THSGuiTrade().refresh_data()
+                        try:
+                            trade_manager.process_trade_delegate_data(dataList)
+                        except Exception as e:
+                            logging.exception(e)
+                        trade_manager.save_trade_delegate_data(dataList)
+                        # 鍒锋柊浜ゆ槗鐣岄潰
+                        trade_gui.THSGuiTrade().refresh_data()
+                    finally:
+                        pass
 
                 elif type == 4:
                     # 琛屼笟浠g爜淇℃伅
@@ -278,28 +295,33 @@
                     buy_one_price = data["buyOnePrice"]
                     buy_one_volumn = data["buyOneVolumn"]
                     buy_queue = data["buyQueue"]
-                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
-                                                                    buy_one_price, buy_time,
-                                                                    buy_queue)
-                    if buy_queue_result_list:
-                        # 鏈夋暟鎹�
-                        try:
-                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
-                                decimal.Decimal("0.00"))
-                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
-                                                                                      buy_queue_result_list)
-                            if buy_progress_index is not None:
-                                HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
-                                                                            l2_data_manager.local_today_datas.get(code),
-                                                                            l2_data_manager.local_today_num_operate_map.get(
-                                                                                code))
-                            logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}  鏁版嵁-{}", code,
-                                                           buy_progress_index,
-                                                           json.dumps(buy_queue_result_list))
-                        except Exception as e:
-                            print("涔板叆闃熷垪", code, buy_queue_result_list)
-                            logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{}  鏁版嵁-{}", code, str(e),
-                                                              json.dumps(buy_queue_result_list))
+                    if buy_one_price is None:
+                        print('涔�1浠锋病鏈夛紝', code)
+                    limit_up_price = gpcode_manager.get_limit_up_price(code)
+                    if limit_up_price is not None:
+                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
+                                                                        buy_queue)
+                        if buy_queue_result_list:
+                            # 鏈夋暟鎹�
+                            try:
+                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
+                                    decimal.Decimal("0.00"))
+                                buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
+                                                                                          buy_queue_result_list)
+                                if buy_progress_index is not None:
+                                    HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
+                                                                                l2.l2_data_util.local_today_datas.get(
+                                                                                    code),
+                                                                                l2.l2_data_util.local_today_num_operate_map.get(
+                                                                                    code))
+                                logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}  鏁版嵁-{}", code,
+                                                               buy_progress_index,
+                                                               json.dumps(buy_queue_result_list))
+                            except Exception as e:
+                                logging.exception(e)
+                                print("涔板叆闃熷垪", code, buy_queue_result_list)
+                                logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{}  鏁版嵁-{}", code, str(e),
+                                                                  json.dumps(buy_queue_result_list))
 
                     # buy_queue鏄惁鏈夊彉鍖�
                     if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
@@ -318,7 +340,7 @@
                                 l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                             if need_sync:
                                 # 鍚屾鏁版嵁
-                                L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time)
+                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                     # print(buy_time, buy_one_price, buy_one_volumn)
 
                     # print("L2涔板崠闃熷垪",datas)

--
Gitblit v1.8.0