Administrator
2023-03-23 96dc1a4cc38b588f39387b5a85b9677100e357f1
server.py
@@ -12,6 +12,7 @@
import alert_util
import client_manager
import code_nature_analyse
import code_volumn_manager
import constant
import data_process
@@ -21,7 +22,7 @@
import gpcode_manager
import authority
import juejin
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
import l2.l2_data_util
@@ -30,14 +31,15 @@
import ths_util
import tool
from third_data import hot_block_data_process
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
from ths import l2_listen_pos_health_manager
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
class MyTCPServer(socketserver.TCPServer):
@@ -61,7 +63,6 @@
    ths_l2_trade_queue_manager = thsl2tradequeuemanager()
    latest_buy1_volumn_dict = {}
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
@@ -69,6 +70,7 @@
    last_time = {}
    first_tick_datas = []
    latest_oringin_data = {}
    last_l2_listen_health_time = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -111,6 +113,15 @@
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        last_health_time = self.last_l2_listen_health_time.get((client, channel))
                        # --------------------------------设置L2健康状态--------------------------------
                        if last_health_time is None or __start_time - last_health_time > 1000:
                            self.last_l2_listen_health_time[(client, channel)] = __start_time
                            # 更新监听位健康状态
                            if origin_datas_count == 0:
                                l2_listen_pos_health_manager.set_unhealthy(client, channel)
                            else:
                                l2_listen_pos_health_manager.set_healthy(client, channel)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if True:
@@ -136,7 +147,7 @@
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
@@ -239,7 +250,7 @@
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 22:
                    try:
                        if int(tool.get_now_time_str().replace(":", "")) < int("092600"):
                        if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
@@ -251,46 +262,9 @@
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                                else:
                                    temp_codes.append(code)
                                # data["price"]
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        for code in new_add_codes:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(
                                    code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                                l2_trade_util.forbidden_trade(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            # 加入首板历史记录
                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 获取60天最大记录
                            for code in new_add_codes:
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes = juejin.get_volumn(code)
                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        if temp_codes:
                            # 获取涨停价
                            juejin.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                        # 保存自由流通股本
                        zyltgb_list = []
                        for data in dataList:
@@ -303,7 +277,64 @@
                            ZYLTGBUtil.save_list(zyltgb_list)
                            global_data_loader.load_zyltgb()
                            # 保存现价
                        bad_codes = set()
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            # 加入首板历史记录
                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 获取60天最大记录
                            for code in codes:
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes_data = juejin.get_volumns_by_code(code, 150)
                                    volumes = juejin.parse_max_volume(volumes_data[:60])
                                    logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
                                    # 判断K线形态
                                    is_has_k_format, msg = code_nature_analyse.is_has_k_format(
                                        gpcode_manager.get_limit_up_price(code), volumes_data)
                                    if not is_has_k_format:
                                        logger_first_code_record.info("{}首板K线形态不好,{}", code, msg)
                                        # 股性不好,就不要加入
                                        bad_codes.add(code)
                                        # 加入禁止交易代码
                                        l2_trade_util.forbidden_trade(code)
                                        break
                                    else:
                                        code_nature_analyse.set_record_datas(code,
                                                                             gpcode_manager.get_limit_up_price(code),
                                                                             volumes_data)
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        # 保存现价
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                                else:
                                    temp_codes.append(code)
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 获取涨停价
                        if temp_codes:
                            # 获取涨停价
                            juejin.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                        # 保存现价
                        self.first_tick_datas.clear()
                        self.first_tick_datas.extend(tick_datas)
@@ -419,10 +450,13 @@
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    msg = ""
                    try:
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
                        data = datas["data"]
                        buy_time = data["buyTime"]
                        buy_one_price = data["buyOnePrice"]
@@ -431,29 +465,38 @@
                        if buy_one_price is None:
                            print('买1价没有,', code)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
                            buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                            buy_time,
                                                                            buy_queue)
                            msg += "买队列保存:" + f"{time.time() - _start_time} "
                            _start_time = time.time()
                            if buy_queue_result_list:
                                raise  Exception("测试中断")
                                # 有数据
                                try:
                                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    exec_time = None
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                        code)
                                    if buy_exec_index:
                                    if True:
                                        # 只有下单过后才获取交易进度
                                        exec_time = None
                                        try:
                                            exec_time = \
                                                l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                                    "time"]
                                            if buy_exec_index:
                                                exec_time = \
                                                    l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                                        "time"]
                                        except:
                                            pass
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                     buy_one_price_,
                                                                                                     buy_queue_result_list,
@@ -468,8 +511,17 @@
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                            # 计算大单成交额
                                            deal_big_money_manager.set_trade_progress(code, buy_progress_index,
                                                                                      l2.l2_data_util.local_today_datas.get(
                                                                                          code),
                                                                                      l2.l2_data_util.local_today_num_operate_map.get(
                                                                                          code))
                                        else:
                                            raise Exception("暂未获取到交易进度")
                                        msg += "计算成交进度:" + f"{time.time() - _start_time} "
                                        _start_time = time.time()
                                except Exception as e:
                                    logging.exception(e)
                                    print("买入队列", code, buy_queue_result_list)
@@ -482,28 +534,31 @@
                            code):
                            self.l2_trade_buy_queue_dict[code] = buy_queue
                            logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                            msg += "保存记录日志:" + f"{time.time() - _start_time} "
                            _start_time = time.time()
                        # 保存最近的记录
                        if self.ths_l2_trade_queue_manager.save_recod(code, data):
                            if buy_time != "00:00:00":
                                logger_l2_trade_queue.info("{}-{}", code, data)
                                self.buy1_price_manager.save(code, buy_one_price)
                                need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                                   int(buy_one_volumn),
                                                                                                   buy_one_price)
                                # if need_cancel:
                                #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                                if need_sync:
                                    # 同步数据
                                    L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                                # if need_sync:
                                #     # 同步数据
                                #     s =  time.time()
                                #     L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                                #     msg += "量校验:"+f"{time.time()-s} "
                        # print(buy_time, buy_one_price, buy_one_volumn)
                        # print("L2买卖队列",datas)
                        msg += "买1处理:" + f"{time.time() - _start_time} "
                        _start_time = time.time()
                    except:
                        pass
                    finally:
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{}", code, space)
                            logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
                elif type == 20:
                    # 登录
@@ -517,9 +572,11 @@
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
                    if datas is None:
                        datas = []
                    print("二板现价")
                    # 获取暂存的二版现价数据
                    if datas and self.first_tick_datas:
                    if self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("二板现价数量", len(datas))
@@ -546,8 +603,6 @@
                                # 记录数据
                                logger_buy_1_volumn_record.info("{}-{}", code, data)
                            self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
                            # 保存买1价格
                            self.buy1_price_manager.save(code, price)
                            # 校正时间
                            time_ = tool.compute_buy1_real_time(time_)
                            # 保存数据
@@ -607,10 +662,51 @@
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    datas = data_process.parse(_str)["data"]
                    data_json = data_process.parse(_str)
                    day = data_json["day"]
                    datas = data_json["data"]
                    if datas:
                        hot_block_data_process.save_datas(datas)
                        hot_block_data_process.save_datas(day, datas)
                    print(datas)
                elif type == 71:
                    # 根据代码获取选股宝热门概念
                    day = tool.get_now_date_str()
                    code = data_process.parse(_str)["data"]["code"]
                    todays = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, day)
                    today_datas = []
                    if todays:
                        for data in todays:
                            block = data[2]
                            block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, day)
                            block_datas = list(block_datas)
                            # 根据涨停时间排序
                            block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00'))
                            for i in range(len(block_datas)):
                                if block_datas[i][3] == code:
                                    today_datas.append(
                                        {"block_name": block, "block_size": len(block_datas), "index": i,
                                         "price": block_datas[i][5], "rate": block_datas[i][6]})
                                    break
                    # 获取前一个交易日
                    last_day = juejin.JueJinManager.get_previous_trading_date(day)
                    lasts = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, last_day)
                    last_datas = []
                    if todays:
                        for data in lasts:
                            block = data[2]
                            block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, last_day)
                            block_datas = list(block_datas)
                            # 根据涨停时间排序
                            block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00'))
                            for i in range(len(block_datas)):
                                if block_datas[i][3] == code:
                                    last_datas.append(
                                        {"block_name": block, "block_size": len(block_datas), "index": i,
                                         "price": block_datas[i][5], "rate": block_datas[i][6]})
                                    break
                    final_data = {'code': code, 'today': today_datas, 'last_day': last_datas}
                    return_str = json.dumps({"code": 0, "data": final_data})
                    pass
                elif type == 201:
                    # 加入黑名单
                    data = json.loads(_str)
@@ -621,7 +717,7 @@
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code,results[code])
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 202:
@@ -661,6 +757,30 @@
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 401:
                    # 加入想要买
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif type == 402:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager.remove_code(code)
                    return_str = json.dumps({"code": 0})
                elif type == 403:
                    codes = gpcode_manager.WantBuyCodesManager.list_code()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
@@ -743,11 +863,27 @@
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
    else:
        # 测速成功
        client_infos = []
        for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            client_infos.append((client, index))
        l2_listen_pos_health_manager.init_all(client_infos)
if __name__ == "__main__":
    listen_codes = gpcode_manager.get_listen_codes()
    for lc in listen_codes:
        if not gpcode_manager.is_in_gp_pool(lc):
            # 移除代码
            l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
    codes = ["601698"]
    for code in codes:
        volumes_data = juejin.get_volumns_by_code(code, 150)
        volumes_data = volumes_data[1:]
        global_data_loader.load_zyltgb()
        limit_up_price = float(gpcode_manager.get_limit_up_price(code))
        # 判断股性
        # is_k_format, msg = code_nature_analyse.is_has_k_format(float(limit_up_price), volumes_data)
        # print(code, is_k_format, msg)
        code_nature_analyse.set_record_datas(code,
                                             limit_up_price,
                                             volumes_data)
        print(code_nature_analyse.get_k_format(float(limit_up_price), volumes_data))