Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
servers/server.py
@@ -4,33 +4,26 @@
import decimal
import json
import logging
import random
import socketserver
import socket
import threading
import time
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from utils import data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor
import constant
from user import authority
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2 import l2_data_manager_new, l2_data_manager, code_price_manager
import l2.l2_data_util
from third_data import block_info
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager
from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager
from trade import trade_data_manager, trade_manager, l2_trade_util, \
    current_price_process_manager, trade_juejin, trade_constant
from code_attribute.code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug
from log_module.log import logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_trade_buy_queue, logger_debug
from trade.huaxin import huaxin_trade_record_manager
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
@@ -79,7 +72,6 @@
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
        # print("----setup方法被执行-----")
        # print("打印传入的参数:", self.server.pipe_trade)
        self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
    def __notify_trade(self, type_):
        if self.server.pipe_trade:
@@ -124,107 +116,7 @@
                    else:
                        print(_str)
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        last_health_time = self.last_l2_listen_health_time.get((client, channel))
                        # --------------------------------设置L2健康状态--------------------------------
                        if last_health_time is None or __start_time - last_health_time > 1000:
                            self.last_l2_listen_health_time[(client, channel)] = __start_time
                            # 更新监听位健康状态
                            if origin_datas_count == 0:
                                l2_listen_pos_health_manager.set_unhealthy(client, channel)
                            else:
                                l2_listen_pos_health_manager.set_healthy(client, channel)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                        key = "{}-{}-{}".format(client, channel, code)
                                        if key not in self.l2_data_error_dict or round(
                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                            # self.l2CodeOperate.repaire_l2_data(code)
                                            logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                            # 单价不一致时需要移除代码重新添加
                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
                                except Exception as e:
                                    print("异常", str(e), code)
                                    logging.exception(e)
                                    logger_l2_error.error("出错:{}".format(str(e)))
                                    logger_l2_error.error("内容:{}".format(_str))
                                finally:
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        if str(e).find("新增数据"):
                            pass
                        else:
                            logger_l2_error.exception(e)
                elif type == 2:
                if type == 2:
                    # 涨停代码
                    dataList, is_add = data_process.parseGPCode(_str)
                    # 设置涨停时间
@@ -366,7 +258,8 @@
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time, limit_up_price,
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time,
                                                                          limit_up_price,
                                                                          sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
@@ -384,14 +277,16 @@
                                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
                                    order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                                        code)
                                    if True:
                                        # 只有下单过后才获取交易进度
                                        exec_time = None
                                        try:
                                            if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
                                                exec_time = \
                                                    l2.l2_data_util.local_today_datas.get(code)[order_begin_pos.buy_exec_index]["val"]["time"]
                                                    l2.l2_data_util.local_today_datas.get(code)[
                                                        order_begin_pos.buy_exec_index]["val"]["time"]
                                        except:
                                            pass
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
@@ -399,7 +294,9 @@
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index,
                                            LCancelBigNumComputer().set_trade_progress(code,
                                                                                       order_begin_pos.buy_single_index,
                                                                                       buy_progress_index,
                                                                                       l2.l2_data_util.local_today_datas.get(
                                                                                           code))
@@ -447,16 +344,6 @@
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
                    try:
                        client_id, _authoritys = authority.login(data["account"], data["pwd"])
                        return_str = data_process.toJson(
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
@@ -471,7 +358,7 @@
                        for item in datas:
                            volumn = item["volume"]
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                            code_volumn_manager.CodeVolumeManager().save_today_volumn(item["code"], volumn, volumnUnit)
                        current_price_process_manager.accept_prices(datas)
                # L2现价更新
                elif type == 41:
@@ -509,56 +396,6 @@
                            # if need_sync:
                            #     # 同步数据
                            #     L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    client_manager.saveClientActive(int(client_id), host, thsDead)
                    if constant.is_windows():
                        # 动态导入
                        if ths_util.is_ths_dead(client_id):
                            # TODO 重启同花顺
                            # 报警
                            l2_clients = authority.get_l2_clients()
                            if client_id in l2_clients:
                                alert_util.alarm()
                elif type == 60:
                    # L2自启动成功
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    now_str = tool.get_now_time_str()
                    ts = tool.get_time_as_second(now_str)
                    # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
                        # 准备批量设置代码
                        return_json = {"code": 1, "msg": "等待批量设置代码"}
                        return_str = json.dumps(return_json)
                        # 获取排名前16位的代码
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
                        # 如果设置失败需要重试2次
                        for i in range(0, 3):
                            set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas)
                            if set_success:
                                break
                            else:
                                time.sleep(3)
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    data_json = data_process.parse(_str)
@@ -589,7 +426,8 @@
                        state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                        if state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销", cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销",
                                                                                    cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                                return_str = json.dumps({"code": 0})
                            except Exception as e:
                                return_str = json.dumps({"code": 2, "msg": str(e)})
@@ -612,7 +450,7 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code, msg="手动加入")
                        l2_trade_util.forbidden_trade(code, msg="手动加入", force=True)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
@@ -835,81 +673,6 @@
        # print("--------finish方法被执行---")
def send_msg(client_id, data):
    _ip = client_manager.getActiveClientIP(client_id)
    # print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketClient.connect((_ip, 9006))
    # 连接socket
    try:
        socketClient.send(json.dumps(data).encode())
        recv = socketClient.recv(1024)
        result = str(recv, encoding="gbk")
        return result
    finally:
        socketClient.close()
# 客户端心跳机制
def test_client_server():
    while True:
        clients = authority.get_l2_clients()
        for client in clients:
            # print("心跳", client)
            try:
                send_msg(client, {"action": "test"})
            except:
                pass
        # 矫正客户端代码
        l2_code_operate.correct_client_codes()
        time.sleep(5)
# 获取采集客户端的状态
def get_client_env_state(client):
    result = send_msg(client, {"action": "getEnvState"})
    result = json.loads(result)
    if result["code"] == 0:
        return json.loads(result["data"])
    else:
        raise Exception(result["msg"])
# 修复采集客户端
def repair_client_env(client):
    result = send_msg(client, {"action": "repairEnv"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
# 同步目标标的到同花顺
def sync_target_codes_to_ths():
    codes = gpcode_manager.get_second_gp_list()
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
# 修复同花顺主站
def repair_ths_main_site(client):
    result = send_msg(client, {"action": "updateTHSSite"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
    else:
        # 测速成功
        client_infos = []
        for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            client_infos.append((client, index))
        l2_listen_pos_health_manager.init_all(client_infos)
if __name__ == "__main__1":
    # 交易成功无法读取时备用
@@ -931,4 +694,4 @@
                trade_manager.save_trade_success_data(fdatas)
        except:
            pass
        time.sleep(1.5)
        time.sleep(1.5)