From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 18 六月 2025 18:41:30 +0800
Subject: [PATCH] 异常保护

---
 l2_test.py |  128 ++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 122 insertions(+), 6 deletions(-)

diff --git a/l2_test.py b/l2_test.py
index 6daa626..d555401 100644
--- a/l2_test.py
+++ b/l2_test.py
@@ -2,6 +2,7 @@
 import json
 import logging
 import multiprocessing
+import queue
 import socketserver
 import threading
 import time
@@ -10,10 +11,16 @@
 import psutil
 import requests
 
+from api import low_suction_data_pusher
+from code_attribute import global_data_loader
 from huaxin_client import l2_client_test, l1_subscript_codes_manager
-from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
+from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
+    logger_local_huaxin_l2_transaction_accurate_big_order
 from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
-from utils import tool
+from third_data.history_k_data_manager import HistoryKDataManager
+from third_data.history_k_data_util import HistoryKDatasUtils
+from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
+from utils import tool, middle_api_protocol, global_util
 import urllib.parse as urlparse
 from urllib.parse import parse_qs
 
@@ -28,13 +35,43 @@
         url = urlparse.urlparse(path)
         response_data = ""
         if url.path == "/get_block_codes_money":
+            # 鑾峰彇鏉垮潡瀵瑰簲鐨勪唬鐮佷笌璇ヤ唬鐮佺殑鍑�娴佸叆
             ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
-            block = ps_dict['block']
+            block = ps_dict.get('block')
             try:
                 fdatas = BlockInMoneyRankManager().get_block_codes_money(block)
                 response_data = json.dumps({"code": 0, "data": fdatas})
             except Exception as e:
                 response_data = json.dumps({"code": 1, "msg": str(e)})
+        elif url.path == "/get_big_order_list":
+            # 鑾疯幏鍙栦唬鐮佺殑澶т拱/鍗栧崟鍒楄〃
+            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
+            code = ps_dict.get('code')
+            try:
+                buy_datas = CodeInMoneyManager().get_big_buy_money_list(code)
+                if buy_datas is None:
+                    buy_datas = []
+                sell_datas = CodeInMoneyManager().get_big_sell_money_list(code)
+                if sell_datas is None:
+                    sell_datas = []
+                response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}})
+            except Exception as e:
+                response_data = json.dumps({"code": 1, "msg": str(e)})
+        elif url.path == "/get_code_money_info":
+            # 鑾峰彇浠g爜閲戦
+            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
+            code = ps_dict.get('code')
+            money_info = CodeInMoneyManager().get_money_info(code)
+            response_data = json.dumps({"code": 0, "data": money_info})
+        elif url.path == "/get_codes_money_info":
+            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
+            codes_str = ps_dict.get('codes')
+            codes = json.loads(codes_str)
+            fresults = {}
+            for code in codes:
+                money_info = CodeInMoneyManager().get_money_info(code)
+                fresults[code] = money_info
+            response_data = json.dumps({"code": 0, "data": fresults})
         self.send_response(200)
         # 鍙戠粰璇锋眰瀹㈡埛绔殑鍝嶅簲鏁版嵁
         self.send_header('Content-type', 'application/json')
@@ -57,7 +94,70 @@
         logger_system.error(f"绔彛鏈嶅姟鍣細{port} 鍚姩澶辫触")
 
 
+def __run_upload_big_order_task(_queue: queue.Queue):
+    # 杩愯涓婁紶澶у崟浠诲姟
+    while True:
+        try:
+            datas = []
+            while not _queue.empty():
+                datas.append(_queue.get())
+            if datas:
+                # 涓婁紶鏁版嵁
+                requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas))
+        except:
+            pass
+        finally:
+            time.sleep(1)
+
+
+def __get_special_codes():
+    """
+    鑾峰彇鐗规畩鐨勪唬鐮侊紝闇�瑕佽闃�300w浠ヤ笂鐨勫ぇ鍗�
+    @return: 浠g爜闆嗗悎
+    """
+    try:
+        zylt_volume_map = global_util.zylt_volume_map
+        codes = set()
+        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
+        for code in zylt_volume_map:
+            if code == '601288':
+                print("")
+            volume = zylt_volume_map.get(code)
+            # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环
+            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
+            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
+                # 鑷敱娴侀�氬競鍊煎湪10浜�-300浜夸互涓�
+                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
+                if limit_up_price > k_bars[0]["high"] or True:
+                    # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环
+                    codes.add(code)
+        # 鑾峰彇杈ㄨ瘑搴︾殑绁�
+        special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys()
+        if special_codes:
+            codes |= set(special_codes)
+        return codes
+    except Exception as e:
+        logger_system.exception(e)
+        return set()
+
+
+def __save_accurate_big_order(big_accurate_order_queue):
+    while True:
+        try:
+            datas = []
+            while not big_accurate_order_queue.empty():
+                data = big_accurate_order_queue.get()
+                datas.append(data)
+            if datas:
+                low_suction_data_pusher.push_big_order(datas)
+                for data in datas:
+                    logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
+        except:
+            pass
+
+
 def run():
+    special_codes = __get_special_codes()
     codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
     codes = [x.decode() for x in codes_sh]
     codes.extend([x.decode() for x in codes_sz])
@@ -66,19 +166,29 @@
     cpu_count = 16
     page_size = int(len(codes) / cpu_count) + 1
 
-    big_order_queue = multiprocessing.Queue()
+    big_order_queue = multiprocessing.Queue(maxsize=1024)
+    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
+    # 澶у崟涓婁紶闃熷垪
+    big_order_upload_queue = queue.Queue(maxsize=1024)
 
     for i in range(cpu_count):
         process = multiprocessing.Process(target=l2_client_test.run,
-                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
+                                          args=(
+                                              codes[i * page_size:(i + 1) * page_size], big_order_queue,
+                                              big_accurate_order_queue, special_codes,))
 
         process.start()
         # 缁戞牳杩愯
         psutil.Process(process.pid).cpu_affinity([i])
+    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
+    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
+
     while True:
         try:
             data = big_order_queue.get()
             CodeInMoneyManager().add_data(data)
+            # 娣诲姞涓婁紶鏁版嵁
+            big_order_upload_queue.put_nowait(data)
             logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
         except:
             pass
@@ -113,12 +223,16 @@
             in_list = BlockInMoneyRankManager().get_in_list()
             out_list = BlockInMoneyRankManager().get_out_list()
             # (浠g爜,鍚嶇О,寮哄害,涓诲姏鍑�棰�)
-            fins = [(0, x[0], 0, x[1]) for x in in_list[:50]]
+            fins = [(0, x[0], 0, x[1]) for x in in_list[:100]]
             fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]]
             # 涓婁紶
             __upload_data("jingxuan_rank", json.dumps(fins))
             __upload_data("jingxuan_rank_out", json.dumps(fouts))
             __upload_codes_in_money()
+            try:
+                low_suction_data_pusher.push_block_in(in_list)
+            except:
+                pass
         except Exception as e:
             logging.exception(e)
         finally:
@@ -136,6 +250,8 @@
 
 
 if __name__ == "__main__":
+    # 杞藉叆鑷敱娴侀�氶噺
+    global_data_loader.load_zyltgb_volume_from_db()
     threading.Thread(target=__compute_and_upload, daemon=True).start()
     # 鍚姩鍐呴儴鎺ュ彛鏈嶅姟
     threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start()

--
Gitblit v1.8.0