Administrator
2023-06-06 751183dcd74207a50834cacc575f0dfccb41658c
交易优化,看盘接口完善
4个文件已添加
20个文件已修改
1449 ■■■■ 已修改文件
constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_analyse.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 73 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/output_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 361 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 52 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_util.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/first_code_score_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_juejin.py 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/network_util.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,7 +1,7 @@
# 是否为测试
TEST = False
# 是否允许交易
TRADE_ENABLE = False
TRADE_ENABLE = True
# 是否需要报警
NEED_ALERT = False
@@ -41,4 +41,6 @@
# 开盘啦
KPL_INVALID_BLOCKS = ["一季报增长", "二季报增长", "三季报增长", "四季报增长", "业绩增长", "中报增长", "年报增长", "年报预增", "无", "次新股", "ST摘帽", "超跌",
                      "股权转让", "并购重组", "再融资", "年报预增", "次新股", " 专精特新", "壳资源"]
                      "股权转让", "并购重组", "再融资", "年报预增", " 专精特新", "壳资源", "行业龙头", "参股金融", "科创板"]
# 是否开启掘金交易
JUEJIN_TRADE_ENABLE = True
data_export_util.py
@@ -207,4 +207,4 @@
if __name__ == "__main__":
    export_l2_excel("002864")
    export_l2_excel("001322")
gui.py
@@ -19,7 +19,7 @@
import settings
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from trade import l2_trade_util
from trade import l2_trade_util, trade_juejin
from trade.l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
from third_data import data_server, kpl_data_manager, kpl_util
@@ -67,8 +67,6 @@
    t1.setDaemon(True)
    t1.start()
    laddr = "", 9001
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_juejin=pipe_juejin)  # 注意:参数是MyBaseRequestHandle
    # tcpserver.handle_request()  # 只接受一个客户端连接
@@ -80,10 +78,12 @@
    tcpserver = ocr_server.run("", 9002)
    tcpserver.serve_forever()
def createDataServer():
    print("create OCRServer")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
def startJueJin(pipe):
    juejin.JueJinManager(pipe).start()
@@ -97,8 +97,8 @@
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,))
        self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,))
        self.jueJinTradeProcess = multiprocessing.Process(target=trade_juejin.run)
        self.ocrServerProcess = multiprocessing.Process(target=createOCRServer)
        self.p1 = p1
        self.p2 = p2
@@ -134,6 +134,7 @@
        self.jueJinProcess.start()
        self.serverProcess.start()
        self.ocrServerProcess.start()
        self.jueJinTradeProcess.start()
        L2CodeOperate.get_instance()
        # 客户端队列操作
@@ -304,6 +305,7 @@
                # table.model.setValueAt(data["apply_time"], index, 2)
                index += 1
            table.redraw()
        # 刷新开盘啦数据
        def refresh_kpl_data():
            kpl_data_manager.KPLDataManager().get_data(kpl_util.KPLDataType.LIMIT_UP)
@@ -311,7 +313,6 @@
            kpl_data_manager.KPLDataManager().get_data(kpl_util.KPLDataType.BEST_FENG_KOU)
            kpl_data_manager.KPLDataManager().get_data(kpl_util.KPLDataType.FENG_KOU)
            kpl_data_manager.KPLDataManager().get_data(kpl_util.KPLDataType.FENG_XIANG)
        start_y = 225
        btn = FlatButton(frame, text="刷新收盘价", command=refresh_close_price_data)
@@ -323,7 +324,7 @@
        btn = FlatButton(frame, text="重新获取收盘价", command=re_get_close_price)
        btn.place(x=80, y=start_y)
        kpl_data =  Label(text="涨停:\n炸板:\n最强:\n风向:\n风口:",bg="#DDDDDD",fg="#666666")
        kpl_data = Label(text="涨停:\n炸板:\n最强:\n风向:\n风口:", bg="#DDDDDD", fg="#666666")
        kpl_data.place(x=190, y=start_y)
        trade_win_datas = []
@@ -634,7 +635,7 @@
        width = 800
        height = 360
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
        trade_info= ""
        trade_info = ""
        for_color = "#008000"
        if constant.TEST:
            trade_info += "测试环境"
@@ -648,7 +649,7 @@
            trade_info += "初始禁止交易"
            for_color = "#FF7F27"
        cl = Label(frame, text=f"{trade_info}", bg="#DDDDDD",foreground=f"{for_color}")
        cl = Label(frame, text=f"{trade_info}", bg="#DDDDDD", foreground=f"{for_color}")
        cl.place(x=5, y=5)
        accept_l2 = IntVar()
@@ -684,10 +685,10 @@
        menu.add_command(label="环境检测", command=check_env)
        menu.add_command(label="同花顺测速", command=ths_test_speed)
        device_index =0
        device_index = 0
        for key in self.l2_codes:
            device_index += 1
            client_lb = Label(frame, text="设备:{} ID:{}".format(device_index,key), background="#DDDDDD")
            client_lb = Label(frame, text="设备:{} ID:{}".format(device_index, key), background="#DDDDDD")
            client_lb.place(x=38, y=40 + l2_client_count * 30)
            btn = FlatButton(frame, text="检测", command=key)
            btn.bind('<Button-3>', lambda event: pop_menu(event))
@@ -766,16 +767,17 @@
            index = 0
            for data in datas:
                table_trade.model.addRow()
                table_trade.model.setValueAt(data["code"], index, 0)
                table_trade.model.setValueAt(data["time"], index, 1)
                table_trade.model.setValueAt(data["num"], index, 2)
                table_trade.model.setValueAt(data["price"], index, 3)
                table_trade.model.setValueAt(data["money"], index, 4)
                table_trade.model.setValueAt(data["trade_num"], index, 5)
                if int(data["type"]) > 0:
                    table_trade.model.setValueAt("卖出", index, 6)
                    table_trade.model.setValueAt("卖出", index, 0)
                else:
                    table_trade.model.setValueAt("买入", index, 6)
                    table_trade.model.setValueAt("买入", index, 0)
                table_trade.model.setValueAt(data["code"], index, 1)
                table_trade.model.setValueAt(data["time"], index, 2)
                table_trade.model.setValueAt(data["num"], index, 3)
                table_trade.model.setValueAt(data["price"], index, 4)
                table_trade.model.setValueAt(data["money"], index, 5)
                table_trade.model.setValueAt(data["trade_num"], index, 6)
                index += 1
            table_trade.redraw()
@@ -846,12 +848,12 @@
        cl.place(x=width - 70, y=30)
        trade_datas = {}
        trade_datas["row{}".format(0)] = {'证券代码': '', '成交时间': '', '成交数量': '', '成交均价': '', '成交金额': '',
                                          '合同编号': '', '操作': ''}
        trade_datas["row{}".format(0)] = {'方向': '', '证券代码': '', '成交时间': '', '成交数量': '', '成交均价': '', '成交金额': '',
                                          '合同编号': ''}
        table_frame = Frame(frame, {"height": table_height, "width": table_width, "bg": "#DDDDDD"})
        table_frame.place(x=width / 2, y=52)
        table_trade = create_table(table_frame, trade_datas, 90)
        table_trade = create_table(table_frame, trade_datas, 70)
        table_trade.show()
        frame.grid(row=2, column=1, padx=5, pady=5, rowspan=2)
juejin.py
@@ -95,7 +95,7 @@
    # 交易時間不能做初始化
    if not tool.is_init_time():
        raise Exception("交易时间不能初始化")
    init_data()
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
@@ -118,7 +118,7 @@
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
    init_data()
    # 初始化同花顺主站
    l2_clients = client_manager.getValidL2Clients()
    for client in l2_clients:
@@ -293,11 +293,13 @@
        t1.start()
    @classmethod
    def get_gp_latest_info(cls, codes):
    def get_gp_latest_info(cls, codes,fields=None):
        if not codes:
            return []
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
        gmapi.set_token(token)
        data = gmapi.get_instruments(symbols=",".join(symbols))
        data = gmapi.get_instruments(symbols=",".join(symbols),fields=fields)
        print(data)
        return data
@@ -352,6 +354,8 @@
    @classmethod
    def get_gp_current_info(cls, codes):
        if not codes:
            return []
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
        gmapi.set_token(token)
@@ -373,10 +377,7 @@
        account_id, s_id, token = getAccountInfo()
        gmapi.run(strategy_id=s_id,
                  filename='juejin.py',
                  # todo 回测模式
                  mode=gmapi.MODE_LIVE,
                  backtest_start_time='2022-08-18 09:25:00',
                  backtest_end_time='2022-08-18 10:30:00',
                  token=token)
    def stop(self):
l2/cancel_buy_strategy.py
@@ -413,7 +413,7 @@
            logger_l2_h_cancel.info(
                f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # H撤已撤订单
            logger_l2_h_cancel.info( f"code-{code} H撤已撤订单:{cls.__get_watch_canceled_index(code)}")
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{cls.__get_watch_canceled_index(code)}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
@@ -546,8 +546,8 @@
        total_count = total_count_old
        # H撤单
        MIN_H_COUNT = cls.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        for i in range(buy_single_index, total_data[-1]["index"] + 1):
        # 从买入信号位3条数据开始计算
        for i in range(buy_single_index + 3, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
@@ -583,6 +583,32 @@
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 获取H撤监听的数据索引范围
    # 返回监听范围与已撤单索引
    @classmethod
    def get_watch_index_dict(cls, code):
        origin_progress_index, latest_progress_index = cls.__get_traded_progress(code)
        # 监听的数据
        watch_indexs_dict = {}
        total_nums = 0
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
        return watch_indexs_dict, cls.__get_watch_canceled_index(code)
# --------------------------------封单额变化撤------------------------
# 涨停封单额统计
l2/l2_data_manager_new.py
@@ -678,19 +678,31 @@
        if zyltgb is None:
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
        buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code)
        if buy1_price is None:
            return False, True, f"尚未获取到买1价"
        # buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code)
        # if buy1_price is None:
        #     return False, True, f"尚未获取到买1价"
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        dif = float(limit_up_price) - float(buy1_price)
        if zyltgb >= 80 * 100000000:
            # 大于2档
            if dif > 0.02001:
                return False, True, f"买1剩余档数大于2档,买一({buy1_price})涨停({limit_up_price})"
        else:
            if dif > 0.03001:
                return False, True, f"买1剩余档数大于3档,买一({buy1_price})涨停({limit_up_price})"
        # dif = float(limit_up_price) - float(buy1_price)
        # if zyltgb >= 100 * 100000000:
        #     # 大于2档
        #     if dif > 0.01001:
        #         return False, True, f"买1剩余档数大于1档,买一({buy1_price})涨停({limit_up_price})"
        # elif zyltgb >= 80 * 100000000:
        #     # 大于2档
        #     if dif > 0.02001:
        #         return False, True, f"买1剩余档数大于2档,买一({buy1_price})涨停({limit_up_price})"
        # elif zyltgb >= 60 * 100000000:
        #     # 大于2档
        #     if dif > 0.03001:
        #         return False, True, f"买1剩余档数大于3档,买一({buy1_price})涨停({limit_up_price})"
        # elif zyltgb >= 40 * 100000000:
        #     # 大于2档
        #     if dif > 0.04001:
        #         return False, True, f"买1剩余档数大于4档,买一({buy1_price})涨停({limit_up_price})"
        # else:
        #     if dif > 0.05001:
        #         return False, True, f"买1剩余档数大于5档,买一({buy1_price})涨停({limit_up_price})"
        limit_up_info = code_price_manager.Buy1PriceManager.get_limit_up_info(code)
        if limit_up_info[0] is None and False:
@@ -721,50 +733,33 @@
                                      cls.__l2PlaceOrderParamsManagerDict[code].score_info)
        if not gpcode_manager.WantBuyCodesManager.is_in(code):
            if float(limit_up_price) >= 40:
                return False, True, "股价大于40块"
            # 判断板块
            plate_can_buy, msg = CodePlateKeyBuyManager.can_buy(code)
            if not plate_can_buy:
                return False, True, msg
            # 查看分数等级
            score_index = cls.__l2PlaceOrderParamsManagerDict[code].score_index
            score = cls.__l2PlaceOrderParamsManagerDict[code].score
            score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info
            # 区分大票,小票
            if zyltgb >= 80 * 100000000:
                if cls.volume_rate_info[code][0] < 0.5:
                    return False, True, f"大市值:量大于50%才下单,量比:{cls.volume_rate_info[code][0]}"
            elif zyltgb <= 35 * 100000000:
                # 获取板块中的票的数量
                if score_info[1][5]["limit_up_codes_count"] < 2:
                    return False, True, f"小市值:板块中必须2个涨停才能买,板块{score_info[1][5]['target_block_info'][0]}-涨停数{score_info[1][5]['limit_up_codes_count']}"
            # 尾盘偷跑票
            limit_up_time = score_info[1][7]
            if limit_up_time is None:
                limit_up_time = tool.get_now_time_str()
            # 尾盘偷跑暂时不要
            # if int(limit_up_time.replace(":", "")) > int("143000"):
            #     if not score_info[1][3][6][0]:
            #         return False, True, f"尾盘偷跑:无任何形态"
            #     if score_info[1][5]["limit_up_codes_count"] > 1:
            #         return False, True, f"尾盘偷跑:板块{score_info[1][5]['target_block_info'][0]}-涨停数{score_info[1][5]['limit_up_codes_count']}"
            #     if float(gpcode_manager.get_limit_up_price(code)) > 30:
            #         return False, True, f"尾盘偷跑:股价大于30"
            if score_index < 0:
                return False, True, f"分值:{score}未达到需要买入的分数线"
            # if score_index >= 3:
            #     return False, True, f"分值:{score}未达到主动买入分数线"
            return True, False, ""
            return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
        else:
            return True, False, "在想买名单中"
    @classmethod
    def can_buy_first(cls, code, limit_up_price, score_index, score, score_info, volume_rate_info):
        if float(limit_up_price) >= 40:
            return False, True, "股价大于40块"
        # 判断板块
        plate_can_buy, msg = CodePlateKeyBuyManager.can_buy(code)
        if not plate_can_buy:
            return False, True, msg
        if volume_rate_info[0] < 0.4:
            return False, True, f"量大于40%才下单,量比:{volume_rate_info[0]}"
        # 是否有K线形态(有K线形态或者天量大阳)
        has_k_format = score_info[1][3][6][0] or score_info[1][3][7][0]
        if not has_k_format:
            return False, True, f"无K线形态"
        if score_index < 0:
            return False, True, f"分值:{score}未达到需要买入的分数线"
        return True, False, ""
    @classmethod
    def __cancel_buy(cls, code):
@@ -1092,8 +1087,8 @@
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续3秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time:
            # 必须为连续2秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
l2_trade_test.py
@@ -18,6 +18,9 @@
from l2 import l2_log, l2_data_manager, transaction_progress, safe_count_manager
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from third_data import kpl_util
from third_data.code_plate_key_manager import RealTimeKplMarketData, LimitUpCodesPlateKeyManager
from third_data.kpl_data_manager import KPLDataManager
from trade import trade_data_manager, l2_trade_factor
from trade.trade_queue_manager import THSBuy1VolumnManager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
@@ -63,7 +66,7 @@
                        decimal.Decimal("0.00"))
                    # 获取执行位时间
                    exec_time = None
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                        code)
                    if buy_exec_index:
                        try:
@@ -87,7 +90,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "603819"
        code = "002989"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -129,6 +132,12 @@
        # 获取交易进度
        trade_progress_list, buy_queues = log.get_trade_progress(code)
        jingxuan_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.JINGXUAN_RANK, tool.get_now_date_str())
        industry_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.INDUSTRY_RANK, tool.get_now_date_str())
        RealTimeKplMarketData().set_top_5_reasons(jingxuan_ranks)
        RealTimeKplMarketData().set_top_5_industry(industry_ranks)
        LimitUpCodesPlateKeyManager().set_today_limit_up(KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
@@ -160,7 +169,7 @@
    @unittest.skip("跳过此单元测试")
    def test_h_cancel(self):
        code = "002870"
        code = "600540"
        l2.l2_data_util.load_l2_data(code)
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        total_datas = total_datas[:899]
log.py
@@ -85,6 +85,14 @@
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_trade"),
                   filter=lambda record: record["extra"].get("name") == "juejin_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "huaxin_trade"),
                   filter=lambda record: record["extra"].get("name") == "huaxin_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "code_operate"),
                   filter=lambda record: record["extra"].get("name") == "code_operate",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -142,6 +150,14 @@
                   filter=lambda record: record["extra"].get("name") == "kpl_limit_up",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_debug"),
                   filter=lambda record: record["extra"].get("name") == "kpl_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_block_can_buy"),
                   filter=lambda record: record["extra"].get("name") == "kpl_block_can_buy",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -171,6 +187,8 @@
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
logger_juejin_trade = __mylogger.get_logger("juejin_trade")
logger_huaxin_trade = __mylogger.get_logger("huaxin_trade")
logger_code_operate = __mylogger.get_logger("code_operate")
logger_device = __mylogger.get_logger("device")
logger_system = __mylogger.get_logger("system")
@@ -196,6 +214,10 @@
logger_kpl_limit_up_reason_change = __mylogger.get_logger("kpl_limit_up_reason_change")
logger_kpl_limit_up = __mylogger.get_logger("kpl_limit_up")
logger_kpl_debug = __mylogger.get_logger("kpl_debug")
logger_kpl_block_can_buy = __mylogger.get_logger("kpl_block_can_buy")
class LogUtil:
@@ -383,6 +405,30 @@
    return index_list, buy_queues
# 获取H级撤单计算结果
def get_h_cancel_compute_info(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"D:/logs/gp/l2/cancel/h_cancel.{date}.log"
    latest_info = None
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                if line.find(f"code-{code}") < 0:
                    continue
                if line.find(f"H级撤单计算结果") < 0:
                    continue
                print(line)
                target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
                cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
                total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
                latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
    return latest_info
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
@@ -453,7 +499,7 @@
if __name__ == '__main__':
    print(load_kpl_reason_changes())
    print(get_h_cancel_compute_info("603912"))
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
log_analyse.py
New file
@@ -0,0 +1,44 @@
"""
日志分析
"""
# 获取不可以下单的原因
import os
import tool
def get_cant_order_reasons_dict():
    file_path = "D:/logs/gp/l2/l2_trade.{}.log".format(tool.get_now_date_str())
    dict_ = {}
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            if line.find("不可以下单,原因:") > -1:
                code = line.split("code=")[1][:6]
                time_ = line.split("|")[0].split(" ")[1][:12]
                reason = line.split("不可以下单,原因:")[1].strip()
                dict_[code] = (time_, reason)
                # print(time_, code, reason)
            line = f.readline()
    return dict_
def get_kpl_can_buy_reasons_dict():
    file_path = "D:/logs/gp/kpl/kpl_block_can_buy.{}.log".format(tool.get_now_date_str())
    dict_ = {}
    if os.path.exists(file_path):
        with open(file_path, encoding="utf-8") as f:
            line = f.readline()
            while line:
                if True:
                    code = line.split("code=")[1][:6]
                    time_ = line.split("|")[0].split(" ")[1][:12]
                    reason = line.split(f"code={code}:")[1].strip()
                    dict_[code] = (time_, reason.replace("可以下单", ""))
                    # print(time_, code, reason)
                line = f.readline()
    return dict_
if __name__ == "__main__":
    print(get_kpl_can_buy_reasons_dict())
output/code_info_output.py
@@ -20,7 +20,10 @@
import log
import tool
from l2 import l2_data_manager, l2_data_util, transaction_progress, l2_data_manager_new, code_price_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer
import l2.l2_data_manager_new
from third_data import kpl_data_manager, kpl_util, kpl_api, block_info
from third_data.code_plate_key_manager import RealTimeKplMarketData
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager
from trade import first_code_score_manager, l2_trade_factor, trade_manager, l2_trade_util
from trade.l2_trade_factor import L2TradeFactorUtil
@@ -59,13 +62,21 @@
def money_desc(money):
    if money > 100000000:
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def get_output_params(code):
def get_output_params(code, jingxuan_cache_dict, industry_cache_dict):
    def format_plate_output(_plat):
        if _plat in jingxuan_cache_dict:
            return _plat, money_desc(jingxuan_cache_dict[_plat][3])
        elif _plat in industry_cache_dict:
            return _plat, money_desc(industry_cache_dict[_plat][3])
        else:
            return _plat, ''
    params = {
        "base_url": "http://192.168.3.252/kp/",
    }
@@ -97,6 +108,7 @@
        volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
        (score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate, limit_up_time,
                                                                                    True)
        ################################买前评分################################
        # ["换手量能", "竞价强度", "资金力度", "K线形态", "历史股性", "板块热度", "上板时间", "市值大小","股价大小"]
@@ -246,6 +258,14 @@
                                                                                (
                                                                                    (score, score_list),
                                                                                    score_source_list))
        # 是否可以买入的信息
        can_buy_info = l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, limit_up_price, __L2PlaceOrderParamsManager.score_index,
                                                          __L2PlaceOrderParamsManager.score,
                                                          __L2PlaceOrderParamsManager.score_info,
                                                          (volume_rate,
                                                           code_volumn_manager.get_volume_rate_index(volume_rate)))
        params["trade_data"]["can_buy_info"] = can_buy_info
        __base_L2PlaceOrderParamsManager = l2_trade_factor.L2PlaceOrderParamsManager(code, False, volume_rate,
                                                                                     code_volumn_manager.get_volume_rate_index(
                                                                                         volume_rate),
@@ -328,6 +348,26 @@
            elif trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                params["trade_data"]["trade_state"]["desc"] = "已成交"
        # H撤监听范围
        if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
            # 根据日志读取实时的计算数据
            h_cancel_latest_compute_info = log.get_h_cancel_compute_info(code)
            if hcancel_datas_dict:
                temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict]
                canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set])
                temp_list.sort(key=lambda x: x[0])
                params["trade_data"]["h_cancel"] = {
                    "computed_info": list(h_cancel_latest_compute_info) if h_cancel_latest_compute_info else None,
                    "datas": []}
                for i in range(0, len(temp_list)):
                    temp = temp_list[i]
                    val = total_datas[temp[0]]["val"]
                    canceled = temp[0] in canceled_indexs
                    params["trade_data"]["h_cancel"]["datas"].append(
                        (val["time"], val["num"], money_desc(val["num"] * float(val["price"]) * 100),
                         (1 if canceled else 0)))
    ##############################主动买,被动买##################################
    # 返回主动买,被动买,不买的列表(代码, 名称, 得分, 是否涨停)
    codes_score = __load_codes_scores()
@@ -340,39 +380,38 @@
    for d in codes_score[1]:
        params["passive_buy_codes"].append(
            {"name": d[1], "code": d[0], "score": d[2], "limit_up": d[3], "open_limit_up": d[4]})
    # 主动买与被动买不能超过11行
    initiative_count = len(params["initiative_buy_codes"])
    passive_count = len(params["passive_buy_codes"])
    buy_row = 0
    buy_row += initiative_count // 3
    if initiative_count % 3 > 0:
        buy_row += 1
    max_passive_count = (10 - buy_row) * 3
    if max_passive_count < 0:
        max_passive_count = 0
    params["passive_buy_codes"] = params["passive_buy_codes"][:max_passive_count]
    params["passive_buy_codes"] = params["passive_buy_codes"]
    trade_info = __load_trade_record(code, total_datas)
    params["trade_record"] = {"open_limit_up": trade_info[0], "records": trade_info[2]}
    ##############################开盘啦相关信息##################################
    params["kpl_code_info"] = {"industry":global_util.code_industry_map.get(code)}
    industry = global_util.code_industry_map.get(code)
    params["kpl_code_info"] = {
        "industry": format_plate_output(industry)}
    # 获取开盘啦板块
    plate_info = kpl_api.getStockIDPlate(code)
    if plate_info:
        plate_info.sort(key=lambda x: x[2])
        plate_info.reverse()
        params["kpl_code_info"]["plate"] = plate_info
        params["kpl_code_info"]["plate"] = [(k[0], k[1], k[2], format_plate_output(k[1])[1]) for k in plate_info]
    # 获取代码的历史涨停数据,(涨停原因,日期,板块)
    params["kpl_code_info"]["code_records"] = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2]
    code_records = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2]
    if code_records:
        code_records = [(format_plate_output(k[0]), k[1], [format_plate_output(k1) for k1 in k[2].split("、")]) for k in
                        code_records]
    params["kpl_code_info"]["code_records"] = code_records
    if not KPLLimitUpDataRecordManager.total_datas:
        KPLLimitUpDataRecordManager.load_total_datas()
    for d in KPLLimitUpDataRecordManager.total_datas:
        if d[3] == code:
            # 获取今日
            params["kpl_code_info"]["today"] = (d[2], d[1], d[6])
            plates = d[6].split("、")
            plates = [format_plate_output(p) for p in plates]
            params["kpl_code_info"]["today"] = (format_plate_output(d[2]), d[1], plates)
            break
    return params
output/output_util.py
@@ -2,7 +2,7 @@
def money_desc(money):
    if money > 100000000:
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
server.py
@@ -32,12 +32,12 @@
import ths_util
import tool
from output import code_info_output
from third_data import hot_block_data_process, block_info, kpl_api, kpl_util
from third_data.code_plate_key_manager import TargetCodePlateKeyManager
from third_data import hot_block_data_process, block_info, kpl_api
from third_data.code_plate_key_manager import  CodesHisReasonAndBlocksManager
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
from ths import l2_listen_pos_health_manager
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
    first_code_score_manager, current_price_process_manager
    first_code_score_manager, current_price_process_manager, trade_juejin
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
@@ -77,7 +77,7 @@
    latest_oringin_data = {}
    last_l2_listen_health_time = {}
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -225,10 +225,10 @@
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    if is_add:
                        gpcode_manager.add_gp_list(code_datas)
                    else:
                        gpcode_manager.set_gp_list(code_datas)
                    # if is_add:
                    #     gpcode_manager.add_gp_list(code_datas)
                    # else:
                    #     gpcode_manager.set_gp_list(code_datas)
                    if not is_add:
                        # 同步同花顺目标代码
@@ -299,15 +299,15 @@
                        # 板块关键字准备
                        for code in codes:
                            if self.__TargetCodePlateKeyManager.get_history_limit_up_reason(code) is None:
                                self.__TargetCodePlateKeyManager.set_history_limit_up_reason(code,
                                                                                             KPLLimitUpDataRecordManager.get_latest_blocks_set(
                                                                                                 code))
                            if self.__TargetCodePlateKeyManager.get_blocks(code) is None:
                            if not self.__CodesPlateKeysManager.get_history_limit_up_reason(code) is None:
                                self.__CodesPlateKeysManager.set_history_limit_up_reason(code,
                                                                                         KPLLimitUpDataRecordManager.get_latest_blocks_set(
                                                                                             code))
                            if self.__CodesPlateKeysManager.get_blocks(code) is None:
                                try:
                                    results = kpl_api.getStockIDPlate(code)
                                    bs = [r[1] for r in results]
                                    self.__TargetCodePlateKeyManager.set_blocks(code, bs)
                                    self.__CodesPlateKeysManager.set_blocks(code, bs)
                                except Exception as e:
                                    logging.exception(e)
                                    pass
@@ -340,7 +340,7 @@
                                                                     volumes_data)
                        gpcode_manager.FirstCodeManager.add_record(codes)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes,fields="symbol,sec_name,sec_type,sec_level"))
                            # 加入首板历史记录
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
@@ -914,7 +914,7 @@
def send_msg(client_id, data):
    _ip = client_manager.getActiveClientIP(client_id)
    print("ip", client_id, _ip)
    # print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -934,7 +934,7 @@
    while True:
        clients = authority.get_l2_clients()
        for client in clients:
            print("心跳", client)
            # print("心跳", client)
            try:
                send_msg(client, {"action": "test"})
            except:
@@ -987,11 +987,23 @@
        l2_listen_pos_health_manager.init_all(client_infos)
if __name__ == "__main__1":
    cid, pid = gpcode_manager.get_listen_code_pos("000070")
    print(cid, pid)
if __name__ == "__main__":
    datas =trade_juejin.get_execution_reports()
    # 上传数据
    fdatas = []
    for d in datas:
        fdatas.append(
            {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
             "type": d[1] - 1})
    print(fdatas)
    if fdatas:
        try:
            trade_manager.process_trade_success_data(fdatas)
        except Exception as e:
            logging.exception(e)
        trade_manager.save_trade_success_data(fdatas)
if __name__ == "__main__1":
    codes = gpcode_manager.get_first_gp_codes()
    for code in codes:
        try:
third_data/code_plate_key_manager.py
@@ -6,64 +6,29 @@
import json
import constant
import global_data_loader
import global_util
import log
import tool
from db import redis_manager
from log import logger_kpl_limit_up
from log import logger_kpl_limit_up, logger_kpl_block_can_buy
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager
# 实时开盘啦市场数据
class RealTimeKplMarketData:
    # 精选前5
    top_5_reason_set = set()
    # 行业前5
    top_5_industry_set = set()
# 开盘啦禁止交易板块管理
class KPLPlateForbiddenManager:
    __redisManager = redis_manager.RedisManager(3)
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_set = set()
        base_count = 5
        for i in range(0, len(datas)):
            if datas[i][1] in constant.KPL_INVALID_BLOCKS:
                base_count += 1
            if i >= base_count:
                break
            if datas[i][3] > 5000 * 10000:
                temp_set.add(datas[i][1])
        cls.top_5_reason_set = temp_set
    def __get_redis(self):
        return self.__redisManager.getRedis()
    @classmethod
    def set_top_5_industry(cls, datas):
        temp_set = set()
        base_count = 5
        for i in range(0, len(datas)):
            if datas[i][1] in constant.KPL_INVALID_BLOCKS:
                base_count += 1
            if i >= base_count:
                break
    def save_plate(self, plate):
        self.__get_redis().sadd("kpl_forbidden_plates", plate)
        self.__get_redis().expire("kpl_forbidden_plates", tool.get_expire())
            if datas[i][2] > 5000 * 10000:
                temp_set.add(datas[i][1])
        cls.top_5_reason_set = temp_set
    # 获取能够买的行业关键字set
    @classmethod
    def get_can_buy_key_set(cls):
        temp_set = cls.top_5_reason_set | cls.top_5_industry_set
        return temp_set
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
        temp_set = keys & reasons
        if temp_set:
            return True, temp_set
        else:
            return False, None
    def list_all(self):
        return self.__get_redis().smembers("kpl_forbidden_plates")
class LimitUpCodesPlateKeyManager:
@@ -123,14 +88,136 @@
        codes_set.discard(code)
        return codes_set
    # 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合}
    def match_limit_up_reason_keys(self, code, keys):
        fresult = {}
        for k in keys:
            if k in self.total_key_codes_dict:
                codes = set(self.total_key_codes_dict[k])
                codes.discard(code)
                if codes:
                    fresult[k] = codes
        return fresult
# 目标代码关键词管理
class TargetCodePlateKeyManager:
# 实时开盘啦市场数据
class RealTimeKplMarketData:
    # 精选前5
    top_5_reason_list = []
    # 行业前5
    top_5_industry_list = []
    #
    top_5_key_dict = {}
    total_reason_dict = {}
    total_industry_dict = {}
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_list = []
        for d in datas:
            cls.total_reason_dict[d[1]] = d
        # 排序
        for i in range(0, len(datas)):
            if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
                # (名称,净流入金额,排名)
                temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
                # 只获取前10个
                if len(temp_list) > 5:
                    break
                if datas[i][3] < 1 * 10000 * 10000:
                    break
        for temp in temp_list:
            names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
            for name in names:
                if name == temp[1]:
                    continue
                temp_list.append((name, temp[1], temp[2]))
        cls.top_5_reason_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def set_top_5_industry(cls, datas):
        for d in datas:
            cls.total_industry_dict[d[1]] = d
        temp_list = []
        for i in range(0, len(datas)):
            if datas[i][1] in constant.KPL_INVALID_BLOCKS:
                continue
            temp_list.append((datas[i][1], datas[i][2], len(temp_list)))
            if len(temp_list) > 5:
                break
            if datas[i][2] < 1 * 10000 * 10000:
                break
        cls.top_5_industry_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def __reset_top_5_dict(cls):
        temp_dict = {}
        for t in cls.top_5_industry_list:
            temp_dict[t[0]] = t
        for t in cls.top_5_reason_list:
            temp_dict[t[0]] = t
        cls.top_5_key_dict = temp_dict
    # 获取能够买的行业关键字set
    @classmethod
    def get_can_buy_key_set(cls):
        temp_set = cls.top_5_key_dict.keys()
        return temp_set
    # 通过关键字判断能买的代码数量
    @classmethod
    def get_can_buy_codes_count(cls, code, key):
        # 判断行业涨停票数量,除开自己必须大于1个
        temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
        if temp_codes is None:
            temp_codes = set()
        else:
            temp_codes = set(temp_codes)
        temp_codes.discard(code)
        if len(temp_codes) < 1:
            # 后排才能挂单
            return 0, "身位不为后排"
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all()
        if key in forbidden_plates:
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("103000"):
            return 2, "10:30以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
        if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
            return 2, "净流入在前5且大于3亿"
        else:
            return 1, "净流入在前5"
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
        log.logger_kpl_debug.debug("市场流入前5:{}", reasons)
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all()
        reasons = reasons - forbidden_plates
        temp_set = keys & reasons
        log.logger_kpl_debug.debug("市场流入前5匹配结果:{}", temp_set)
        if temp_set:
            return True, temp_set
        else:
            return False, None
#
class CodesHisReasonAndBlocksManager:
    __redisManager = redis_manager.RedisManager(1)
    # 历史涨停原因
    __history_limit_up_reason_dict = {}
    # 二级行业
    __second_industry_dict = {}
    # 板块
    __blocks_dict = {}
@@ -141,7 +228,8 @@
        self.__history_limit_up_reason_dict[code] = set(reasons)
        self.__get_redis().setex(f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons)))
    # 如果返回值不为None表示已经加载过历史原因了
        # 如果返回值不为None表示已经加载过历史原因了
    def get_history_limit_up_reason(self, code):
        reasons = self.__history_limit_up_reason_dict.get(code)
        if reasons is None:
@@ -150,7 +238,10 @@
            if val is not None:
                val = set(json.loads(val))
                self.__history_limit_up_reason_dict[code] = val
            return self.__history_limit_up_reason_dict.get(code)
            if code in self.__history_limit_up_reason_dict:
                return self.__history_limit_up_reason_dict.get(code)
            else:
                return None
        else:
            return reasons
@@ -166,9 +257,30 @@
            if val is not None:
                val = set(json.loads(val))
                self.__blocks_dict[code] = val
            return self.__blocks_dict.get(code)
            if code in self.__blocks_dict:
                return self.__blocks_dict.get(code)
            else:
                return None
        else:
            return reasons
    def get_total_keys(self, code):
        reasons = self.get_history_limit_up_reason(code)
        if reasons is None:
            reasons = set()
        blocks = self.get_blocks(code)
        if blocks is None:
            blocks = set()
        return reasons | blocks
# 目标代码关键词管理
class TargetCodePlateKeyManager:
    __redisManager = redis_manager.RedisManager(1)
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,板块
    def get_plate_keys(self, code):
@@ -178,17 +290,17 @@
            k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]}
        # 加载历史原因
        k11 = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}")
        k2 = set()
        if code in self.__history_limit_up_reason_dict:
            k2 = self.__history_limit_up_reason_dict[code]
        k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason(code)
        if k2 is None:
            k2 = set()
        k3 = set()
        industry = global_util.code_industry_map.get(code)
        if industry:
            k3 = {industry}
        k4 = set()
        if code in self.__blocks_dict:
            k4 = self.__blocks_dict[code]
        k4 = self.__CodesPlateKeysManager.get_blocks(code)
        if k4 is None:
            k4 = set()
        for k in [k1, k11, k2, k3, k4]:
            keys |= k
@@ -201,40 +313,119 @@
class CodePlateKeyBuyManager:
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    # 是否可以下单
    @classmethod
    def can_buy(cls, code):
        if constant.TEST:
            return True, ""
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        # 板块Key是否在市场前5key中
        is_in, valid_keys = RealTimeKplMarketData.is_in_top(keys)
        if not valid_keys:
            return False, "板块未在市场流入前5"
        # 相同板块中是否已经有别的票涨停
        is_back = False, ''
        for key in valid_keys:
            codes = cls.__LimitUpCodesPlateKeyManager.get_codes_by_key_without_mine(key, code)
            if codes and len(codes) > 0:
                is_back = True, key
        log.logger_kpl_debug.info("{}关键词:{},{},{},{},{},{}", code, keys, k1, k11, k2, k3, k4)
        # 涨停列表中匹配关键词
        match_limit_up_result = cls.__LimitUpCodesPlateKeyManager.match_limit_up_reason_keys(code, keys)
        log.logger_kpl_debug.info("{}关键词身位匹配结果:{}", code, match_limit_up_result)
        if not match_limit_up_result:
            return False, "未在涨停列表中未匹配到涨停原因"
        # ---------------------------------判断目标代码的板块-------------------start------------
        # 判断匹配出的涨停原因,判断是否有已经下单的票
        # reason_need_buy_dict = {}
        # for k in match_limit_up_result:
        #     codes = match_limit_up_result[k]
        #     final_codes_keys = [keys]
        #     for code_ in codes:
        #         temp_key_set = set()
        #         temp_key_set |= cls.__CodesHisReasonAndBlocksManager.get_total_keys(code_)
        #         temp = cls.__LimitUpCodesPlateKeyManager.total_code_keys_dict.get(code_)
        #         if temp:
        #             temp_key_set |= temp
        #         # 二级
        #         industry = global_util.code_industry_map.get(code_)
        #         if industry:
        #             temp_key_set.add(industry)
        #
        #         final_codes_keys.append(temp_key_set)
        #     # 求共同的关键词
        #     intersection = set(final_codes_keys[0])
        #     for s in final_codes_keys:
        #         intersection &= s
        #     log.logger_kpl_debug.info("{}的板块求交集:{}-{}", code, k, intersection)
        #
        #     # 求公共的板块是否在流入前5中
        #     is_in, valid_keys = RealTimeKplMarketData.is_in_top(intersection)
        #     if is_in:
        #         reason_need_buy_dict[k] = (is_in, valid_keys)
        # ---------------------------------判断目标代码的板块-------------------end------------
        # 获取板块可以下单的个数
        can_buy_codes_count_dict = {}
        for key__ in match_limit_up_result:
            can_buy_count, msg = RealTimeKplMarketData.get_can_buy_codes_count(code, key__)
            can_buy_codes_count_dict[key__] = can_buy_count
        has_available_key = False
        for key in can_buy_codes_count_dict:
            if can_buy_codes_count_dict[key] > 0:
                has_available_key = True
                break
        if not is_back[0]:
            return False, f"板块中首个涨停:{valid_keys}"
        # 看板块中是否已经有已经下单的或者成交的代码
        codes = trade_manager.get_codes_by_trade_states(
            {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER,
             trade_manager.TRADE_STATE_BUY_SUCCESS})
        # 遍历已经成交/下单的代码,获取其涨停原因,然后和当前代码涨停原因做比较,有相同代码的不能买
        if not has_available_key:
            return False, f"匹配到的【{','.join(match_limit_up_result.keys())}】没在精选/行业可以买入的板块中"
        # ---------------------------------加载已经下单/成交的代码信息------------start-------------
        match_reasons = match_limit_up_result.keys()
        # 判断匹配到的原因是否已经有下单/买入成功的代码
        codes_delegate = set(trade_manager.get_codes_by_trade_states(
            {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(trade_manager.get_codes_by_trade_states(
            {trade_manager.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
        # 统计成交代码的板块
        trade_codes_blocks_dict = {}
        # 已经成交的板块
        trade_success_blocks = set()
        for c in codes:
            keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c)
            # 实时涨停原因
            for k_ in k1_:
                # 当前代码已经有挂的或者成交的
                if k_ in valid_keys:
                    return False, f"{k_}板块中的{c}已经下单/买入成功,同一板块中只能买1个票"
        return True, f"涨停原因:{is_back[1]}"
            trade_codes_blocks_dict[c] = k1_
        # 统计板块中的代码
        trade_block_codes_dict = {}
        for c in trade_codes_blocks_dict:
            for b in trade_codes_blocks_dict[c]:
                if c in codes_success:
                    trade_success_blocks.add(b)
                if b not in trade_block_codes_dict:
                    trade_block_codes_dict[b] = set()
                trade_block_codes_dict[b].add(c)
        # ---------------------------------加载已经下单/成交的代码信息------------end-------------
        msg_list = []
        for key in can_buy_codes_count_dict:
            log.logger_kpl_debug.debug(f"{code}:板块可以下单的数量【{key}】-{can_buy_codes_count_dict[key]}")
            if can_buy_codes_count_dict[key] < 1:
                continue
            # 板块中已经有成交的就不下单了
            if key in trade_success_blocks:
                msg_list.append(f"【{key}】中已经有成交代码")
                log.logger_kpl_debug.debug(f"{code}:板块已经有成交【{key}】")
                continue
            # 板块可以下单数量
            if trade_block_codes_dict.get(key) is None or len(trade_block_codes_dict.get(key)) < \
                    can_buy_codes_count_dict[key]:
                order_count = len(trade_block_codes_dict.get(key)) if key in trade_block_codes_dict else 0
                logger_kpl_block_can_buy.info(
                    f"code={code}:【{key}】可以下单,现有数量:{order_count} 最大数量:{can_buy_codes_count_dict[key]}")
                return True, f"可以下单,板块:【{key}】,板块中已经下单的数量:{order_count}"
            else:
                order_count = len(trade_block_codes_dict.get(key))
                msg_list.append(f"【{key}】中下单代码数量{order_count}/允许下单数量{can_buy_codes_count_dict[key]}")
        return False, ",".join(msg_list)
if __name__ == "__main__":
    datas = log.load_kpl_reason_changes()
    for k in datas:
        LimitUpCodesPlateKeyManager().set_today_limit_up_reason_change(k[0], k[1], k[2])
    pass
third_data/data_server.py
@@ -1,4 +1,5 @@
import base64
import http
import json
import logging
import socketserver
@@ -10,14 +11,15 @@
import global_util
import gpcode_manager
import log
import log_analyse
import tool
from l2 import code_price_manager
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, KPLPlatManager, \
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \
    KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLDataType
from third_data.kpl_util import KPLDataType, KPLPlatManager
import urllib.parse as urlparse
from urllib.parse import parse_qs
from output import code_info_output, limit_up_data_filter, output_util
@@ -32,10 +34,14 @@
    __IgnoreCodeManager = IgnoreCodeManager()
    __KPLPlatManager = KPLPlatManager()
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    # 历史板块
    __history_plates_dict = {}
    # 板块
    __blocks_dict = {}
    # 精选,行业数据缓存
    __jingxuan_cache_dict = {}
    __industry_cache_dict = {}
    def __get_limit_up_list(self):
        # 统计目前为止的代码涨停数量(分涨停原因)
@@ -89,10 +95,13 @@
        for r in reason_changes:
            if r[0] not in reason_changes_dict:
                reason_changes_dict[r[0]] = r[1]
        # 统计最近下单动作反馈
        order_reasons_dict = log_analyse.get_cant_order_reasons_dict()
        kpl_can_buy_reasons_dict = log_analyse.get_kpl_can_buy_reasons_dict()
        for d in total_datas:
            code = d[3]
            # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化)
            # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化,涨停原因的流入净额,下单简介)
            limit_up_state = 0
            if code in limit_up_dict:
                if limit_up_dict[code][0]:
@@ -104,9 +113,33 @@
                score = score_dict[code]
            if code in ignore_codes:
                continue
            # 涨停原因的净流入金额
            reason = d[2]
            reason_money = ''
            if reason in self.__jingxuan_cache_dict:
                reason_money = output_util.money_desc(self.__jingxuan_cache_dict[reason][3])
            elif reason in self.__industry_cache_dict:
                reason_money = output_util.money_desc(self.__industry_cache_dict[reason][3])
            # 匹配下单反馈
            order_desc = ''
            order_reason = order_reasons_dict.get(code)
            kpl_can_buy_reason = kpl_can_buy_reasons_dict.get(code)
            if order_reason and kpl_can_buy_reason:
                if int(order_reason[0].replace(":", "").replace(".", "")) > int(
                        kpl_can_buy_reason[0].replace(":", "").replace(".", "")):
                    order_desc = f"不:{order_reason[1]}"
                else:
                    order_desc = f"买:{kpl_can_buy_reason[1]}"
            elif order_reason:
                order_desc = f"不:{order_reason[1]}"
            elif kpl_can_buy_reason:
                order_desc = f"买:{kpl_can_buy_reason[1]}"
            fresult.append((code, d[4], limit_up_state, f"龙{rank_dict.get(code)}", d[12], score,
                            output_util.time_format(int(d[5])), d[2], d[10], output_util.money_desc(d[13]),
                            reason_changes_dict.get(code)))
                            reason_changes_dict.get(code), reason_money, order_desc))
        response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(limit_up_codes),
                                                        "open_limit_up_count": len(open_limit_up_codes),
                                                        "limit_up_reason_statistic": limit_up_reason_statistic_info,
@@ -235,12 +268,20 @@
        response_data = ""
        if url.path == "/get_kpl_data":
            best_feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.BEST_FENG_KOU)
            if not best_feng_kou:
                best_feng_kou = []
            best_feng_kou = best_feng_kou[:22]
            feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_KOU)
            if not feng_kou:
                feng_kou = []
            feng_kou = feng_kou[:22]
            industry_rank = self.__kplDataManager.get_data(kpl_util.KPLDataType.INDUSTRY_RANK)
            if not industry_rank:
                industry_rank = []
            industry_rank = industry_rank[:22]
            feng_xiang = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_XIANG)
            if not feng_xiang:
                feng_xiang = []
            feng_xiang = feng_xiang[:22]
            response_data = json.dumps({"code": 0, "data": {"best_feng_kou": best_feng_kou, "feng_kou": feng_kou,
                                                            "industry_rank": industry_rank, "feng_xiang": feng_xiang}})
@@ -249,12 +290,14 @@
            code = ps_dict['code']
            name = ps_dict.get('name')
            data = code_info_output.get_output_params(code)
            data = code_info_output.get_output_params(code,self.__jingxuan_cache_dict,self.__industry_cache_dict)
            if data["code_name"].find("None") > -1 and name:
                data["code_name"] = f"{name} {code}"
            self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"])
            self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"])
            if "plate" in data["kpl_code_info"]:
                self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"])
            response_data = json.dumps({"code": 0, "data": data})
            # 获取评分信息
@@ -287,14 +330,47 @@
                # 精选,主力净额顺序
                result = kpl_api.getMarketJingXuanRealRankingInfo(False)
                result = kpl_util.parseMarketJingXuan(result)
            response_data = json.dumps({"code": 0, "data": result})
            forbidden_plates = self.__KPLPlateForbiddenManager.list_all()
            fresult = []
            for d in result:
                if type_ == 2 or type_ == 3:
                    self.__jingxuan_cache_dict[d[1]] = d
                elif type_ == 0 or type_ == 1:
                    self.__industry_cache_dict[d[1]] = d
                d = list(d)
                d.append(1 if d[1] in forbidden_plates else 0)
                fresult.append(d)
            response_data = json.dumps({"code": 0, "data": fresult})
        elif url.path == "/kpl/add_ignore_code":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            type_ = ps_dict['type']
            self.__IgnoreCodeManager.ignore_code(type_, code)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/forbidden_plate":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 加入禁止
            self.__KPLPlateForbiddenManager.save_plate(plate)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/get_plate_codes":
            # 获取涨停原因下面的代码
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 获取板块下的代码
            # 统计目前为止的代码涨停数量(分涨停原因)
            now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
            codes_info = []
            for d in now_limit_up_codes_info:
                if d[5] != plate:
                    continue
                codes_info.append([d[0],d[1],0])
            # 查询是否为想买单
            want_codes = gpcode_manager.WantBuyCodesManager.list_code()
            for code_info in codes_info:
                code_info[2] = 1 if code_info[0] in want_codes else 0
            response_data = json.dumps({"code": 0,"data":codes_info})
        self.send_response(200)
        # 发给请求客户端的响应数据
@@ -395,10 +471,13 @@
        params = json.loads(_str)
        return params
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def run(addr, port):
    handler = DataServer
    httpd = socketserver.TCPServer((addr, port), handler)
    # httpd = socketserver.TCPServer((addr, port), handler)
    httpd =ThreadedHTTPServer((addr, port), handler)
    print("HTTP server is at: http://%s:%d/" % (addr, port))
    httpd.serve_forever()
third_data/kpl_data_manager.py
@@ -6,13 +6,15 @@
# 开盘啦历史涨停数据管理
from db import mysql_data, redis_manager
from l2 import code_price_manager
from log import logger_kpl_limit_up_reason_change
from third_data import kpl_util
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
# 代码对应的涨停原因保存
from third_data.kpl_util import KPLPlatManager
class KPLCodeLimitUpReasonManager:
    __redisManager = redis_manager.RedisManager(3)
@@ -31,30 +33,6 @@
        return dict_
class KPLPlatManager:
    def save_plat(self, _id, name):
        if not _id:
            return
        mysqldb = mysql_data.Mysqldb()
        key = f"{_id}-{name}"
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if not results:
            mysqldb.execute(f"insert into kpl_plate(_id,_name,_key) values({_id},'{name}','{key}')")
    def get_plat(self, name):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if results:
            return results[0]
        return None
    def get_same_plat_names(self, name):
        mysqldb = mysql_data.Mysqldb()
        plate = self.get_plat(name)
        if not plate:
            return {name}
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{plate}'")
        return set([r[0] for r in results])
class KPLLimitUpDataRecordManager:
@@ -62,6 +40,22 @@
    latest_datas = {}
    __kplPlatManager = KPLPlatManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    @classmethod
    def __load_hist_and_blocks(cls,code):
        # 有数据新增,加载历史原因与板块
        his_reasons = cls.get_latest_infos(code, 10, False)
        his_reasons = set([r[0] for r in his_reasons])
        cls.__CodesPlateKeysManager.set_history_limit_up_reason(code, his_reasons)
        try:
            if not cls.__CodesPlateKeysManager.get_blocks(code):
                results = kpl_api.getStockIDPlate(code)
                bs = [r[1] for r in results]
                cls.__CodesPlateKeysManager.set_blocks(code, bs)
        except Exception as e:
            pass
    @classmethod
    def save_record(cls, day, records):
@@ -96,7 +90,7 @@
            if not result:
                mysqldb.execute(
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]})")
                cls.__load_hist_and_blocks(code)
            else:
                if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
                    mysqldb.execute(
@@ -126,6 +120,8 @@
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up([(r[3], r[2]) for r in cls.total_datas])
        for d in cls.total_datas:
            cls.__load_hist_and_blocks(d[3])
    @staticmethod
    def list_all(day):
third_data/kpl_util.py
@@ -1,6 +1,8 @@
import enum
import json
from db import mysql_data
def parse_kpl_datas(results):
    start_y = -1
@@ -233,3 +235,34 @@
        # (代码,名称,强度,主力净额)
        fresult_.append((d[0], d[1], d[2], d[6]))
    return fresult_
class KPLPlatManager:
    def save_plat(self, _id, name):
        if not _id:
            return
        mysqldb = mysql_data.Mysqldb()
        key = f"{_id}-{name}"
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if not results:
            mysqldb.execute(f"insert into kpl_plate(_id,_name,_key) values({_id},'{name}','{key}')")
    def get_plat(self, name):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if results:
            return results[0]
        return None
    def get_same_plat_names(self, name):
        mysqldb = mysql_data.Mysqldb()
        plate = self.get_plat(name)
        if not plate:
            return {name}
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{plate}'")
        return set([r[0] for r in results])
    def get_same_plat_names_by_id(self, id_):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{id_}'")
        return set([r[0] for r in results])
trade/current_price_process_manager.py
@@ -20,14 +20,17 @@
    # 获取首板代码
    first_codes = gpcode_manager.get_first_gp_codes()
    print("价格代码数量:", len(prices))
    print("总价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
        print("采集到的代码数量不正确:", len(prices))
        return
    now_str = tool.get_now_time_str()
    now_strs = now_str.split(":")
    # 获取想买单
    want_codes = gpcode_manager.WantBuyCodesManager.list_code()
    if True:
        _code_list = []
        _delete_list = []
@@ -42,10 +45,10 @@
                    rate = rate / 2
                if rate >= 0 and not trade_manager.ForbiddenBuyCodeByScoreManager.is_in(code):
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code))
                    _code_list.append((rate, code, 1 if code in want_codes else 0))
                else:
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code))
                    _delete_list.append((rate, code, 0))
                try:
                    __actualPriceProcessor.process_rate(code, rate, now_str)
                except Exception as e:
@@ -61,7 +64,7 @@
        # -------------------------------处理交易位置分配---------------------------------
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(0), e.__getitem__(1)), reverse=True)
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True)
        # 预填充下单代码
        _buy_win_codes = []
        for d in new_code_list:
@@ -69,7 +72,8 @@
        for d in _delete_list:
            _buy_win_codes.append(d[1])
        try:
            trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
            if not constant.JUEJIN_TRADE_ENABLE:
                trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
        except Exception as e:
            logging.exception(e)
            pass
trade/first_code_score_manager.py
@@ -152,7 +152,7 @@
        if hot_block["limit_up_index"] <= 0:
            hot_block_score.append(0)
        else:
            hot_block_score.append(max(90 - hot_block["limit_up_index"] * 10, 0))
            hot_block_score.append(max(120 - hot_block["limit_up_index"] * 10, 0))
        # 板块 - 高位板
        high_score = 0
        for high_info in hot_block["high_block_infos"]:
@@ -186,22 +186,22 @@
    if limit_up_time:
        limit_up_time_m = tool.trade_time_sub(limit_up_time, "09:00:00") // 60
    if limit_up_time_m < 240:
    if limit_up_time_m < 240 or True:
        # 14:30之前适用
        score_list.append(min(int(0 - round(limit_up_time_m / 15) + 12), 10))
    elif limit_up_time_m <= 270:
        # 15:00之前加
        score_list.append(100)
    else:
        score_list.append(0)
    # elif limit_up_time_m <= 270:
    #     # 15:00之前加
    #     score_list.append(100)
    # else:
    #     score_list.append(0)
    # 大单成交
    if deal_big_money_rate < 1:
        score_list.append(0)
    else:
        d_score = int(round(10 * deal_big_money_rate, 0))
        d_score = min(d_score, 60)
        score_list.append(d_score)
    # if deal_big_money_rate < 1:
    #     score_list.append(0)
    # else:
    d_score = int(round(5 * deal_big_money_rate + 10, 0))
    d_score = min(d_score, 40)
    score_list.append(d_score)
    score = 0
    for s in score_list:
trade/huaxin_trade.py
New file
@@ -0,0 +1,167 @@
"""
华鑫交易
"""
import hashlib
import json
import random
# 交易API
import socket
import time
# 交易订单号管理
import constant
import tool
from db.redis_manager import RedisManager
from log import logger_huaxin_trade
class TradeOrderIdManager:
    __redisManager = RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 添加订单ID
    @classmethod
    def add_order_id(cls, code, account_id, order_id):
        cls.__get_redis().sadd(f"huaxin_order_id-{code}", json.dumps((account_id, order_id)))
        cls.__get_redis().expire(f"huaxin_order_id-{code}", tool.get_expire())
    # 删除订单ID
    @classmethod
    def remove_order_id(cls, code, account_id, order_id):
        cls.__get_redis().srem(f"huaxin_order_id-{code}", json.dumps((account_id, order_id)))
    # 查询所有的订单号
    @classmethod
    def list_order_ids(cls, code):
        return cls.__get_redis().smembers(f"huaxin_order_id-{code}")
class TradeApi:
    @classmethod
    def __send_msg(cls, data):
        if "req_id" not in data:
            data["req_id"] = f"{round(time.time() * 1000)}_{random.randint(0, 10000)}"
        # 签名
        list_str = []
        for k in data:
            list_str.append(f"{k}={data[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        data["sign"] = md5
        # 生成socket,连接server
        ip_port = ("127.0.0.1", 9002)  # server地址和端口号(最好是10000以后)
        client = socket.socket()  # 生成socket,连接server
        client.connect(ip_port)
        client.send(json.dumps(data).encode("utf-8"))
        rec = client.recv(1024000)
        result = rec.decode("gbk")
        client.close()
        return result
    @classmethod
    def buy(cls, code, count, price):
        data = {"type": 0,
                "data": json.dumps({"code": code, "count": count, "price": price})}
        result = cls.__send_msg(data)
        result = json.loads(result)
        print("华鑫下单结果", result)
        if result["code"] != 0:
            raise Exception(result.get("msg"))
        code_data = result["data"]
        securityId = code_data["securityId"]
        orderStatus = int(code_data["orderStatus"])
        accountId = code_data["accountId"]
        orderSysID = code_data["orderStatus"]
        if orderStatus == 7:
            raise Exception("交易所已拒绝")
        return securityId, accountId, orderSysID
    @classmethod
    def cancel_buy(cls, code, order_sys_id):
        data = {"type": 1,
                "data": json.dumps({"code": code, "order_sys_id": order_sys_id})}
        result = cls.__send_msg(data)
        result = json.loads(result)
        print("华鑫撤单结果", result)
        if result["code"] != 0:
            raise Exception(result.get("msg"))
        result = result["data"]
        if not result["cancel"]:
            raise Exception(result.get("errorMsg"))
    # 内容格式
    # {"securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID,
    #  "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
    #  "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
    #  "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
    #  "turnover": pOrderField.Turnover,
    #  "volume": pOrderField.VolumeTotalOriginal,
    #  "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
    #  "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg})
    @classmethod
    def list_delegate(cls):
        data = {"type": 2}
        result_str = cls.__send_msg(data)
        result_json = json.loads(result_str)
        if result_json["code"] != 0:
            raise Exception(result_json.get("msg"))
        return result_json["data"]
    # 内容格式
    # {"tradeID": pTradeField.TradeID, "securityID": pTradeField.SecurityID,
    #  "orderLocalID": pTradeField.OrderLocalID,
    #  "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID, "price": pTradeField.Price,
    #  "tradeTime": pTradeField.TradeTime,
    #  "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, "tradingDay": pTradeField.TradingDay,
    #  "PbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID}
    @classmethod
    def list_traded(cls):
        data = {"type": 3}
        result = cls.__send_msg(data)
        result_json = json.loads(result)
        if result_json["code"] != 0:
            raise Exception(result_json.get("msg"))
        return result_json["data"]
# 通过量下单,返回(代码,账号ID,订单号)
def order_volume(code, price, count):
    if not constant.TRADE_ENABLE:
        return
    if code.find("00") != 0 and code.find("60") != 0:
        raise Exception("只支持00开头与60开头的代码下单")
    start_time = time.time()
    try:
        securityId, accountId, orderSysID = TradeApi.buy(code, count, price)
        print("华鑫下单耗时", time.time() - start_time)
        logger_huaxin_trade.info(f"{code}:下单耗时{round(time.time() - start_time, 3)}s")
        TradeOrderIdManager.add_order_id(code, accountId, orderSysID)
        logger_huaxin_trade.info(f"{code}:下单成功 security_id:{securityId} ord_sys_id:{orderSysID}, accountId:{accountId}")
        return securityId, accountId, orderSysID
    except Exception as e:
        logger_huaxin_trade.info(f"{code}:下单失败:{str(e)}")
        raise e
# 撤单
def cancel_order(code):
    orders_info = TradeOrderIdManager.list_order_ids(code)
    if orders_info:
        logger_huaxin_trade.info(f"{code}:开始执行撤单")
        for order in orders_info:
            order_info = json.loads(order)
            # for i in range(3):
            try:
                TradeApi.cancel_buy(code, order_info[1])
            except Exception as e:
                # 状态异常表示已经撤单过了,不算无效撤单
                if str(e).find("状态异常") < 0:
                    raise e
            logger_huaxin_trade.info(f"{code}:撤单成功:{order_info}")
            TradeOrderIdManager.remove_order_id(code, order_info[0], order_info[1])
trade/l2_trade_factor.py
@@ -90,9 +90,15 @@
                score_index = i
                break
        self.score_index = score_index
        # 只要加入想买单的,全部执行主动买入一星方案
        if gpcode_manager.WantBuyCodesManager.is_in(code) and self.score_index >= 3:
            self.score_index = 2
        # 只要加入想买单的,全部执行主动买入二星方案
        if gpcode_manager.WantBuyCodesManager.is_in(code):
            self.score_index = 1
            self.is_want_buy = True
        else:
            self.is_want_buy = False
            # 没有加入想买单的,2星/3星将为1星
            if self.score_index == 0 or self.score_index == 1:
                self.score_index = 2
        self.volume_rate = volume_rate
        self.volume_rate_index = volume_rate_index
@@ -124,7 +130,8 @@
    # 获取时间计算范围,返回s
    def get_time_range(self):
        ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)]
        # ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)]
        ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)]
        if -1 < self.score_index < 3:
            return ts[0]
        volume_rate_index = self.volume_rate_index
@@ -174,16 +181,20 @@
    # 获取m值
    def get_m_val(self):
        if self.is_first_code:
            if self.buy_rank == 0:
                return 0, ""
            elif self.buy_rank < 4:
                return 1000 * 10000, ""
        # 获取固定m值
        zyltgb = global_util.zyltgb_map.get(self.code)
        if zyltgb is None:
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(self.code)
        if self.is_first_code:
            if self.buy_rank == 0:
                return 0, ""
            elif self.is_want_buy and zyltgb and zyltgb < 20 * 10000 * 10000:
                # 小于20亿的想买单
                return 500 * 10000, ""
            elif self.buy_rank < 4:
                return 1000 * 10000, ""
        base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb)
        rate = self.get_m_val_rate(self.volume_rate_index)
@@ -213,7 +224,7 @@
    # 获取撤销比例
    @staticmethod
    def get_cancel_rate(volume_rate_index):
        rates = [0.39, 0.49, 0.59, 0.69, 0.69, 0.79, 0.79, 0.79]
        rates = [0.34, 0.44, 0.54, 0.64, 0.74, 0.84, 0.94, 1.04]
        if volume_rate_index >= len(rates):
            volume_rate_index = -1
        return rates[volume_rate_index]
trade/l2_trade_util.py
@@ -92,4 +92,4 @@
if __name__ == "__main__":
    # add_to_forbidden_trade_codes("000977")
    WhiteListCodeManager.add_code("002019")
    WhiteListCodeManager.add_code("002240")
trade/trade_juejin.py
New file
@@ -0,0 +1,191 @@
"""
同花顺交易操作工具
"""
import json
import logging
import time
import gm.api as gmapi
import constant
import gpcode_manager
import tool
from db.redis_manager import RedisManager
from log import logger_juejin_trade
from utils import network_util
__context_dict = {}
account_id = "8099a935-a991-4871-977f-206c6d3e04ca"
token = "a2eed2b159e9238dc0353fc3e73734d7677f7baf"
gmapi.set_token(token)
# gmapi.set_account_id(account_id)
# 交易订单号管理
class TradeOrderIdManager:
    __redisManager = RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 添加订单ID
    @classmethod
    def add_order_id(cls, code, account_id, order_id):
        cls.__get_redis().sadd(f"juejin_order_id-{code}", json.dumps((account_id, order_id)))
        cls.__get_redis().expire(f"juejin_order_id-{code}", tool.get_expire())
    # 删除订单ID
    @classmethod
    def remove_order_id(cls, code, account_id, order_id):
        cls.__get_redis().srem(f"juejin_order_id-{code}", json.dumps((account_id, order_id)))
    # 查询所有的订单号
    @classmethod
    def list_order_ids(cls, code):
        return cls.__get_redis().smembers(f"juejin_order_id-{code}")
def init(context):
    __context_dict["init"] = context
    print("掘金交易初始化成功")
# 可用金额
def get_account_left_money():
    if "init" in __context_dict:
        dict_ = __context_dict["init"].account().cash
        return dict_["available"]
    return None
# 通过量下单,返回(代码,账号ID,订单号)
def order_volume(code, price, count):
    if not constant.TRADE_ENABLE:
        return
    if code.find("00") != 0 and code.find("60") != 0:
        raise Exception("只支持00开头与60开头的代码下单")
    code_str = code
    if code[0:2] == '00':
        code_str = f"SZSE.{code}"
    elif code[0:2] == '60':
        code_str = f"SHSE.{code}"
    start_time = time.time()
    results = gmapi.order_volume(code_str, count, gmapi.OrderSide_Buy, gmapi.OrderType_Limit, gmapi.PositionEffect_Open,
                                 price=price,
                                 order_duration=gmapi.OrderDuration_GFD, account=account_id)
    print("掘金下单耗时", time.time() - start_time)
    logger_juejin_trade.info(f"{code}:下单耗时{round(time.time() - start_time, 3)}s")
    if results:
        print("下单结果", results)
        result = results[0]
        if result["ord_rej_reason_detail"]:
            logger_juejin_trade.info(f"{code}:下单失败:{result['ord_rej_reason_detail']}")
            raise Exception(result["ord_rej_reason_detail"])
        else:
            TradeOrderIdManager.add_order_id(code, result["account_id"], result["cl_ord_id"])
            logger_juejin_trade.info(f"{code}:下单成功 ord_id:{result['cl_ord_id']}")
            return result["symbol"].split(".")[1], result["account_id"], result["cl_ord_id"]
    else:
        raise Exception("下单失败,无返回")
# 撤单
def cancel_order(code):
    orders_info = TradeOrderIdManager.list_order_ids(code)
    orders = []
    if orders_info:
        for order in orders_info:
            order_info = json.loads(order)
            orders.append({'cl_ord_id': order_info[1], 'account_id': order_info[0]})
    if orders:
        logger_juejin_trade.info(f"{code}:开始执行撤单")
        # 执行3次撤单
        for i in range(3):
            gmapi.order_cancel(orders)
        logger_juejin_trade.info(f"{code}:撤单成功,撤单数量:{len(orders)}")
        for order in orders:
            TradeOrderIdManager.remove_order_id(code, order["account_id"], order["cl_ord_id"])
# 撤单
def __cancel_order(account_id, cl_ord_id):
    orders = [{'cl_ord_id': cl_ord_id, 'account_id': account_id}]
    gmapi.order_cancel(orders)
def test():
    symbols = gpcode_manager.get_gp_list_with_prefix(["002531"])
    data = gmapi.get_instruments(symbols=",".join(symbols))
    print(data)
def run():
    print("启动读取掘金交易数据")
    # strategy_id = "e97a257e-1bba-11ed-a1b1-00e070c694ff"
    # token = "a2eed2b159e9238dc0353fc3e73734d7677f7baf"
    # gmapi.run(strategy_id, filename="trade.trade_juejin.py", mode=gmapi.MODE_LIVE, token=token)
    while True:
        try:
            if tool.is_trade_time():
                datas = get_execution_reports()
                # 上传数据
                fdatas = []
                for d in datas:
                    fdatas.append(
                        {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
                         "type": d[1] - 1})
                if fdatas:
                    network_util.send_socket_msg("127.0.0.1", 9001, {"type": 3, "data": fdatas})
        except Exception as e:
            logging.exception(e)
        # 2s更新
        time.sleep(1.5)
# 获取成交列表,返回的内容为:[(代码,买(1)/卖(2),量,价格,成交金额,订单ID,委托客户端ID,成交时间,成交日期)]
def get_execution_reports():
    gmapi.set_account_id(account_id)
    reports = gmapi.get_execution_reports()
    results = []
    for r in reports:
        if not r['ord_rej_reason_detail']:
            results.append(
                [r["symbol"].split(".")[1], r["side"], r["volume"], round(r["price"], 2), round(r["amount"], 2),
                 r["order_id"],
                 r["cl_ord_id"], r["created_at"].strftime("%H:%M:%S"), r["created_at"].strftime("%Y-%m-%d")])
    # 根据订单号合并数据
    temp_dict = {}
    for r in results:
        if r[5] not in temp_dict:
            temp_dict[r[5]] = r
        else:
            temp_dict[r[5]][2] += r[2]
            temp_dict[r[5]][4] += r[4]
    results = [temp_dict[k] for k in temp_dict]
    print("获取已成交数量:", len(results))
    return results
if __name__ == "__main__":
    datas = get_execution_reports()
    # 上传数据
    fdatas = []
    for d in datas:
        fdatas.append(
            {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
             "type": d[1] - 1})
    print(fdatas)
    network_util.send_socket_msg("127.0.0.1", 9001, {"type": 3, "data": fdatas})
    # print(order_volume("000566", 4.66, 100))
    # gmapi.set_token(token)
    # gmapi.set_account_id(account_id)
    # cancel_order("000566")
    # orders=[]
    # orders.append({'cl_ord_id':"3a691f3d-fdc7-11ed-838e-f4b5203f67bf", 'account_id': "8099a935-a991-4871-977f-206c6d3e04ca"})
    # gmapi.order_cancel(orders)
trade/trade_manager.py
@@ -7,8 +7,9 @@
import dask
import constant
from db import mysql_data, redis_manager
from trade import trade_data_manager, l2_trade_util
from trade import trade_data_manager, l2_trade_util, trade_juejin
import trade.trade_gui
import time as t
from l2 import l2_data_manager, l2_data_log
@@ -147,8 +148,7 @@
# 保存交易成功的数据
def save_trade_success_data(datas):
    day = datetime.datetime.now().strftime("%Y%m%d")
def save_trade_success_data(datas, day=datetime.datetime.now().strftime("%Y%m%d")):
    redis = __redis_manager.getRedis()
    time_str = tool.get_now_time_str()
    redis.setex("trade-success-latest-time", tool.get_expire(), time_str)
@@ -312,10 +312,15 @@
# 购买
@tool.async_call
# @tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    try:
        guiTrade.buy(code, price)
        if constant.JUEJIN_TRADE_ENABLE:
            # 每笔5000元
            count = (5000 // int(round(float(price) * 100))) * 100
            trade_juejin.order_volume(code, price, count)
        else:
            guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data, last_data_index)
    except Exception as e:
        __place_order_fail(code, trade_state)
@@ -357,7 +362,10 @@
        logger_trade.info("{}开始撤单".format(code))
        set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        logger_trade.info("{}撤单方法开始".format(code))
        guiTrade.cancel_buy(code)
        if constant.JUEJIN_TRADE_ENABLE:
            trade_juejin.cancel_order(code)
        else:
            guiTrade.cancel_buy(code)
        logger_trade.info("{}撤单方法结束".format(code))
        __cancel_success(code)
        try:
@@ -487,4 +495,6 @@
if __name__ == "__main__":
    set_trade_state("002351", TRADE_STATE_BUY_DELEGATED)
    price = 5
    r = (5000 // int(round(float(price) * 100))) * 100
    print(r)
utils/network_util.py
New file
@@ -0,0 +1,15 @@
import json
import socket
def send_socket_msg(ip, port, data, encoding="gbk"):
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketClient.connect((ip, port))
    # 连接socket
    try:
        socketClient.send(json.dumps(data).encode())
        recv = socketClient.recv(1024)
        result = str(recv, encoding=encoding)
        return result
    finally:
        socketClient.close()