From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- l2_test.py | 234 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 229 insertions(+), 5 deletions(-) diff --git a/l2_test.py b/l2_test.py index bf4333c..d555401 100644 --- a/l2_test.py +++ b/l2_test.py @@ -1,12 +1,163 @@ +import http +import json +import logging import multiprocessing +import queue +import socketserver +import threading import time -import psutil +from http.server import BaseHTTPRequestHandler +import psutil +import requests + +from api import low_suction_data_pusher +from code_attribute import global_data_loader from huaxin_client import l2_client_test, l1_subscript_codes_manager -from log_module.log import logger_local_huaxin_l2_upload +from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \ + logger_local_huaxin_l2_transaction_accurate_big_order +from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager +from third_data.history_k_data_manager import HistoryKDataManager +from third_data.history_k_data_util import HistoryKDatasUtils +from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager +from utils import tool, middle_api_protocol, global_util +import urllib.parse as urlparse +from urllib.parse import parse_qs + + +class DataServer(BaseHTTPRequestHandler): + # 绂佺敤鏃ュ織杈撳嚭 + def log_message(self, format, *args): + pass + + def do_GET(self): + path = self.path + url = urlparse.urlparse(path) + response_data = "" + if url.path == "/get_block_codes_money": + # 鑾峰彇鏉垮潡瀵瑰簲鐨勪唬鐮佷笌璇ヤ唬鐮佺殑鍑�娴佸叆 + ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) + block = ps_dict.get('block') + try: + fdatas = BlockInMoneyRankManager().get_block_codes_money(block) + response_data = json.dumps({"code": 0, "data": fdatas}) + except Exception as e: + response_data = json.dumps({"code": 1, "msg": str(e)}) + elif url.path == "/get_big_order_list": + # 鑾疯幏鍙栦唬鐮佺殑澶т拱/鍗栧崟鍒楄〃 + ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) + code = ps_dict.get('code') + try: + buy_datas = CodeInMoneyManager().get_big_buy_money_list(code) + if buy_datas is None: + buy_datas = [] + sell_datas = CodeInMoneyManager().get_big_sell_money_list(code) + if sell_datas is None: + sell_datas = [] + response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}}) + except Exception as e: + response_data = json.dumps({"code": 1, "msg": str(e)}) + elif url.path == "/get_code_money_info": + # 鑾峰彇浠g爜閲戦 + ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) + code = ps_dict.get('code') + money_info = CodeInMoneyManager().get_money_info(code) + response_data = json.dumps({"code": 0, "data": money_info}) + elif url.path == "/get_codes_money_info": + ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) + codes_str = ps_dict.get('codes') + codes = json.loads(codes_str) + fresults = {} + for code in codes: + money_info = CodeInMoneyManager().get_money_info(code) + fresults[code] = money_info + response_data = json.dumps({"code": 0, "data": fresults}) + self.send_response(200) + # 鍙戠粰璇锋眰瀹㈡埛绔殑鍝嶅簲鏁版嵁 + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(response_data.encode()) + + +class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + pass + + +def __run_server(addr, port): + handler = DataServer + try: + httpd = ThreadedHTTPServer((addr, port), handler) + print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port)) + httpd.serve_forever() + except Exception as e: + logger_system.exception(e) + logger_system.error(f"绔彛鏈嶅姟鍣細{port} 鍚姩澶辫触") + + +def __run_upload_big_order_task(_queue: queue.Queue): + # 杩愯涓婁紶澶у崟浠诲姟 + while True: + try: + datas = [] + while not _queue.empty(): + datas.append(_queue.get()) + if datas: + # 涓婁紶鏁版嵁 + requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas)) + except: + pass + finally: + time.sleep(1) + + +def __get_special_codes(): + """ + 鑾峰彇鐗规畩鐨勪唬鐮侊紝闇�瑕佽闃�300w浠ヤ笂鐨勫ぇ鍗� + @return: 浠g爜闆嗗悎 + """ + try: + zylt_volume_map = global_util.zylt_volume_map + codes = set() + last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] + for code in zylt_volume_map: + if code == '601288': + print("") + volume = zylt_volume_map.get(code) + # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 + k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) + if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: + # 鑷敱娴侀�氬競鍊煎湪10浜�-300浜夸互涓� + limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) + if limit_up_price > k_bars[0]["high"] or True: + # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 + codes.add(code) + # 鑾峰彇杈ㄨ瘑搴︾殑绁� + special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys() + if special_codes: + codes |= set(special_codes) + return codes + except Exception as e: + logger_system.exception(e) + return set() + + +def __save_accurate_big_order(big_accurate_order_queue): + while True: + try: + datas = [] + while not big_accurate_order_queue.empty(): + data = big_accurate_order_queue.get() + datas.append(data) + if datas: + low_suction_data_pusher.push_big_order(datas) + for data in datas: + logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") + except: + pass def run(): + special_codes = __get_special_codes() codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes = [x.decode() for x in codes_sh] codes.extend([x.decode() for x in codes_sz]) @@ -15,24 +166,97 @@ cpu_count = 16 page_size = int(len(codes) / cpu_count) + 1 - big_order_queue = multiprocessing.Queue() + big_order_queue = multiprocessing.Queue(maxsize=1024) + big_accurate_order_queue = multiprocessing.Queue(maxsize=1024) + # 澶у崟涓婁紶闃熷垪 + big_order_upload_queue = queue.Queue(maxsize=1024) for i in range(cpu_count): process = multiprocessing.Process(target=l2_client_test.run, - args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,)) + args=( + codes[i * page_size:(i + 1) * page_size], big_order_queue, + big_accurate_order_queue, special_codes,)) process.start() # 缁戞牳杩愯 psutil.Process(process.pid).cpu_affinity([i]) + threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start() + threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start() + while True: try: data = big_order_queue.get() - logger_local_huaxin_l2_upload.info(f"{data}") + CodeInMoneyManager().add_data(data) + # 娣诲姞涓婁紶鏁版嵁 + big_order_upload_queue.put_nowait(data) + logger_local_huaxin_l2_transaction_big_order.info(f"{data}") except: pass +def __compute_and_upload(): + def __upload_data(type_, data_): + root_data = { + "type": type_, + "data": data_ + } + requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data)) + + def __upload_codes_in_money(): + """ + 涓婁紶鎵�鏈変唬鐮佺殑娴佸叆 + @param type_: + @param data_: + @return: + """ + + root_data = { + "data": json.dumps(CodeInMoneyManager().get_code_money_dict()) + } + requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data)) + + while True: + try: + if not tool.is_trade_time(): + continue + BlockInMoneyRankManager().compute() + in_list = BlockInMoneyRankManager().get_in_list() + out_list = BlockInMoneyRankManager().get_out_list() + # (浠g爜,鍚嶇О,寮哄害,涓诲姏鍑�棰�) + fins = [(0, x[0], 0, x[1]) for x in in_list[:100]] + fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]] + # 涓婁紶 + __upload_data("jingxuan_rank", json.dumps(fins)) + __upload_data("jingxuan_rank_out", json.dumps(fouts)) + __upload_codes_in_money() + try: + low_suction_data_pusher.push_block_in(in_list) + except: + pass + except Exception as e: + logging.exception(e) + finally: + time.sleep(3) + + +def __update_today_limit_up_records(): + while True: + try: + BlockInMoneyRankManager().load_today_limit_up_codes() + except: + pass + finally: + time.sleep(3) + + if __name__ == "__main__": + # 杞藉叆鑷敱娴侀�氶噺 + global_data_loader.load_zyltgb_volume_from_db() + threading.Thread(target=__compute_and_upload, daemon=True).start() + # 鍚姩鍐呴儴鎺ュ彛鏈嶅姟 + threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start() + # 鍚敤瀹氭椂鏇存柊褰撴棩娑ㄥ仠 + threading.Thread(target=__update_today_limit_up_records, daemon=True).start() run() while True: time.sleep(2) -- Gitblit v1.8.0