From c0bcfe746b97bc126636a658b1f01fc6a51f9f95 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 25 九月 2023 17:55:33 +0800 Subject: [PATCH] 将华鑫订单交易成功独立出来处理 --- third_data/data_server.py | 82 ++++++++++++++++++++++++++++++++--------- 1 files changed, 64 insertions(+), 18 deletions(-) diff --git a/third_data/data_server.py b/third_data/data_server.py index 046a245..1fa8dad 100644 --- a/third_data/data_server.py +++ b/third_data/data_server.py @@ -1,18 +1,20 @@ import http import json import socketserver +import threading import time from http.server import BaseHTTPRequestHandler import dask +from log_module.log import logger_system, logger_debug from utils import global_util, tool from code_attribute import gpcode_manager from log_module import log, log_analyse, log_export -from l2 import code_price_manager, l2_data_util +from l2 import code_price_manager, l2_data_util, l2_data_manager_new from l2.cancel_buy_strategy import HourCancelBigNumComputer from output.limit_up_data_filter import IgnoreCodeManager -from third_data import kpl_util, kpl_data_manager, kpl_api -from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager +from third_data import kpl_util, kpl_data_manager, kpl_api, block_info +from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager, CodePlateKeyBuyManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \ KPLCodeLimitUpReasonManager @@ -21,8 +23,9 @@ from urllib.parse import parse_qs from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager -from trade import bidding_money_manager, trade_manager +from trade import bidding_money_manager, trade_manager, l2_trade_util from trade.l2_trade_util import BlackListCodeManager +import concurrent.futures class DataServer(BaseHTTPRequestHandler): @@ -39,7 +42,8 @@ # 绮鹃��,琛屼笟鏁版嵁缂撳瓨 __jingxuan_cache_dict = {} __industry_cache_dict = {} - + __latest_limit_up_codes_set = set() + __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) def __get_limit_up_list(self): # 缁熻鐩墠涓烘鐨勪唬鐮佹定鍋滄暟閲忥紙鍒嗘定鍋滃師鍥狅級 @@ -378,7 +382,8 @@ for code_info in codes_info: code_info[4] = 1 if code_info[0] in want_codes else 0 # 鑾峰彇浠g爜鐘舵�� - if trade_manager.CodesTradeStateManager().get_trade_state_cache(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE: + if trade_manager.CodesTradeStateManager().get_trade_state_cache( + code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE: code_info[5] = 1 response_data = json.dumps({"code": 0, "data": codes_info}) @@ -393,7 +398,7 @@ trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS: - hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code) + hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code) # 鏍规嵁鏃ュ織璇诲彇瀹炴椂鐨勮绠楁暟鎹� h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code) if hcancel_datas_dict: @@ -476,6 +481,49 @@ self.__send_response(result_str) def __process_kpl_data(self, data): + def do_limit_up(result_list_): + if result_list_: + # 淇濆瓨娑ㄥ仠鏃堕棿 + codes_set = set() + limit_up_reasons = {} + for d in result_list_: + code = d[0] + limit_up_reasons[code] = d[5] + codes_set.add(code) + if code.find("00") == 0 or code.find("60") == 0: + limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2])) + code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time) + add_codes = codes_set - self.__latest_limit_up_codes_set + self.__latest_limit_up_codes_set = codes_set + if add_codes: + for code in add_codes: + # 鏍规嵁娑ㄥ仠鍘熷洜鍒ゆ柇鏄惁鍙互涔� + if code.find("00") == 0 or code.find("60") == 0: + try: + # 鍒ゆ柇鏄惁涓嬪崟 + trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code) + if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED: + # 濮旀墭涓殑璁㈠崟锛屽垽鏂槸鍚﹂渶瑕佹挙鍗� + if not gpcode_manager.WantBuyCodesManager().is_in_cache(code): + yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() + current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict = kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, yesterday_codes, block_info.get_before_blocks_dict() + if not current_limit_up_datas: + current_limit_up_datas = [] + if not limit_up_record_datas: + limit_up_record_datas=[] + if CodePlateKeyBuyManager.is_need_cancel(code, limit_up_reasons.get(code), + current_limit_up_datas, + limit_up_record_datas, + yesterday_current_limit_up_codes, + before_blocks_dict): + pass + # TODO 娴嬭瘯鏆傛椂娉ㄩ噴 + # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"娑ㄥ仠鍘熷洜锛坽 limit_up_reasons.get(code)}锛変笉鏄�佸ぇ鎾ゅ崟", "鏉垮潡鎾�") + except Exception as e: + logger_debug.exception(e) + kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list_) + self.__kplDataManager.save_data(type_, result_list_) + type_ = data["type"] print("寮�鐩樺暒type:", type_) if type_ == KPLDataType.BIDDING.value: @@ -493,15 +541,7 @@ self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP) - if result_list: - # 淇濆瓨娑ㄥ仠鏃堕棿 - for d in result_list: - code = d[0] - if code.find("00") == 0 or code.find("60") == 0: - limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2])) - code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time) - self.__kplDataManager.save_data(type_, result_list) - kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list) + self.__data_process_thread_pool.submit(lambda: do_limit_up(result_list)) elif type_ == KPLDataType.OPEN_LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP) if result_list: @@ -566,12 +606,18 @@ def run(addr, port): # 杩愯鐪嬬洏娑堟伅閲囬泦 - kp_client_msg_manager.run_capture() + # kp_client_msg_manager.run_capture() + kpl_data_manager.run_pull_task() + handler = DataServer # httpd = socketserver.TCPServer((addr, port), handler) httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) - httpd.serve_forever() + try: + httpd.serve_forever() + except Exception as e: + logger_system.exception(e) + logger_system.error(f"绔彛鏈嶅姟鍣細{port} 鍚姩澶辫触") if __name__ == "__main__": -- Gitblit v1.8.0