Administrator
2022-12-18 86e0061f9cf211b98252a9e6b71d6c9801e4a16b
server.py
@@ -31,8 +31,9 @@
import l2_code_operate
from code_data_util import ZYLTGBUtil
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
class MyTCPServer(socketserver.TCPServer):
@@ -50,8 +51,12 @@
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    ths_l2_trade_queue_manager = thsl2tradequeuemanager()
    latest_buy1_volumn_dict = {}
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -90,8 +95,14 @@
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
@@ -149,23 +160,26 @@
                        logging.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
                    data_list, is_add = data_process.parseGPCode(_str)
                    ZYLTGBUtil.save_list(data_list)
                    code_list = []
                    for data in data_list:
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    if is_add:
                        gpcode_manager.add_gp_list(code_datas)
                    else:
                    gpcode_manager.set_gp_list(code_datas)
                    if not is_add:
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    dataList = data_process.parseGPCode(_str)
                    dataList, is_add = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    gpcode_manager.set_limit_up_list(dataList)
                    # 保存到内存中
@@ -184,8 +198,8 @@
                            continue
                        # 获取是否有涨停时间
                        if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                            limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 3:
                    # 交易成功信息
@@ -223,6 +237,32 @@
                    money = datas["money"]
                    # TODO存入缓存文件
                    trade_manager.set_available_money(client, money)
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn),
                                                                                           buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -287,6 +327,12 @@
                        l2_clients = authority.get_l2_clients()
                        if client_id in l2_clients:
                            alert_util.alarm()
                elif type == 60:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
@@ -368,6 +414,7 @@
if __name__ == "__main__":
    try:
        repair_ths_main_site(2)
        thsl2tradequeuemanager().test()
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))