Administrator
2023-06-09 6b77c1709908133c040778f5f775432c4ce7efd7
优化L2卡位分配/增加想买单第一时间加入L2卡位
1 文件已重命名
12个文件已修改
1个文件已添加
603 ■■■■ 已修改文件
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_analyse.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/kp_client_msg_manager.py 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 207 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_code_operate.py 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py
@@ -207,4 +207,4 @@
if __name__ == "__main__":
    export_l2_excel("001322")
    export_l2_excel("603999")
gpcode_manager.py
@@ -173,6 +173,39 @@
    @classmethod
    def add_code(cls, code):
        cls.__get_redis().sadd(cls.__redis_key, code)
        cls.__get_redis().expire(cls.__redis_key, tool.get_expire())
    @classmethod
    def remove_code(cls, code):
        cls.__get_redis().srem(cls.__redis_key, code)
    @classmethod
    def is_in(cls, code):
        return cls.__get_redis().sismember(cls.__redis_key, code)
    @classmethod
    def list_code(cls):
        return cls.__get_redis().smembers(cls.__redis_key)
# 暂停下单代码管理
# 与黑名单的区别是暂停交易代码只是不交易,不能移除L2监控位
class PauseBuyCodesManager:
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "pause_buy_codes"
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    @classmethod
    def clear(cls):
        cls.__get_redis().delete(cls.__redis_key)
    @classmethod
    def add_code(cls, code):
        cls.__get_redis().sadd(cls.__redis_key, code)
        cls.__get_redis().expire(cls.__redis_key, tool.get_expire())
    @classmethod
    def remove_code(cls, code):
@@ -289,11 +322,21 @@
    return code
# 代码名字缓存
__code_name_dict = {}
# 获取代码的名称
def get_code_name(code):
    if code in __code_name_dict:
        return __code_name_dict.get(code)
    name = CodesNameManager.get_second_code_name(code)
    if name is not None:
        __code_name_dict[code] = name
        return name
    name = CodesNameManager.get_first_code_name(code)
    if name:
        __code_name_dict[code] = name
    return name
@@ -541,9 +584,11 @@
    if available_positions:
        # 获取健康状态
        available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions)
        available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True)
        # 尽量不分配第一个位置
        available_positions_new = sorted(available_positions, key=lambda x: (available_positions_health_states[x], 0 if x[1] ==0 else 1), reverse=True)
        # available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True)
        # 取第1个数据
        return available_positions[0][0], available_positions[0][1]
        return available_positions_new[0][0], available_positions_new[0][1]
    return None, None
@@ -636,4 +681,4 @@
if __name__ == '__main__':
    set_price_pre("601858", 27.67)
    get_can_listen_pos()
gui.py
@@ -9,7 +9,6 @@
import win32gui
import constant
import data_export_util
import multiprocessing
@@ -18,8 +17,7 @@
import server
import settings
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from trade import l2_trade_util, trade_juejin
from ths.l2_code_operate import L2CodeOperate
from trade.l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
from third_data import data_server, kpl_data_manager, kpl_util
juejin.py
@@ -88,6 +88,8 @@
        gpcode_manager.WantBuyCodesManager.clear()
        # 清空分数禁止代码
        trade_manager.ForbiddenBuyCodeByScoreManager.clear()
        # 清空暂停交易代码
        gpcode_manager.PauseBuyCodesManager.clear()
# 每日初始化
@@ -135,7 +137,6 @@
def init(context):
    # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
@@ -148,11 +149,8 @@
    t1.start()
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='08:30:00')
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='08:50:00')
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='09:28:00')
    gmapi.schedule(schedule_func=__get_current_info, date_rule='1d', time_rule='09:25:00')
    gmapi.schedule(schedule_func=__get_current_info, date_rule='1d', time_rule='09:29:00')
    re_subscribe_tick()
    # re_subscribe_bar()
@@ -246,6 +244,7 @@
        # print(tick["created_at"],tick["quotes"][0]["bid_v"])
        __prices_now[symbol] = price
def on_bar(context, bars):
    print("on_bar", bars)
@@ -293,13 +292,13 @@
        t1.start()
    @classmethod
    def get_gp_latest_info(cls, codes,fields=None):
    def get_gp_latest_info(cls, codes, fields=None):
        if not codes:
            return []
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
        gmapi.set_token(token)
        data = gmapi.get_instruments(symbols=",".join(symbols),fields=fields)
        data = gmapi.get_instruments(symbols=",".join(symbols), fields=fields)
        print(data)
        return data
@@ -466,7 +465,9 @@
                max_volume = item["volume"]
                max_volume_date = item["bob"]
    else:
        date = None
        target_volume = None
        for i in range(len(datas)):
            # 查询涨停
            item = datas[i]
@@ -477,6 +478,7 @@
            # 是否有涨停
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                # 涨停
                next_volume = 0
                if i > 0:
                    next_volume = datas[i - 1]["volume"]
@@ -484,9 +486,37 @@
                if volume < next_volume:
                    volume = next_volume
                    date = datas[i - 1]["bob"]
                return volume, volume, date.strftime("%Y-%m-%d")
    return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d")
                target_volume = (volume, date)
        if not target_volume:
            target_volume = (max_volume, max_volume_date)
        # --判断近60天无涨停的最大量
        max_60_volume_info = [0, None]
        # 60天内是否有涨停
        has_60_limit_up = False
        for i in range(60):
            if i >= len(datas):
                break
            item = datas[i]
            volume = item["volume"]
            if max_60_volume_info[0] < volume:
                max_60_volume_info = [volume, item["bob"]]
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                has_60_limit_up = True
                break
        if not has_60_limit_up and target_volume[0] > max_60_volume_info[0] * 3:
            # 60天内无涨停,且60天内最大量小于最大量的1/3,判断为地量,返回近60个交易日的最大量
            return max_60_volume_info[0], max_60_volume_info[0], max_60_volume_info[1].strftime("%Y-%m-%d")
        else:
            return target_volume[0], target_volume[0], target_volume[1].strftime("%Y-%m-%d")
if __name__ == '__main__':
    init_data()
    # init_data()
    code = "600278"
    volumes_data = get_volumns_by_code(code, 150)
    volumes = parse_max_volume(volumes_data[1:91])
    print(volumes)
l2/l2_data_manager_new.py
@@ -32,6 +32,8 @@
import dask
from trade.trade_manager import TradeTargetCodeModeManager
class L2DataManager:
    # 格式化数据
@@ -673,21 +675,24 @@
        if not trade_manager.TradeStateManager.is_can_buy():
            return False, True, f"今日已禁止交易"
        if gpcode_manager.PauseBuyCodesManager.is_in(code):
            return False, True, f"该代码被暂停交易"
        # 判断买1价格档位
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
        # buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code)
        # if buy1_price is None:
        #     return False, True, f"尚未获取到买1价"
        buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code)
        if buy1_price is None:
            return False, True, f"尚未获取到买1价"
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        # dif = float(limit_up_price) - float(buy1_price)
        # if zyltgb >= 100 * 100000000:
        #     # 大于2档
        #     if dif > 0.01001:
        #         return False, True, f"买1剩余档数大于1档,买一({buy1_price})涨停({limit_up_price})"
        dif = float(limit_up_price) - float(buy1_price)
        if zyltgb >= 200 * 100000000:
            # 大于10档
            if dif > 0.10001:
                return False, True, f"自由流通200亿以上,买1剩余档数大于10档,买一({buy1_price})涨停({limit_up_price})"
        # elif zyltgb >= 80 * 100000000:
        #     # 大于2档
        #     if dif > 0.02001:
@@ -733,6 +738,8 @@
                                      cls.__l2PlaceOrderParamsManagerDict[code].score_info)
        if not gpcode_manager.WantBuyCodesManager.is_in(code):
            if TradeTargetCodeModeManager.get_mode() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES:
                return False, True, f"只买想买单中的代码"
            score_index = cls.__l2PlaceOrderParamsManagerDict[code].score_index
            score = cls.__l2PlaceOrderParamsManagerDict[code].score
            score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info
log.py
@@ -158,6 +158,11 @@
                   filter=lambda record: record["extra"].get("name") == "kpl_block_can_buy",
                   rotation="00:00", compression="zip", enqueue=True)
        # 看盘日志
        logger.add(self.get_path("kp", "kp_msg"),
                   filter=lambda record: record["extra"].get("name") == "kp_msg",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -218,6 +223,9 @@
logger_kpl_debug = __mylogger.get_logger("kpl_debug")
logger_kpl_block_can_buy = __mylogger.get_logger("kpl_block_can_buy")
logger_kp_msg = __mylogger.get_logger("kp_msg")
class LogUtil:
@@ -428,6 +436,21 @@
                latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
    return latest_info
# 读取看盘消息
def get_kp_msg_list(date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"D:/logs/gp/kp/kp_msg.{date}.log"
    msg_list = []
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                msg_list.append(line)
    return msg_list
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
log_analyse.py
@@ -10,16 +10,17 @@
def get_cant_order_reasons_dict():
    file_path = "D:/logs/gp/l2/l2_trade.{}.log".format(tool.get_now_date_str())
    dict_ = {}
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            if line.find("不可以下单,原因:") > -1:
                code = line.split("code=")[1][:6]
                time_ = line.split("|")[0].split(" ")[1][:12]
                reason = line.split("不可以下单,原因:")[1].strip()
                dict_[code] = (time_, reason)
                # print(time_, code, reason)
    if os.path.exists(file_path):
        with open(file_path, encoding="utf-8") as f:
            line = f.readline()
            while line:
                if line.find("不可以下单,原因:") > -1:
                    code = line.split("code=")[1][:6]
                    time_ = line.split("|")[0].split(" ")[1][:12]
                    reason = line.split("不可以下单,原因:")[1].strip()
                    dict_[code] = (time_, reason)
                    # print(time_, code, reason)
                line = f.readline()
    return dict_
output/code_info_output.py
@@ -91,6 +91,8 @@
    # 获取白名单,黑名单
    if trade_manager.gpcode_manager.WantBuyCodesManager.is_in(code):
        code_extra_infos.append("想买单")
    if trade_manager.gpcode_manager.PauseBuyCodesManager.is_in(code):
        code_extra_infos.append("暂不买")
    params["code"] = code
    params["code_name"] = f"{gpcode_manager.get_code_name(code)} {code}  ({','.join(code_extra_infos)})"
output/kp_client_msg_manager.py
New file
@@ -0,0 +1,92 @@
"""
看盘端消息管理器
"""
import json
import queue
import threading
import time
import gpcode_manager
import log
from db.redis_manager import RedisManager
from log import logger_kp_msg
CLIENT_IDS = ["zjb", "hxh"]
__temp_msg_queue = queue.Queue()
class MsgQueueManager:
    __redisManager = RedisManager(3)
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 添加消息,2s内有效
    def add_msg(self, client_id, msg):
        self.__get_redis().lpush(f"kp_msg_queue-{client_id}", json.dumps((time.time() + 2, msg)))
    # 读取消息
    def read_msg(self, client_id):
        data = self.__get_redis().lpop(f"kp_msg_queue-{client_id}")
        if not data:
            return None
        data = json.loads(data)
        return data
# 运行采集器
__MsgQueueManager = MsgQueueManager()
# 添加消息
def add_msg(code, msg):
    # 根据代码获取名称
    name = gpcode_manager.get_code_name(code)
    msg = f"【{name}({code})】{msg}"
    __temp_msg_queue.put_nowait(msg)
    # 添加到日志
    logger_kp_msg.info(msg)
def read_msg(client_id):
    msg_data = __MsgQueueManager.read_msg(client_id)
    if not msg_data:
        return None
    expire_time = msg_data[0]
    msg = msg_data[1]
    if expire_time < time.time():
        # 过期
        return None
    return msg
# 读取本地消息列表
def list_msg_from_local():
    return log.get_kp_msg_list()
# 运行采集
def run_capture():
    def capture():
        while True:
            try:
                msg = __temp_msg_queue.get()
                if msg:
                    for c in CLIENT_IDS:
                        __MsgQueueManager.add_msg(c, msg)
            except:
                pass
            time.sleep(0.01)
    t1 = threading.Thread(target=lambda: capture())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
if __name__ == "__main__":
    for i in range(0, 10):
        add_msg("600839", "买入成功")
    run_capture()
    input()
server.py
@@ -22,7 +22,6 @@
import gpcode_manager
import authority
import juejin
import limit_up_time_manager
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
@@ -33,17 +32,17 @@
import tool
from output import code_info_output
from third_data import hot_block_data_process, block_info, kpl_api
from third_data.code_plate_key_manager import  CodesHisReasonAndBlocksManager
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
from ths import l2_listen_pos_health_manager
from ths import l2_listen_pos_health_manager, l2_code_operate
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
    first_code_score_manager, current_price_process_manager, trade_juejin
import l2_code_operate
    current_price_process_manager, trade_juejin
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
@@ -78,6 +77,8 @@
    last_l2_listen_health_time = {}
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    # 在L2监控上采集的现价
    __l2_current_price_data = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -274,7 +275,44 @@
                                code = data["code"]
                                codes.append(code)
                        # 保存未筛选的首板代码
                        # ---查询想买单,如果没有在列表中就需要强行加入列表
                        want_codes = gpcode_manager.WantBuyCodesManager.list_code()
                        if want_codes:
                            # 没有在现价采集中的想买代码
                            diff_codes = set(want_codes) - set(codes)
                            if diff_codes:
                                zyltgb_list = []
                                for code in diff_codes:
                                    # 查询是否在L2现价中
                                    if code in self.__l2_current_price_data:
                                        item = self.__l2_current_price_data.get(code)
                                        codes.append(code)
                                        dataList.append(item)
                                        # 保存自由流通股本
                                        zyltgb_list.append({"code": code, "zyltgb": item["zyltgb"],"zyltgb_unit": item["zyltgbUnit"]})
                                    else:
                                        # 获取涨停价
                                        _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if not _limit_up_price:
                                            juejin.re_set_price_pres([code],True)
                                            # 再次获取涨停价
                                            _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if _limit_up_price:
                                            # 成功获取到了涨停价,构造虚拟的现价信息
                                            codes.append(code)
                                            dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                                             "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                                             "zyltgbUnit": 0})
                                # 强制更新自由流通股本
                                if zyltgb_list:
                                    ZYLTGBUtil.save_list(zyltgb_list)
                                    # 将保存的数据更新到内存中
                                    for z in zyltgb_list:
                                        val = ZYLTGBUtil.get(z["code"])
                                        if val:
                                            global_util.zyltgb_map[z["code"]] = val
                        # ---保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        # 保存自由流通股本
                        if dataList:
@@ -340,7 +378,8 @@
                                                                     volumes_data)
                        gpcode_manager.FirstCodeManager.add_record(codes)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes,fields="symbol,sec_name,sec_type,sec_level"))
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes,
                                                                                                                fields="symbol,sec_name,sec_type,sec_level"))
                            # 加入首板历史记录
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
@@ -351,29 +390,6 @@
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        if new_add_codes:
                            # 低分值代码禁止交易
                            for code in new_add_codes:
                                try:
                                    score, score_list = first_code_score_manager.get_score(code, 0, None, False)
                                    if score < 0:
                                        trade_manager.ForbiddenBuyCodeByScoreManager.add_code(code)
                                    # elif score >= 200:
                                    #     # 如果没有涨停过
                                    #     limit_up_time = limit_up_time_manager.get_limit_up_time(code)
                                    #     if limit_up_time is None and int(tool.get_now_time_str().replace(":","")) > int("113000"):
                                    #         gpcode_manager.WantBuyCodesManager.add_code(code)
                                except Exception as e:
                                    logging.exception(e)
                        # if dataList and int(tool.get_now_time_str().replace(":","")) > int("113000"):
                        #     for data in dataList:
                        #         code = data["code"]
                        #         if gpcode_manager.WantBuyCodesManager.is_in(code):
                        #             score, score_list = first_code_score_manager.get_score(code, 0, None, False)
                        #             if score < 200:
                        #                 gpcode_manager.WantBuyCodesManager.remove_code(code)
                        # 保存现价
                        if dataList:
@@ -654,6 +670,14 @@
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        current_price_process_manager.accept_prices(datas)
                # L2现价更新
                elif type == 41:
                    datas = data_process.parse(_str)["data"]
                    if datas:
                        for d in datas:
                            code = d["code"]
                            self.__l2_current_price_data[code] = d
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
@@ -765,11 +789,15 @@
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    if code:
                        return_str = json.dumps({"code": 0})
                        try:
                            l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
                        except Exception as e:
                            return_str = json.dumps({"code": 2, "msg": str(e)})
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
                                return_str = json.dumps({"code": 0})
                            except Exception as e:
                                return_str = json.dumps({"code": 2, "msg": str(e)})
                        else:
                            return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
                    else:
                        return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                elif type == 201:
@@ -885,6 +913,70 @@
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 411:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                    # 加入暂停买入列表
                elif type == 412:
                    # 移除暂停买入列表
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        gpcode_manager.PauseBuyCodesManager.remove_code(code)
                    return_str = json.dumps({"code": 0})
                elif type == 413:
                    # 暂停买入列表
                    codes = gpcode_manager.PauseBuyCodesManager.list_code()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 420:
                    # 是否可以撤单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    code = codes[0]
                    state = trade_manager.get_trade_state(code)
                    if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
                        return_str = json.dumps({"code": 0, "msg": "可以取消"})
                    else:
                        return_str = json.dumps({"code": 1, "msg": "不可以取消"})
                elif type == 430:
                    # 查询代码属性
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    # 查询是否想买单/白名单/黑名单/暂不买
                    code_name = gpcode_manager.get_code_name(code)
                    want = gpcode_manager.WantBuyCodesManager.is_in(code)
                    white = l2_trade_util.WhiteListCodeManager.is_in(code)
                    black = l2_trade_util.is_in_forbidden_trade_codes(code)
                    pause_buy = gpcode_manager.PauseBuyCodesManager.is_in(code)
                    desc_list = []
                    if want:
                        desc_list.append("【想买单】")
                    if white:
                        desc_list.append("【白名单】")
                    if black:
                        desc_list.append("【黑名单】")
                    if pause_buy:
                        desc_list.append("【暂不买】")
                    return_str = json.dumps(
                        {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}})
                elif type == 501:
                    data = json.loads(_str)
                    is_open = data["data"]["open"]
@@ -896,6 +988,19 @@
                elif type == 502:
                    can_buy = trade_manager.TradeStateManager.is_can_buy()
                    return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
                elif type == 503:
                    # 设置交易目标代码的模式
                    data = json.loads(_str)
                    mode = data["data"]["mode"]
                    try:
                        TradeTargetCodeModeManager.set_mode(mode)
                        return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                    except Exception as e:
                        return_str = json.dumps({"code": 1, "msg": str(e)})
                elif type == 504:
                    # 获取交易目标代码模式
                    mode = TradeTargetCodeModeManager.get_mode()
                    return_str = json.dumps({"code": 0, "data": {"mode": mode}})
                elif type == 601:
                    pass
                    # 加自选
@@ -988,20 +1093,26 @@
if __name__ == "__main__":
    datas =trade_juejin.get_execution_reports()
    # 上传数据
    fdatas = []
    for d in datas:
        fdatas.append(
            {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
             "type": d[1] - 1})
    print(fdatas)
    if fdatas:
    # 交易成功无法读取时备用
    while True:
        try:
            trade_manager.process_trade_success_data(fdatas)
        except Exception as e:
            logging.exception(e)
        trade_manager.save_trade_success_data(fdatas)
            datas = trade_juejin.get_execution_reports()
            # 上传数据
            fdatas = []
            for d in datas:
                fdatas.append(
                    {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
                     "type": d[1] - 1})
            print(fdatas)
            if fdatas:
                try:
                    trade_manager.process_trade_success_data(fdatas)
                except Exception as e:
                    logging.exception(e)
                trade_manager.save_trade_success_data(fdatas)
        except:
            pass
        time.sleep(1.5)
if __name__ == "__main__1":
    codes = gpcode_manager.get_first_gp_codes()
third_data/data_server.py
@@ -10,10 +10,12 @@
import global_util
import gpcode_manager
import juejin
import log
import log_analyse
import tool
from l2 import code_price_manager
from l2 import code_price_manager, l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager
@@ -22,7 +24,7 @@
from third_data.kpl_util import KPLDataType, KPLPlatManager
import urllib.parse as urlparse
from urllib.parse import parse_qs
from output import code_info_output, limit_up_data_filter, output_util
from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager
from trade import bidding_money_manager, trade_manager
from trade.l2_trade_util import BlackListCodeManager
@@ -290,10 +292,9 @@
            code = ps_dict['code']
            name = ps_dict.get('name')
            data = code_info_output.get_output_params(code,self.__jingxuan_cache_dict,self.__industry_cache_dict)
            data = code_info_output.get_output_params(code, self.__jingxuan_cache_dict, self.__industry_cache_dict)
            if data["code_name"].find("None") > -1 and name:
                data["code_name"] = f"{name} {code}"
            self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"])
            if "plate" in data["kpl_code_info"]:
@@ -364,13 +365,88 @@
            for d in now_limit_up_codes_info:
                if d[5] != plate:
                    continue
                codes_info.append([d[0],d[1],0])
                codes_info.append([d[0], d[1], 0])
            # 查询是否为想买单
            want_codes = gpcode_manager.WantBuyCodesManager.list_code()
            for code_info in codes_info:
                code_info[2] = 1 if code_info[0] in want_codes else 0
            response_data = json.dumps({"code": 0,"data":codes_info})
            response_data = json.dumps({"code": 0, "data": codes_info})
        elif url.path == "/get_h_cancel_data":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            if code:
                total_datas = l2_data_util.local_today_datas.get(code)
                if total_datas is None:
                    l2_data_util.load_l2_data(code)
                    total_datas = l2_data_util.local_today_datas.get(code)
                trade_state = trade_manager.get_trade_state(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
                    # 根据日志读取实时的计算数据
                    h_cancel_latest_compute_info = log.get_h_cancel_compute_info(code)
                    if hcancel_datas_dict:
                        temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict]
                        canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set])
                        temp_list.sort(key=lambda x: x[0])
                        fdata = {
                            "computed_info": list(
                                h_cancel_latest_compute_info) if h_cancel_latest_compute_info else None,
                            "datas": []}
                        for i in range(0, len(temp_list)):
                            temp = temp_list[i]
                            val = total_datas[temp[0]]["val"]
                            canceled = temp[0] in canceled_indexs
                            fdata["datas"].append(
                                (val["time"], val["num"],
                                 code_info_output.money_desc(val["num"] * float(val["price"]) * 100),
                                 (1 if canceled else 0)))
                        response_data = json.dumps({"code": 0, "data": fdata})
                    else:
                        response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
                else:
                    response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
            else:
                response_data = json.dumps({"code": 1, "msg": "请上传code"})
        elif url.path == "/get_last_trade_day_reasons":
            # 获取上个交易日的相同涨停原因的代码信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            day = juejin.JueJinManager.get_previous_trading_date(tool.get_now_date_str())
            # 获取涨停数据
            # 获取代码的原因
            reasons = kpl_data_manager.KPLLimitUpDataRecordManager.list_by_code(code, day)
            if reasons:
                reasons = list(reasons)
                reasons.sort(key=lambda x: x[9])
                reason = reasons[-1][2]
                datas = self.__kplDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP, day)
                # (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量)
                result_list = []
                for d in datas:
                    if d[5] == reason and d[0] != code:
                        # (代码,名称)
                        result_list.append((d[0], d[1]))
                response_data = json.dumps({"code": 0, "data": {"reason": reason, "data": result_list}})
            else:
                response_data = json.dumps({"code": 1, "msg": "昨日未涨停"})
        elif url.path == "/pull_kp_client_msg":
            # 拉取客户端消息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            client = ps_dict["client"]
            msg = kp_client_msg_manager.read_msg(client)
            if msg:
                response_data = json.dumps({"code": 0, "data": msg})
            else:
                response_data = json.dumps({"code": 1, "msg": "暂无消息"})
        elif url.path == "/list_kp_client_msg":
            msg_list = kp_client_msg_manager.list_msg_from_local()
            msg_list.reverse()
            msg_list = [f"{msg.split('|')[0]}{msg.split('|')[-1].split('-')[1].strip()}" for msg in msg_list]
            response_data = json.dumps({"code": 0, "data": msg_list})
        self.send_response(200)
        # 发给请求客户端的响应数据
@@ -471,13 +547,17 @@
        params = json.loads(_str)
        return params
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def run(addr, port):
    # 运行看盘消息采集
    kp_client_msg_manager.run_capture()
    handler = DataServer
    # httpd = socketserver.TCPServer((addr, port), handler)
    httpd =ThreadedHTTPServer((addr, port), handler)
    httpd = ThreadedHTTPServer((addr, port), handler)
    print("HTTP server is at: http://%s:%d/" % (addr, port))
    httpd.serve_forever()
ths/l2_code_operate.py
trade/current_price_process_manager.py
@@ -9,7 +9,7 @@
import constant
import gpcode_manager
import tool
from l2_code_operate import L2CodeOperate
from ths.l2_code_operate import L2CodeOperate
from trade import trade_manager, trade_gui, l2_trade_util
from trade.trade_data_manager import CodeActualPriceProcessor
trade/trade_manager.py
@@ -9,6 +9,7 @@
import constant
from db import mysql_data, redis_manager
from output import kp_client_msg_manager
from trade import trade_data_manager, l2_trade_util, trade_juejin
import trade.trade_gui
import time as t
@@ -66,6 +67,36 @@
            return True
        else:
            return False
# 交易目标票模式
class TradeTargetCodeModeManager:
    # 只买想买单
    MODE_ONLY_BUY_WANT_CODES = 1
    # 买所有
    MODE_BUY_ALL = 0
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 开启购买入口
    @classmethod
    def set_mode(cls, mode):
        if mode != cls.MODE_ONLY_BUY_WANT_CODES and mode != cls.MODE_BUY_ALL:
            raise Exception("mode参数值错误")
        cls.__get_redis().setex("trade_buy_mode", tool.get_expire(), mode)
    # 是否可以下单
    @classmethod
    def get_mode(cls):
        # 默认设置为可交易
        val = cls.__get_redis().get("trade_buy_mode")
        if val is None:
            return cls.MODE_BUY_ALL
        return int(val)
# 根据分数禁止买的票管理
@@ -341,6 +372,7 @@
    print("买入结束")
    logger_trade.info("{}买入成功".format(code))
    kp_client_msg_manager.add_msg(code, "下单成功")
# 下单失败
@@ -406,6 +438,7 @@
    # 下单成功,加入固定代码库
    l2_data_manager.remove_from_l2_fixed_codes(code)
    logger_trade.info("{}撤单成功".format(code))
    kp_client_msg_manager.add_msg(code, "撤单成功")
# 处理交易成功数据
@@ -424,6 +457,7 @@
            if state != TRADE_STATE_BUY_SUCCESS:
                set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
                kp_client_msg_manager.add_msg(code, "买入成交")
                l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager.delete_buy_point(code)