Administrator
2023-08-08 c20c3c10635ce78db4a86ce9c0bb1d02e90f525d
server.py
@@ -10,19 +10,19 @@
import threading
import time
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \
    gpcode_first_screen_manager
    gpcode_first_screen_manager, first_target_code_data_processor
import constant
from user import authority
import inited_data
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, LCancelBigNumComputer
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
import l2.l2_data_util
from output import code_info_output
from third_data import  block_info, kpl_api
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
@@ -43,8 +43,8 @@
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_trade=None, pipe_ui=None):
        self.pipe_trade = pipe_trade  # 增加的参数
        self.pipe_ui = pipe_ui
        # 初始化数据
        block_info.init()
@@ -81,8 +81,12 @@
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
        # print("----setup方法被执行-----")
        # print("打印传入的参数:", self.server.pipe)
        # print("打印传入的参数:", self.server.pipe_trade)
        self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
    def __notify_trade(self, type_):
        if self.server.pipe_trade:
            self.server.pipe_trade.send(json.dumps({"type": type_}))
    def handle(self):
        host = self.client_address[0]
@@ -108,7 +112,6 @@
                try:
                    # 如果带有头
                    if _str.startswith("##"):
                        total_length = int(_str[2:10])
                        _str = _str[10:]
                        # 防止socket数据发生粘连
@@ -270,203 +273,21 @@
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 22:
                    print("---接受到首板代码")
                    try:
                        if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
                        tick_datas = []
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                        # ---查询想买单,如果没有在列表中就需要强行加入列表
                        want_codes = gpcode_manager.WantBuyCodesManager.list_code()
                        if want_codes:
                            # 没有在现价采集中的想买代码
                            diff_codes = set(want_codes) - set(codes)
                            if diff_codes:
                                zyltgb_list = []
                                for code in diff_codes:
                                    # 查询是否在L2现价中
                                    if code in self.__l2_current_price_data:
                                        item = self.__l2_current_price_data.get(code)
                                        codes.append(code)
                                        dataList.append(item)
                                        # 保存自由流通股本
                                        zyltgb_list.append(
                                            {"code": code, "zyltgb": item["zyltgb"], "zyltgb_unit": item["zyltgbUnit"]})
                                    else:
                                        # 获取涨停价
                                        _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if not _limit_up_price:
                                            inited_data.re_set_price_pres([code], True)
                                            # 再次获取涨停价
                                            _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if _limit_up_price:
                                            # 成功获取到了涨停价,构造虚拟的现价信息
                                            codes.append(code)
                                            dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                                             "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                                             "zyltgbUnit": 0})
                                # 强制更新自由流通股本
                                if zyltgb_list:
                                    ZYLTGBUtil.save_list(zyltgb_list)
                                    # 将保存的数据更新到内存中
                                    for z in zyltgb_list:
                                        val = ZYLTGBUtil.get(z["code"])
                                        if val:
                                            global_util.zyltgb_map[z["code"]] = val
                        # ---保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        # 保存自由流通股本
                        if dataList:
                            zyltgb_list = []
                            for data in dataList:
                                code = data["code"]
                                if code in global_util.zyltgb_map:
                                    continue
                                zyltgb_list.append(
                                    {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
                            if zyltgb_list:
                                ZYLTGBUtil.save_list(zyltgb_list)
                                global_data_loader.load_zyltgb()
                        bad_codes = set()
                        # 获取昨日收盘价
                        for code in codes:
                            # 如果涨停价是空值就需要设置昨日收盘价格
                            if gpcode_manager.get_limit_up_price(code) is None:
                                inited_data.re_set_price_pres([code], True)
                        # 板块关键字准备
                        for code in codes:
                            if not self.__CodesPlateKeysManager.get_history_limit_up_reason(code) is None:
                                self.__CodesPlateKeysManager.set_history_limit_up_reason(code,
                                                                                         KPLLimitUpDataRecordManager.get_latest_blocks_set(
                                                                                             code))
                            if self.__CodesPlateKeysManager.get_blocks(code) is None:
                                try:
                                    results = kpl_api.getStockIDPlate(code)
                                    bs = [r[1] for r in results]
                                    self.__CodesPlateKeysManager.set_blocks(code, bs)
                                except Exception as e:
                                    logging.exception(e)
                                    pass
                        # 获取60天最大记录
                        for code in codes:
                            need_get_volumn = False
                            if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                need_get_volumn = True
                            if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature(
                                    code) is None:
                                need_get_volumn = True
                            if need_get_volumn:
                                volumes_data = inited_data.get_volumns_by_code(code, 150)
                                volumes = inited_data.parse_max_volume(volumes_data[:90],
                                                                       code_nature_analyse.is_new_top(
                                                                           gpcode_manager.get_limit_up_price(code),
                                                                           volumes_data[:90]))
                                logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                                code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
                                # 判断K线形态
                                is_has_k_format, msg = code_nature_analyse.is_has_k_format(
                                    gpcode_manager.get_limit_up_price(code), volumes_data)
                                if not is_has_k_format:
                                    logger_first_code_record.info("{}首板K线形态不好,{}", code, msg)
                                    # 股性不好,就不要加入
                                    bad_codes.add(code)
                                    # 加入禁止交易代码
                                    l2_trade_util.forbidden_trade(code)
                                code_nature_analyse.set_record_datas(code,
                                                                     gpcode_manager.get_limit_up_price(code),
                                                                     volumes_data)
                        gpcode_manager.FirstCodeManager.add_record(codes)
                        # 初始化板块信息
                        for code in codes:
                            block_info.init_code(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes,
                                                                                                              fields="symbol,sec_name,sec_type,sec_level"))
                            # 加入首板历史记录
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        # 保存现价
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                                else:
                                    temp_codes.append(code)
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 获取涨停价
                        if temp_codes:
                            # 获取涨停价
                            inited_data.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                        tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList)
                        # 保存现价
                        self.first_tick_datas.clear()
                        self.first_tick_datas.extend(tick_datas)
                        # 首板数据加工
                        prices = []
                        for data in dataList:
                            code = data["code"]
                            price = data["price"]
                            limit_up_time = data["time"]
                            if limit_up_time == "00:00:00":
                                limit_up_time = None
                            if code not in limit_up_price_dict:
                                continue
                            is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            if is_limit_up:
                                # 加入首板涨停
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            if pricePre is None:
                                inited_data.re_set_price_pres([code])
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
                                {"code": code, "time": limit_up_time, "rate": rate,
                                 "limit_up": is_limit_up})
                            if code in new_add_codes:
                                if is_limit_up:
                                    place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
                        logging.exception(e)
                    finally:
                        print("首板代码处理完毕:")
                        return_str = socket_util.load_header(json.dumps({"code": 0}).encode("utf-8")).decode("utf-8")
                elif type == 3:
                    # 交易成功信息
@@ -493,7 +314,7 @@
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.get_trade_state(code)
                                    trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
@@ -540,7 +361,7 @@
                    client = datas["client"]
                    money = datas["money"]
                    # TODO存入缓存文件
                    trade_manager.set_available_money(client, money)
                    trade_manager.AccountAvailableMoneyManager().set_available_money(client, money)
                # l2交易队列
                elif type == 10:
                    # 可用金额
@@ -568,8 +389,8 @@
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price,
                                                                        sell_one_price, sell_one_volumn)
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price,
                                                                          sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
@@ -587,7 +408,7 @@
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data(
                                        code)
                                    if True:
                                        # 只有下单过后才获取交易进度
@@ -604,15 +425,16 @@
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                        buy_progress_index,
                                                                                        l2.l2_data_util.local_today_datas.get(
                                                                                            code),
                                                                                        l2.l2_data_util.local_today_num_operate_map.get(
                                                                                            code))
                                            LCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                     l2.l2_data_util.local_today_datas.get(
                                                                                         code))
                                            HourCancelBigNumComputer().set_trade_progress(code, buy_time,
                                                                                          buy_exec_index,
                                                                                          buy_progress_index,
                                                                                          l2.l2_data_util.local_today_datas.get(
                                                                                              code),
                                                                                          l2.l2_data_util.local_today_num_operate_map.get(
                                                                                              code))
                                            LCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                                       l2.l2_data_util.local_today_datas.get(
                                                                                           code))
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
@@ -724,9 +546,9 @@
                                                                                               price)
                            # if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                            # if need_sync:
                            #     # 同步数据
                            #     L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
@@ -813,7 +635,7 @@
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    if code:
                        state = trade_manager.get_trade_state(code)
                        state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
@@ -847,6 +669,7 @@
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("black_list")
                elif type == 202:
                    # 加入白名单
                    data = json.loads(_str)
@@ -854,9 +677,9 @@
                    try:
                        for code in codes:
                            # 自由流通市值>50亿,股价高于30块的不能加白名单
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if float(limit_up_price) > 30:
                                raise Exception("股价高于30元")
                            # limit_up_price = gpcode_manager.get_limit_up_price(code)
                            # if float(limit_up_price) > 30:
                            #     raise Exception("股价高于30元")
                            # zyltgb = global_util.zyltgb_map.get(code)
                            # if zyltgb is None:
                            #     global_data_loader.load_zyltgb()
@@ -864,7 +687,7 @@
                            # if zyltgb > 50 * 100000000:
                            #     raise Exception("自由流通股本大于50亿")
                            l2_trade_util.WhiteListCodeManager.add_code(code)
                            l2_trade_util.WhiteListCodeManager().add_code(code)
                            name = gpcode_manager.get_code_name(code)
                            if not name:
                                results = HistoryKDatasUtils.get_gp_codes_names([code])
@@ -873,6 +696,7 @@
                        return_str = json.dumps({"code": 0})
                    except Exception as e:
                        return_str = json.dumps({"code": 1, "msg": str(e)})
                    self.__notify_trade("white_list")
                elif type == 203:
                    # 移除黑名单
@@ -881,16 +705,18 @@
                    for code in codes:
                        l2_trade_util.remove_from_forbidden_trade_codes(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("black_list")
                elif type == 204:
                    # 移除白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.remove_code(code)
                        l2_trade_util.WhiteListCodeManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("white_list")
                elif type == 301:
                    # 黑名单列表
                    codes = l2_trade_util.BlackListCodeManager.list_codes()
                    codes = l2_trade_util.BlackListCodeManager().list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
@@ -898,18 +724,19 @@
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager.list_codes()
                    codes = l2_trade_util.WhiteListCodeManager().list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 401:
                    # 加入想要买
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager.add_code(code)
                        gpcode_manager.WantBuyCodesManager().add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
@@ -920,12 +747,14 @@
                            self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("want_list")
                elif type == 402:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.WantBuyCodesManager.remove_code(code)
                        gpcode_manager.WantBuyCodesManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("want_list")
                elif type == 403:
                    plate = None
                    include_codes = set()
@@ -938,7 +767,7 @@
                                if code_map[k] == plate:
                                    include_codes.add(k)
                    codes = gpcode_manager.WantBuyCodesManager.list_code()
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        if plate and plate != '其他' and code not in include_codes:
@@ -951,25 +780,27 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager.add_code(code)
                        gpcode_manager.PauseBuyCodesManager().add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("pause_buy_list")
                    # 加入暂停买入列表
                elif type == 412:
                    # 移除暂停买入列表
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager.remove_code(code)
                        gpcode_manager.PauseBuyCodesManager().remove_code(code)
                    return_str = json.dumps({"code": 0})
                    self.__notify_trade("pause_buy_list")
                elif type == 413:
                    # 暂停买入列表
                    codes = gpcode_manager.PauseBuyCodesManager.list_code()
                    codes = gpcode_manager.PauseBuyCodesManager().list_code()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
@@ -981,7 +812,7 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    code = codes[0]
                    state = trade_manager.get_trade_state(code)
                    state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                    if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
                        return_str = json.dumps({"code": 0, "msg": "可以取消"})
                    else:
@@ -993,10 +824,10 @@
                    code = data["data"]["code"]
                    # 查询是否想买单/白名单/黑名单/暂不买
                    code_name = gpcode_manager.get_code_name(code)
                    want = gpcode_manager.WantBuyCodesManager.is_in(code)
                    white = l2_trade_util.WhiteListCodeManager.is_in(code)
                    want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
                    white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
                    black = l2_trade_util.is_in_forbidden_trade_codes(code)
                    pause_buy = gpcode_manager.PauseBuyCodesManager.is_in(code)
                    pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
                    desc_list = []
                    if want:
@@ -1015,25 +846,27 @@
                    data = json.loads(_str)
                    is_open = data["data"]["open"]
                    if is_open:
                        trade_manager.TradeStateManager.open_buy()
                        trade_manager.TradeStateManager().open_buy()
                    else:
                        trade_manager.TradeStateManager.close_buy()
                        trade_manager.TradeStateManager().close_buy()
                    return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
                    self.__notify_trade("trade_state")
                elif type == 502:
                    can_buy = trade_manager.TradeStateManager.is_can_buy()
                    can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                    return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
                elif type == 503:
                    # 设置交易目标代码的模式
                    data = json.loads(_str)
                    mode = data["data"]["mode"]
                    try:
                        TradeTargetCodeModeManager.set_mode(mode)
                        TradeTargetCodeModeManager().set_mode(mode)
                        return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                    except Exception as e:
                        return_str = json.dumps({"code": 1, "msg": str(e)})
                    self.__notify_trade("trade_mode")
                elif type == 504:
                    # 获取交易目标代码模式
                    mode = TradeTargetCodeModeManager.get_mode()
                    mode = TradeTargetCodeModeManager().get_mode_cache()
                    return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                elif type == 601:
                    pass