Administrator
2023-03-15 68464c679ae5e1ae35e7e67e3b339ba0f939cbd3
选股宝板块优化
19个文件已修改
2个文件已添加
960 ■■■■■ 已修改文件
constant.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_first_screen_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 235 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_util.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 303 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block.py 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block_data_process.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/res/chromedriver.exe 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 83 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,7 +1,7 @@
# 是否为测试
TEST = False
# 是否允许交易
TRADE_ENABLE = True
TRADE_ENABLE = False
# 水下捞累计连续水下时间最小值
UNDER_WATER_PRICE_TIME_AS_SECONDS = 1200
# 大单金额(单位为百)
@@ -27,4 +27,4 @@
# L2监控的最低金额
L2_MIN_MONEY = 500000
# 每个L2设备的代码数量
L2_CODE_COUNT_PER_DEVICE = 8
L2_CODE_COUNT_PER_DEVICE = 6
data_export_util.py
@@ -196,6 +196,6 @@
if __name__ == "__main__":
    codes = ["600647"]
    codes = ["002119"]
    for code in codes:
        export_l2_excel(code)
gpcode_first_screen_manager.py
@@ -42,6 +42,11 @@
        redis.expire("first_no_screen_codes", tool.get_expire())
def clear_first_no_screen_codes():
    redis = __redisManager.getRedis()
    redis.delete("first_no_screen_codes")
def __remove_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
gpcode_manager.py
@@ -80,6 +80,15 @@
        return val.get(name)
    @classmethod
    def add_first_code_name(cls, code,name):
        val = cls.__get_redis().get("gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        val[name] = code
        cls.set_first_code_names(val)
    @classmethod
    def get_second_name_code(cls, name):
        val = cls.__get_redis().get("gp_list_names")
        if not val:
@@ -456,6 +465,14 @@
        redis_instance.setex(key, tool.get_expire(), "")
def clear_first_codes():
    redis_instance = __redisManager.getRedis()
    redis_instance.delete("gp_list_first")
    redis_instance.delete("gp_list_names_first")
    redis_instance.delete("first_code_record")
    redis_instance.delete("first_code_limited_up_record")
# 获取可以操作的位置
def get_can_listen_pos(client_id=0):
    client_ids = []
@@ -492,6 +509,21 @@
    return None, None
# 获取可以操作的位置
def get_free_listen_pos_count():
    client_ids = client_manager.getValidL2Clients()
    free_count = 0
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = redis_instance.keys(k)
        for key in keys:
            code = redis_instance.get(key)
            if not code:
                free_count += 1
    return free_count
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    redis_instance = __redisManager.getRedis()
@@ -523,7 +555,7 @@
def is_listen_full():
    clients = client_manager.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= 8 * len(clients)
    return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients)
# 是否正在操作
@@ -559,6 +591,6 @@
if __name__ == '__main__':
    print(get_code_name("603042"))
    print(get_free_listen_pos_count())
    print(get_name_code("华脉科技"))
    print(get_name_codes())
gui.py
@@ -7,6 +7,7 @@
import win32gui
import constant
import data_export_util
import multiprocessing
@@ -95,7 +96,7 @@
        clients = authority.get_l2_clients()
        for client_id in clients:
            self.l2_codes[client_id] = []
            for i in range(0, 8):
            for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                code = gpcode_manager.get_listen_code_by_pos(client_id, i)
                self.l2_codes[client_id].append(code)
@@ -293,7 +294,7 @@
                index += 1
            table.redraw()
        start_y = 160
        start_y = 225
        btn = Button(frame, text="刷新收盘价", command=refresh_close_price_data)
        btn.place(x=5, y=start_y)
@@ -371,7 +372,6 @@
            except:
                pass
            try:
                codes = self.thsBuy1VolumnManager.get_current_codes()
                count = 0
@@ -399,9 +399,6 @@
            except:
                pass
            # 获取有效的L2客户端数量
            l2_client_count = client_manager.getValidL2Clients()
            if len(l2_client_count) < 2:
@@ -421,7 +418,7 @@
                    pass
                time.sleep(2)
        start_y = 225
        start_y = 285
        btn = Button(frame, text="刷新状态", command=refresh_data)
        btn.place(x=10, y=start_y)
@@ -495,7 +492,7 @@
                else:
                    client_state[client_id].configure(text="(离线:未知IP)", foreground="#999999")
                for i in range(0, 8):
                for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                    code = gpcode_manager.get_listen_code_by_pos(client_id, i)
                    data_count = l2_data_util.get_l2_latest_data_number(code)
                    if data_count is None:
@@ -611,7 +608,7 @@
            check(self.selected_client)
        width = 800
        height = 290
        height = 360
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
        cl = Label(frame, text="L2采集状态", bg="#DDDDDD")
        cl.place(x=5, y=5)
@@ -658,19 +655,19 @@
            btn.place(x=5, y=35 + l2_client_count * 30)
            client_state_lb = Label(frame, text="(未知)", padx=0, pady=0, background="#DDDDDD", font=('微软雅黑', 8))
            client_state_lb.place(x=80, y=40 + l2_client_count * 30)
            client_state_lb.place(x=82, y=40 + l2_client_count * 30)
            client_state[key] = client_state_lb
            code_sv_map[key] = []
            code_labels[key] = []
            for i in range(0, 8):
            for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                sv = StringVar(value=self.l2_codes[key][i])
                code_sv_map[key].append(sv)
                cframe = Frame(frame, {"height": 23, "width": 70, "bg": "#FFFFFF"})
                cframe = Frame(frame, {"height": 23, "width": 80, "bg": "#FFFFFF"})
                code_label = Label(cframe, textvariable=sv, background="#FFFFFF", foreground="#FF0000")
                code_labels[key].append(code_label)
                code_label.place(x=0, y=0)
                cframe.place(x=200 + i * 75, y=40 + l2_client_count * 30)
                cframe.place(x=200 + i * 85, y=40 + l2_client_count * 30)
            l2_client_count += 1
        # 添加更新线程
        t1 = threading.Thread(target=lambda: update_data())
@@ -934,9 +931,13 @@
            pass
        # 禁止代码
        def forbidden_code(code_):
            l2_trade_util.forbidden_trade(code_)
            showinfo("提示","禁止成功")
        def cancel_order(code_):
            try:
                l2.l2_data_util.load_l2_data(code_, True)
                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code_, "手动撤销")
                showinfo("提示", "撤单成功")
            except Exception as e:
                showwarning("提示", "撤单成功异常" + str(e))
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
        frame.grid(row=2, column=2, rowspan=2, pady=5)
@@ -982,7 +983,7 @@
        btn = Button(frame, text="清空l2数据", command=lambda: clear_l2(code.get()))
        btn.place(x=150, y=130)
        btn = Button(frame, text="禁止交易", command=lambda: forbidden_code(code.get()))
        btn = Button(frame, text="撤销挂单", command=lambda: cancel_order(code.get()))
        btn.place(x=230, y=130)
        # 交易按钮
@@ -1009,7 +1010,7 @@
        self.__draw_trade_data(root)
        self.__draw_test(root)
        root.geometry("1120x600")
        root.geometry("1120x660")
        root.mainloop()
juejin.py
@@ -20,6 +20,7 @@
import constant
import global_data_loader
import global_util
import gpcode_first_screen_manager
import gpcode_manager
import threading
@@ -74,7 +75,16 @@
    # 9点25之前删除所有代码
    if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0:
        # 删除L2监听代码
        gpcode_manager.clear_listen_codes()
        # 删除首板代码
        gpcode_manager.clear_first_codes()
        # 删除首板未筛选代码
        gpcode_first_screen_manager.clear_first_no_screen_codes()
        # 删除禁止代码
        l2_trade_util.init_forbidden_trade_codes()
        # 清空白名单
        l2_trade_util.WhiteListCodeManager.clear()
    # TODO 删除所有首板代码
@@ -148,7 +158,7 @@
    # 初始化内容
    clients = authority.get_l2_clients()
    for client in clients:
        for i in range(0, 8):
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(client, i)
@@ -156,7 +166,7 @@
    # 初始化内容
    clients = authority.get_l2_clients()
    for c in clients:
        for i in range(0, 8):
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(int(c), i)
    codes = gpcode_manager.get_gp_list()
    result = JueJinManager.get_gp_latest_info(codes)
@@ -268,7 +278,7 @@
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 2:
    if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
        return
    now_str = tool.get_now_time_str()
    now_strs = now_str.split(":")
@@ -362,6 +372,19 @@
        for code in add_code_list:
            if not gpcode_manager.is_listen_old(code):
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        # 获取卡位数量
        free_count = gpcode_manager.get_free_listen_pos_count()
        if free_count < 2:
            # 空闲位置不足
            listen_codes = gpcode_manager.get_listen_codes()
            for code in listen_codes:
                if not gpcode_manager.is_in_gp_pool(code):
                    client_id, pos = gpcode_manager.get_listen_code_pos(code)
                    gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
                    free_count += 1
                    if free_count > 2:
                        break
        print(add_code_list, del_code_list)
@@ -586,5 +609,14 @@
if __name__ == '__main__':
    print(get_volumn("002115"))
    print(JueJinManager.get_lowest_price_rate("002713", 15))
    free_count = 0
    if free_count < 2:
        # 空闲位置不足
        listen_codes = gpcode_manager.get_listen_codes()
        for code in listen_codes:
            if not gpcode_manager.is_in_gp_pool(code):
                client_id, pos = gpcode_manager.get_listen_code_pos(code)
                gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
                free_count += 1
                if free_count > 2:
                    break
l2/cancel_buy_strategy.py
@@ -107,7 +107,7 @@
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
                    need_cancel=True):
        if start_index >= 217:
            print("进入调试")
@@ -200,6 +200,9 @@
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if is_first_code:
                            cancel_rate_threshold += 0.1
                            cancel_rate_threshold = round(cancel_rate_threshold, 2)
                        if cancel_num / max(buy_num, 1) > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
@@ -303,7 +306,8 @@
            cls.__getRedis().delete(key)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map):
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
                    is_first_code):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
@@ -349,6 +353,9 @@
            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
        else:
            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
        if is_first_code:
            cancel_rate_threshold += 0.1
            cancel_rate_threshold = round(cancel_rate_threshold,2)
        process_index = start_index
        try:
            for i in range(start_index, end_index + 1):
@@ -403,7 +410,8 @@
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map,
                                             MAX_EXPIRE_CANCEL_TIME=None):
        data = None
        try:
            data = total_data[index]
@@ -421,10 +429,14 @@
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        if MAX_EXPIRE_CANCEL_TIME and tool.trade_time_sub(cancel_data["val"]["time"],
                                                                          MAX_EXPIRE_CANCEL_TIME) > 0:
                            continue
                        canceled = True
                        count = data["re"] - cancel_data["re"]
                        if count > 0:
                            return count
                        cancel_index = cancel_data["index"]
                        break
            if not canceled:
                count = data["re"]
@@ -503,7 +515,11 @@
            if i <= process_index_old:
                continue
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                  local_today_num_operate_map,
                                                                  tool.trade_time_add_second(
                                                                      total_data[buy_exec_index]["val"]["time"],
                                                                      constant.S_CANCEL_EXPIRE_TIME))
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
l2/l2_data_manager_new.py
@@ -175,7 +175,7 @@
                # 加载历史数据,返回数据是否正常
                is_normal = l2.l2_data_util.load_l2_data(code)
                if not is_normal:
                    print("历史数据异常:",code)
                    print("历史数据异常:", code)
                    # 数据不正常需要禁止交易
                    l2_trade_util.forbidden_trade(code)
                # 纠正数据
@@ -224,6 +224,8 @@
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
            # 是否为首板代码
            is_first_code = gpcode_manager.FirstCodeManager.is_in_first_record(code)
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str,
@@ -235,10 +237,10 @@
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    cls.__process_order(code, start_index, end_index, capture_timestamp)
                    cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
                else:
                    # 未挂单
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code)
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
@@ -248,7 +250,7 @@
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
    def __process_not_order(cls, code, start_index, end_index, capture_time, is_first_code):
        __start_time = round(t.time() * 1000)
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
@@ -256,16 +258,16 @@
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code)
    # 测试专用
    @classmethod
    def process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        cls.__process_order(code, start_index, end_index, capture_time, new_add)
    def process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True):
        cls.__process_order(code, start_index, end_index, capture_time, is_first_code, new_add)
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
    def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True):
        # 计算安全笔数
        @dask.delayed
        def compute_safe_count():
@@ -312,7 +314,8 @@
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data)
                                                                                      end_index, total_data,
                                                                                      is_first_code)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
@@ -330,7 +333,7 @@
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    local_today_num_operate_map.get(
                                                                                        code))
                                                                                        code), is_first_code)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
@@ -398,7 +401,7 @@
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "已下单-撤单 耗时")
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "处理剩余数据 耗时")
            else:
@@ -412,14 +415,14 @@
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                          unreal_buy_info[0], is_first_code)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "已虚拟下单-执行真实下单 外部耗时")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
    def __buy(cls, code, capture_timestamp, last_data, last_data_index, is_first_code):
        __start_time = tool.get_now_timestamp()
        can, need_clear_data, reason = cls.__can_buy(code)
        can, need_clear_data, reason = cls.__can_buy(code, is_first_code)
        __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
@@ -428,8 +431,10 @@
        if not can:
            l2_log.debug(code, "不可以下单,原因:{}", reason)
            if need_clear_data:
                # 中断买入
                trade_manager.break_buy(code, reason)
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                    code)
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index,
                                                         local_today_datas.get(code))
            return
        else:
            l2_log.debug(code, "可以下单,原因:{}", reason)
@@ -451,6 +456,9 @@
    def __can_cancel(cls, code):
        if constant.TEST:
            return True, ""
        if l2_trade_util.WhiteListCodeManager.is_in(code):
            return False, "代码在白名单中"
        # 暂时注释掉
        # 14点后如果是板块老大就不需要取消了
        # now_time_str = tool.get_now_time_str()
@@ -480,14 +488,18 @@
    # 是否可以买
    # 返回是否可以买,是否需要清除之前的买入信息,原因
    @classmethod
    def __can_buy(cls, code):
    def __can_buy(cls, code, is_first_code):
        __start_time = t.time()
        # 判断是否为首板代码
        is_first = gpcode_manager.FirstCodeManager.is_in_first_record(code)
        if is_first:
        if is_first_code:
            # 首板代码且尚未涨停过的不能下单
            is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code)
            if not is_limited_up:
                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                    code)
                if place_order_count == 0:
                    trade_data_manager.placeordercountmanager.place_order(code)
                return False, True, "首板代码,且尚未涨停过"
        try:
@@ -504,7 +516,6 @@
            total_datas = local_today_datas[code]
            if total_datas[-1]["index"] + 1 > len(total_datas):
                return False, True, "L2数据错误"
            try:
                sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
@@ -524,30 +535,34 @@
                        elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                            buy_nums -= _val["num"] * total_datas[i]["re"]
                    if buy_nums < sell1_volumn * 0.49:
                        return False, True, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
                        return False, False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
            except Exception as e:
                logging.exception(e)
            # 量比超过1.3的不能买
            volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
            if volumn_rate >= 1.3:
                return False, True, "最大量比超过1.3不能买"
                return False, False, "最大量比超过1.3不能买"
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
                "14:30:00"):
                return False, True, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
            if limit_up_time is not None:
                limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time)
                if not is_first_code and limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second(
                        "13:00:00"):
                    return False, False, "二板下午涨停的不能买,涨停时间为{}".format(limit_up_time)
                if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second("14:55:00"):
                    return False, False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
            # 同一板块中老二后面的不能买
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, True, "没有获取到行业"
                return True, False, "没有获取到行业"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
                # 当老大老二当前没涨停
                return False, True, "同一板块中老三,老四,...不能买"
                return False, False, "同一板块中老三,老四,...不能买"
            if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
                # 水下捞且板块中的票小于16不能买
@@ -559,7 +574,7 @@
                    # 获取老大的市值
                    for c in codes_index:
                        if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                            return False, True, "水下捞,不是老大,且自由流通市值大于老大"
                            return False, False, "水下捞,不是老大,且自由流通市值大于老大"
            # 13:30后涨停,本板块中涨停票数<29不能买
            # if limit_up_time is not None:
@@ -649,6 +664,7 @@
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            is_first_code,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
@@ -721,7 +737,8 @@
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time,
                                    is_first_code, False)
            return
        if compute_index is not None:
@@ -763,7 +780,7 @@
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas,
                                                                                  total_datas, is_first_code,
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -774,17 +791,17 @@
                        # 执行撤单成功
                        pass
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, False)
                                                       compute_index, total_datas, is_first_code, False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 数据尚未处理完毕,进行下一步处理
                l2_log.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, is_first_code, False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  f"处理撤单步骤耗时,范围:{compute_index + 1}-{compute_end_index}", force=True)
@@ -892,7 +909,7 @@
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        # 目标订单数量
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code,is_first_code, place_order_count)
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code, is_first_code, place_order_count)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
@@ -988,154 +1005,6 @@
                         threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
        return None, buy_nums, buy_count, None, max_buy_num_set
    @classmethod
    def test(cls):
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        load_l2_data(code, True)
        _start = t.time()
        if True:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 1552, 1641, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 1552, 1641, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            print("处理时间", round((t.time() - _start) * 1000))
            return
        # 按s批量化数据
        total_datas = local_today_datas.get(code)
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        local_latest_datas[code] = []
        load_l2_data(code, True)
        _start = t.time()
        capture_timestamp = 1999999999
        cls.process(code, local_today_datas[code][1552:1641], capture_timestamp)
        print("时间花费:", round((t.time() - _start) * 1000))
        pass
    @classmethod
    def test2(cls):
        code = "002864"
        load_l2_data(code)
        limit_up_time_manager.load_limit_up_time()
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
            "14:55:00"):
            return False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
            # 水下捞且板块中的票小于21不能买
            if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                    industry) <= 16:
                return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
            if codes_index.get(code) != 0:
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:30后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 16:
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # ----此条注释-----
            # 如果老大已经买成功了,老二就不需要买了
            # first_codes = []
            # for key in codes_index:
            #     if codes_index.get(key) == 0:
            #         first_codes.append(key)
            #
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            # ----此条注释-----
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
            # ----此条注释-----
            return True, "老二可以下单"
    @classmethod
    def test3(cls):
        code = "002094"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod
    def test_can_buy(cls):
        code = "002923"
        load_l2_data(code, True)
        limit_up_time_manager.load_limit_up_time()
        can, msg = cls.__can_buy(code)
        print(can, msg)
if __name__ == "__main__":
l2/l2_data_util.py
@@ -133,7 +133,7 @@
        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
        except Exception as e:
@@ -142,7 +142,7 @@
# 设置最新的l2数据采集的数量
def __set_l2_data_latest_count(code, count):
def set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    redis.setex(key, 2, count)
@@ -172,12 +172,25 @@
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    count = data["count"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
    return day, client, channel, code, capture_time, process_time, data,count
# 元数据是否有差异
def is_origin_data_diffrent(data1, data2):
    if data1 is None or data2 is None:
        return True
    if len(data1) != len(data2):
        return True
    # 比较
    data_length = len(data1)
    step = len(data1) // 10
    for i in range(0, data_length, step):
        if json.dumps(data1[i]) != json.dumps(data2[i]):
            return True
    return False
# 是否为大单
l2/transaction_progress.py
@@ -61,15 +61,23 @@
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
        # 如果买1不为涨停价就不需要保存
        if queues == self.last_buy_queue_data.get(code):
        # 2个以上的数据才有处理价值
        if not queues or len(queues) < 2:
            return None
        # 如果买1不为涨停价就不需要保存
        old_queues = self.last_buy_queue_data.get(code)
        if old_queues and len(old_queues) == len(queues):
            # 元素相同就不需要再次处理
            old_str = ",".join([str(k) for k in old_queues[1:]])
            new_str = ",".join([str(k) for k in queues[1:]])
            if old_str == new_str:
                return None
        self.last_buy_queue_data[code] = queues
        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01:
            # 保存最近的涨停起始时间
            self.__save_latest_not_limit_up_time(code, buy_1_time)
            return None
        self.last_buy_queue_data[code] = queues
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
        # 忽略第一条数据
@@ -87,7 +95,8 @@
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = None
        if True:
            buyQueueBigTemp = buyQueueBig
            # 最多5个数据
            buyQueueBigTemp = buyQueueBig[:5]
            last_index, is_default = self.get_traded_index(code)
            c_last_index = 0
            if not is_default and last_index is not None:
@@ -143,5 +152,9 @@
        self.__save_buy_progress_index(code, index, False)
if __name__ == '__main':
    pass
if __name__ == '__main__':
    a = [1, 2, 3, 4]
    results = [str(k) for k in a]
    b = [1, 2, 3]
    result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b]))
    print(result)
l2_trade_test.py
@@ -15,10 +15,10 @@
import log
import tool
from db import redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress
from l2 import l2_log, l2_data_manager, transaction_progress, safe_count_manager
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager
from trade import trade_data_manager, l2_trade_factor
from trade.trade_queue_manager import THSBuy1VolumnManager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
@@ -73,7 +73,8 @@
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_,
                                                                              buy_queue_result_list, exec_time)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index, buy_progress_index,
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index,
                                                                                           buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
                                                                                               code),
                                                                                           l2.l2_data_util.local_today_num_operate_map.get(
@@ -84,9 +85,9 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002235"
        code = "000892"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -118,7 +119,8 @@
        l2.l2_data_util.local_today_num_operate_map[code].clear()
        print("id:", id(l2.l2_data_util.local_today_datas))
        # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
        safe_count_manager.BuyL2SafeCountManager.get_safe_count = mock.Mock(return_value=16)
        l2_trade_factor.L2TradeFactorUtil.compute_m_value = mock.Mock(return_value=(14699952, ""))
        # pos_list.insert(41,(225,306))
        # pos_list.insert(63, (345, 423))
        # pos_list.insert(66, (440, 447))
log.py
@@ -118,6 +118,10 @@
                   filter=lambda record: record["extra"].get("name") == "first_code_record",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("debug", "debug"),
                   filter=lambda record: record["extra"].get("name") == "debug",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -160,6 +164,8 @@
logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute")
logger_first_code_record = __mylogger.get_logger("first_code_record")
logger_debug = __mylogger.get_logger("debug")
class LogUtil:
@@ -367,7 +373,7 @@
if __name__ == '__main__':
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    codes = ["002757"]
    codes = ["603388"]
    for code in codes:
        export_logs(code)
ocr/ocr_util.py
@@ -9,7 +9,7 @@
# 图像识别类
class OcrUtil:
    __ocr = CnOcr()
    reader = easyocr.Reader(['en'], gpu=False)
    reader = easyocr.Reader(['ch_sim','en'], gpu=False)
    @classmethod
    def ocr(cls, mat):
@@ -48,5 +48,4 @@
if __name__ == "__main__":
    result = OcrUtil.ocr_num("D:/test1.png", "000977")
    print(result)
    print(re.match("首..注", "首版关注"))
server.py
@@ -13,6 +13,7 @@
import alert_util
import client_manager
import code_volumn_manager
import constant
import data_process
import global_data_loader
import global_util
@@ -28,13 +29,14 @@
import ths_industry_util
import ths_util
import tool
from third_data import hot_block_data_process
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -66,6 +68,7 @@
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    first_tick_datas = []
    latest_oringin_data = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -106,24 +109,16 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                # print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                            self.last_time[channel] = now_time
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(datas) > 0:
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
@@ -137,6 +132,16 @@
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
@@ -151,11 +156,11 @@
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
@@ -184,7 +189,10 @@
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                        if str(e).find("新增数据"):
                            pass
                        else:
                            logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
@@ -235,7 +243,6 @@
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        # {'code': '605300', 'limitUpPercent': '0009.99', 'price': '0020.14', 'time': '10:44:00', 'volume': '44529', 'volumeUnit': 2, 'zyltMoney': '0011.60', 'zyltMoneyUnit': 0}
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
@@ -250,24 +257,31 @@
                                else:
                                    temp_codes.append(code)
                                # data["price"]
                                tick_datas.append({"code": code, "price": data["price"], "volumn": data["volume"],
                                                   "volumnUnit": data["volumeUnit"]})
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        for code in new_add_codes:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(
                                    code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                                l2_trade_util.forbidden_trade(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            # 加入首板历史记录
                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
                            logger_first_code_record.info("新增首板:{}",new_add_codes)
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 获取60天最大记录
                            for code in new_add_codes:
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes = juejin.get_volumn(code)
                                    code_volumn_manager.set_histry_volumn(code,volumes[0],volumes[1])
                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        if temp_codes:
                            # 获取涨停价
@@ -284,7 +298,7 @@
                            if code in global_util.zyltgb_map:
                                continue
                            zyltgb_list.append(
                                {"code": code, "zyltgb": data["zyltMoney"], "zyltgb_unit": data["zyltMoneyUnit"]})
                                {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
                        if zyltgb_list:
                            ZYLTGBUtil.save_list(zyltgb_list)
                            global_data_loader.load_zyltgb()
@@ -307,6 +321,9 @@
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            if is_limit_up:
                                # 加入首板涨停
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
@@ -318,8 +335,6 @@
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                                    # 加入首板涨停
                                    gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
@@ -400,78 +415,96 @@
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    __start_time = time.time()
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    if buy_one_price is None:
                        print('买1价没有,', code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price is not None:
                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
                                                                        buy_queue)
                        if buy_queue_result_list:
                            # 有数据
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                # 获取执行位时间
                                exec_time = None
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    code)
                                if buy_exec_index:
                                    try:
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                            "time"]
                                    except:
                                        pass
                    try:
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
                        data = datas["data"]
                        buy_time = data["buyTime"]
                        buy_one_price = data["buyOnePrice"]
                        buy_one_volumn = data["buyOneVolumn"]
                        buy_queue = data["buyQueue"]
                        if buy_one_price is None:
                            print('买1价没有,', code)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                            buy_time,
                                                                            buy_queue)
                            if buy_queue_result_list:
                                raise  Exception("测试中断")
                                # 有数据
                                try:
                                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    exec_time = None
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                        code)
                                    if buy_exec_index:
                                        # 只有下单过后才获取交易进度
                                        try:
                                            exec_time = \
                                                l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                                    "time"]
                                        except:
                                            pass
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                                                                             buy_queue_result_list,
                                                                                             exec_time)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
                                                                                    code))
                                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                   buy_progress_index,
                                                                   json.dumps(buy_queue_result_list))
                                else:
                                    raise Exception("暂未获取到交易进度")
                            except Exception as e:
                                logging.exception(e)
                                print("买入队列", code, buy_queue_result_list)
                                logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                  json.dumps(buy_queue_result_list))
                                        buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                     buy_one_price_,
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                        buy_progress_index,
                                                                                        l2.l2_data_util.local_today_datas.get(
                                                                                            code),
                                                                                        l2.l2_data_util.local_today_num_operate_map.get(
                                                                                            code))
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                        else:
                                            raise Exception("暂未获取到交易进度")
                                except Exception as e:
                                    logging.exception(e)
                                    print("买入队列", code, buy_queue_result_list)
                                    logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                      json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                        # buy_queue是否有变化
                        if self.l2_trade_buy_queue_dict.get(
                                code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            self.buy1_price_manager.save(code, buy_one_price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            #if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                            self.l2_trade_buy_queue_dict[code] = buy_queue
                            logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                        # 保存最近的记录
                        if self.ths_l2_trade_queue_manager.save_recod(code, data):
                            if buy_time != "00:00:00":
                                logger_l2_trade_queue.info("{}-{}", code, data)
                                self.buy1_price_manager.save(code, buy_one_price)
                                need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                                   int(buy_one_volumn),
                                                                                                   buy_one_price)
                                # if need_cancel:
                                #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                                if need_sync:
                                    # 同步数据
                                    L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                        # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                        # print("L2买卖队列",datas)
                    except:
                        pass
                    finally:
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{}", code, space)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -484,14 +517,15 @@
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
                    print("二板现价")
                    # 获取暂存的二版现价数据
                    if datas and self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("现价数量", len(datas))
                        print("二板现价数量", len(datas))
                        for item in datas:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            volumn = item["volume"]
                            volumnUnit = item["volumeUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accept_prices(datas)
                elif type == 50:
@@ -519,12 +553,11 @@
                            # 保存数据
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            #if need_cancel:
                            # if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
@@ -554,11 +587,11 @@
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:8]
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[8:]
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, 8):
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
@@ -572,12 +605,67 @@
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    datas = data_process.parse(_str)["data"]
                    if datas:
                        hot_block_data_process.save_datas(datas)
                    print(datas)
                elif type == 201:
                    # 加入黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code,results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 202:
                    # 加入白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
                elif type == 203:
                    # 移除黑名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.remove_from_forbidden_trade_codes(code)
                    return_str = json.dumps({"code": 0})
                elif type == 204:
                    # 移除白名单
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.WhiteListCodeManager.remove_code(code)
                    return_str = json.dumps({"code": 0})
                elif type == 301:
                    # 黑名单列表
                    codes = l2_trade_util.BlackListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                elif type == 302:
                    # 黑名单列表
                    codes = l2_trade_util.WhiteListCodeManager.list_codes()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    return_str = json.dumps({"code": 0, "data": datas})
                sk.send(return_str.encode())
@@ -644,7 +732,7 @@
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("client-industry")
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
@@ -658,9 +746,8 @@
if __name__ == "__main__":
    try:
        a = round(float("0002.90"), 2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
    listen_codes = gpcode_manager.get_listen_codes()
    for lc in listen_codes:
        if not gpcode_manager.is_in_gp_pool(lc):
            # 移除代码
            l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
third_data/hot_block.py
@@ -1,7 +1,10 @@
"""
热门板块监听
"""
import datetime
import json
import logging
import socket
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
@@ -18,7 +21,11 @@
        print("----------------------")
        header = item.find_element(by=By.TAG_NAME, value="section").find_element(by=By.TAG_NAME, value="header")
        title = header.find_element(by=By.TAG_NAME, value="h3").text
        total_rate = header.find_element(by=By.TAG_NAME, value="span").text
        total_rate = None
        try:
            total_rate = header.find_element(by=By.TAG_NAME, value="span").text
        except:
            pass
        print(title, total_rate)
        contents = item.find_element(by=By.TAG_NAME, value="div").find_element(by=By.TAG_NAME,
                                                                               value="tbody").find_elements(
@@ -44,15 +51,43 @@
def get_hot_block(callback):
    # 先启动浏览器
    options = Options()
    chrome_path = "res/chromedriver.exe"
    options.add_argument("--disable-blink-features")
    options.add_argument("--disable-blink-features=AutomationControlled")
    driver = webdriver.Chrome(options=options)
    driver = webdriver.Chrome(chrome_path, options=options)
    driver.get("https://xuangubao.cn/top-gainer")
    time.sleep(5)
    while True:
        time.sleep(3)
        # 交易时间才识别
        time_str = datetime.datetime.now().strftime("%H%M%S")
        if int(time_str) < int("092500") or int(time_str) > int("150000"):
            continue
        if int("113000") < int(time_str) < int("130000"):
            continue
        try:
            result = __parseData(driver)
            callback(result)
        except Exception as e:
            logging.exception(e)
def upload_data(datas):
    client = socket.socket()  # 生成socket,连接server
    ip_port = ("192.168.3.252", 9001)  # server地址和端口号(最好是10000以后)
    client.connect(ip_port)
    data = {"type": 70, "data": datas}
    client.send(json.dumps(data).encode("gbk"))
    client.close()
# 打包命令
# cd D:\workspace\trade\third_data
# C:\Users\Administrator\AppData\Roaming\Python\Python37\Scripts\pyinstaller.exe hot_block.spec
if __name__ == "__main__":
    def callback(result):
        upload_data(result)
        pass
    get_hot_block(callback)
third_data/hot_block.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['hot_block.py'],
    pathex=[],
    binaries=[],
    datas=[('res','res')],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    cipher=block_cipher,
    noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='hot_block',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='hot_block',
)
third_data/hot_block_data_process.py
@@ -20,12 +20,12 @@
    for block in datas:
        codes = []
        for code_data in block[2]:
            code = code_data[0]
            code = code_data[0].split(".")[0]
            if code not in code_block_dict:
                code_block_dict[code] = set()
            code_block_dict[code].add(block[0])
            codes.append(code)
        block_codes_dict[block] = codes
        block_codes_dict[block[0]] = codes
    __save_block_codes(block_codes_dict)
    for key in code_block_dict:
        __save_code_block(key, code_block_dict[key])
@@ -33,7 +33,7 @@
# 保存代码所属板块
def __save_code_block(code, blocks):
    __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(blocks))
    __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
# 保存板块下的代码
@@ -62,3 +62,6 @@
    if block_codes:
        block_codes.get(block)
    return None
if __name__ == "__main__":
    print(get_code_blocks("600468"))
third_data/res/chromedriver.exe
Binary files differ
trade/l2_trade_util.py
@@ -5,30 +5,78 @@
__redis_manager = redis_manager.RedisManager(2)
class WhiteListCodeManager:
    __redis_manager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def add_code(cls, code):
        cls.__get_redis().sadd("white_list_codes", code)
        cls.__get_redis().expire("white_list_codes", tool.get_expire())
    @classmethod
    def remove_code(cls, code):
        cls.__get_redis().srem("white_list_codes", code)
    @classmethod
    def is_in(cls, code):
        return cls.__get_redis().sismember("white_list_codes", code)
    @classmethod
    def list_codes(cls):
        return cls.__get_redis().smembers("white_list_codes")
    @classmethod
    def clear(cls):
        cls.__get_redis().delete("white_list_codes")
class BlackListCodeManager:
    __redis_manager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def add_code(cls, code):
        cls.__get_redis().sadd("forbidden-trade-codes", code)
        cls.__get_redis().expire("forbidden-trade-codes", tool.get_expire())
    @classmethod
    def remove_code(cls, code):
        cls.__get_redis().srem("forbidden-trade-codes", code)
    @classmethod
    def is_in(cls, code):
        return cls.__get_redis().sismember("forbidden-trade-codes", code)
    @classmethod
    def list_codes(cls):
        return cls.__get_redis().smembers("forbidden-trade-codes")
    @classmethod
    def clear(cls):
        cls.__get_redis().delete("forbidden-trade-codes")
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
    BlackListCodeManager.clear()
    BlackListCodeManager.add_code("000000")
# 移除禁止交易代码
def remove_from_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.srem(key, code)
    BlackListCodeManager.remove_code(code)
# 添加代码到禁止交易
def add_to_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
    BlackListCodeManager.add_code(code)
# 禁止代码交易
@@ -39,10 +87,9 @@
def is_in_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
    return BlackListCodeManager.is_in(code)
if __name__ == "__main__":
    add_to_forbidden_trade_codes("000977")
    # add_to_forbidden_trade_codes("000977")
    WhiteListCodeManager.add_code("002019")
trade/trade_manager.py
@@ -124,7 +124,7 @@
    time_str = tool.get_now_time_str()
    mysqldb = mysql_data.Mysqldb()
    for data in datas:
        data["_id"] = "{}-{}-{}".format(day, data["code"], data["time"])
        data["_id"] = "{}-{}-{}".format(day, data["code"], data["apply_time"][:6])
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
        counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"]))
trade/trade_result_manager.py
@@ -82,3 +82,13 @@
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6)
if __name__ == "__main__":
    code = "600246"
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f2, f3, f4, f5, f6)