Administrator
2023-03-08 3cfa1332c0807a74b4ac5a2150500841f5299147
首板加入,安全笔数与H撤笔数优化
20个文件已修改
3个文件已添加
1562 ■■■■ 已修改文件
constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_first_screen_manager.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 261 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 110 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 116 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_server.py 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 159 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block_data_process.py 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 78 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_gui.py 277 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/win32_util.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -14,15 +14,17 @@
S_CANCEL_SECOND_RATE = 0.59
S_CANCEL_THIRD_RATE = 0.49
# s撤守护时间
S_CANCEL_EXPIRE_TIME = 30
S_CANCEL_EXPIRE_TIME = 60
# H撤比例
H_CANCEL_FIRST_RATE = 0.69
H_CANCEL_SECOND_RATE = 0.59
H_CANCEL_THIRD_RATE = 0.49
H_CANCEL_MIN_MONEY = 10000000
H_CANCEL_MIN_COUNT = 30
H_CANCEL_MIN_COUNT = 40
H_CANCEL_MIN_BIG_NUM_COUNT = 3
# L2监控的最低金额
L2_MIN_MONEY = 500000
# 每个L2设备的代码数量
L2_CODE_COUNT_PER_DEVICE = 8
data_export_util.py
@@ -196,6 +196,6 @@
if __name__ == "__main__":
    codes = ["002970"]
    codes = ["600647"]
    for code in codes:
        export_l2_excel(code)
gpcode_first_screen_manager.py
New file
@@ -0,0 +1,139 @@
"""
首板筛票机制
"""
# 设置首板未筛选的目标票
import json
import tool
from db import redis_manager
from third_data import hot_block_data_process
__redisManager = redis_manager.RedisManager(0)
def __get_redis():
    return __redisManager.getRedis()
# 保存首板票的数据
# 1.首次涨停时间
# 2.最近涨停时间
# 3.首次炸开时间
# 4.最近炸开时间
# 5.是否已经涨停
def __save_first_code_data(code, data):
    __redisManager.getRedis().setex(f"first_code_data-{code}", tool.get_expire(), json.dumps(data))
def __get_first_code_data(code):
    val = __get_redis().get(f"first_code_data-{code}")
    if val is None:
        return None
    return json.loads(val)
# 添加进首板未筛选票
def __add_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            redis.sadd("first_no_screen_codes", code)
        redis.expire("first_no_screen_codes", tool.get_expire())
def __remove_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            redis.srem("first_no_screen_codes", code)
def __get_first_no_screen_codes():
    codes = __get_redis().smembers("first_no_screen_codes")
    if not codes:
        return set()
    return codes
# 处理ticks数据
def process_ticks(prices):
    for price in prices:
        code = price["code"]
        time_ = price["time"]
        old_data = __get_first_code_data(code)
        if old_data is None:
            continue
        limit_up = price["limit_up"]
        last_limit_up = old_data[4]
        if not last_limit_up and limit_up:
            # 涨停
            old_data[1] = time_
        elif last_limit_up and not limit_up:
            # 炸开
            if not old_data[2]:
                old_data[2] = time_
            old_data[3] = time_
        old_data[4] = limit_up
        __save_first_code_data(code, old_data)
def set_target_no_screen_codes(codes):
    old_codes = get_target_no_screen_codes()
    if old_codes is None:
        old_codes = set()
    codes_set = set(codes)
    if old_codes == codes_set:
        return set()
    del_codes = old_codes - codes_set
    add_codes = codes_set - old_codes
    if del_codes:
        __remove_first_no_screen_codes(del_codes)
    if add_codes:
        # 添加进首板未选票
        __add_first_no_screen_codes(add_codes)
    return add_codes
# 获取首板未筛选的目标票
def get_target_no_screen_codes():
    return __get_first_no_screen_codes()
# 是否需要加入首板
def is_need_add_to_first(code):
    # 被禁止交易的票不能加入首板
    # 15个交易日的最低价与当前价比较,涨幅大于等于50%的需要剔除
    #
    pass
# 是否需要从L2监控卡位移除
# now_rate 当前涨幅
# is_limit_up 是否涨停
# limit_up_time 首次涨停时间
# latest_open_time 最近一次开板时间
def need_remove_from_l2_watch(code, now_rate, is_limit_up, limit_up_time, latest_open_time):
    if is_limit_up:
        # 处理当前是涨停状态
        if tool.trade_time_sub(tool.get_now_time_str(), "11:30:00") > 0:
            # 11:30 过后,10:30就涨停了的票,中途还未炸过的就需要剔除
            if latest_open_time is None and tool.trade_time_sub(limit_up_time, "10:30:00") <= 0:
                return True, "早上10点30前就涨停了,且11点30前未炸板"
    else:
        # 处理当前不是涨停状态
        if latest_open_time is not None:
            open_time_seconds = tool.trade_time_sub(tool.get_now_time_str(), latest_open_time)
            if open_time_seconds > 60 * 60:
                return True, "炸板后,60分钟内都未回封"
        if now_rate <= 6:
            return True, "炸板后,涨幅小于6%"
    # TODO 是否有同概念的票已经买入成功
    blocks = hot_block_data_process.get_code_blocks(code)
    if blocks and len(blocks) == 1:
        codes = hot_block_data_process.get_block_codes(blocks[0])
        if codes:
            pass
    return False, "首板炸开后,涨幅≤6%"
gpcode_manager.py
@@ -6,11 +6,144 @@
import time
import client_manager
import constant
from db import redis_manager
import tool
import decimal
__redisManager = redis_manager.RedisManager(0)
class CodesNameManager:
    redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    @classmethod
    def list_code_name_dict(cls):
        dict_ = {}
        val = cls.list_first_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        val = cls.list_second_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        return dict_
    @classmethod
    def list_first_code_name_dict(cls):
        val = cls.__get_redis().get("gp_list_names_first")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def list_second_code_name_dict(cls):
        val = cls.__get_redis().get("gp_list_names")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def get_first_code_name(cls, code):
        val = cls.__get_redis().get("gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
        return None
    @classmethod
    def get_second_code_name(cls, code):
        val = cls.__get_redis().get("gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
    @classmethod
    def get_first_name_code(cls, name):
        val = cls.__get_redis().get("gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    @classmethod
    def get_second_name_code(cls, name):
        val = cls.__get_redis().get("gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    # 设置首板代码名称
    @classmethod
    def set_first_code_names(cls, datas):
        cls.__get_redis().set("gp_list_names_first", json.dumps(datas))
    # 设置二板代码名称
    @classmethod
    def set_second_code_names(cls, datas):
        cls.__get_redis().set("gp_list_names", json.dumps(datas))
    # 删除首板代码名称
    @classmethod
    def clear_first_code_names(cls):
        cls.__get_redis().delete("gp_list_names_first")
    # 设置二板代码名称
    @classmethod
    def clear_second_code_names(cls):
        cls.__get_redis().delete("gp_list_names")
# 首板代码管理
class FirstCodeManager:
    redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 加入首板历史记录
    @classmethod
    def add_record(cls, codes):
        for code in codes:
            cls.__get_redis().sadd("first_code_record", code)
        cls.__get_redis().expire("first_code_record", tool.get_expire())
    @classmethod
    def is_in_first_record(cls, code):
        if cls.__get_redis().sismember("first_code_record", code):
            return True
        else:
            return False
    # 加入首板涨停过代码集合
    @classmethod
    def add_limited_up_record(cls, codes):
        for code in codes:
            cls.__get_redis().sadd("first_code_limited_up_record", code)
        cls.__get_redis().expire("first_code_limited_up_record", tool.get_expire())
    # 是否涨停过
    @classmethod
    def is_limited_up(cls, code):
        if cls.__get_redis().sismember("first_code_limited_up_record", code):
            return True
        else:
            return False
def __parse_codes_data(code_datas):
@@ -28,64 +161,106 @@
    return codes, name_codes
# -------------------------------二板代码管理---------------------------------
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    redis_instance.delete("gp_list")
    redis_instance.delete("gp_list_names")
    CodesNameManager.clear_second_code_names()
    for d in codes:
        redis_instance.sadd("gp_list", d)
    redis_instance.set("gp_list_names", json.dumps(name_codes))
    CodesNameManager.set_second_code_names(name_codes)
# 新增代码
def add_gp_list(code_datas):
    if len(code_datas) > 200:
        raise Exception("不能超过200个数据")
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    for d in codes:
        redis_instance.sadd("gp_list", d)
    old_name_codes = get_name_codes()
    old_name_codes = CodesNameManager.list_second_code_name_dict()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    redis_instance.set("gp_list_names", json.dumps(old_name_codes))
    CodesNameManager.set_second_code_names(old_name_codes)
# -------------------------------首板代码管理-------------------------------
# 添加首板代码
# code_datas 掘金返回的数据
def set_first_gp_codes_with_data(code_datas):
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    codes_set = set()
    for code in codes:
        codes_set.add(code)
    old_codes_set = redis_instance.smembers("gp_list_first")
    if old_codes_set is None:
        old_codes_set = set()
    del_set = old_codes_set - codes_set
    add_codes = codes_set - old_codes_set
    for code in add_codes:
        redis_instance.sadd("gp_list_first", code)
    for code in del_set:
        redis_instance.srem("gp_list_first", code)
    redis_instance.expire("gp_list_first", tool.get_expire())
    old_name_codes = CodesNameManager.list_first_code_name_dict()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    CodesNameManager.set_first_code_names(old_name_codes)
# 移除首板代码
def remove_first_gp_code(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.srem("gp_list_first", code)
# 获取首板代码
def get_first_gp_codes():
    redis_instance = __redisManager.getRedis()
    return redis_instance.smembers("gp_list_first")
# 是否在首板里面
def is_in_first_gp_codes(code):
    redis_instance = __redisManager.getRedis()
    return redis_instance.sismember("gp_list_first", code)
# 获取名称对应的代码
def get_name_code(name):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val.get(name)
    code = CodesNameManager.get_second_name_code(name)
    if code is not None:
        return code
    code = CodesNameManager.get_first_name_code(name)
    return code
def get_code_name(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    for key in val:
        if val[key] == code:
            return key
    return None
    name = CodesNameManager.get_second_code_name(code)
    if name is not None:
        return name
    name = CodesNameManager.get_first_code_name(code)
    return name
def get_name_codes():
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val
    return CodesNameManager.list_code_name_dict()
# 涨停犁碑坳
# 涨停数据保存
def set_limit_up_list(gpset):
    if gpset is None:
        return
@@ -108,16 +283,26 @@
def rm_gp(code):
    redis_instance = __redisManager.getRedis()
    redis_instance.srem("gp_list", code)
    remove_first_gp_code([code])
def is_in_gp_pool(code):
    redis_instance = __redisManager.getRedis()
    return redis_instance.sismember("gp_list", code)
    return redis_instance.sismember("gp_list", code) or is_in_first_gp_codes(code)
def get_gp_list():
    redis_instance = __redisManager.getRedis()
    return redis_instance.smembers("gp_list")
    codes = redis_instance.smembers("gp_list")
    first_codes = get_first_gp_codes()
    return set.union(codes, first_codes)
# 获取二板代码
def get_second_gp_list():
    redis_instance = __redisManager.getRedis()
    codes = redis_instance.smembers("gp_list")
    return codes
def get_gp_list_with_prefix(data=None):
@@ -263,6 +448,14 @@
    redis_instance.expire(key, tool.get_expire())
# 清除所有监听代码
def clear_listen_codes():
    redis_instance = __redisManager.getRedis()
    keys = redis_instance.keys("listen_code-*-*")
    for key in keys:
        redis_instance.setex(key, tool.get_expire(), "")
# 获取可以操作的位置
def get_can_listen_pos(client_id=0):
    client_ids = []
@@ -278,8 +471,10 @@
        random.shuffle(keys)
        codes = []
        for key in keys:
            index = key.split("-")[-1]
            if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
                continue
            result = redis_instance.get(key)
            if result is None or len(result) == 0:
                return client_id, int(key.replace("listen_code-{}-".format(client_id), ""))
            else:
@@ -364,8 +559,6 @@
if __name__ == '__main__':
    _start = time.time()
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format("603786"))
    print(json.loads(val))
    print((time.time() - _start) * 1000)
    print(get_code_name("603042"))
    print(get_name_code("华脉科技"))
    print(get_name_codes())
gui.py
@@ -16,6 +16,7 @@
import settings
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from trade import l2_trade_util
from trade.l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
@@ -64,8 +65,7 @@
def createOCRServer():
    print("create OCRServer")
    laddr = "", 9002
    tcpserver = ocr_server.MyThreadingTCPServer(laddr, ocr_server.MyBaseRequestHandle)
    tcpserver = ocr_server.run("", 9002)
    tcpserver.serve_forever()
@@ -90,6 +90,7 @@
        self.codeActualPriceProcessor = CodeActualPriceProcessor()
        # L2显示
        self.l2_codes = {}
        self.selected_client = {}
        # 获取l2的客户端列表
        clients = authority.get_l2_clients()
        for client_id in clients:
@@ -292,36 +293,36 @@
                index += 1
            table.redraw()
        start_y = 160
        btn = Button(frame, text="刷新收盘价", command=refresh_close_price_data)
        btn.place(x=5, y=150)
        btn.place(x=5, y=start_y)
        sv_num = StringVar(value="获取到收盘价数量:未知")
        cl = Label(frame, textvariable=sv_num, bg="#DDDDDD", fg="#666666")
        cl.place(x=5, y=180)
        cl.place(x=5, y=start_y + 30)
        btn = Button(frame, text="重新获取收盘价", command=re_get_close_price)
        btn.place(x=150, y=150)
        btn.place(x=130, y=start_y)
        btn = Button(frame, text="今日涨停", command=get_limit_up_codes_win)
        btn.place(x=300, y=150)
        btn.place(x=250, y=start_y)
        trade_win_datas = []
        # draw_trade_buy_win(360, 140)
        table_width = 300
        table_height = 90
        table_height = 95
        _frame = Frame(frame, {"height": table_height, "width": table_width, "bg": "#DDDDDD"})
        table = tkintertable.TableCanvas(_frame, data={"row0": {'代码': '', '涨幅': '', '窗口句柄': ''}}, read_only=True,
                                         width=table_width, height=table_height, thefont=('微软雅黑', 9), cellwidth=100,
                                         rowheaderwidth=20)
        table.show()
        _frame.place(x=380-12, y=130)
        _frame.place(x=450, y=start_y)
        refresh_trade_buy_win_data()
        refresh_close_price_data()
        btn = Button(frame, text="刷新", command=refresh_trade_buy_win_data,height=1)
        btn.place(x=730-12, y=130)
        btn = Button(frame, text="刷新", command=refresh_trade_buy_win_data, height=1)
        btn.place(x=450 - 35, y=start_y)
        def re_distribute_buy_win():
            try:
@@ -332,7 +333,7 @@
                showerror("分配出错", str(e))
        btn = Button(frame, text="重新分配窗口", command=re_distribute_buy_win, height=1)
        btn.place(x=730-12, y=165)
        btn.place(x=450 - 83, y=start_y + 30)
    # 绘制交易状态
    def __draw_trade_state(self, frame):
@@ -356,6 +357,20 @@
                cl_codes.configure(text="{}/{}".format(juejin_length, codes_length), foreground="#008000")
            except Exception as e:
                pass
            # 获取板块状态
            try:
                ths_dead = client_manager.getTHSState(7)
                if ths_dead is None:
                    cl_block.configure(text="离线", foreground="#FF7F27")
                elif ths_dead:
                    normal = False
                    cl_block.configure(text="离线", foreground="#FF7F27")
                else:
                    cl_block.configure(text="在线", foreground="#008000")
            except:
                pass
            try:
                codes = self.thsBuy1VolumnManager.get_current_codes()
@@ -384,6 +399,9 @@
            except:
                pass
            # 获取有效的L2客户端数量
            l2_client_count = client_manager.getValidL2Clients()
            if len(l2_client_count) < 2:
@@ -403,7 +421,7 @@
                    pass
                time.sleep(2)
        start_y = 230
        start_y = 225
        btn = Button(frame, text="刷新状态", command=refresh_data)
        btn.place(x=10, y=start_y)
@@ -422,24 +440,29 @@
        cl_queue.place(x=100, y=y_)
        cl = Label(frame, text="交易窗口状态:", bg="#DDDDDD")
        cl.place(x=200, y=y_)
        cl.place(x=170, y=y_)
        cl_win = Label(frame, text="未知", bg="#DDDDDD")
        cl_win.place(x=300, y=y_)
        cl_win.place(x=270, y=y_)
        cl = Label(frame, text="板块状态:", bg="#DDDDDD")
        cl.place(x=320, y=y_)
        cl_block = Label(frame, text="未知", bg="#DDDDDD")
        cl_block.place(x=380, y=y_)
        cl = Label(frame, text="掘金代码回调数量:", bg="#DDDDDD")
        cl.place(x=350, y=y_)
        cl.place(x=300, y=y_ + 20)
        cl_codes = Label(frame, text="未知", bg="#DDDDDD")
        cl_codes.place(x=450, y=y_)
        cl_codes.place(x=410, y=y_ + 20)
        cl = Label(frame, text="买1代码数量:", bg="#DDDDDD")
        cl.place(x=500, y=y_)
        cl.place(x=10, y=y_ + 20)
        cl_buy_1 = Label(frame, text="未知", bg="#DDDDDD")
        cl_buy_1.place(x=580, y=y_)
        cl_buy_1.place(x=10 + 90, y=y_ + 20)
        cl = Label(frame, text="现价代码数量:", bg="#DDDDDD")
        cl.place(x=620, y=y_)
        cl.place(x=170, y=y_ + 20)
        cl_price_count = Label(frame, text="未知", bg="#DDDDDD")
        cl_price_count.place(x=700, y=y_)
        cl_price_count.place(x=170 + 100, y=y_ + 20)
        refresh_data()
        # 添加更新线程
@@ -487,8 +510,7 @@
                    else:
                        code_labels[client_id][i].configure(foreground="#999999")
        def check(event):
            client = (event.widget["command"])
        def check(client):
            msg_list = []
            try:
                result = get_client_env_state(client)
@@ -568,6 +590,26 @@
        def set_accept_l2():
            settings.set_accept_l2(accept_l2.get())
        def pop_menu(event):
            self.selected_client = event.widget["command"]
            menu.post(event.x_root, event.y_root)
        def ths_test_speed():
            if self.selected_client is None:
                showwarning("警告", "未选中客户端")
                return
            try:
                server.repair_ths_main_site(self.selected_client)
                showinfo("提示", "同花顺测速完成")
            except Exception as e:
                showerror("错误", str(e))
        def check_env():
            if self.selected_client is None:
                showwarning("警告", "未选中客户端")
                return
            check(self.selected_client)
        width = 800
        height = 290
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
@@ -598,18 +640,26 @@
        code_sv_map = {}
        code_labels = {}
        client_state = {}
        # 右键菜单
        menu = Menu(frame,
                    tearoff=False,
                    # bg="black",
                    )
        menu.add_command(label="环境检测", command=check_env)
        menu.add_command(label="同花顺测速", command=ths_test_speed)
        for key in self.l2_codes:
            client_lb = Label(frame, text="设备:{}".format(key), background="#DDDDDD")
            client_lb.place(x=38, y=40 + l2_client_count * 30)
            btn = Button(frame, text="检测", command=key)
            btn.bind('<Button-1>', check)
            btn.bind('<Button-3>', lambda event: pop_menu(event))
            btn.place(x=5, y=35 + l2_client_count * 30)
            client_state_lb = Label(frame, text="(未知)", background="#DDDDDD", font=('微软雅黑', 8))
            client_state_lb = Label(frame, text="(未知)", padx=0, pady=0, background="#DDDDDD", font=('微软雅黑', 8))
            client_state_lb.place(x=80, y=40 + l2_client_count * 30)
            client_state[key] = client_state_lb
            code_sv_map[key] = []
            code_labels[key] = []
            for i in range(0, 8):
@@ -883,6 +933,11 @@
            self.gs_gui_pipe.send(json.dumps({"type": "clear_l2", "data": {"code": code}}))
            pass
        # 禁止代码
        def forbidden_code(code_):
            l2_trade_util.forbidden_trade(code_)
            showinfo("提示","禁止成功")
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
        frame.grid(row=2, column=2, rowspan=2, pady=5)
        btntext = StringVar()
@@ -927,6 +982,9 @@
        btn = Button(frame, text="清空l2数据", command=lambda: clear_l2(code.get()))
        btn.place(x=150, y=130)
        btn = Button(frame, text="禁止交易", command=lambda: forbidden_code(code.get()))
        btn.place(x=230, y=130)
        # 交易按钮
        btn = Button(frame, textvariable=btntext, command=startJueJinGui)
        btn.place(x=10, y=160)
juejin.py
@@ -72,6 +72,12 @@
    # 载入量
    global_data_loader.load_volumn()
    # 9点25之前删除所有代码
    if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0:
        gpcode_manager.clear_listen_codes()
    # TODO 删除所有首板代码
# 每日初始化
def everyday_init():
@@ -152,8 +158,8 @@
    for c in clients:
        for i in range(0, 8):
            gpcode_manager.init_listen_code_by_pos(int(c), i)
    codes = gpcode_manager.get_gp_list();
    result = JueJinManager.get_gp_latest_info(codes);
    codes = gpcode_manager.get_gp_list()
    result = JueJinManager.get_gp_latest_info(codes)
    for item in result:
        sec_level = item['sec_level']
        symbol = item['symbol']
@@ -255,7 +261,11 @@
# 获取到现价
def accept_prices(prices):
    # 获取首板代码
    first_codes = gpcode_manager.get_first_gp_codes()
    print("价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 2:
@@ -278,6 +288,8 @@
            pricePre = gpcode_manager.get_price_pre(code)
            if pricePre is not None:
                rate = round((price - pricePre) * 100 / pricePre, 2)
                if first_codes and code in first_codes:
                    rate = rate / 2
                if rate >= 0:
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code))
@@ -297,8 +309,9 @@
                except Exception as e:
                    logging.exception(e)
        # -------------------------------处理交易位置分配---------------------------------
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(0), e.__getitem__(1)), reverse=True)
        # 预填充下单代码
        _buy_win_codes = []
        for d in new_code_list:
@@ -311,40 +324,46 @@
            logging.exception(e)
            pass
        # -------------------------------处理L2监听---------------------------------
        client_ids = client_manager.getValidL2Clients()
        # 最多填充的代码数量
        max_count = len(client_ids) * 8
        max_count = len(client_ids) * constant.L2_CODE_COUNT_PER_DEVICE
        if max_count == 0:
            max_count = 8
            max_count = constant.L2_CODE_COUNT_PER_DEVICE
        _delete_list = []
        for item in new_code_list:
            if l2_trade_util.is_in_forbidden_trade_codes(item[1]) or item[0] < 0:
                _delete_list.append(item)
        for item in _delete_list:
            new_code_list.remove(item)
        # 截取前几个代码填充
        add_list = new_code_list[:max_count]
        # 后面的代码全部删除
        _delete_list.extend(new_code_list[max_count:])
        add_code_list = []
        del_list = []
        del_code_list = []
        for d in add_list:
            add_code_list.append(d[1])
        for d in _delete_list:
            del_list.append(d[1])
            del_code_list.append(d[1])
        # 后面的代码数量
        # 先删除应该删除的代码
        for code in del_list:
        for code in del_code_list:
            if gpcode_manager.is_listen_old(code):
                # 判断是否在监听里面
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        # 增加应该增加的代码
        for code in add_code_list:
            if not gpcode_manager.is_listen_old(code):
                if not l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            else:
                if l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        print(add_code_list, del_list)
        print(add_code_list, del_code_list)
def on_bar(context, bars):
@@ -393,8 +412,8 @@
        t1.setDaemon(True)
        t1.start()
    @staticmethod
    def get_gp_latest_info(codes):
    @classmethod
    def get_gp_latest_info(cls, codes):
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
        gmapi.set_token(token)
@@ -402,8 +421,8 @@
        print(data)
        return data
    @staticmethod
    def get_now_price(codes):
    @classmethod
    def get_now_price(cls, codes):
        data = JueJinManager.get_gp_current_info(codes)
        prices = []
        for item in data:
@@ -413,8 +432,8 @@
        return prices
    # 获取代码的涨幅
    @staticmethod
    def get_codes_limit_rate(codes):
    @classmethod
    def get_codes_limit_rate(cls, codes):
        datas = JueJinManager.get_gp_latest_info(codes)
        pre_price_dict = {}
        for data in datas:
@@ -434,14 +453,41 @@
        f_results.reverse()
        return f_results
    @staticmethod
    def get_gp_current_info(codes):
    @classmethod
    def get_history_tick_n(cls, code, count):
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix([code])
        gmapi.set_token(token)
        results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count)
        return results
    @classmethod
    def get_lowest_price_rate(cls, code, count):
        datas = cls.get_history_tick_n(code, count)
        low_price = datas[0]["close"]
        for data in datas:
            if low_price > data["close"]:
                low_price = data["close"]
        return (datas[-1]["close"] - low_price) / low_price
    @classmethod
    def get_gp_current_info(cls, codes):
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
        gmapi.set_token(token)
        data = gmapi.current(symbols=",".join(symbols))
        print(data)
        return data
    @classmethod
    def get_gp_codes_names(cls, codes):
        datas = cls.get_gp_latest_info(codes)
        results = {}
        for data in datas:
            code = data["symbol"].split(".")[1]
            code_name = data['sec_name']
            results[code] = code_name
        return results
    def start(self):
        account_id, s_id, token = getAccountInfo()
@@ -507,9 +553,30 @@
    return _fresult
# 获取近90天的最大量与最近的量
def get_volumn(code) -> object:
    end = datetime.datetime.now()
    account_id, s_id, token = getAccountInfo()
    gmapi.set_token(token)
    gmapi.set_account_id(account_id)
    results = gmapi.history_n(symbol=gpcode_manager.get_gp_list_with_prefix([code])[0], frequency="1d",
                              count=60,
                              fields="volume",
                              end_time=end)
    if not results:
        return None
    yes_volume = results[-1]["volume"]
    max_volume = results[0]["volume"]
    for result in results:
        volumn = int(result["volume"])
        if volumn > max_volume:
            max_volume = volumn
    return (max_volume, yes_volume)
# 根据涨幅高低分配交易窗口
def distribute_buy_win():
    if tool.trade_time_sub(tool.get_now_time_str(),"09:30:00") > 0:
    if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
        raise Exception("只能9:30之前重新分配窗口")
    datas = JueJinManager.get_codes_limit_rate(gpcode_manager.get_gp_list())
@@ -519,4 +586,5 @@
if __name__ == '__main__':
    distribute_buy_win()
    print(get_volumn("002115"))
    print(JueJinManager.get_lowest_price_rate("002713", 15))
l2/cancel_buy_strategy.py
@@ -109,15 +109,17 @@
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    need_cancel=True):
        if start_index == 375:
        if start_index >= 217:
            print("进入调试")
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
        if tool.trade_time_sub(total_data[start_index]["val"]["time"],
                               total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) >  constant.S_CANCEL_EXPIRE_TIME:
        if tool.trade_time_sub(total_data[end_index]["val"]["time"],
                               total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
@@ -258,8 +260,6 @@
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    # 保存成交进度
    @classmethod
    def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
@@ -298,7 +298,7 @@
    @classmethod
    def __clear_data(cls, code):
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}",f"h_cancel_traded_progress-{code}"]
              f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}"]
        for key in ks:
            cls.__getRedis().delete(key)
@@ -306,12 +306,12 @@
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >=  constant.S_CANCEL_EXPIRE_TIME - 1:
        if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
            # 开始计算需要监控的单
            cls.__compute_watch_indexs_after_exec(code, buy_exec_index, total_data, local_today_num_operate_map)
        # 守护30s以外的数据
        if time_space <=  constant.S_CANCEL_EXPIRE_TIME:
        if time_space <= constant.S_CANCEL_EXPIRE_TIME:
            return False, None
            # 获取成交进度
        origin_progress_index, latest_progress_index = cls.__get_traded_progress(code)
@@ -386,7 +386,9 @@
    def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        cls.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time, total_data[buy_exec_index]["val"]["time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time,
                                                                               total_data[buy_exec_index]["val"][
                                                                                   "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = cls.__get_traded_progress(code)
@@ -494,6 +496,9 @@
        finished = False
        big_num_count = big_num_count_old
        total_count = total_count_old
        # H撤单
        MIN_H_COUNT = l2_trade_factor.L2TradeFactorUtil.get_h_cancel_min_count(code)
        for i in range(buy_exec_index, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
@@ -511,7 +516,7 @@
                    big_num_count += data["re"]
                # 判断是否达到阈值
                if total_count >= constant.H_CANCEL_MIN_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                if total_count >= MIN_H_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                    finished = True
                    l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
@@ -814,7 +819,7 @@
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >=  constant.S_CANCEL_EXPIRE_TIME:
                    if exec_time_offset >= constant.S_CANCEL_EXPIRE_TIME:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
l2/l2_data_manager_new.py
@@ -172,8 +172,12 @@
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                l2.l2_data_util.load_l2_data(code)
                # 加载历史数据,返回数据是否正常
                is_normal = l2.l2_data_util.load_l2_data(code)
                if not is_normal:
                    print("历史数据异常:",code)
                    # 数据不正常需要禁止交易
                    l2_trade_util.forbidden_trade(code)
                # 纠正数据
                datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas)
                _start_index = 0
@@ -296,9 +300,9 @@
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            if constant.TEST:
                return None, ""
            return cancel_data, cancel_msg
            # 买1不会触发撤单
            return None, ""
            # return cancel_data, cancel_msg
        # S撤
        @dask.delayed
@@ -324,7 +328,9 @@
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data, local_today_num_operate_map.get(code))
                                                                                    end_index, total_data,
                                                                                    local_today_num_operate_map.get(
                                                                                        code))
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
@@ -413,7 +419,7 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        __start_time = tool.get_now_timestamp()
        can, reason = cls.__can_buy(code)
        can, need_clear_data, reason = cls.__can_buy(code)
        __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
@@ -421,7 +427,7 @@
        if not can:
            l2_log.debug(code, "不可以下单,原因:{}", reason)
            if not reason.startswith("买1价不为涨停价"):
            if need_clear_data:
                # 中断买入
                trade_manager.break_buy(code, reason)
            return
@@ -472,9 +478,18 @@
        return True, ""
    # 是否可以买
    # 返回是否可以买,是否需要清除之前的买入信息,原因
    @classmethod
    def __can_buy(cls, code):
        __start_time = t.time()
        # 判断是否为首板代码
        is_first = gpcode_manager.FirstCodeManager.is_in_first_record(code)
        if is_first:
            # 首板代码且尚未涨停过的不能下单
            is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code)
            if not is_limited_up:
                return False, True, "首板代码,且尚未涨停过"
        try:
            # 买1价格必须为涨停价才能买
            # buy1_price = cls.buy1PriceManager.get_price(code)
@@ -487,6 +502,10 @@
            #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
            # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
            total_datas = local_today_datas[code]
            if total_datas[-1]["index"] + 1 > len(total_datas):
                return False, True, "L2数据错误"
            try:
                sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
                l2_log.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
@@ -505,30 +524,30 @@
                        elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                            buy_nums -= _val["num"] * total_datas[i]["re"]
                    if buy_nums < sell1_volumn * 0.49:
                        return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
                        return False, True, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
            except Exception as e:
                logging.exception(e)
            # 量比超过1.3的不能买
            volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
            if volumn_rate >= 1.3:
                return False, "最大量比超过1.3不能买"
                return False, True, "最大量比超过1.3不能买"
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
                "14:30:00"):
                return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
                return False, True, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
            # 同一板块中老二后面的不能买
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
                return True, True, "没有获取到行业"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
                # 当老大老二当前没涨停
                return False, "同一板块中老三,老四,...不能买"
                return False, True, "同一板块中老三,老四,...不能买"
            if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
                # 水下捞且板块中的票小于16不能买
@@ -540,7 +559,7 @@
                    # 获取老大的市值
                    for c in codes_index:
                        if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                            return False, "水下捞,不是老大,且自由流通市值大于老大"
                            return False, True, "水下捞,不是老大,且自由流通市值大于老大"
            # 13:30后涨停,本板块中涨停票数<29不能买
            # if limit_up_time is not None:
@@ -576,7 +595,7 @@
            #     if global_util.industry_hot_num.get(industry) < 29:
            #         return False, "老二,本板块中涨停票数<29不能买"
            # 可以下单
            return True, None
            return True, False, None
        finally:
            l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "是否可以下单计算")
@@ -672,8 +691,9 @@
            return None
        # 开始计算的位置
        start_process_index = min(buy_single_index, compute_start_index) if new_get_single else max(buy_single_index,
                                                                                                    compute_start_index)
        start_process_index = max(buy_single_index, compute_start_index)
        if new_get_single:
            start_process_index = buy_single_index
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, start_process_index,
@@ -859,6 +879,7 @@
        _start_time = t.time()
        total_datas = local_today_datas[code]
        is_first_code = gpcode_manager.FirstCodeManager.is_in_first_record(code)
        buy_nums = origin_num
        buy_count = origin_count
@@ -869,14 +890,15 @@
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        # 目标订单数量
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code,is_first_code, place_order_count)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        if place_order_count > 3:
            place_order_count = 3
        # 间隔最大时间依次为:3,9,27,81
@@ -926,7 +948,8 @@
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i],
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code,
                                                                                                     total_datas[i],
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None:
@@ -1039,8 +1062,8 @@
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
            "14:55:00"):
            return False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
l2/l2_data_util.py
@@ -49,6 +49,9 @@
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        data_normal = True
        if datas and len(datas) < datas[-1]["index"] + 1:
            data_normal = False
        # 从数据库加载
        # datas = []
@@ -63,6 +66,8 @@
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        return data_normal
    return True
# 将数据根据num-operate分类
@@ -488,6 +493,4 @@
if __name__ == "__main__":
    cha = [0, 2, 4]
    std_result = numpy.std(cha)
    print(std_result)
    print(load_l2_data("002235"))
l2/safe_count_manager.py
@@ -69,21 +69,24 @@
        self.__getRedis().delete(key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
    def __get_base_save_count(self, code, is_first):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code, is_first)
    # 获取最后的安全笔数
    def get_safe_count(self, code):
    def get_safe_count(self, code, is_first_code, place_order_count=None):
        rate = self.__get_rate(code)
        # 第4次下单按第一次算
        if place_order_count and place_order_count >= 3:
            rate = 1
        print("--------------------------------")
        print("安全笔数比例:", rate)
        print("--------------------------------")
        count = self.__get_base_save_count(code)
        count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
        count = round(count * rate)
        if count < 8:
            count = 8
        if count > 21:
            count = 21
        if count < min_count:
            count = min_count
        if count > max_count:
            count = max_count
        return count
    # 计算留下来的比例
l2_data_util.py
@@ -200,7 +200,8 @@
# 保存l2最新数据的大小
@async_call
# TODO 测试数据
# @async_call
def save_l2_latest_data_number(code, num):
    redis = l2_data_manager._redisManager.getRedis()
    redis.setex("l2_latest_data_num-{}".format(code), 3, num)
l2_trade_test.py
@@ -86,7 +86,7 @@
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002117"
        code = "002235"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
limit_up_time_manager.py
@@ -9,9 +9,6 @@
_redisManager = redis_manager.RedisManager(0)
def save_limit_up_time(code, time):
    _time = get_limit_up_time(code)
    if _time is None:
log.py
@@ -114,6 +114,10 @@
                   filter=lambda record: record["extra"].get("name") == "buy_win_distibute",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("first_code", "first_code_record"),
                   filter=lambda record: record["extra"].get("name") == "first_code_record",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -155,7 +159,7 @@
logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute")
logger_first_code_record = __mylogger.get_logger("first_code_record")
class LogUtil:
@@ -363,7 +367,7 @@
if __name__ == '__main__':
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    codes = ["002946"]
    codes = ["002757"]
    for code in codes:
        export_logs(code)
ocr/ocr_server.py
@@ -1,56 +1,34 @@
import json
import logging
import socketserver
import socket
from http.server import BaseHTTPRequestHandler
import cv2
import ths_industry_util
from ocr import ocr_util
from ocr.ocr_util import OcrUtil
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
        self.pipe_ui = pipe_ui
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
class OCRServer(BaseHTTPRequestHandler):
    ocr_temp_data = {}
    def setup(self):
        super().setup()
    def do_GET(self):
        path = self.path
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write("".encode())
    def handle(self):
        host = self.client_address[0]
        super().handle()  # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
        # print("-------handler方法被执行----")
        # print(self.server)
        # print(self.request)  # 服务
        # print("客户端地址:", self.client_address)  # 客户端地址
        # print(self.__dict__)
        # print("- " * 30)
        # print(self.server.__dict__)
        # print("- " * 30)
        sk: socket.socket = self.request
        # 设置非阻塞
        sk.setblocking(False)
        data = bytes()
        while True:
            try:
                temp_data = sk.recv(1024)
                if not temp_data:
                    break
                data += temp_data
            except Exception as e:
                break
        _str = str(data, encoding="gbk")
        # print("OCR SERVER 内容:", _str[0:20], "......", _str[-150:-1])
        return_str = "OK"
    def do_POST(self):
        path = self.path
        params = self.__parse_request()
        params = params["data"]
        result_str = self.__process(params)
        self.__send_response(result_str)
    def __process(self, _str):
        return_str = ""
        try:
            data = ""
            try:
@@ -81,17 +59,84 @@
                                mat[r][c] = [datas[r * cols + c]]
                        # cv2.imwrite("D:/test.png", mat)
                        ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, key)
                        if not ocr_results:
                            # 多重识别,防止识别出错
                            ocr_results = ocr_util.OcrUtil.ocr_num(mat, key)
                        # 图像识别
                        return_str = json.dumps({"code": 0, "data": {"datas": ocr_results}})
                    else:
                        return_str = json.dumps({"code": 2, "msg": "数据出错"})
                else:
                    return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
            elif type == 101:
                data = data["data"]
                matId = data["matId"]
                index = data["index"]
                maxIndex = data["maxIndex"]
                cols = data["width"]
                rows = data["height"]
                datas = data["data"]
                if self.ocr_temp_data.get(matId) is None:
                    self.ocr_temp_data[matId] = []
                self.ocr_temp_data[matId].extend(datas)
                if maxIndex == index:
                    # 数据传输完成
                    datas = self.ocr_temp_data[matId]
                    if rows * cols == len(datas):
                        self.ocr_temp_data.pop(matId)
                        mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
                        for r in range(0, rows):
                            for c in range(0, cols):
                                mat[r][c] = [datas[r * cols + c]]
                        # cv2.imwrite("D:/test.png", mat)
                        ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, ".")
                        code_name = ""
                        for res in ocr_results:
                            code_name += res[0]
                        # TODO 根据代码名称获取代码
                        code = ths_industry_util.get_code_by_name(code_name)
                        # 图像识别
                        return_str = json.dumps({"code": 0, "data": {"code": code}})
                    else:
                        return_str = json.dumps({"code": 2, "msg": "数据出错"})
                else:
                    return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
        except Exception as e:
            logging.exception(e)
            if str(e).__contains__("json解析失败"):
                logging.error("OCR数据JSON解析解析失败")
                return_str = json.dumps({"code": -1, "msg": str(e)})
        sk.send(return_str.encode())
        return return_str
    def finish(self):
        super().finish()
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        start = 0
        while True:
            start = _str.find("Content-Disposition: form-data;", start + 1)
            if start <= 0:
                break
            name_start = start + len("Content-Disposition: form-data;")
            name_end = _str.find("\r\n\r\n", start)
            val_end = _str.find("------", name_end)
            key = _str[name_start:name_end].strip()[6:-1]
            val = _str[name_end:val_end].strip()
            params[key] = val
        return params
def run(addr, port):
    handler = OCRServer
    httpd = socketserver.TCPServer((addr, port), handler)
    print("HTTP server is at: http://%s:%d/" % (addr, port))
    httpd.serve_forever()
ocr/ocr_util.py
@@ -2,12 +2,14 @@
import time
import cv2
import easyocr
from cnocr import CnOcr
# 图像识别类
class OcrUtil:
    __ocr = CnOcr()
    reader = easyocr.Reader(['en'], gpu=False)
    @classmethod
    def ocr(cls, mat):
@@ -28,3 +30,23 @@
                                         (int(ps[2][0]), int(ps[2][1])), (int(ps[3][0]), int(ps[3][1]))]))
        print("识别时间", time.time() - start)
        return res_final
    @classmethod
    def ocr_num(cls, mat, key):
        start = time.time()
        res = cls.reader.readtext(mat, detail=1, text_threshold=0.6, mag_ratio=1.5)
        res_final = []
        if res:
            for r in res:
                text = r[1]
                if re.match(key, text):
                    ps = r[0]
                    res_final.append((text, [(int(ps[0][0]), int(ps[0][1])), (int(ps[1][0]), int(ps[1][1])),
                                             (int(ps[2][0]), int(ps[2][1])), (int(ps[3][0]), int(ps[3][1]))]))
        print("数字识别时间", time.time() - start)
        return res_final
if __name__ == "__main__":
    result = OcrUtil.ocr_num("D:/test1.png", "000977")
    print(result)
server.py
@@ -16,6 +16,7 @@
import data_process
import global_data_loader
import global_util
import gpcode_first_screen_manager
import gpcode_manager
import authority
import juejin
@@ -27,13 +28,13 @@
import ths_industry_util
import ths_util
import tool
from trade import trade_gui, trade_data_manager, trade_manager
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -46,6 +47,9 @@
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
# 首板tick级数据
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
@@ -61,6 +65,7 @@
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    first_tick_datas = []
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -104,10 +109,11 @@
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                            _str)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                #print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                # print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                            self.last_time[channel] = now_time
@@ -179,6 +185,7 @@
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list, is_add = data_process.parseGPCode(_str)
@@ -222,6 +229,101 @@
                        # 获取是否有涨停时间
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 22:
                    try:
                        if int(tool.get_now_time_str().replace(":", "")) < int("092600"):
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        # {'code': '605300', 'limitUpPercent': '0009.99', 'price': '0020.14', 'time': '10:44:00', 'volume': '44529', 'volumeUnit': 2, 'zyltMoney': '0011.60', 'zyltMoneyUnit': 0}
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
                        tick_datas = []
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                                else:
                                    temp_codes.append(code)
                                # data["price"]
                                tick_datas.append({"code": code, "price": data["price"], "volumn": data["volume"],
                                                   "volumnUnit": data["volumeUnit"]})
                        # 保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        for code in new_add_codes:
                            if (not l2_trade_util.is_in_forbidden_trade_codes(code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
                                l2_trade_util.forbidden_trade(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
                            # 加入首板历史记录
                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
                            logger_first_code_record.info("新增首板:{}",new_add_codes)
                            # 获取60天最大记录
                            for code in new_add_codes:
                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                    volumes = juejin.get_volumn(code)
                                    code_volumn_manager.set_histry_volumn(code,volumes[0],volumes[1])
                        if temp_codes:
                            # 获取涨停价
                            juejin.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                        # 保存自由流通股本
                        zyltgb_list = []
                        for data in dataList:
                            code = data["code"]
                            if code in global_util.zyltgb_map:
                                continue
                            zyltgb_list.append(
                                {"code": code, "zyltgb": data["zyltMoney"], "zyltgb_unit": data["zyltMoneyUnit"]})
                        if zyltgb_list:
                            ZYLTGBUtil.save_list(zyltgb_list)
                            global_data_loader.load_zyltgb()
                            # 保存现价
                        self.first_tick_datas.clear()
                        self.first_tick_datas.extend(tick_datas)
                        # 首板数据加工
                        prices = []
                        for data in dataList:
                            code = data["code"]
                            price = data["price"]
                            limit_up_time = data["time"]
                            if limit_up_time == "00:00:00":
                                limit_up_time = None
                            if code not in limit_up_price_dict:
                                continue
                            is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            pricePre = gpcode_manager.get_price_pre(code)
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
                                {"code": code, "time": limit_up_time, "rate": rate,
                                 "limit_up": is_limit_up})
                            if code in new_add_codes:
                                if is_limit_up:
                                    place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                                    # 加入首板涨停
                                    gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
                        pass
                elif type == 3:
                    # 交易成功信息
@@ -277,7 +379,17 @@
                elif type == 4:
                    # 行业代码信息
                    dataList = data_process.parseList(_str)
                    ths_industry_util.save_industry_code(dataList)
                    codes = []
                    for datas in dataList:
                        for d in datas:
                            name = ths_industry_util.get_name_by_code(d['code'])
                            if not name or name == 'None':
                                codes.append(d["code"])
                    # 根据代码获取代码名称
                    codes_name = {}
                    if codes:
                        codes_name = juejin.JueJinManager.get_gp_codes_names(codes)
                    ths_industry_util.save_industry_code(dataList, codes_name)
                elif type == 6:
                    # 可用金额
                    datas = data_process.parseData(_str)
@@ -313,13 +425,17 @@
                                    code)
                                if buy_exec_index:
                                    try:
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"]
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
                                            "time"]
                                    except:
                                        pass
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_, buy_queue_result_list,exec_time)
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                                                                             buy_queue_result_list,
                                                                                             exec_time)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code,buy_time,buy_exec_index, buy_progress_index,
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
@@ -348,8 +464,8 @@
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            #if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
@@ -367,14 +483,17 @@
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print("现价数量", len(data))
                        for item in data:
                    datas = data_process.parse(_str)["data"]
                    # 获取暂存的二版现价数据
                    if datas and self.first_tick_datas:
                        datas.extend(self.first_tick_datas)
                    if datas is not None:
                        print("现价数量", len(datas))
                        for item in datas:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accept_prices(data)
                        juejin.accept_prices(datas)
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
@@ -400,8 +519,8 @@
                            # 保存数据
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            #if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
@@ -453,6 +572,12 @@
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 201:
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                    return_str = json.dumps({"code": 0})
                sk.send(return_str.encode())
@@ -515,7 +640,7 @@
# 同步目标标的到同花顺
def sync_target_codes_to_ths():
    codes = gpcode_manager.get_gp_list()
    codes = gpcode_manager.get_second_gp_list()
    code_list = []
    for code in codes:
        code_list.append(code)
third_data/hot_block_data_process.py
New file
@@ -0,0 +1,64 @@
"""
热门板块数据处理
"""
import json
import tool
from db import redis_manager
__redisManager = redis_manager.RedisManager(0)
def __get_redis():
    return __redisManager.getRedis()
# 保存数据
def save_datas(datas):
    code_block_dict = {}
    block_codes_dict = {}
    for block in datas:
        codes = []
        for code_data in block[2]:
            code = code_data[0]
            if code not in code_block_dict:
                code_block_dict[code] = set()
            code_block_dict[code].add(block[0])
            codes.append(code)
        block_codes_dict[block] = codes
    __save_block_codes(block_codes_dict)
    for key in code_block_dict:
        __save_code_block(key, code_block_dict[key])
# 保存代码所属板块
def __save_code_block(code, blocks):
    __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(blocks))
# 保存板块下的代码
def __save_block_codes(block_codes):
    __get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes))
def __get_block_codes():
    val = __get_redis().get(f"blocks_codes")
    if val is None:
        return None
    return json.loads(val)
# 获取代码板块
def get_code_blocks(code):
    val = __get_redis().get(f"code_blocks-{code}")
    if val is None:
        return None
    return json.loads(val)
# 获取板块下的所有代码
def get_block_codes(block):
    block_codes = __get_block_codes()
    if block_codes:
        block_codes.get(block)
    return None
ths_industry_util.py
@@ -107,30 +107,54 @@
# 保存单个代码的行业
def __save_code_industry(code, industry_name, zyltgb, zyltgb_unit):
def __save_code_industry(code, code_name, industry_name, zyltgb, zyltgb_unit):
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from ths_industry_codes where _id={}".format(code))
    if result is None:
        mysqldb.execute(
            "insert into ths_industry_codes(_id,second_industry,zyltgb,zyltgb_unit) values('{}','{}','{}',{})".format(
                code, industry_name, zyltgb, zyltgb_unit, round(time.time() * 1000)))
            "insert into ths_industry_codes(_id,_name, second_industry,zyltgb,zyltgb_unit) values('{}','{}','{}','{}',{})".format(
                code, code_name, industry_name, zyltgb, zyltgb_unit, round(time.time() * 1000)))
    else:
        mysqldb.execute(
            "update ths_industry_codes set second_industry='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
                industry_name, zyltgb, zyltgb_unit, code))
        if code_name:
            mysqldb.execute(
                "update ths_industry_codes set _name='{}', second_industry='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
                    code_name, industry_name, zyltgb, zyltgb_unit, code))
        else:
            mysqldb.execute(
                "update ths_industry_codes set second_industry='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
                     industry_name, zyltgb, zyltgb_unit, code))
# 保存行业代码
def save_industry_code(datasList):
def save_industry_code(datasList, code_names):
    for datas in datasList:
        # 查询这批数据所属行业
        industry_name = __get_industry(datas)
        _list = []
        for data in datas:
            # 保存
            __save_code_industry(data["code"], industry_name, data["zyltgb"], data["zyltgb_unit"])
            code = data["code"]
            __save_code_industry(code, code_names.get(code), industry_name, data["zyltgb"], data["zyltgb_unit"])
# 根据名称获取代码
def get_code_by_name(name):
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from ths_industry_codes where _name='{}'".format(name))
    if result is not None:
        return result[0]
    else:
        return None
def get_name_by_code(code):
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from ths_industry_codes where _id={}".format(code))
    if result is not None:
        return result[1]
    else:
        return None
if __name__ == "__main__":
    _code_map, _industry_map = get_code_industry_maps()
    print(_code_map, _industry_map)
trade/l2_trade_factor.py
@@ -5,6 +5,7 @@
# l2交易因子
import big_money_num_manager
import constant
import global_data_loader
import global_util
import limit_up_time_manager
@@ -210,36 +211,66 @@
            zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb is None:
                print("没有获取到自由流通市值")
                return 10000000
                return 10000000, ""
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate, msg = cls.compute_rate_by_code(code)
        # print("m值获取:", code, round(zyltgb * rate))
        return round(zyltgb * rate), msg
    @classmethod
    def get_h_cancel_min_count(cls, code):
        volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
        if volumn_day60_max is None or volumn_yest is None or volumn_today is None:
            return constant.H_CANCEL_MIN_COUNT
        rate = round(int(volumn_today) / max(int(volumn_day60_max), int(volumn_yest)), 2)
        counts = [40, 36, 32, 28, 24, 20, 16]
        rates = [0.3, 0.55, 0.8, 1.05, 1.3, 1.55, 10]
        for index in range(0,len(rates)):
            if rate < rates[index]:
                return counts[index]
        return counts[0]
    # 获取安全笔数
    @classmethod
    def get_safe_buy_count(cls, code):
    def get_safe_buy_count(cls, code, is_first=False):
        gb = cls.get_zyltgb(code)
        return cls.get_safe_buy_count_by_gp(gb, is_first)
    @classmethod
    def get_safe_buy_count_by_gp(cls, gb, is_first=False):
        MIN_VAL = 8
        MAX_VAL = 16
        if is_first:
            MIN_VAL = 5
            MAX_VAL = 13
        if not gb:
            # 默认8笔
            return 8
            return MIN_VAL
        count = gb // 100000000
        if count <= 6:
            count = 8
        elif count < 32:
            count = round(8 + 0.5 * (count - 6))
        else:
            count = 21
        # volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
        # rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
        # 取大单影响值与行业影响值的较大值
        # count = round(count * (1 - rate))
        if count < 8:
            count = 8
        elif count > 21:
            count = 21
        return count
        if True:
            if count < 8:
                count = MIN_VAL
            elif count < 11:
                count = MIN_VAL + 1
            elif count < 14:
                count = MIN_VAL + 2
            elif count < 17:
                count = MIN_VAL + 3
            elif count < 20:
                count = MIN_VAL + 4
            elif count < 23:
                count = MIN_VAL + 5
            elif count < 26:
                count = MIN_VAL + 6
            elif count < 29:
                count = MIN_VAL + 7
            else:
                count = MAX_VAL
        if count < MIN_VAL:
            count = MIN_VAL
        elif count > MAX_VAL:
            count = MAX_VAL
        return count, MIN_VAL, MAX_VAL
# l2因子归因数据
@@ -259,10 +290,15 @@
if __name__ == "__main__":
    print(L2TradeFactorUtil.get_safe_buy_count("003005"))
    # print(L2TradeFactorUtil.get_safe_buy_count("003005"))
    # print(L2TradeFactorUtil.get_rate_factors("003004"))
    # print(L2TradeFactorUtil.factors_to_string("003004"))
    print(L2TradeFactorUtil.get_safe_buy_count("002864"))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 4))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 7))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 10))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 16))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 22))
    print(L2TradeFactorUtil.get_safe_buy_count_by_gp(100000000 * 31))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00"))
trade/l2_trade_util.py
@@ -45,4 +45,4 @@
if __name__ == "__main__":
    add_to_forbidden_trade_codes("605133")
    add_to_forbidden_trade_codes("000977")
trade/trade_gui.py
@@ -16,6 +16,7 @@
from db import redis_manager
from log import *
from tool import async_call
from utils import win32_util
class THSGuiTrade(object):
@@ -28,16 +29,16 @@
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.buy_lock = threading.RLock()
            cls.__instance.buy_cancel_lock = threading.RLock()
            cls.__instance.buy_cancel_locks = {}
            cls.__instance.buy_win_list = cls.get_buy_wins()
            print("交易窗口", cls.__instance.buy_win_list)
            cls.__instance.using_buy_wins = set()
            cls.__instance.cancel_win = cls.__instance.getCancelBuyWin()
            cls.__instance.cancel_wins = cls.__instance.getCancelBuyWins()
        return cls.__instance
    # 刷新窗口句柄
    def refresh_hwnds(self):
        self.cancel_win = self.__instance.getCancelBuyWin()
        self.cancel_wins = self.__instance.getCancelBuyWins()
        self.buy_win_list = self.get_buy_wins()
    # 打开交易环境
@@ -67,11 +68,11 @@
            raise Exception("下单窗口最低需要10个")
        # 检测撤单窗口
        cancel_trade_win = cls.getCancelBuyWin()
        if cancel_trade_win <= 0:
        cancel_trade_wins = cls.getCancelBuyWins()
        if len(cancel_trade_wins) <= 0:
            raise Exception("委托撤销窗口未打开")
        else:
            pos = win32gui.GetWindowRect(cancel_trade_win)
            pos = win32gui.GetWindowRect(cancel_trade_wins[0])
            width = pos[2] - pos[0]
            height = pos[3] - pos[1]
            if width <= 0 or height <= 0:
@@ -109,7 +110,24 @@
    # 获取撤单窗口
    @classmethod
    def getCancelBuyWin(cls):
    def getCancelBuyWin(cls, code=None):
        # 获取代码位分配的下单窗口
        if code:
            trade_win = THSBuyWinManagerNew.get_distributed_code_win(code)
            if trade_win and win32gui.IsWindowVisible(trade_win):
                top_win = win32_util.get_top_window(trade_win)
                if cls.getText(top_win) == '专业版下单' and win32gui.IsWindowVisible(top_win):
                    return top_win
        wins = cls.getCancelBuyWins()
        if wins:
            return wins[0]
        else:
            return 0
    @classmethod
    def getCancelBuyWins(cls):
        cancel_buy_wins = set()
        hWndList = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
@@ -121,10 +139,10 @@
                        width = pos[2] - pos[0]
                        height = pos[3] - pos[1]
                        if width > 200 and height > 200:
                            return hwnd
                            cancel_buy_wins.add(hwnd)
                except:
                    pass
        return 0
        return list(cancel_buy_wins)
    def input_number(self, hwnd, num_str):
        for i in range(10):
@@ -327,13 +345,10 @@
                        pass
        return 0
    def __get_code_input(self):
        win = self.cancel_win
        if win <= 0 or not win32gui.IsWindowVisible(win):
            self.cancel_win = self.getCancelBuyWin()
            win = self.cancel_win
            if win <= 0:
                raise Exception("无法找到取消委托窗口")
    def __get_code_input(self, code=None):
        win = self.getCancelBuyWin(code)
        if win <= 0:
            raise Exception("无法找到取消委托窗口")
        t = time.time()
        print(t)
        start = int(round(t * 1000))
@@ -352,9 +367,15 @@
    def cancel_buy(self, code):
        if not constant.TRADE_ENABLE:
            return
        self.buy_cancel_lock.acquire()
        main_win = self.getCancelBuyWin(code)
        if main_win <= 0:
            raise Exception("尚未找到交易窗口")
        if main_win not in self.buy_cancel_locks:
            self.buy_cancel_locks[main_win] = threading.RLock()
        self.buy_cancel_locks[main_win].acquire()
        logger_trade_gui.info("开始获取撤单控件:code-{}".format(code))
        code_input, win = self.__get_code_input()
        code_input, win = self.__get_code_input(code)
        try:
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            start = int(round(time.time() * 1000))
@@ -390,7 +411,7 @@
            logger_trade_gui.info("撤单成功:code-{} 耗时:{}".format(code, end - start))
            time.sleep(0.03)
        finally:
            self.buy_cancel_lock.release()
            self.buy_cancel_locks[main_win].release()
            # 清空代码框
            self.input_number(code_input, "")
            # 再次清除代码框
@@ -415,32 +436,43 @@
    @async_call
    def refresh_data(self):
        # 获取到专业下单页面
        win = self.getCancelBuyWin()
        child_win = None
        refresh_btn = None
        for i in range(0, 20):
            child_win = win32gui.FindWindowEx(win, child_win, "#32770", None)
            if not child_win:
                break
            if not win32gui.IsWindowVisible(child_win):
                continue
            temp = win32gui.FindWindowEx(child_win, None, "Button", "还原")
            if temp:
                refresh_btn = win32gui.GetDlgItem(child_win, 0x00000457)
                break
        if refresh_btn:
            # 点击刷新
            THSGuiUtil.click(refresh_btn)
        wins = self.getCancelBuyWins()
        if wins:
            for win in wins:
                refresh_btn = None
                child_win = None
                for i in range(0, 20):
                    child_win = win32gui.FindWindowEx(win, child_win, "#32770", None)
                    if not child_win:
                        break
                    if not win32gui.IsWindowVisible(child_win):
                        continue
                    temp = win32gui.FindWindowEx(child_win, None, "Button", "还原")
                    if temp:
                        refresh_btn = win32gui.GetDlgItem(child_win, 0x00000457)
                        break
                if refresh_btn:
                    # 点击刷新
                    THSGuiUtil.click(refresh_btn)
            for w in wins:
                if w not in self.buy_cancel_locks:
                    self.buy_cancel_locks[w] = threading.RLock()
class THSGuiUtil:
    @classmethod
    def getText(cls, hwnd):
        bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
        buffer = array.array('b', b'\x00\x00' * bufSize)
        win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
        text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
        return text.replace("\x00", "").strip()
        try:
            buffer = array.array('b', b'\x00\x00' * bufSize)
            win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
            text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
            return text.replace("\x00", "").strip()
        except:
            return ""
    # 添加下单窗口
    @classmethod
@@ -525,131 +557,6 @@
        THSGuiTrade().input_number(hwnd1, code)
# 过时 同花顺买入窗口管理器
class __THSBuyWinManager:
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 保存窗口代码分配
    @classmethod
    def __save_code_win(cls, code, win):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), win)
    # 获取窗口分配的代码
    @classmethod
    def __get_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        win = cls.__get_redis().get(key)
        if win is not None:
            return int(win)
        return None
    # 删除代码窗口分配
    @classmethod
    def __del_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().delete(key)
    # 获取所有已经分配窗口的代码
    @classmethod
    def __get_distributed_win_codes(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        codes = []
        for k in keys:
            codes.append(k.replace("buywin_distribute-", ""))
        return codes
    # 获取可用的窗口
    @classmethod
    def __get_available_win(cls):
        # 是否有可用的还未分配的窗口
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        win_list = THSGuiTrade().get_buy_wins()
        if len(win_list) < 1:
            raise Exception("必须要有一个买入窗口")
        win_set = set(win_list)
        for k in keys:
            win = int(cls.__get_redis().get(k))
            if win in win_set:
                win_set.remove(win)
        if len(win_set) > 0:
            return win_set.pop()
        # 没有剩余的窗口,新增加窗口
        win = THSGuiUtil.add_buy_win()
        if win:
            return win
        else:
            raise Exception("新增窗口失败")
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
            # 已经分配的窗口是否有效
            if THSGuiUtil.is_win_exist(win):
                # 填充代码
                THSGuiUtil.set_buy_window_code(win, code)
                return win
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(win, code)
        return win
    # 删除代码窗口分配
    @classmethod
    def cancel_distribute_win_for_code(cls, code):
        win = cls.__get_code_win(code)
        if win is not None:
            # 清除代码
            THSGuiUtil.clear_buy_window_code(win)
        cls.__del_code_win(code)
    # 获取代码已经分配的窗口
    @classmethod
    def get_distributed_code_win(cls, code):
        win = cls.__get_code_win(code)
        if not THSGuiUtil.is_win_exist(win):
            # 删除不存在的窗口
            cls.__del_code_win(code)
            return None
        return win
    @classmethod
    def fill_codes(cls, codes):
        codes_ = gpcode_manager.get_gp_list()
        # 先删除没有的代码
        old_codes = cls.__get_distributed_win_codes()
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
        for code in add_codes:
            # 已经加入进去的不做操作
            if code in old_codes:
                continue
            win = cls.distribute_win_for_code(code)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
# 同花顺买入窗口管理器
class THSBuyWinManagerNew:
    redisManager = redis_manager.RedisManager(2)
@@ -658,24 +565,24 @@
    def get_buy_wins(cls):
        buy_win_list = []
        hWndList = []
        main_hwnd = None
        main_hwnds = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            if THSGuiUtil.getText(hwnd) == "专业版下单":
                main_hwnd = hwnd
                break
        if not main_hwnd:
                main_hwnds.append(hwnd)
        if not main_hwnds:
            raise Exception("专业版下单未打开")
        child_win = None
        for i in range(0, 20):
            child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None)
            if not child_win:
                break
            if not win32gui.IsWindowVisible(child_win):
                continue
            temp = win32gui.FindWindowEx(child_win, None, "Button", "撤单")
            if temp:
                buy_win_list.append(child_win)
        for main_hwnd in main_hwnds:
            for i in range(0, 20):
                child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None)
                if not child_win:
                    break
                if not win32gui.IsWindowVisible(child_win):
                    continue
                temp = win32gui.FindWindowEx(child_win, None, "Button", "撤单")
                if temp:
                    buy_win_list.append(child_win)
        return buy_win_list
    @classmethod
@@ -846,8 +753,10 @@
            else:
                new_delete_codes.append(code)
        add_codes = new_codes[0:10]
        del_codes = new_codes[10:]
        cancel_wins = THSGuiTrade.getCancelBuyWins()
        add_codes_num = len(cancel_wins)*10
        add_codes = new_codes[0:add_codes_num]
        del_codes = new_codes[add_codes_num:]
        del_codes.extend(new_delete_codes)
        for code in del_codes:
@@ -900,12 +809,10 @@
if __name__ == '__main__':
    try:
        THSGuiTrade().cancel_buy_again("000637")
    except Exception as e:
        print(e)
    # GUITest().test_distribute()
    pass
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
    #     THSGuiTrade().cancel_buy_again("000637")
    # except Exception as e:
    #     print(e)
    print(THSGuiTrade().cancel_buy("000582"))
    # THSGuiTrade().buy("600613", 10.29)
utils/win32_util.py
New file
@@ -0,0 +1,13 @@
import win32gui
# 获取顶部窗口句柄
def get_top_window(hwnd):
    child = hwnd
    count = 50
    while True and count >= 0:
        count -= 1
        pa = win32gui.GetParent(child)
        if pa <= 0:
            return child
        child = pa