Administrator
2022-10-27 6e71fbcb119e7068ba35380edaa5cc66e7c71f1b
server.py
@@ -10,8 +10,10 @@
import time
import alert_util
import client_manager
import code_volumn_manager
import data_process
import global_data_loader
import global_util
import gpcode_manager
import authority
@@ -19,15 +21,15 @@
import l2_data_log
import l2_data_manager
import l2_data_manager_new
import log
import l2_data_util
import ths_industry_util
import ths_util
import tool
import trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
from trade_data_manager import TradeCancelDataManager
from log import logger_l2_error, logger_device, logger_trade_delegate
from trade_queue_manager import THSBuy1VolumnManager
@@ -84,7 +86,7 @@
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data(
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
@@ -129,6 +131,8 @@
                                                                   "l2数据正确性判断时间")
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -148,6 +152,7 @@
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
@@ -155,23 +160,17 @@
                                                        True)
                    except Exception as e:
                        logging.exception(e)
                elif type == 10:
                    # level2交易队列
                    try:
                        code, setData = data_process.parseL2TradeQueueData(_str)
                        if gpcode_manager.is_listen(code):
                            data_process.saveL2Data(day, code, setData)
                    except:
                        print("异常")
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
                    data_process.saveZYLTSZ(data_list)
                    ZYLTGBUtil.save_list(data_list)
                    code_list = []
                    for data in data_list:
                        code_list.append(data["code"])
                    gpcode_manager.set_gp_list(code_list)
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    gpcode_manager.set_gp_list(code_datas)
                    # 重新订阅
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
@@ -183,6 +182,9 @@
                    dataList = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    gpcode_manager.set_limit_up_list(dataList)
                    # 保存到内存中
                    if dataList:
                        global_data_loader.add_limit_up_codes(dataList)
                    ths_industry_util.set_industry_hot_num(dataList)
                elif type == 3:
                    # 交易成功信息
@@ -242,14 +244,23 @@
                        index = data["index"]
                        code_name = data["codeName"]
                        volumn = data["volumn"]
                        price = data["price"]
                        time_ = data["time"]
                        code = global_util.name_codes.get(code_name)
                        if code is None:
                            global_util.load_name_codes()
                            global_data_loader.load_name_codes()
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 校正时间
                            seconds = tool.get_time_as_second(time_)
                            if seconds % 3 > 0:
                                seconds = seconds - seconds % 3
                            time_ = tool.time_seconds_format(seconds)
                            # 保存数据
                            self.buy1_volumn_manager.save(code, time_, volumn)
                            need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price)
                            if need_sync:
                                # 同步数据
                                l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
@@ -257,7 +268,7 @@
                    client_id = data["client"]
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host, thsDead)
                    client_manager.saveClientActive(int(client_id), host, thsDead)
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
@@ -275,7 +286,7 @@
def send_msg(client_id, data):
    _ip = data_process.getActiveClientIP(client_id)
    _ip = client_manager.getActiveClientIP(client_id)
    print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")