Administrator
2023-08-08 c20c3c10635ce78db4a86ce9c0bb1d02e90f525d
server.py
@@ -10,42 +10,52 @@
import threading
import time
import alert_util
import client_manager
import code_volumn_manager
import data_process
import global_data_loader
import global_util
import gpcode_manager
import authority
import juejin
import l2_data_log
from l2 import l2_data_manager_new, l2_data_manager
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \
    gpcode_first_screen_manager, first_target_code_data_processor
import constant
from user import authority
import inited_data
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
import l2.l2_data_util
import ths_industry_util
import ths_util
import tool
from trade import trade_gui, trade_data_manager, trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
from output import code_info_output
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager
from trade import trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
    current_price_process_manager, trade_juejin
from code_attribute.code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.huaxin import huaxin_trade_record_manager
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
ths_util = import_util.import_lib("ths.ths_util")
trade_gui = import_util.import_lib("trade.trade_gui")
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_trade=None, pipe_ui=None):
        self.pipe_trade = pipe_trade  # 增加的参数
        self.pipe_ui = pipe_ui
        # 初始化数据
        block_info.init()
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
# 首板tick级数据
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
@@ -55,17 +65,28 @@
    ths_l2_trade_queue_manager = thsl2tradequeuemanager()
    latest_buy1_volumn_dict = {}
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    first_tick_datas = []
    latest_oringin_data = {}
    last_l2_listen_health_time = {}
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    # 在L2监控上采集的现价
    __l2_current_price_data = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
        # print("----setup方法被执行-----")
        # print("打印传入的参数:", self.server.pipe)
        # print("打印传入的参数:", self.server.pipe_trade)
        self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
    def __notify_trade(self, type_):
        if self.server.pipe_trade:
            self.server.pipe_trade.send(json.dumps({"type": type_}))
    def handle(self):
        host = self.client_address[0]
@@ -80,92 +101,134 @@
        # print("- " * 30)
        sk: socket.socket = self.request
        while True:
            data = sk.recv(1024 * 1024 * 20)
            data = sk.recv(1024 * 100)
            if len(data) == 0:
                # print("客户端断开连接")
                break
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
                type = -1
                try:
                    # 如果带有头
                    if _str.startswith("##"):
                        total_length = int(_str[2:10])
                        _str = _str[10:]
                        # 防止socket数据发生粘连
                        while total_length > len(_str):
                            d = sk.recv(1024 * 100)
                            if d:
                                _str += d.decode(encoding='gbk')
                    type = data_process.parseType(_str)
                except Exception as e:
                    print("接受到的异常数据:", f"{_str[:10]}...{_str[-10:]}")
                    if str(e).find("Unterminated string starting") > -1:
                        _str = _str.replace("\n", "")
                        type = data_process.parseType(_str)
                    else:
                        print(_str)
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                            code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        last_health_time = self.last_l2_listen_health_time.get((client, channel))
                        # --------------------------------设置L2健康状态--------------------------------
                        if last_health_time is None or __start_time - last_health_time > 1000:
                            self.last_l2_listen_health_time[(client, channel)] = __start_time
                            # 更新监听位健康状态
                            if origin_datas_count == 0:
                                l2_listen_pos_health_manager.set_unhealthy(client, channel)
                            else:
                                l2_listen_pos_health_manager.set_healthy(client, channel)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # 校验客户端代码
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                     do_id)
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    if key not in self.l2_data_error_dict or round(
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                            except Exception as e:
                                print("异常", str(e), code)
                                logging.exception(e)
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                        key = "{}-{}-{}".format(client, channel, code)
                                        if key not in self.l2_data_error_dict or round(
                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                            # self.l2CodeOperate.repaire_l2_data(code)
                                            # todo 太敏感移除代码
                                            logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                            # 单价不一致时需要移除代码重新添加
                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                                except Exception as e:
                                    print("异常", str(e), code)
                                    logging.exception(e)
                                    logger_l2_error.error("出错:{}".format(str(e)))
                                    logger_l2_error.error("内容:{}".format(_str))
                                finally:
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                        if str(e).find("新增数据"):
                            pass
                        else:
                            logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list, is_add = data_process.parseGPCode(_str)
@@ -174,11 +237,11 @@
                    for data in data_list:
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    if is_add:
                        gpcode_manager.add_gp_list(code_datas)
                    else:
                        gpcode_manager.set_gp_list(code_datas)
                    code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list)
                    # if is_add:
                    #     gpcode_manager.add_gp_list(code_datas)
                    # else:
                    #     gpcode_manager.set_gp_list(code_datas)
                    if not is_add:
                        # 同步同花顺目标代码
@@ -209,6 +272,22 @@
                        # 获取是否有涨停时间
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 22:
                    print("---接受到首板代码")
                    try:
                        if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList)
                        # 保存现价
                        self.first_tick_datas.clear()
                        self.first_tick_datas.extend(tick_datas)
                    except Exception as e:
                        logging.exception(e)
                    finally:
                        print("首板代码处理完毕:")
                        return_str = socket_util.load_header(json.dumps({"code": 0}).encode("utf-8")).decode("utf-8")
                elif type == 3:
                    # 交易成功信息
@@ -221,107 +300,194 @@
                elif type == 5:
                    logger_trade_delegate.debug("接收到委托信息")
                    # 交易委托信息
                    dataList = data_process.parseList(_str)
                    if self.last_trade_delegate_data != _str:
                        self.last_trade_delegate_data = _str
                        # 保存委托信息
                        logger_trade_delegate.info(dataList)
                    __start_time = round(time.time() * 1000)
                    try:
                        # 设置申报时间
                        for item in dataList:
                            apply_time = item["apply_time"]
                            if apply_time and len(apply_time) >= 8:
                                code = item["code"]
                                trade_state = trade_manager.get_trade_state(code)
                                # 设置下单状态的代码为已委托
                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                    origin_apply_time = apply_time
                                    apply_time = apply_time[0:6]
                                    apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6])
                                    ms = origin_apply_time[6:9]
                                    if int(ms) > 500:
                                        # 时间+1s
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                        # 交易委托信息
                        dataList = data_process.parseList(_str)
                        if self.last_trade_delegate_data != _str:
                            self.last_trade_delegate_data = _str
                            # 保存委托信息
                            logger_trade_delegate.info(dataList)
                        try:
                            # 设置申报时间
                            for item in dataList:
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
                                        apply_time = apply_time[0:6]
                                        apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
                                                                       apply_time[4:6])
                                        ms = origin_apply_time[6:9]
                                        if int(ms) > 500:
                                            # 时间+1s
                                            apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                    except Exception as e:
                        logging.exception(e)
                                        print(apply_time)
                        except Exception as e:
                            logging.exception(e)
                    try:
                        trade_manager.process_trade_delegate_data(dataList)
                    except Exception as e:
                        logging.exception(e)
                    trade_manager.save_trade_delegate_data(dataList)
                    # 刷新交易界面
                    trade_gui.THSGuiTrade().refresh_data()
                        try:
                            trade_manager.process_trade_delegate_data(dataList)
                        except Exception as e:
                            logging.exception(e)
                        trade_manager.save_trade_delegate_data(dataList)
                        # 刷新交易界面
                        if trade_gui is not None:
                            trade_gui.THSGuiTrade().refresh_data()
                    finally:
                        pass
                elif type == 4:
                    # 行业代码信息
                    dataList = data_process.parseList(_str)
                    ths_industry_util.save_industry_code(dataList)
                    codes = []
                    for datas in dataList:
                        for d in datas:
                            name = ths_industry_util.get_name_by_code(d['code'])
                            if not name or name == 'None':
                                codes.append(d["code"])
                    # 根据代码获取代码名称
                    codes_name = {}
                    if codes:
                        codes_name = HistoryKDatasUtils.get_gp_codes_names(codes)
                    ths_industry_util.save_industry_code(dataList, codes_name)
                elif type == 6:
                    # 可用金额
                    datas = data_process.parseData(_str)
                    client = datas["client"]
                    money = datas["money"]
                    # TODO存入缓存文件
                    trade_manager.set_available_money(client, money)
                    trade_manager.AccountAvailableMoneyManager().set_available_money(client, money)
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    __start_time = time.time()
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_one_price, buy_time,
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                            l2_data_manager.local_today_datas.get(code),
                                                                            l2_data_manager.local_today_num_operate_map.get(
                                                                                code))
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
                        except Exception as e:
                            print("买入队列", code, buy_queue_result_list)
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.dumps(buy_queue_result_list))
                    msg = ""
                    try:
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
                        data = datas["data"]
                        buy_time = data["buyTime"]
                        buy_one_price = data["buyOnePrice"]
                        buy_one_volumn = data["buyOneVolumn"]
                        sell_one_price = data["sellOnePrice"]
                        sell_one_volumn = data["sellOneVolumn"]
                        buy_queue = data["buyQueue"]
                        if buy_one_price is None:
                            print('买1价没有,', code)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price,
                                                                          sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
                            buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                            buy_time,
                                                                            buy_queue)
                            msg += "买队列保存:" + f"{time.time() - _start_time} "
                            _start_time = time.time()
                            if buy_queue_result_list:
                                # 有数据
                                try:
                                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data(
                                        code)
                                    if True:
                                        # 只有下单过后才获取交易进度
                                        exec_time = None
                                        try:
                                            if buy_exec_index:
                                                exec_time = \
                                                    l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                                        "time"]
                                        except:
                                            pass
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                     buy_one_price_,
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer().set_trade_progress(code, buy_time,
                                                                                          buy_exec_index,
                                                                                          buy_progress_index,
                                                                                          l2.l2_data_util.local_today_datas.get(
                                                                                              code),
                                                                                          l2.l2_data_util.local_today_num_operate_map.get(
                                                                                              code))
                                            LCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                                       l2.l2_data_util.local_today_datas.get(
                                                                                           code))
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                            # 计算大单成交额
                                            deal_big_money_manager.set_trade_progress(code, buy_progress_index,
                                                                                      l2.l2_data_util.local_today_datas.get(
                                                                                          code),
                                                                                      l2.l2_data_util.local_today_num_operate_map.get(
                                                                                          code))
                                        else:
                                            raise Exception("暂未获取到交易进度")
                                        msg += "计算成交进度:" + f"{time.time() - _start_time} "
                                        _start_time = time.time()
                                except Exception as e:
                                    logging.exception(e)
                                    print("买入队列", code, buy_queue_result_list)
                                    logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                      json.dumps(buy_queue_result_list))
                        # buy_queue是否有变化
                        if self.l2_trade_buy_queue_dict.get(
                                code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            self.buy1_price_manager.save(code, buy_one_price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                            self.l2_trade_buy_queue_dict[code] = buy_queue
                            logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                            msg += "保存记录日志:" + f"{time.time() - _start_time} "
                            _start_time = time.time()
                        # 保存最近的记录
                        if self.ths_l2_trade_queue_manager.save_recod(code, data):
                            if buy_time != "00:00:00":
                                logger_l2_trade_queue.info("{}-{}", code, data)
                                need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                                   int(buy_one_volumn),
                                                                                                   buy_one_price)
                                # if need_sync:
                                #     # 同步数据
                                #     s =  time.time()
                                #     L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                                #     msg += "量校验:"+f"{time.time()-s} "
                        # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                        # print("L2买卖队列",datas)
                        msg += "买1处理:" + f"{time.time() - _start_time} "
                        _start_time = time.time()
                    except:
                        pass
                    finally:
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -333,14 +499,28 @@
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print("现价数量", len(data))
                        for item in data:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                    datas = data_process.parse(_str)["data"]
                    if datas is None:
                        datas = []
                    print("二板现价")
                    # 获取暂存的二版现价数据
                    if self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("二板现价数量", len(datas))
                        for item in datas:
                            volumn = item["volume"]
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accpt_prices(data)
                        current_price_process_manager.accept_prices(datas)
                # L2现价更新
                elif type == 41:
                    datas = data_process.parse(_str)["data"]
                    if datas:
                        for d in datas:
                            code = d["code"]
                            self.__l2_current_price_data[code] = d
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
@@ -359,19 +539,16 @@
                                # 记录数据
                                logger_buy_1_volumn_record.info("{}-{}", code, data)
                            self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
                            # 保存买1价格
                            self.buy1_price_manager.save(code, price)
                            # 校正时间
                            time_ = tool.compute_buy1_real_time(time_)
                            # 保存数据
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                            # if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            # if need_sync:
                            #     # 同步数据
                            #     L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
@@ -379,12 +556,15 @@
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    client_manager.saveClientActive(int(client_id), host, thsDead)
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
                        l2_clients = authority.get_l2_clients()
                        if client_id in l2_clients:
                            alert_util.alarm()
                    if constant.is_windows():
                        # 动态导入
                        if ths_util.is_ths_dead(client_id):
                            # TODO 重启同花顺
                            # 报警
                            l2_clients = authority.get_l2_clients()
                            if client_id in l2_clients:
                                alert_util.alarm()
                elif type == 60:
                    # L2自启动成功
                    data = data_process.parse(_str)["data"]
@@ -401,11 +581,11 @@
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:8]
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[8:]
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, 8):
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
@@ -419,6 +599,281 @@
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    data_json = data_process.parse(_str)
                    day = data_json["day"]
                    datas = data_json["data"]
                    # if datas:
                    #     hot_block_data_process.save_datas(day, datas)
                    print(datas)
                elif type == 71:
                    # 根据代码获取选股宝热门概念
                    day = tool.get_now_date_str()
                    code = data_process.parse(_str)["data"]["code"]
                    __start_time = time.time()
                    final_data = {'code': code, 'data': code_info_output.get_output_html(code)}
                    return_str = json.dumps({"code": 0, "data": final_data})
                    print("代码信息获取时间", code, round((time.time() - __start_time) * 1000))
                    pass
                # 获取最近2个交易日涨停代码
                elif type == 72:
                    day = tool.get_now_date_str()
                    data_dict = {}
                    for i in range(0, 2):
                        day = HistoryKDatasUtils.get_previous_trading_date(day)
                        data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day))
                        codes_set = set()
                        if data_list:
                            for d in data_list:
                                if len(d[4]) > 6:
                                    codes_set.add(d[3])
                        data_dict[day] = list(codes_set)
                    return_str = json.dumps({"code": 0, "data": data_dict})
                elif type == 80:
                    # 撤单
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    if code:
                        state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
                                return_str = json.dumps({"code": 0})
                            except Exception as e:
                                return_str = json.dumps({"code": 2, "msg": str(e)})
                        else:
                            return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
                    else:
                        return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                elif type == 82:
                    # 获取委托列表
                    data = json.loads(_str)
                    update_time = data["data"]["update_time"]
                    results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                        tool.get_now_date_str("%Y%m%d"), update_time)
                    return_str = json.dumps(
                        {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
                elif type == 201:
                    # 加入黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("black_list")
                elif type == 202:
                    # 加入白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    try:
                        for code in codes:
                            # 自由流通市值>50亿,股价高于30块的不能加白名单
                            # limit_up_price = gpcode_manager.get_limit_up_price(code)
                            # if float(limit_up_price) > 30:
                            #     raise Exception("股价高于30元")
                            # zyltgb = global_util.zyltgb_map.get(code)
                            # if zyltgb is None:
                            #     global_data_loader.load_zyltgb()
                            #     zyltgb = global_util.zyltgb_map.get(code)
                            # if zyltgb > 50 * 100000000:
                            #     raise Exception("自由流通股本大于50亿")
                            l2_trade_util.WhiteListCodeManager().add_code(code)
                            name = gpcode_manager.get_code_name(code)
                            if not name:
                                results = HistoryKDatasUtils.get_gp_codes_names([code])
                                if results:
                                    gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                        return_str = json.dumps({"code": 0})
                    except Exception as e:
                        return_str = json.dumps({"code": 1, "msg": str(e)})
                    self.__notify_trade("white_list")
                elif type == 203:
                    # 移除黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.remove_from_forbidden_trade_codes(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("black_list")
                elif type == 204:
                    # 移除白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("white_list")
                elif type == 301:
                    # 黑名单列表
                    codes = l2_trade_util.BlackListCodeManager().list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager().list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 401:
                    # 加入想要买
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager().add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    if "plates" in data["data"]:
                        for i in range(len(data["data"]["plates"])):
                            self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("want_list")
                elif type == 402:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("want_list")
                elif type == 403:
                    plate = None
                    include_codes = set()
                    if _str:
                        data = json.loads(_str)
                        plate = data.get("plate")
                        if plate:
                            code_map = self.__KPLCodeLimitUpReasonManager.list_all()
                            for k in code_map:
                                if code_map[k] == plate:
                                    include_codes.add(k)
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        if plate and plate != '其他' and code not in include_codes:
                            continue
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 411:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager().add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("pause_buy_list")
                    # 加入暂停买入列表
                elif type == 412:
                    # 移除暂停买入列表
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("pause_buy_list")
                elif type == 413:
                    # 暂停买入列表
                    codes = gpcode_manager.PauseBuyCodesManager().list_code()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 420:
                    # 是否可以撤单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    code = codes[0]
                    state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                    if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
                        return_str = json.dumps({"code": 0, "msg": "可以取消"})
                    else:
                        return_str = json.dumps({"code": 1, "msg": "不可以取消"})
                elif type == 430:
                    # 查询代码属性
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    # 查询是否想买单/白名单/黑名单/暂不买
                    code_name = gpcode_manager.get_code_name(code)
                    want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
                    white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
                    black = l2_trade_util.is_in_forbidden_trade_codes(code)
                    pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
                    desc_list = []
                    if want:
                        desc_list.append("【想买单】")
                    if white:
                        desc_list.append("【白名单】")
                    if black:
                        desc_list.append("【黑名单】")
                    if pause_buy:
                        desc_list.append("【暂不买】")
                    return_str = json.dumps(
                        {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}})
                elif type == 501:
                    data = json.loads(_str)
                    is_open = data["data"]["open"]
                    if is_open:
                        trade_manager.TradeStateManager().open_buy()
                    else:
                        trade_manager.TradeStateManager().close_buy()
                    return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
                    self.__notify_trade("trade_state")
                elif type == 502:
                    can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                    return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
                elif type == 503:
                    # 设置交易目标代码的模式
                    data = json.loads(_str)
                    mode = data["data"]["mode"]
                    try:
                        TradeTargetCodeModeManager().set_mode(mode)
                        return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                    except Exception as e:
                        return_str = json.dumps({"code": 1, "msg": str(e)})
                    self.__notify_trade("trade_mode")
                elif type == 504:
                    # 获取交易目标代码模式
                    mode = TradeTargetCodeModeManager().get_mode_cache()
                    return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                elif type == 601:
                    pass
                    # 加自选
                elif type == 602:
                    pass
                    # 移除自选
                sk.send(return_str.encode())
@@ -431,7 +886,7 @@
def send_msg(client_id, data):
    _ip = client_manager.getActiveClientIP(client_id)
    print("ip", client_id, _ip)
    # print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -451,7 +906,7 @@
    while True:
        clients = authority.get_l2_clients()
        for client in clients:
            print("心跳", client)
            # print("心跳", client)
            try:
                send_msg(client, {"action": "test"})
            except:
@@ -481,11 +936,11 @@
# 同步目标标的到同花顺
def sync_target_codes_to_ths():
    codes = gpcode_manager.get_gp_list()
    codes = gpcode_manager.get_second_gp_list()
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("client-industry")
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
@@ -496,12 +951,61 @@
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
    else:
        # 测速成功
        client_infos = []
        for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            client_infos.append((client, index))
        l2_listen_pos_health_manager.init_all(client_infos)
if __name__ == "__main__":
    try:
        a = round(float("0002.90"), 2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
    # 交易成功无法读取时备用
    while True:
        try:
            datas = trade_juejin.get_execution_reports()
            # 上传数据
            fdatas = []
            for d in datas:
                fdatas.append(
                    {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
                     "type": d[1] - 1})
            print(fdatas)
            if fdatas:
                try:
                    trade_manager.process_trade_success_data(fdatas)
                except Exception as e:
                    logging.exception(e)
                trade_manager.save_trade_success_data(fdatas)
        except:
            pass
        time.sleep(1.5)
if __name__ == "__main__1":
    codes = gpcode_manager.get_first_gp_codes()
    for code in codes:
        try:
            global_data_loader.load_zyltgb()
            limit_up_price = float(gpcode_manager.get_limit_up_price(code))
            volumes_data = inited_data.get_volumns_by_code(code, 150)
            volumes_data = volumes_data[1:]
            volumes = inited_data.parse_max_volume(volumes_data[:60],
                                                   code_nature_analyse.is_new_top(limit_up_price,
                                                                                  volumes_data[:60]))
            logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
            code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
            # 判断K线形态
            k_format = code_nature_analyse.get_k_format(
                limit_up_price, volumes_data)
            print(k_format)
            code_nature_analyse.set_record_datas(code,
                                                 gpcode_manager.get_limit_up_price(code),
                                                 volumes_data)
        except:
            pass
        # code_nature_analyse.set_record_datas(code,
        #                                      limit_up_price,
        #                                      volumes_data)