Administrator
2023-02-05 1252c9489b631905fbce608109260760537b224f
server.py
@@ -1,13 +1,16 @@
"""
接受客户端数据的服务器
"""
import datetime
import decimal
import json
import logging
import random
import socketserver
import socket
import threading
import time
import cv2
import alert_util
import client_manager
@@ -22,7 +25,9 @@
import l2_data_manager
import l2_data_manager_new
import l2_data_util
import limit_up_time_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from ocr import ocr_util
import ths_industry_util
import ths_util
import tool
@@ -31,9 +36,10 @@
import trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -58,6 +64,8 @@
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -78,7 +86,7 @@
        # print("- " * 30)
        sk: socket.socket = self.request
        while True:
            data = sk.recv(1024000)
            data = sk.recv(1024 * 1024 * 20)
            if len(data) == 0:
                # print("客户端断开连接")
                break
@@ -92,6 +100,7 @@
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
@@ -105,12 +114,12 @@
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -119,16 +128,19 @@
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                     do_id)
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
@@ -155,7 +167,7 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
@@ -238,7 +250,6 @@
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                                    l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time)
                    except Exception as e:
                        logging.exception(e)
@@ -271,6 +282,29 @@
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code,buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code,buy_progress_index)
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.loads(buy_queue_result_list))
                        except Exception as e:
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.loads(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
@@ -352,14 +386,14 @@
                        if client_id in l2_clients:
                            alert_util.alarm()
                elif type == 60:
                    # 心跳信息
                    # L2自启动成功
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    now_str = tool.get_now_time_str()
                    ts = tool.get_time_as_second(now_str)
                    # 9点25到9点28之间的自启动就需要批量设置代码
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"):
                    # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
                        # 准备批量设置代码
                        return_json = {"code": 1, "msg": "等待批量设置代码"}
                        return_str = json.dumps(return_json)
@@ -382,17 +416,10 @@
                                break
                            else:
                                time.sleep(3)
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                    # print("心跳:", client_id)
                elif type == 100:
                    # 图像识别
                    return_str = data_process.toJson({"code": 0, "data": {"datas": []}})
                    pass
                sk.send(return_str.encode())
        # print("----------handler end ----------")
@@ -473,7 +500,8 @@
if __name__ == "__main__":
    try:
        thsl2tradequeuemanager().test()
        a=round(float("0002.90"),2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))