Administrator
2023-03-15 68464c679ae5e1ae35e7e67e3b339ba0f939cbd3
server.py
@@ -13,6 +13,7 @@
import alert_util
import client_manager
import code_volumn_manager
import constant
import data_process
import global_data_loader
import global_util
@@ -28,13 +29,14 @@
import ths_industry_util
import ths_util
import tool
from third_data import hot_block_data_process
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -66,6 +68,7 @@
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    first_tick_datas = []
    latest_oringin_data = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -106,24 +109,16 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                # print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                            self.last_time[channel] = now_time
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(datas) > 0:
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
@@ -137,6 +132,16 @@
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
@@ -151,11 +156,11 @@
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
@@ -184,6 +189,9 @@
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        if str(e).find("新增数据"):
                            pass
                        else:
                        logger_l2_error.exception(e)
                elif type == 1:
@@ -235,7 +243,6 @@
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        # {'code': '605300', 'limitUpPercent': '0009.99', 'price': '0020.14', 'time': '10:44:00', 'volume': '44529', 'volumeUnit': 2, 'zyltMoney': '0011.60', 'zyltMoneyUnit': 0}
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
@@ -250,12 +257,13 @@
                                else:
                                    temp_codes.append(code)
                                # data["price"]
                                tick_datas.append({"code": code, "price": data["price"], "volumn": data["volume"],
                                                   "volumnUnit": data["volumeUnit"]})
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        for code in new_add_codes:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(
                                    code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                                l2_trade_util.forbidden_trade(code)
                        if new_add_codes:
@@ -268,6 +276,12 @@
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes = juejin.get_volumn(code)
                                    code_volumn_manager.set_histry_volumn(code,volumes[0],volumes[1])
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        if temp_codes:
                            # 获取涨停价
@@ -284,7 +298,7 @@
                            if code in global_util.zyltgb_map:
                                continue
                            zyltgb_list.append(
                                {"code": code, "zyltgb": data["zyltMoney"], "zyltgb_unit": data["zyltMoneyUnit"]})
                                {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
                        if zyltgb_list:
                            ZYLTGBUtil.save_list(zyltgb_list)
                            global_data_loader.load_zyltgb()
@@ -307,6 +321,9 @@
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            if is_limit_up:
                                # 加入首板涨停
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
@@ -318,8 +335,6 @@
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                                    # 加入首板涨停
                                    gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
@@ -400,9 +415,14 @@
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    __start_time = time.time()
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    try:
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
@@ -412,9 +432,11 @@
                        print('买1价没有,', code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price is not None:
                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
                            buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                            buy_time,
                                                                        buy_queue)
                        if buy_queue_result_list:
                                raise  Exception("测试中断")
                            # 有数据
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
@@ -424,13 +446,16 @@
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    code)
                                if buy_exec_index:
                                        # 只有下单过后才获取交易进度
                                    try:
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                            exec_time = \
                                                l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                            "time"]
                                    except:
                                        pass
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                     buy_one_price_,
                                                                                             buy_queue_result_list,
                                                                                             exec_time)
                                if buy_progress_index is not None:
@@ -452,7 +477,8 @@
                                                                  json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                        if self.l2_trade_buy_queue_dict.get(
                                code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
@@ -472,6 +498,13 @@
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                    except:
                        pass
                    finally:
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{}", code, space)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -484,14 +517,15 @@
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
                    print("二板现价")
                    # 获取暂存的二版现价数据
                    if datas and self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("现价数量", len(datas))
                        print("二板现价数量", len(datas))
                        for item in datas:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            volumn = item["volume"]
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accept_prices(datas)
                elif type == 50:
@@ -524,7 +558,6 @@
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
@@ -554,11 +587,11 @@
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:8]
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[8:]
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, 8):
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
@@ -572,12 +605,67 @@
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    datas = data_process.parse(_str)["data"]
                    if datas:
                        hot_block_data_process.save_datas(datas)
                    print(datas)
                elif type == 201:
                    # 加入黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code,results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 202:
                    # 加入白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 203:
                    # 移除黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.remove_from_forbidden_trade_codes(code)
                    return_str = json.dumps({"code": 0})
                elif type == 204:
                    # 移除白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.remove_code(code)
                    return_str = json.dumps({"code": 0})
                elif type == 301:
                    # 黑名单列表
                    codes = l2_trade_util.BlackListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                sk.send(return_str.encode())
@@ -644,7 +732,7 @@
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("client-industry")
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
@@ -658,9 +746,8 @@
if __name__ == "__main__":
    try:
        a = round(float("0002.90"), 2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
    listen_codes = gpcode_manager.get_listen_codes()
    for lc in listen_codes:
        if not gpcode_manager.is_in_gp_pool(lc):
            # 移除代码
            l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")