Administrator
2023-09-25 c0bcfe746b97bc126636a658b1f01fc6a51f9f95
third_data/data_server.py
@@ -1,19 +1,20 @@
import http
import json
import socketserver
import threading
import time
from http.server import BaseHTTPRequestHandler
import dask
from log_module.log import logger_system
from log_module.log import logger_system, logger_debug
from utils import global_util, tool
from code_attribute import gpcode_manager
from log_module import log, log_analyse, log_export
from l2 import code_price_manager, l2_data_util
from l2 import code_price_manager, l2_data_util, l2_data_manager_new
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager
from third_data import kpl_util, kpl_data_manager, kpl_api, block_info
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \
    KPLCodeLimitUpReasonManager
@@ -22,8 +23,9 @@
from urllib.parse import parse_qs
from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager
from trade import bidding_money_manager, trade_manager
from trade import bidding_money_manager, trade_manager, l2_trade_util
from trade.l2_trade_util import BlackListCodeManager
import concurrent.futures
class DataServer(BaseHTTPRequestHandler):
@@ -40,6 +42,8 @@
    # 精选,行业数据缓存
    __jingxuan_cache_dict = {}
    __industry_cache_dict = {}
    __latest_limit_up_codes_set = set()
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    def __get_limit_up_list(self):
        # 统计目前为止的代码涨停数量(分涨停原因)
@@ -477,16 +481,48 @@
            self.__send_response(result_str)
    def __process_kpl_data(self, data):
        def do_limit_up(result_list):
            if result_list:
        def do_limit_up(result_list_):
            if result_list_:
                # 保存涨停时间
                for d in result_list:
                codes_set = set()
                limit_up_reasons = {}
                for d in result_list_:
                    code = d[0]
                    limit_up_reasons[code] = d[5]
                    codes_set.add(code)
                    if code.find("00") == 0 or code.find("60") == 0:
                        limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2]))
                        code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time)
                kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list)
                self.__kplDataManager.save_data(type_, result_list)
                add_codes = codes_set - self.__latest_limit_up_codes_set
                self.__latest_limit_up_codes_set = codes_set
                if add_codes:
                    for code in add_codes:
                        # 根据涨停原因判断是否可以买
                        if code.find("00") == 0 or code.find("60") == 0:
                            try:
                                # 判断是否下单
                                trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED:
                                    # 委托中的订单,判断是否需要撤单
                                    if not gpcode_manager.WantBuyCodesManager().is_in_cache(code):
                                        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                                        current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict = kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, yesterday_codes, block_info.get_before_blocks_dict()
                                        if not current_limit_up_datas:
                                            current_limit_up_datas = []
                                        if not limit_up_record_datas:
                                            limit_up_record_datas=[]
                                        if CodePlateKeyBuyManager.is_need_cancel(code, limit_up_reasons.get(code),
                                                                                 current_limit_up_datas,
                                                                                 limit_up_record_datas,
                                                                                 yesterday_current_limit_up_codes,
                                                                                 before_blocks_dict):
                                            pass
                                            # TODO 测试暂时注释
                                            # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"涨停原因({ limit_up_reasons.get(code)})不是老大撤单", "板块撤")
                            except Exception as e:
                                logger_debug.exception(e)
                kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list_)
                self.__kplDataManager.save_data(type_, result_list_)
        type_ = data["type"]
        print("开盘啦type:", type_)
@@ -505,8 +541,7 @@
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.LIMIT_UP.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP)
            do_limit_up(result_list)
            self.__data_process_thread_pool.submit(lambda: do_limit_up(result_list))
        elif type_ == KPLDataType.OPEN_LIMIT_UP.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP)
            if result_list: