Administrator
2023-03-15 68464c679ae5e1ae35e7e67e3b339ba0f939cbd3
server.py
@@ -13,6 +13,7 @@
import alert_util
import client_manager
import code_volumn_manager
import constant
import data_process
import global_data_loader
import global_util
@@ -28,13 +29,14 @@
import ths_industry_util
import ths_util
import tool
from third_data import hot_block_data_process
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -66,6 +68,7 @@
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    first_tick_datas = []
    latest_oringin_data = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -106,24 +109,16 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                # print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                            self.last_time[channel] = now_time
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(datas) > 0:
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
@@ -137,6 +132,16 @@
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
@@ -151,11 +156,11 @@
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
@@ -184,7 +189,10 @@
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                        if str(e).find("新增数据"):
                            pass
                        else:
                            logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
@@ -235,7 +243,6 @@
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        # {'code': '605300', 'limitUpPercent': '0009.99', 'price': '0020.14', 'time': '10:44:00', 'volume': '44529', 'volumeUnit': 2, 'zyltMoney': '0011.60', 'zyltMoneyUnit': 0}
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
@@ -250,24 +257,31 @@
                                else:
                                    temp_codes.append(code)
                                # data["price"]
                                tick_datas.append({"code": code, "price": data["price"], "volumn": data["volume"],
                                                   "volumnUnit": data["volumeUnit"]})
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        for code in new_add_codes:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(
                                    code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                                l2_trade_util.forbidden_trade(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            # 加入首板历史记录
                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
                            logger_first_code_record.info("新增首板:{}",new_add_codes)
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 获取60天最大记录
                            for code in new_add_codes:
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes = juejin.get_volumn(code)
                                    code_volumn_manager.set_histry_volumn(code,volumes[0],volumes[1])
                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        if temp_codes:
                            # 获取涨停价
@@ -284,7 +298,7 @@
                            if code in global_util.zyltgb_map:
                                continue
                            zyltgb_list.append(
                                {"code": code, "zyltgb": data["zyltMoney"], "zyltgb_unit": data["zyltMoneyUnit"]})
                                {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
                        if zyltgb_list:
                            ZYLTGBUtil.save_list(zyltgb_list)
                            global_data_loader.load_zyltgb()
@@ -307,6 +321,9 @@
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            if is_limit_up:
                                # 加入首板涨停
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
@@ -318,8 +335,6 @@
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                                    # 加入首板涨停
                                    gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
@@ -400,78 +415,96 @@
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    __start_time = time.time()
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    if buy_one_price is None:
                        print('买1价没有,', code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price is not None:
                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
                                                                        buy_queue)
                        if buy_queue_result_list:
                            # 有数据
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                # 获取执行位时间
                                exec_time = None
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    code)
                                if buy_exec_index:
                                    try:
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                            "time"]
                                    except:
                                        pass
                    try:
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
                        data = datas["data"]
                        buy_time = data["buyTime"]
                        buy_one_price = data["buyOnePrice"]
                        buy_one_volumn = data["buyOneVolumn"]
                        buy_queue = data["buyQueue"]
                        if buy_one_price is None:
                            print('买1价没有,', code)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                            buy_time,
                                                                            buy_queue)
                            if buy_queue_result_list:
                                raise  Exception("测试中断")
                                # 有数据
                                try:
                                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    exec_time = None
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                        code)
                                    if buy_exec_index:
                                        # 只有下单过后才获取交易进度
                                        try:
                                            exec_time = \
                                                l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                                    "time"]
                                        except:
                                            pass
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                                                                             buy_queue_result_list,
                                                                                             exec_time)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
                                                                                    code))
                                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                   buy_progress_index,
                                                                   json.dumps(buy_queue_result_list))
                                else:
                                    raise Exception("暂未获取到交易进度")
                            except Exception as e:
                                logging.exception(e)
                                print("买入队列", code, buy_queue_result_list)
                                logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                  json.dumps(buy_queue_result_list))
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                     buy_one_price_,
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                        buy_progress_index,
                                                                                        l2.l2_data_util.local_today_datas.get(
                                                                                            code),
                                                                                        l2.l2_data_util.local_today_num_operate_map.get(
                                                                                            code))
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                        else:
                                            raise Exception("暂未获取到交易进度")
                                except Exception as e:
                                    logging.exception(e)
                                    print("买入队列", code, buy_queue_result_list)
                                    logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                      json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                        # buy_queue是否有变化
                        if self.l2_trade_buy_queue_dict.get(
                                code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            self.buy1_price_manager.save(code, buy_one_price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            #if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                            self.l2_trade_buy_queue_dict[code] = buy_queue
                            logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                        # 保存最近的记录
                        if self.ths_l2_trade_queue_manager.save_recod(code, data):
                            if buy_time != "00:00:00":
                                logger_l2_trade_queue.info("{}-{}", code, data)
                                self.buy1_price_manager.save(code, buy_one_price)
                                need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                                   int(buy_one_volumn),
                                                                                                   buy_one_price)
                                # if need_cancel:
                                #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                                if need_sync:
                                    # 同步数据
                                    L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                        # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                        # print("L2买卖队列",datas)
                    except:
                        pass
                    finally:
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{}", code, space)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -484,14 +517,15 @@
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
                    print("二板现价")
                    # 获取暂存的二版现价数据
                    if datas and self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("现价数量", len(datas))
                        print("二板现价数量", len(datas))
                        for item in datas:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            volumn = item["volume"]
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accept_prices(datas)
                elif type == 50:
@@ -519,12 +553,11 @@
                            # 保存数据
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            #if need_cancel:
                            # if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
@@ -554,11 +587,11 @@
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:8]
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[8:]
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, 8):
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
@@ -572,12 +605,67 @@
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    datas = data_process.parse(_str)["data"]
                    if datas:
                        hot_block_data_process.save_datas(datas)
                    print(datas)
                elif type == 201:
                    # 加入黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code,results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 202:
                    # 加入白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 203:
                    # 移除黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.remove_from_forbidden_trade_codes(code)
                    return_str = json.dumps({"code": 0})
                elif type == 204:
                    # 移除白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.remove_code(code)
                    return_str = json.dumps({"code": 0})
                elif type == 301:
                    # 黑名单列表
                    codes = l2_trade_util.BlackListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                sk.send(return_str.encode())
@@ -644,7 +732,7 @@
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("client-industry")
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
@@ -658,9 +746,8 @@
if __name__ == "__main__":
    try:
        a = round(float("0002.90"), 2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
    listen_codes = gpcode_manager.get_listen_codes()
    for lc in listen_codes:
        if not gpcode_manager.is_in_gp_pool(lc):
            # 移除代码
            l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")