Administrator
2022-10-21 892b50e242e3c59a738b92dfdfee1bf1ff8932f2
新策略修改
20个文件已修改
5个文件已添加
2745 ■■■■ 已修改文件
alert_util.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
authority.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
big_money_num_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 69 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 93 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_log.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 395 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 629 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 168 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 162 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mysql_data.py 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_data.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 504 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
alert_util.py
@@ -4,9 +4,15 @@
# 报警
import tool
def alarm():
    AlertUtil().stop_audio()
    AlertUtil().play_audio()
    if not tool.is_trade_time():
        return
    # TODO 暂时关闭报警
    # AlertUtil().stop_audio()
    # AlertUtil().play_audio()
class AlertUtil:
authority.py
@@ -2,25 +2,39 @@
用户验证
"""
import mongo_data
import mysql_data
# 新增用户
def add_user(id, account, pwd, rule):
    _dict = {"_id": id, "account": account, "pwd": pwd, "rule": rule}
    mongo_data.save_one("clients", _dict)
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from clients where _id={}".format(id))
    if result is None:
        mysqldb.execute(
            "insert into clients(_id,account,pwd,rule) values({},'{}','{}','{}')".format(id, account, pwd, rule))
    else:
        mysqldb.execute("update clients set account='{}', pwd='{}', rule='{}' where _id={}".format(account, pwd, rule,id))
def add_rule(rule, authritys):
    _dict = {"_id": rule, "authritys": authritys}
    mongo_data.save_one("clients-authritys", _dict)
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from clients_authritys where _id='{}'".format(rule))
    if result is None:
        mysqldb.execute(
            "insert into clients_authritys(_id,authritys) values('{}','{}')".format(id, authritys))
    else:
        mysqldb.execute(
            "update clients_authritys set authritys='{}' where _id={}".format(authritys.replace("\"", "\\" + "\""),
                                                                              rule))
def _get_client_ids_by_rule(rule):
    results = mongo_data.find("clients", {"rule": rule})
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from clients where rule='{}'".format(rule))
    _ids = []
    for result in results:
        _ids.append(result["_id"])
        _ids.append(result[0])
    return _ids
@@ -31,15 +45,16 @@
# 登录,返回用户ID与权限
def login(account, pwd):
    results = mongo_data.find("clients", {"account": account})
    if mongo_data.count("clients", {"account": account}) == 0:
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from clients where account='{}'".format(account))
    if len(results) <= 0:
        raise Exception("账号不存在")
    result = results[0]
    if result["pwd"] != pwd:
    if result[2] != pwd:
        raise Exception("密码错误")
    results = mongo_data.find("clients-authritys", {"_id": result["rule"]})
    return result["_id"], results[0]["authritys"]
    results_ = mysqldb.select_one("select * from clients_authritys where _id='{}'".format(result[3]))
    return result[0], results_[1]
if __name__ == '__main__':
@@ -49,15 +64,17 @@
    # add_rule("uploader", ["code_upload"])
    #
    #
    # add_user(1, "super", "123456", "super")
    #add_user(10, "super1", "123456", "super")
    # add_user(2, "client1", "123456", "client-l2")
    # add_user(3, "client2", "123456", "client-l2")
    # add_user(4, "client3", "123456", "client-l2")
    # add_user(5, "client2", "123456", "client-industry")
    # add_user(6, "admin", "123456", "uploader")
    get_l2_clients()
    try:
        print(login("client1", "12345"))
        print(login("client1", "1234567"))
    except Exception as e:
        print()
        print(str(e))
    print(get_l2_clients())
big_money_num_manager.py
@@ -30,7 +30,7 @@
    num = redis.get("big_money-{}".format(code))
    if num is None:
        return 0
    return int(num)
    return round(int(num)/1000/4)
if __name__ == "__main__":
data_export_util.py
@@ -1,6 +1,7 @@
"""
数据导出工具
"""
import json
import os
import time
@@ -10,6 +11,13 @@
def export_l2_data(code, datas, dest_dir="D:/export/l2"):
    local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time()))
    file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time)
    file_name_txt = "{}/{}_{}.txt".format(dest_dir, code, local_time)
    openfile = open(file_name_txt,'w')
    try:
        for data in datas:
            openfile.write(json.dumps(data)+"\n")
    finally:
        openfile.close()
    wb = xlwt.Workbook()
    ws = wb.add_sheet('sheet1')
    ws.write(0, 0, '序号')
@@ -37,7 +45,10 @@
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["val"]["price"])
        ws.write(index, 4, data["val"]["num"])
        if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2:
            ws.write(index, 4, 0-int(data["val"]["num"]))
        else:
            ws.write(index, 4, int(data["val"]["num"]))
        limit_price=""
        if int(data["val"]["limitPrice"]) == 1:
@@ -55,6 +66,16 @@
                ws.write(index, 5, '买撤 ({})'.format(limit_price))
            else:
                ws.write(index, 5, '买撤')
        elif int(data["val"]["operateType"]) == 2:
            if len(limit_price) > 0:
                ws.write(index, 5, '卖 ({})'.format(limit_price))
            else:
                ws.write(index, 5, '卖')
        elif int(data["val"]["operateType"]) == 3:
            if len(limit_price) > 0:
                ws.write(index, 5, '卖撤 ({})'.format(limit_price))
            else:
                ws.write(index, 5, '卖撤')
        ws.write(index, 6, data["re"])
    wb.save(file_name)
    return file_name
data_process.py
@@ -5,9 +5,9 @@
import time as t
import authority
import mysql_data
import redis_manager
import gpcode_manager
import mongo_data
# 统计今日卖出
# 统计今日买入
@@ -17,27 +17,6 @@
__redisManager = redis_manager.RedisManager(0)
def _mysql_insert_data(day, code, item, conn):
    try:
        with conn.cursor() as cursor:
            sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())"
            print(sql)
            cursor.execute(sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
def _mysql_update_data(item, conn):
    try:
        with conn.cursor() as cursor:
            sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5'])
            print(sql)
            cursor.execute(sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
def parse(str):
@@ -117,53 +96,21 @@
    return json.loads(data_str)
def _getIndustry(datas):
    ors = []
    codes = set()
    for data in datas:
        codes.add(data["code"])
    for code in codes:
        ors.append({'first_code': code})
    result = mongo_data.find("ths-industry", {'$or': ors})
    _fname = None
    for a in result:
        _fname = a["_id"]
        break
    print("最终的二级行业名称为:", _fname)
    return _fname
def saveIndustryCode(datasList):
    for datas in datasList:
        # 查询这批数据所属行业
        industry_name = _getIndustry(datas);
        _list = []
        for data in datas:
            # 保存
            _dict = {"_id": data["code"]}
            _dict["second_industry"] = industry_name
            _dict["zyltgb"] = data["zyltgb"]
            _dict["zyltgb_unit"] = data["zyltgb_unit"]
            _dict["update_time"] = int(round(t.time() * 1000))
            _list.append(_dict)
        mongo_data.save("ths-industry-codes", _list)
# 保存自由流通市值
def saveZYLTSZ(datasList):
    redis = __redisManager.getRedis()
    _list = []
    mysqldb = mysql_data.Mysqldb()
    for data in datasList:
        # 保存
        _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                 "update_time": int(round(t.time() * 1000))}
        if float(data["zyltgb"]) > 0:
            _list.append(_dict)
            # 保存10天
            ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
    mongo_data.save("ths-zylt", _list)
            result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"]))
            if result is None:
                mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000)))
            else:
                mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"]))
def saveClientActive(client_id, host, thsDead):
@@ -182,7 +129,7 @@
        _id = k.split("client-active-")[1]
        # 客户端同花顺没卡死才能加入
        if not ths_util.is_ths_dead(_id):
            client_ids.append(_id)
            client_ids.append(int(_id))
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
global_util.py
@@ -3,16 +3,14 @@
"""
# 代码行业映射
import pymongo
import mongo_data
import code_volumn_manager
import gpcode_manager
import ths_industry_util
from code_data_util import ZYLTGBUtil
TEST = False
TEST = True
code_industry_map = {}
# 行业代码映射
@@ -23,6 +21,12 @@
today_limit_up_codes = {}
# 行业热度指数
industry_hot_num = {}
# 涨停股票的涨幅
limit_up_codes_percent = {}
# 名称代码映射
name_codes = {}
# 今日量
today_volumn = {}
# 60日最大量
@@ -39,6 +43,7 @@
    load_volumn()
    load_zyltgb()
    load_industry()
    load_name_codes()
# 加载行业数据
@@ -59,6 +64,14 @@
            zyltgb_map[code] = result
# 加载名称代码隐射
def load_name_codes():
    dict_ = gpcode_manager.get_name_codes()
    if dict_:
        for key in dict_:
            name_codes[key] = dict_[key]
# 加载量
def load_volumn():
    codes = gpcode_manager.get_gp_list()
@@ -72,6 +85,8 @@
# 添加今日涨停数据
def add_limit_up_codes(datas, clear=False):
    if datas is None:
        return
    if clear:
        today_limit_up_codes.clear()
    # 涨停数量
gpcode_manager.py
@@ -20,22 +20,49 @@
    # 获取基本信息
    code_datas = juejin.JueJinManager.get_gp_latest_info(gpset)
    codes = []
    name_codes = {}
    for _data in code_datas:
        # 正常的股票
        if _data["sec_type"] == 1 and _data["sec_level"] == 1:
            code = _data["symbol"].split(".")[1]
            if code.find("30") != 0 and code.find("68") != 0:
                name = _data["sec_name"]
                codes.append(code)
                # 保存代码对应的名称
                name_codes[name] = code
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    redis_instance.delete("gp_list")
    redis_instance.delete("gp_list_names")
    for d in codes:
        redis_instance.sadd("gp_list", d)
    redis_instance.set("gp_list_names", json.dumps(name_codes))
# 获取名称对应的代码
def get_name_code(name):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val.get(name)
def get_name_codes():
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val
# 涨停犁碑坳
def set_limit_up_list(gpset):
    if gpset is None:
        return
    # 保存到内存中
    global_util.add_limit_up_codes(gpset)
    # 获取基本信息
@@ -45,7 +72,7 @@
    for d in gpset:
        redis_instance.sadd("gp_limit_up_list", json.dumps(d))
    redis_instance.expire("gp_limit_up_list", tool.get_expire())
    redis_instance.setex("gp_limit_up_list_update_time",tool.get_expire(),round( time.time()*1000))
    redis_instance.setex("gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000))
# 获取涨停列表
@@ -92,7 +119,7 @@
# 设置收盘价
def set_price_pre(code, price):
    codes= get_gp_list()
    codes = get_gp_list()
    if code not in codes:
        return
    redis_instance = __redisManager.getRedis()
@@ -145,11 +172,10 @@
# 根据位置获取正在监听的代码
def get_listen_code_by_pos(client_id, pos):
    redis_instance = __redisManager.getRedis()
    key="listen_code-{}-{}".format(client_id, pos)
    key = "listen_code-{}-{}".format(client_id, pos)
    value = redis_instance.get(key)
    # print("redis:", key,value)
    return value
# 设置位置的监听代码
@@ -221,7 +247,7 @@
def is_listen_full():
    clients = data_process.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= 8*len(clients)
    return len(codes) >= 8 * len(clients)
# 是否正在操作
@@ -248,5 +274,5 @@
    # print(is_listen_full())
    # print(is_listen("002271"))
    # print(get_listen_code_pos("002272"))
    code= get_listen_code_by_pos(2, 7)
    print(code)
    code = get_listen_code_by_pos(2, 7)
    print(code)
gui.py
@@ -12,8 +12,9 @@
import multiprocessing
import global_util
import log
import mysql_data
import redis_manager
import mongo_data
import server
import trade_gui
from l2_code_operate import L2CodeOperate
@@ -22,12 +23,39 @@
from server import *
def createServer(pipe):
# 读取server进程的消息
def __read_server_pipe(pipe):
    while True:
        value = pipe.recv()
        if value is not None:
            value = json.loads(value)
            if value.get("type") == "clear_l2":
                code = value["data"]["code"]
                print("清除l2数据", code)
                if len(code) != 6:
                    continue
                l2_data_manager.clear_l2_data(code)
                # 删除level2的数据
                if l2_data_manager.local_today_datas and code in l2_data_manager.local_today_datas:
                    l2_data_manager.local_today_datas.pop(code)
                if l2_data_manager.local_latest_datas and code in l2_data_manager.local_latest_datas:
                    l2_data_manager.local_latest_datas.pop(code)
        time.sleep(0.1)
def createServer(pipe_juejin, pipe_gui):
    print("create SocketServer")
    # 初始化参数
    global_util.init()
    t1 = threading.Thread(target=lambda: __read_server_pipe(pipe_gui))
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    laddr = "", 9001
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe=pipe)  # 注意:参数是MyBaseRequestHandle
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_juejin=pipe_juejin)  # 注意:参数是MyBaseRequestHandle
    # tcpserver.handle_request()  # 只接受一个客户端连接
    tcpserver.serve_forever()  # 永久循环执行,可以接受多个客户端连接
@@ -37,11 +65,17 @@
class GUI:
    def __init__(self):
        p1, p2 = multiprocessing.Pipe()
        gs_gui_pipe, gs_server_pipe = multiprocessing.Pipe()
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1,))
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,))
        self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,))
        self.p1 = p1
        self.p2 = p2
        self.gs_gui_pipe = gs_gui_pipe
        # L2显示
        self.l2_codes = {}
        # 获取l2的客户端列表
@@ -51,6 +85,19 @@
            for i in range(0, 8):
                code = gpcode_manager.get_listen_code_by_pos(client_id, i)
                self.l2_codes[client_id].append(code)
    # 读取server进程的消息
    def __read_gui_server_pipe(self,pipe):
        while True:
            value = pipe.recv()
            if value is not None:
                value = json.loads(value)
                if value.get("type") == "l2_data_notify":
                    code = value["data"]["code"]
                    count =value["data"]["count"]
                    print("l2数据通知:{}-{}", code,count)
            time.sleep(0.1)
    def run(self):
        # TODO
@@ -63,6 +110,11 @@
        # 客户端server连接
        t1 = threading.Thread(target=lambda: server.test_client_server())
        # 后台运行
        t1.setDaemon(True)
        t1.start()
        t1 = threading.Thread(target=lambda: self.__read_gui_server_pipe(self.gs_gui_pipe))
        # 后台运行
        t1.setDaemon(True)
        t1.start()
@@ -93,12 +145,12 @@
                _set_error_color(text, 1, error)
            # 验证mongodb
            try:
                count = mongo_data.count("clients", {})
                if count < 1:
                counts = mysql_data.Mysqldb().select_one("select count(*) from clients")
                if counts[0] < 1:
                    raise Exception("")
                text.insert(END, "mongodb连接成功!\n")
                text.insert(END, "mysql连接成功!\n")
            except:
                error = "mongodb连接失败...\n"
                error = "mysql连接失败...\n"
                text.insert(END, error)
                _set_error_color(text, 2, error)
                pass
@@ -221,18 +273,18 @@
    # 绘制交易状态
    def __draw_trade_state(self, frame):
        def refresh_data():
            normal=True
            normal = True
            if l2_code_operate.L2CodeOperate.is_read_queue_valid():
                cl_queue.configure(text="正常", foreground="#008000")
            else:
                cl_queue.configure(text="异常", foreground="#FF7F27")
                normal=False
                normal = False
            try:
                trade_gui.THSGuiTrade.checkEnv()
                cl_win.configure(text="正常", foreground="#008000")
            except Exception as e:
                normal = False
                cl_win.configure(text="异常:{}".format(str(e)),foreground="#FF7F27")
                cl_win.configure(text="异常:{}".format(str(e)), foreground="#FF7F27")
            # 状态有问题,需要报警
            if not normal:
                alert_util.alarm()
@@ -247,11 +299,10 @@
                    pass
                time.sleep(2)
        start_y=230
        start_y = 230
        btn = Button(frame, text="刷新状态", command=refresh_data)
        btn.place(x=10, y=start_y)
        auo_refresh = IntVar()
        ch1 = Checkbutton(frame, text='自动刷新', variable=auo_refresh, onvalue=1, offvalue=0, background="#DDDDDD",
@@ -260,7 +311,7 @@
        auo_refresh.set(1)
        ch1.place(x=100, y=start_y)
        y_=start_y+30
        y_ = start_y + 30
        cl = Label(frame, text="操作队列状态:", bg="#DDDDDD")
        cl.place(x=10, y=y_)
        cl_queue = Label(frame, text="未知", bg="#DDDDDD")
@@ -687,6 +738,10 @@
            m = L2TradeFactorUtil.compute_m_value(code)
            showinfo("提示", "{}".format(m))
        def clear_l2(code):
            self.gs_gui_pipe.send(json.dumps({"type": "clear_l2", "data": {"code": code}}))
            pass
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
        frame.grid(row=2, column=2, rowspan=2, pady=5)
        btntext = StringVar()
@@ -723,11 +778,17 @@
        btn.place(x=220, y=100)
        btn = Button(frame, text="获取m值", command=lambda: compute_m(code.get()))
        btn.place(x=10, y=120)
        btn.place(x=10, y=130)
        btn = Button(frame, text="导出交易日志", command=lambda: log.export_l2_log(code.get()))
        btn.place(x=80, y=130)
        btn = Button(frame, text="清空l2数据", command=lambda: clear_l2(code.get()))
        btn.place(x=150, y=130)
        # 交易按钮
        btn = Button(frame, textvariable=btntext, command=startJueJinGui)
        btn.place(x=10, y=150)
        btn.place(x=10, y=160)
        btntext.set("启动掘金")
        btn = Button(frame, text="重新订阅行情", command=resub)
juejin.py
@@ -6,6 +6,7 @@
import datetime
import json
import logging
import time as t
import schedule
@@ -24,10 +25,15 @@
import redis_manager
import authority
import decimal
import trade_gui
from l2_code_operate import L2CodeOperate
from l2_data_manager import L2LimitUpMoneyStatisticUtil
from log import logger_juejin_tick, logger_system
from trade_queue_manager import JueJinBuy1VolumnManager
redisManager = redis_manager.RedisManager()
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
# 设置账户信息
@@ -173,8 +179,6 @@
def on_tick(context, tick):
    if global_util.TEST:
        return
    # print(tick["created_at"])
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    # 9点20-15:05接受数据
@@ -193,6 +197,16 @@
        # 保存最新价
        symbol = symbol.split(".")[1]
        time_ = tick["created_at"].strftime("%H:%M:%S")
        data_=(symbol,time_,tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
        logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2],
                                data_[3])
        need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2],data_[3])
        if need_sync:
            # 同步数据
            L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
        # print(tick["created_at"],tick["quotes"][0]["bid_v"])
        accpt_price(symbol, price)
        __prices_now[symbol] = price
@@ -220,6 +234,7 @@
# 获取到现价
def accpt_prices(prices):
    print("价格代码数量:", len(prices))
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
@@ -237,7 +252,7 @@
            # 获取收盘价
            pricePre = gpcode_manager.get_price_pre(code)
            if pricePre is not None:
                rate = round((price - pricePre) * 100 / pricePre, 1)
                rate = round((price - pricePre) * 100 / pricePre, 2)
                if rate >= 0:
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code))
@@ -246,9 +261,23 @@
                    _delete_list.append((rate, code))
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        # 预填充下单代码
        _buy_win_codes = []
        for d in new_code_list:
            _buy_win_codes.append(d[1])
        for d in _delete_list:
            _buy_win_codes.append(d[1])
        try:
            trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
        except Exception as e:
            logging.exception(e)
            pass
        client_ids = data_process.getValidL2Clients()
        # 最多填充的代码数量
        max_count = len(client_ids) * 8
        if max_count == 0:
            max_count = 8
        # 截取前几个代码填充
        add_list = new_code_list[:max_count]
        # 后面的代码全部删除
@@ -272,8 +301,6 @@
        for code in add_code_list:
            if not gpcode_manager.is_listen(code):
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        print(add_code_list, del_list)
l2_data_log.py
New file
@@ -0,0 +1,9 @@
# l2数据的日志
import time
import log
def l2_time(code, time_, description, new_line=False):
    log.logger_l2_process_time.info("{}: {}-{}{}", description, code, time_, "\n" if new_line else "")
    return int(time.time() * 1000)
l2_data_manager.py
@@ -73,9 +73,9 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0
            return None, None, None, 0, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3]
        return _data[0], _data[1], _data[2], _data[3], _data[4]
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -83,16 +83,16 @@
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -265,6 +265,16 @@
        saveL2Data(code, add_datas)
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    redis_l2.delete("l2-data-latest-{}".format(code))
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
@@ -363,10 +373,11 @@
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            # 不需要非涨停数据/非跌停数据
            if int(item["limitPrice"]) == 0:
                continue
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
@@ -380,6 +391,8 @@
                dataIndexs.setdefault(key, len(datas) - 1)
        l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
@@ -406,6 +419,20 @@
            return False
        return True
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
@@ -420,6 +447,20 @@
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
# L2交易数据处理器
@@ -484,17 +525,17 @@
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    # if L2DataUtil.is_same_time(now_time_str, latest_time):
                    #     # 判断是否已经挂单
                    #     state = trade_manager.get_trade_state(code)
                    #     start_index = len(total_datas) - len(add_datas)
                    #     end_index = len(total_datas) - 1
                    #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    #         # 已挂单
                    #         cls.__process_order(code, start_index, end_index, capture_timestamp)
                    #     else:
                    #         # 未挂单
                    #         cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
@@ -722,6 +763,8 @@
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
@@ -774,10 +817,19 @@
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
@@ -905,7 +957,7 @@
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                last_index = None
                count = 0
                start = None
@@ -931,7 +983,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -967,7 +1019,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -1323,7 +1375,7 @@
    @classmethod
    def test_can_order(cls):
        code = "002393"
        code = "000948"
        global_util.load_industry()
        limit_up_time_manager.load_limit_up_time()
@@ -1618,8 +1670,9 @@
                if need_cancel:
                    # 需要撤单
                    # 撤单
                    cls.__cancel_buy(code, max_num_data["index"])
                    L2TradeDataProcessor.cancel_debug(code, "跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"])
                    cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
                    L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
                                                      max_num_data["index"])
                    return True, cancel_data
                else:
                    # 无需撤单
@@ -1695,8 +1748,8 @@
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤
        if total_count <= 5:
        # 大单小于5笔无脑撤,后修改为无大单无脑撤
        if total_count <= 0:
            return True
        # 大单撤单笔数大于总大单笔数的1/5就撤单
@@ -1788,6 +1841,287 @@
                    index_set.add(d[1])
                    big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
# 卖跟踪
class L2SellProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("sell_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, process_index, count):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        process_index, count = cls.__get_recod(code)
        if process_index is None:
            # 无卖的信息
            return False
        if count is None:
            count = 0
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            return False
        if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
                global_util.zyltgb_map[code]):
            return True
        return False
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        process_index, count = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停卖
            if not L2DataUtil.is_limit_up_price_sell(
                    total_datas[index]["val"]):
                continue
            # 不处理历史数据
            if process_index is not None and process_index >= index:
                continue
            if count is None:
                count = 0
            count += int(total_datas[index]["val"]["num"])
        if process_index is None:
            process_index = end_index
        cls.__save_recod(code, process_index, count)
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
            old_to = to_index
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = cls.__get_redis().get(key)
        return cls.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = cls.__get_redis().keys(key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        result = cls.__get_redis().get(key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
        else:
            return 0, -1
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
        time_ = time_str.replace(":", "")
        key = None
        for i in range(4, -2, -2):
            # 获取本(分钟/小时/天)内秒分布数据
            time_regex = "{}*".format(time_[:i])
            keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
            if keys_ and len(keys_) > 1:
                # 需要排序
                keys = []
                for k in keys_:
                    keys.append(k)
                keys.sort(key=lambda tup: int(tup.split("-")[-1]))
                # 有2个元素
                for index in range(0, len(keys) - 1):
                    time_1 = keys[index].split("-")[-1]
                    time_2 = keys[index + 1].split("-")[-1]
                    if int(time_1) <= int(time_) <= int(time_2):
                        # 在此时间范围内
                        if time_ == time_2:
                            key = keys[index + 1]
                        else:
                            key = keys[index]
                        break
            if key:
                val = cls.__get_redis().get(key)
                old_num, old_from, old_to = cls.__format_second_money_record_val(val)
                end_index = old_to
                # 保存最近的数据
                cls.__set_l2_latest_money_record(code, end_index, num)
                break
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
        else:
            # 卖撤
            if L2DataUtil.is_sell_cancel(data["val"]):
                # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
                if l2_data_util.is_sell_index_before_target(data, buy_single_data,
                                                            local_today_num_operate_map.get(code)):
                    return 0
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().delete(key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True):
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        num_dict = {}
        # 统计时间分布
        time_dict = {}
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict:
                time_dict[time_] = i
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict_num:
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(t.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        if index == -1:
            # 没有获取到最新的矫正封单额,需要从买入信号开始点计算
            index = buy_single_begin_index - 1
            total_num = 0
        # TODO 待优化计算
        cancel_index = None
        cancel_msg = None
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
                time_start_index_dict[time_] = i
                # 记录时间分布
                time_list.append(time_)
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了
            if val < 0 and start_index <= i <= end_index:
                # 累计封单金额小于1000万
                if total_num < min_volumn:
                    cancel_index = i
                    cancel_msg = "封单金额小于1000万"
                    break
                # 相邻2s内的数据减小50%
                # 上1s的总数
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                     total_num)
                    break
        if not with_cancel:
            cancel_index = None
        print("封单额计算时间:", round(t.time() * 1000) - start_time)
        process_end_index = end_index
        if cancel_index:
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        if cancel_index:
            return total_datas[cancel_index], cancel_msg
        return None, None
def __get_time_second(time_str):
@@ -2035,4 +2369,7 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test_can_order()
    # 处理数据
    code = "002898"
    load_l2_data(code)
    L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")
l2_data_manager_new.py
New file
@@ -0,0 +1,629 @@
import datetime
import logging
import random
import time as t
import big_money_num_manager
import data_process
import global_util
import gpcode_manager
import l2_data_log
import l2_data_manager
import l2_data_util
import l2_trade_factor
import l2_trade_test
import limit_up_time_manager
import log
import redis_manager
import ths_industry_util
import tool
import trade_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map, L2LimitUpMoneyStatisticUtil
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
# TODO l2数据管理
class L2DataManager:
    # 格式化数据
    def format_data(self, datas):
        format_datas = []
        for data in datas:
            format_datas.append({"val": data, "re": 1})
        return format_datas
    # 获取新增数据
    def get_add_datas(self, format_datas):
        pass
    # 从数据库加载数据
    def load_data(self, code=None, force=False):
        pass
    # 保存数据
    def save_datas(self, add_datas, datas):
        pass
# m值大单处理
class L2BigNumForMProcessor:
    def __init__(self):
        self._redis_manager = redis_manager.RedisManager(1)
    def __get_redis(self):
        return self._redis_manager.getRedis()
    # 保存计算开始位置
    def set_begin_pos(self, code, index):
        if self.__get_begin_pos(code) is None:
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            self.__get_redis().setex(key, tool.get_expire(), index)
    # 获取计算开始位置
    def __get_begin_pos(self, code):
        key = "m_big_money_begin-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        return int(val)
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
        key = "m_big_money_process_index-{}".format(code)
        self.__get_redis().delete(key)
    # 添加已经处理过的单
    def __set_processed_end_index(self, code, index):
        key = "m_big_money_process_index-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), index)
    # 是否已经处理过
    def __get_processed_end_index(self, code):
        key = "m_big_money_process_index-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        return int(val)
    # 处理大单
    def process(self, code, start_index, end_index, limit_up_price):
        begin_pos = self.__get_begin_pos(code)
        if begin_pos is None:
            # 没有获取到开始买入信号
            return
        # 上次处理到的坐标
        processed_index = self.__get_processed_end_index(code)
        if processed_index is None:
            processed_index = 0
        if processed_index >= end_index:
            return
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price),
                       round(30000 / limit_up_price)]
        total_num = 0
        for i in range(max(start_index, processed_index), end_index + 1):
            data = total_datas[i]
            if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy(
                    data["val"]):
                continue
            # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                # 获取买入信号
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None and buy_index < begin_pos:
                    continue
            # 计算成交金额
            num = int(data["val"]["num"])
            temp = 0
            if num < num_splites[0]:
                pass
            elif num < num_splites[1]:
                temp = 1
            elif num < num_splites[2]:
                temp = round(4 / 3, 3)
            elif num < num_splites[3]:
                temp = 2
            else:
                temp = 4
            count = int(temp * data["re"] * 1000)
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                count = 0 - count
            total_num += count
        self.__set_processed_end_index(code, end_index)
        big_money_num_manager.add_num(code, total_num)
        print("m值大单计算范围:{}-{}  时间:{}".format(max(start_index, processed_index), end_index,
                                             round(t.time() * 1000) - start_time))
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                l2_data_manager.load_l2_data(code)
                # 纠正数据
                datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间")
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    @classmethod
    def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
        index, old_buy_count, old_cancel_count = l2_data_manager.TradePointManager.get_count_info_for_cancel_buy(code)
        for i in range(start_index, end_index + 1):
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
            old_buy_count += buy_count
            old_cancel_count += buy_cancel_count
            if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
                return i, True
        l2_data_manager.TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
                                                                        old_cancel_count)
        return end_index, False
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        if start_index < 0:
            start_index = 0
        if end_index < start_index:
            return
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            cls.cancel_buy(code)
            # 继续计算下单
            cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
        else:
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code):
        # 删除大群撤事件的大单
        l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
        l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
                count = 0
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        threshold_money = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                                      compute_start_index),
                                                                                            compute_end_index, num,
                                                                                            count, threshold_money,
                                                                                            buy_single_index,
                                                                                            capture_time)
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            return
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      buy_count,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count)
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            # 涨停封单额计算
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count)
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        second_930 = 9 * 3600 + 30 * 60 + 0
        # 倒数100条数据查询
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return False, None
        __time = None
        last_index = None
        count = 0
        start = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val):
                if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]):
                    if start is None:
                        start = i
                    last_index = i
                    count += datas[i]["re"]
                    if count >= continue_count:
                        return True, start
                else:
                    # 本条数据作为起点
                    last_index = i
                    count = datas[i]["re"]
                    start = i
            elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val):
                # 剔除卖与卖撤
                last_index = None
                count = 0
                start = None
        return False, None
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, capture_time):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = threshold_money / (limit_up_price * 100)
        # 目标订单数量
        threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, buy_count, ii
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num and buy_count >= threshold_count:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}", code, i, buy_nums,
                                             threshold_num, buy_count, threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        buy_count -= int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= threshold_count:
                return i, buy_nums, buy_count, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{}", compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, threshold_count)
        return None, buy_nums, buy_count, None
    @classmethod
    def test(cls):
        code = "002898"
        l2_trade_test.clear_trade_data(code)
        load_l2_data(code, True)
        if True:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 0, 140, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 0, 140, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            return
        _start = t.time()
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
if __name__ == "__main__":
    L2TradeDataProcessor.test()
l2_data_util.py
@@ -13,6 +13,19 @@
from trade_gui import async_call
def run_time():
    def decorator(func):
        def infunc(*args, **kwargs):
            start = round(time.time() * 1000)
            result = func(args, **kwargs)
            print("执行时间", round(time.time() * 1000) - start)
            return result
        return infunc
    return decorator
def compare_time(time1, time2):
    result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
    return result
@@ -110,6 +123,33 @@
    return None, None
# 判断卖撤的卖信号是否在目标信号之前
def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map):
    min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"],
                                                        sell_cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(sell_cancel_data["val"]["time"], min_space)
    min_time = __sub_time(sell_cancel_data["val"]["time"], max_space)
    # 如果最大值都在目标信号之前则信号肯定在目标信号之前
    if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")):
        return True
    sell_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"]))
    for i in range(0, len(sell_datas)):
        data = sell_datas[i]
        if int(data["val"]["operateType"]) != 2:
            continue
        if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
            continue
        if min_space == 0 and max_space == 0:
            # 本秒内
            if compare_time(data["val"]["time"], min_time) == 0:
                return data["index"] < target_data["index"]
        # 数据在正确的区间
        elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data["index"] < target_data["index"]
    return False
__last_big_data = {}
@@ -140,17 +180,127 @@
            break
# l2数据拼接工具
class L2DataConcatUtil:
    # 初始化
    def __init__(self, code, last_datas, datas):
        self.last_datas = last_datas
        self.datas = datas
        self.code = code
    def __get_data_identity(self, data_):
        data=data_["val"]
        return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"),
                                          data.get("cancelTime"), data.get("cancelTimeUnit"))
    # 获取拼接的特征,获取最后3笔
    def __get_concat_feature(self):
        # 最少需要3条数据+2条需要有特征点的数据
        min_identity = 2
        min_count = 3
        identity_set = set()
        count = 0
        start_index = -1
        for i in range(len(self.last_datas) - 1, -1, -1):
            identity_set.add(self.__get_data_identity(self.last_datas[i]))
            count += 1
            start_index = i
            if count >= min_count and len(identity_set) >= min_identity:
                break
        return start_index, len(self.last_datas) - 1
    # 获取新增数据
    def get_add_datas(self):
        # 查询当前数据是否在最近一次数据之后
        if self.last_datas and self.datas:
            if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
                return []
        # 获取拼接点
        start_index, end_index = self.__get_concat_feature()
        if start_index < 0:
            return self.datas
        print("特征位置:", start_index, end_index)
        # 提取特征点的标识数据
        identity_list = []
        for i in range(start_index, end_index + 1):
            identity_list.append(self.__get_data_identity(self.last_datas[i]))
        # 查找完整的特征
        identity_count = len(identity_list)
        for n in range(0, identity_count):
            # 每次遍历减少最前面一个特征量
            for i in range(0, len(self.datas) - len(identity_list) + n):
                if self.__get_data_identity(self.datas[i]) == identity_list[n]:
                    # n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配
                    if n == 0 or i == 0:
                        find_identity = True
                        for j in range(n + 1, len(identity_list)):
                            if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]):
                                find_identity = False
                                break
                        if find_identity:
                            return self.datas[i + len(identity_list) - n:]
                else:
                    continue
        print("新数据中未找到特征标识")
        return self.datas
def test_add_datas():
    def load_data(datas):
        data_list = []
        for data in datas:
            data_list.append({"val":{"time": data}})
        return data_list
    # 不匹配
    latest_datas = []
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 不匹配
    latest_datas = ["10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 不匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
def test(datas):
    datas["code"] = "test"
if __name__ == "__main__":
    # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}}
    # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}]
    # result= get_buy_data_with_cancel_data(cancel_data,today_datas)
    # print(result)
    code = "001209"
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code))
    print(index, data)
    test_add_datas()
l2_trade_factor.py
@@ -18,53 +18,57 @@
            yi = 1
        return 5000000 + (yi - 1) * 500000
    # 自由流通市值影响比例
    @classmethod
    def get_zylt_rate(cls, zyltgb):
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        if yi <= 30:
            rate = -0.04 + 0.01 * (yi - 1)
            if rate > 0.1:
                rate = 0.1
        else:
            rate = 0.09 - (yi - 31) * 0.002
            if rate < -0.1:
                rate = -0.1
        return round(rate, 4)
    # 获取行业影响比例
    # total_limit_percent为统计的比例之和乘以100
    @classmethod
    def get_industry_rate(cls, total_limit_percent):
        t = total_limit_percent / 10
        rate = t / 0.5 * 0.02 + 0.26
        if rate > 0.52:
            rate = 0.52
        return round(rate, 2)
        if t < 0.9:
            return 0
        elif t <= 1.1:
            return 0.2
        elif t <= 1.6:
            return 0
        elif t <= 2.1:
            return 0.03
        elif t <= 2.6:
            return 0.06
        elif t <= 3.1:
            return 0.09
        elif t <= 3.6:
            return 0.12
        elif t <= 4.1:
            return 0.15
        elif t <= 4.6:
            return 0.18
        elif t <= 5.1:
            return 0.21
        elif t <= 5.6:
            return 0.24
        elif t <= 6.1:
            return 0.27
        else:
            return 0.30
    # 获取量影响比例
    @classmethod
    def get_volumn_rate(cls, day60_max, yest, today):
        old_volumn = yest
        base_rate = 0.49
        if day60_max > yest:
            old_volumn = day60_max
            base_rate = 0.50
        r = round(today / old_volumn, 2)
        if r < 0.01:
            r = 0.01
        print("比例:", r)
        rate = 0
        if r <= 0.25:
            rate = base_rate - (r - 0.01) * 2
        elif r <= 0.5:
            rate = 0.25 - r + (0.01 if day60_max > yest else 0)
        elif r < 0.75:
            rate = r - 0.75 + (0.01 if day60_max > yest else 0)
        elif r < 1.74:
            rate = base_rate - (r - 0.75)
        if r < 0.5:
            rate = 0.3 - (r - 0.01)
        elif r <= 0.75:
            rate = -0.2 + (r - 0.5) * 2
        elif r <= 1.35:
            rate = 0.3 - (r - 0.75)
        else:
            rate = base_rate - 0.99
            rate = -0.3
        return round(rate, 4)
    # 当前股票首次涨停时间的影响比例
@@ -74,36 +78,33 @@
        start_m = 9 * 60 + 30
        m = int(times[0]) * 60 + int(times[1])
        dif = m - start_m
        base_rate = 0.5
        base_rate = 0.3
        rate = 0
        if dif < 1:
            rate = base_rate
        elif dif <= 5:
            rate = base_rate - dif * 0.02
        elif dif <= 120:
            # 11:30之前
            rate = 0.39 - (dif - 6) * 0.004
            rate = base_rate - dif * 0.0035
        else:
            rate = 0.39 - (120 - 6) * 0.004 - (dif - 210 + 1) * 0.004
            if rate < -0.5:
                rate = -0.5
            rate = base_rate - (dif - 89) * 0.0035
            if rate < -0.3020:
                rate = -0.3020
        return round(rate, 4)
    # 纯万手哥影响值(手数》=9000 OR 金额》=500w)
    @classmethod
    def get_big_money_rate(cls, num):
        if num < 0:
            num = 0
        if num >= 10:
            return 0.5
        else:
            return round(num * 0.05, 2)
        if num < 4:
            return 0
        rate = (num - 4) * 0.035 / 4 + 0.06
        if rate > 0.9:
            rate = 0.9
        return round(rate, 4)
    @classmethod
    def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
                     limit_up_time, big_money_num):
        # 自由流通股本影响比例
        zyltgb_rate = cls.get_zylt_rate(zyltgb)
        # 行业涨停影响比例
        industry_rate = 0
        if total_industry_limit_percent is not None:
@@ -121,16 +122,23 @@
        if big_money_num is not None:
            big_money_rate = cls.get_big_money_rate(big_money_num)
        print(
            "zyltgb_rate:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(zyltgb_rate,
                                                                                                            industry_rate,
                                                                                                            volumn_rate,
                                                                                                            limit_up_time_rate,
                                                                                                            big_money_rate))
            "industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(industry_rate,
                                                                                             volumn_rate,
                                                                                             limit_up_time_rate,
                                                                                             big_money_rate))
        return round(1 - (zyltgb_rate + industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4)
        final_rate = round(1 - (industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4)
        if final_rate < 0.1:
            final_rate = 0.1
        return final_rate
    @classmethod
    def compute_rate_by_code(cls, code):
        factors = cls.__get_rate_factors(code)
        return cls.compute_rate(factors[0], factors[1], factors[2], factors[3], factors[4], factors[5], factors[6])
    @classmethod
    def __get_rate_factors(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        # 获取行业热度
        industry = global_util.code_industry_map.get(code)
@@ -139,6 +147,11 @@
            industry = global_util.code_industry_map.get(code)
        total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None
        # 当前票是否涨停
        if total_industry_limit_percent is not None:
            if code in global_util.limit_up_codes_percent:
                total_industry_limit_percent -= global_util.limit_up_codes_percent[code]
        # 获取量
        volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
            code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
@@ -154,8 +167,22 @@
        big_money_num = global_util.big_money_num.get(code)
        if big_money_num is None:
            big_money_num = big_money_num_manager.get_num(code)
        return cls.compute_rate(zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
                                limit_up_time, big_money_num)
        return (
            zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time,
            big_money_num)
    @classmethod
    def factors_to_string(cls, code):
        vals = cls.__get_rate_factors(code)
        return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals
    @classmethod
    def __get_zyltgb(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_util.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
        return zyltgb
    @classmethod
    def compute_m_value(cls, code):
@@ -166,12 +193,24 @@
            if zyltgb is None:
                print("没有获取到自由流通市值")
                return 10000000
        if code == '002476':
            print("")
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate = cls.compute_rate_by_code(code)
        # print("m值获取:", code, round(zyltgb * rate))
        return round(zyltgb * rate)
    # 获取安全笔数
    @classmethod
    def get_safe_buy_count(cls, code):
        gb = cls.__get_zyltgb(code)
        if not gb:
            # 默认10笔
            return 8
        count = gb // 100000000
        if count > 30:
            return 30
        if count < 5:
            return 5
        return count
# l2因子归因数据
@@ -191,7 +230,14 @@
if __name__ == "__main__":
    L2TradeFactorUtil.compute_m_value("000036")
    # print(L2TradeFactorUtil.get_big_money_rate(1))
    # print(L2TradeFactorUtil.get_rate_factors("003004"))
    # print(L2TradeFactorUtil.factors_to_string("003004"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("09:30:30"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("13:53:23"))
    print(L2TradeFactorUtil.get_limit_up_time_rate("14:23:23"))
    # print(L2TradeFactorUtil.get_big_money_rate(2))
    # print(L2TradeFactorUtil.get_big_money_rate(3))
l2_trade_test.py
New file
@@ -0,0 +1,29 @@
# 交易测试
# 清除交易数据
import big_money_num_manager
import redis_manager
from l2_data_manager import TradePointManager
def clear_trade_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"]
    for k in keys:
        redis_l2.delete(k.format(code))
    TradePointManager.delete_buy_point(code)
    big_money_num_manager.reset(code)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = redis_info.keys("*{}*".format(code))
    for k in keys:
        if k.find("pre") is not None:
            continue
        if k.find("zyltgb") is not None:
            continue
        redis_info.delete(k)
limit_up_time_manager.py
@@ -37,6 +37,7 @@
        global_util.limit_up_time[code] = redis.get(key)
# 板块强度排序
def sort_code_by_limit_time(codes):
    if not global_util.limit_up_time:
        load_limit_up_time()
@@ -47,10 +48,23 @@
            list.append((code, limit_up_time))
    new_s = sorted(list, key=lambda e: int(e[1].replace(":", "")))
    dict_ = {}
    # 相同值为同一排序
    sort_index = 0
    for i in range(0, len(new_s)):
        dict_[new_s[i][0]] = i
        if new_s[i - 1][1] != new_s[i][1] and i > 0:
            sort_index += 1
        dict_[new_s[i][0]] = sort_index
    return dict_
if __name__ == "__main__":
    print(sort_code_by_limit_time(["002393", "002476", "002614", "002750", "600082", "002751"]))
    list = [("1234578", "09:00:03"), ("12345", "09:00:00"), ("123456", "09:00:00"), ("123457", "09:00:03")]
    new_s = sorted(list, key=lambda e: int(e[1].replace(":", "")))
    dict_ = {}
    # 相同值为同一排序
    sort_index = 0
    for i in range(0, len(new_s)):
        if new_s[i - 1][1] != new_s[i][1] and i > 0:
            sort_index += 1
        dict_[new_s[i][0]] = sort_index
    print(dict_)
log.py
@@ -1,15 +1,20 @@
"""
日志
"""
import datetime
import os
from loguru import logger
def get_path(dir_name, log_name):
    return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
logger.add(get_path("l2", "l2_process_time"), filter=lambda record: record["extra"].get("name") == "l2_process_time",
           rotation="00:00", compression="zip", enqueue=True)
logger_l2_process_time = logger.bind(name="l2_process_time")
logger.remove(handler_id=None)
#   每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
@@ -26,9 +31,9 @@
logger.add(get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_trade_cancel"), filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
           rotation="00:00", compression="zip", enqueue=True)
@@ -50,15 +55,17 @@
logger.add(get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
           rotation="00:00", compression="zip", enqueue=True)
logger_trade_gui = logger.bind(name="trade_gui")
logger_trade = logger.bind(name="trade")
logger_trade_delegate = logger.bind(name="delegate")
logger_l2_error = logger.bind(name="l2_error")
logger_l2_process = logger.bind(name="l2_process")
logger_l2_trade = logger.bind(name="l2_trade")
logger_l2_trade_cancel = logger.bind(name="l2_trade_cancel")
logger_l2_trade_buy = logger.bind(name="l2_trade_buy")
logger_l2_big_data = logger.bind(name="l2_big_data")
logger_juejin_tick = logger.bind(name="juejin_tick")
@@ -66,5 +73,52 @@
logger_device = logger.bind(name="device")
logger_system = logger.bind(name="system")
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code), "D:/logs/gp/l2/l2_process.{}.log".format(date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade.{}.log".format(date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade_cancel.{}.log".format(date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "D:/logs/gp/l2/{}".format(code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
if __name__ == '__main__':
    logger_trade_gui.info("测试")
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    LogUtil.extract_log_from_key("003005", "D:/logs/gp/l2/l2_process_time.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_process_time{}.{}.log".format("003005", date))
mysql_data.py
New file
@@ -0,0 +1,85 @@
# 先要导入pymysql
import pymysql
# 把连接参数定义成字典
config = {
    "host": "127.0.0.1",
    "port": 3306,
    "database": "gp",
    "charset": "utf8",
    "user": "root",
    "passwd": "123456"
}
class Mysqldb:
    # 初始化方法
    def __init__(self):
        # 初始化方法中调用连接数据库的方法
        self.conn = self.get_conn()
        # 调用获取游标的方法
        self.cursor = self.get_cursor()
    # 连接数据库的方法
    def get_conn(self):
        # **config代表不定长参数
        conn = pymysql.connect(**config)
        return conn
    # 获取游标
    def get_cursor(self):
        cursor = self.conn.cursor()
        return cursor
    # 查询sql语句返回的所有数据
    def select_all(self, sql):
        self.cursor.execute(sql)
        return self.cursor.fetchall()
    # 查询sql语句返回的一条数据
    def select_one(self, sql):
        self.cursor.execute(sql)
        return self.cursor.fetchone()
    # 查询sql语句返回的几条数据
    def select_many(self, sql, num):
        self.cursor.execute(sql)
        return self.cursor.fetchmany(num)
    # 增删改除了SQL语句不一样其他都是一样的,都需要提交
    def execute(self, sql, args=None):
        try:
            # 执行语句
            self.cursor.execute(sql, args)
            # 提交
            self.conn.commit()
        except Exception as e:
            print("提交出错\n:", e)
            # 如果出错要回滚
            self.conn.rollback()
    def execute_many(self, sql, args=None):
        try:
            # 执行语句
            self.cursor.executemany(sql, args)
            # 提交
            self.conn.commit()
        except Exception as e:
            print("提交出错\n:", e)
            # 如果出错要回滚
            self.conn.rollback()
    # 当对象被销毁时,游标要关闭,连接也要关闭
    # 创建时是先创建连接后创建游标,关闭时是先关闭游标后关闭连接
    def __del__(self):
        self.cursor.close()
        self.conn.close()
if __name__ == '__main__':
    mysqldb = Mysqldb()
    # 插入单条数据
    mysqldb.execute("insert into clients(account,pwd,rule) values(%s,%s,%s)", ("test", 123456, "\"123"))
    # 插入多条数据
    mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)", [("test", 123456, "\"123"),("test", 123456, "\"123")])
server.py
@@ -12,10 +12,14 @@
import alert_util
import code_volumn_manager
import data_process
import global_util
import gpcode_manager
import authority
import juejin
import l2_data_log
import l2_data_manager
import l2_data_manager_new
import log
import ths_industry_util
import ths_util
import tool
@@ -24,11 +28,13 @@
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
from trade_data_manager import TradeCancelDataManager
from trade_queue_manager import THSBuy1VolumnManager
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None):
        self.pipe = pipe  # 增加的参数
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
        self.pipe_ui = pipe_ui
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
@@ -41,6 +47,7 @@
    set_operate_code_state_dict = {}
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -65,7 +72,7 @@
            if len(data) == 0:
                # print("客户端断开连接")
                break;
            _str = str(data, encoding="gb2312")
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
@@ -74,14 +81,23 @@
                    try:
                        __start_time = round(time.time() * 1000)
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # 保存l2截图时间
                        TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间")
                        # try:
                        #     self.pipe_ui.send(
                        #         json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}}))
                        # except:
                        #     pass
                        # 过时 保存l2截图时间
                        # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -109,8 +125,10 @@
                                        self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                   "l2数据正确性判断时间")
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -133,9 +151,10 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
                                    logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                    except:
                        pass
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
                        logging.exception(e)
                elif type == 10:
                    # level2交易队列
                    try:
@@ -154,7 +173,7 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
@@ -190,7 +209,7 @@
                elif type == 4:
                    # 行业代码信息
                    dataList = data_process.parseList(_str)
                    data_process.saveIndustryCode(dataList)
                    ths_industry_util.save_industry_code(dataList)
                elif type == 6:
                    # 可用金额
                    datas = data_process.parseData(_str)
@@ -217,6 +236,20 @@
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accpt_prices(data)
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        index = data["index"]
                        code_name = data["codeName"]
                        volumn = data["volumn"]
                        time_ = data["time"]
                        code = global_util.name_codes.get(code_name)
                        if code is None:
                            global_util.load_name_codes()
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 保存数据
                            self.buy1_volumn_manager.save(code, time_, volumn)
                elif type == 30:
                    # 心跳信息
@@ -228,7 +261,9 @@
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
                        alert_util.alarm()
                        l2_clients = authority.get_l2_clients()
                        if client_id in l2_clients:
                            alert_util.alarm()
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
@@ -250,7 +285,7 @@
    try:
        socketClient.send(json.dumps(data).encode())
        recv = socketClient.recv(1024)
        result = recv.decode().lstrip()
        result = str(recv, encoding="gbk")
        return result
    finally:
        socketClient.close()
ths_data.py
@@ -13,7 +13,7 @@
from scrapy import cmdline
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
import mongo_data
import mysql_data
def save(dn_name, datas):
@@ -251,4 +251,4 @@
            code = str.split(":")[1].strip()
            _list.append({"_id": name, "first_code": code})
    mongo_data.save("ths-industry", _list)
    #mongo_data.save("ths-industry", _list)
ths_industry_util.py
@@ -3,18 +3,21 @@
"""
# 同花顺行业
import time
import global_util
import mongo_data
import mysql_data
# 获取行业映射
def get_code_industry_maps():
    __code_map = {}
    __industry_map = {}
    results = mongo_data.find("ths-industry-codes", {})
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from ths_industry_codes")
    for r in results:
        code = r["_id"]
        industry = r["second_industry"]
        code = r[0]
        industry = r[1]
        __code_map[code] = industry
        if __industry_map.get(industry) is None:
            __industry_map[industry] = set()
@@ -24,6 +27,8 @@
# 设置行业热度
def set_industry_hot_num(limit_up_datas):
    if limit_up_datas is None:
        return
    industry_hot_dict = {}
    code_industry_map = global_util.code_industry_map
    if code_industry_map is None or len(code_industry_map) == 0:
@@ -44,6 +49,10 @@
        percent = float(data["limitUpPercent"])
        if percent > 21:
            percent = 21
        percent = round(percent, 2)
        # 保存涨幅
        global_util.limit_up_codes_percent[code] = percent
        industry_hot_dict[industry] = round(industry_hot_dict[industry] + percent, 2)
    global_util.industry_hot_num = industry_hot_dict
@@ -57,7 +66,7 @@
        global_util.load_industry()
        industry = global_util.code_industry_map.get(code)
    if industry is None:
        return None,None
        return None, None
    codes_ = set()
    for code_ in codes:
        if global_util.code_industry_map.get(code_) == industry:
@@ -66,6 +75,53 @@
    return industry, codes_
# 获取这一批数据的行业
def __get_industry(datas):
    ors = []
    codes = set()
    for data in datas:
        codes.add(data["code"])
    " or ".join(codes)
    for code in codes:
        ors.append("first_code='{}'".format(code))
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from ths_industry where {}".format(" or ".join(ors)))
    _fname = None
    for a in results:
        _fname = a[0]
        break
    print("最终的二级行业名称为:", _fname)
    return _fname
# 保存单个代码的行业
def __save_code_industry(code, industry_name, zyltgb, zyltgb_unit):
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from ths_industry_codes where _id={}".format(code))
    if result is None:
        mysqldb.insert(
            "insert into ths_industry_codes(code,industry_name,zyltgb,zyltgb_unit) values('{}','{}','{}',{})".format(
                code, industry_name, zyltgb, zyltgb_unit, round(time.time() * 1000)))
    else:
        mysqldb.update(
            "update ths_industry_codes set industry_name='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
                industry_name, zyltgb, zyltgb_unit, code))
# 保存行业代码
def save_industry_code(datasList):
    for datas in datasList:
        # 查询这批数据所属行业
        industry_name = __get_industry(datas)
        _list = []
        for data in datas:
            # 保存
            __save_code_industry(data["code"],industry_name,data["zyltgb"],data["zyltgb_unit"])
if __name__ == "__main__":
    _code_map, _industry_map = get_code_industry_maps()
    print(_code_map, _industry_map)
tool.py
@@ -3,6 +3,7 @@
"""
import decimal
import random
import time
import time as t
import datetime
@@ -47,9 +48,21 @@
        return False
if __name__=="__main__":
     d1 = decimal.Decimal("0.12")
     d2 = decimal.Decimal("0.12")
     if d1==d2:
         print("123")
def run_time():
    def decorator(func):
        def infunc(*args, **kwargs):
            start = round(time.time() * 1000)
            result = func(args, **kwargs)
            print("执行时间", round(time.time() * 1000) - start)
            return result
        return infunc
    return decorator
if __name__ == "__main__":
    d1 = decimal.Decimal("0.12")
    d2 = decimal.Decimal("0.12")
    if d1 == d2:
        print("123")
trade_gui.py
@@ -5,12 +5,16 @@
import array
import threading
import time
import random
import win32gui
import win32api
import win32con
import global_util
import gpcode_manager
import redis_manager
import tool
from log import *
from threading import Thread
@@ -66,9 +70,9 @@
    @classmethod
    def checkEnv(cls):
        # 检测交易窗口
        buy_wins = cls.get_buy_wins()
        if len(buy_wins) < 3:
            raise Exception("闪电买入窗口最低需要3个")
        buy_wins = THSBuyWinManagerNew.get_buy_wins()
        if len(buy_wins) < 10:
            raise Exception("下单窗口最低需要10个")
        # 检测撤单窗口
        cancel_trade_win = cls.getCancelBuyWin()
@@ -169,7 +173,8 @@
    def getLimitUpPrice(self, win):
        hwnd = win32gui.GetDlgItem(win, 0x000006C8)
        return self.getText(hwnd)
        text_ = self.getText(hwnd)
        return text_.replace("涨停:", "")
    # 获取交易结果
    def getTradeResultWin(self):
@@ -217,36 +222,40 @@
        try:
            logger_trade_gui.info("开始买入:code-{}".format(code))
            if win < 1:
                win = self.get_available_buy_win()
                if win < 1:
                win = THSBuyWinManagerNew.get_distributed_code_win(code)  # self.get_available_buy_win()
                if win is None or win < 1:
                    raise Exception("无可用的交易窗口")
            print("使用窗口", win)
            t = time.time()
            print(t)
            start = int(round(t * 1000))
            # 输入代码
            # 代码输入框的控件ID:0x00000408
            hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
            # 名称 名称的控件ID:0x0000040C
            hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
            self.input_number(hwnd1, code)
            # 最多等待2s钟
            data_fill = False
            for i in range(0, 500):
                bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
                print(i, bufSize)
                if bufSize > 1:
                    data_fill = True
                    break;
                time.sleep(0.004)
            if not data_fill:
                raise Exception("代码输入填充出错")
            time.sleep(0.001)
            # # 输入代码
            # # 代码输入框的控件ID:0x00000408
            # hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
            # # 名称 名称的控件ID:0x0000040C
            # hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
            # self.input_number(hwnd1, code)
            # # 最多等待2s钟
            # data_fill = False
            # for i in range(0, 500):
            #     bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
            #     print(i, bufSize)
            #     if bufSize > 1:
            #         data_fill = True
            #         break;
            #     time.sleep(0.004)
            #
            # if not data_fill:
            #     raise Exception("代码输入填充出错")
            # time.sleep(0.001)
            # 验证涨停价
            limit_up_price_now = self.getLimitUpPrice(win)
            trade_win = THSBuyWinManagerNew.get_trade_win(win)
            # if not trade_win:
            #     error = "交易子窗口查找失败 {}".format(code)
            #     raise Exception(error)
            # 测试,暂时不验证涨停价
            # TODO 暂时不验证涨停价
            if not global_util.TEST:
                if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
                    error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
@@ -258,6 +267,7 @@
            # win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONUP, 0, 0);
            # 买入 快捷键B
            # 获取交易win
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0);
            logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
@@ -380,18 +390,432 @@
            self.input_number(code_input, "")
class THSGuiUtil:
    @classmethod
    def getText(cls, hwnd):
        bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
        buffer = array.array('b', b'\x00\x00' * bufSize)
        win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
        text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
        return text.replace("\x00", "").strip();
    # 添加下单窗口
    @classmethod
    def add_buy_win(cls):
        buy_wins = THSGuiTrade().get_buy_wins()
        if len(buy_wins) < 1:
            raise Exception("没有买入窗口")
        if len(buy_wins) >= 10:
            raise Exception("最多只能添加10个下单框")
        # 增加窗口按钮的ID:00005ED
        win = buy_wins[-1]
        add_btn = win32gui.GetDlgItem(win, 0x000005ED)
        if add_btn <= 0:
            raise Exception("没有找到添加按钮")
        try:
            win32gui.SetForegroundWindow(win)
        except:
            pass
        cls.click(add_btn)
        for i in range(0, 30):
            new_buy_wins = THSGuiTrade().get_buy_wins()
            if len(new_buy_wins) - len(buy_wins) >= 1:
                # 求差集
                list_ = list(set(new_buy_wins).difference(set(buy_wins)))
                return list_[0]
            else:
                time.sleep(0.01)
        raise Exception("未添加成功")
    # 窗口是否存在
    @classmethod
    def is_win_exist(cls, win):
        try:
            result = win32gui.IsWindowVisible(win)
            if result:
                return True
            else:
                return False
        except:
            return False
    # 窗口是否正在展示
    @classmethod
    def is_win_show(cls, win):
        try:
            result = win32gui.GetWindowRect(win)
            if result[2] - result[0] > 0 and result[3] - result[1] > 0:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def click(cls, control):
        win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0)
        win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0)
    # 清除买入窗口代码
    @classmethod
    def clear_buy_window_code(cls, win):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, "")
    # 设置买入窗口代码
    @classmethod
    def set_buy_window_code(cls, win, code):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
        try:
            win32gui.SetForegroundWindow(win)
        except:
            pass
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, code)
# 过时 同花顺买入窗口管理器
class __THSBuyWinManager:
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 保存窗口代码分配
    @classmethod
    def __save_code_win(cls, code, win):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), win)
    # 获取窗口分配的代码
    @classmethod
    def __get_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        win = cls.__get_redis().get(key)
        if win is not None:
            return int(win)
        return None
    # 删除代码窗口分配
    @classmethod
    def __del_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().delete(key)
    # 获取所有已经分配窗口的代码
    @classmethod
    def __get_distributed_win_codes(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        codes = []
        for k in keys:
            codes.append(k.replace("buywin_distribute-", ""))
        return codes
    # 获取可用的窗口
    @classmethod
    def __get_available_win(cls):
        # 是否有可用的还未分配的窗口
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        win_list = THSGuiTrade().get_buy_wins()
        if len(win_list) < 1:
            raise Exception("必须要有一个买入窗口")
        win_set = set(win_list)
        for k in keys:
            win = int(cls.__get_redis().get(k))
            if win in win_set:
                win_set.remove(win)
        if len(win_set) > 0:
            return win_set.pop()
        # 没有剩余的窗口,新增加窗口
        win = THSGuiUtil.add_buy_win()
        if win:
            return win
        else:
            raise Exception("新增窗口失败")
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
            # 已经分配的窗口是否有效
            if THSGuiUtil.is_win_exist(win):
                # 填充代码
                THSGuiUtil.set_buy_window_code(win, code)
                return win
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(win, code)
        return win
    # 删除代码窗口分配
    @classmethod
    def cancel_distribute_win_for_code(cls, code):
        win = cls.__get_code_win(code)
        if win is not None:
            # 清除代码
            THSGuiUtil.clear_buy_window_code(win)
        cls.__del_code_win(code)
    # 获取代码已经分配的窗口
    @classmethod
    def get_distributed_code_win(cls, code):
        win = cls.__get_code_win(code)
        if not THSGuiUtil.is_win_exist(win):
            # 删除不存在的窗口
            cls.__del_code_win(code)
            return None
        return win
    @classmethod
    def fill_codes(cls, codes):
        codes_ = gpcode_manager.get_gp_list()
        # 先删除没有的代码
        old_codes = cls.__get_distributed_win_codes()
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
        for code in add_codes:
            # 已经加入进去的不做操作
            if code in old_codes:
                continue
            win = cls.distribute_win_for_code(code)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
# 同花顺买入窗口管理器
class THSBuyWinManagerNew:
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def get_buy_wins(cls):
        buy_win_list = []
        hWndList = []
        main_hwnd = None
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            if THSGuiUtil.getText(hwnd) == "专业版下单":
                main_hwnd = hwnd
                break
        if not main_hwnd:
            raise Exception("专业版下单未打开")
        child_win = None
        for i in range(0, 20):
            child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None)
            if not child_win:
                break
            if not win32gui.IsWindowVisible(child_win):
                continue
            temp = win32gui.FindWindowEx(child_win, None, "Button", "撤单")
            if temp:
                buy_win_list.append(child_win)
        return buy_win_list
    @classmethod
    def get_trade_win(cls, win):
        # 获取交易窗口
        child_child_win = None
        for j in range(0, 10):
            child_child_win = win32gui.FindWindowEx(win, child_child_win, "#32770", None)
            if not child_child_win:
                break
            temp = win32gui.FindWindowEx(child_child_win, None, "Edit", None)
            if temp:
                return child_child_win
        return None
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 保存窗口代码分配
    @classmethod
    def __save_code_win(cls, code, win):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), win)
    # 获取窗口分配的代码
    @classmethod
    def __get_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        win = cls.__get_redis().get(key)
        if win is not None:
            return int(win)
        return None
    # 删除代码窗口分配
    @classmethod
    def __del_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().delete(key)
    # 获取所有已经分配窗口的代码
    @classmethod
    def __get_distributed_win_codes(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        codes = []
        for k in keys:
            codes.append(k.replace("buywin_distribute-", ""))
        return codes
    # 获取可用的窗口
    @classmethod
    def __get_available_win(cls):
        # 是否有可用的还未分配的窗口
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        win_list = cls.get_buy_wins()
        if len(win_list) < 1:
            raise Exception("必须要有一个买入窗口")
        win_set = set(win_list)
        for k in keys:
            win = int(cls.__get_redis().get(k))
            if win in win_set:
                win_set.remove(win)
        if len(win_set) > 0:
            return win_set.pop()
        # 没有剩余的窗口,新增加窗口
        raise Exception("没有剩余窗口")
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
            # 已经分配的窗口是否有效
            if THSGuiUtil.is_win_exist(win):
                # 填充代码
                THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
                return win
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
        return win
    # 删除代码窗口分配
    @classmethod
    def cancel_distribute_win_for_code(cls, code):
        win = cls.__get_code_win(code)
        if win is not None:
            # 清除代码
            try:
                THSGuiUtil.clear_buy_window_code(win)
            except:
                pass
        cls.__del_code_win(code)
    # 获取代码已经分配的窗口
    @classmethod
    def get_distributed_code_win(cls, code):
        win = cls.__get_code_win(code)
        if not THSGuiUtil.is_win_exist(win):
            # 删除不存在的窗口
            cls.__del_code_win(code)
            return None
        return win
    # 获取代码名称
    @classmethod
    def __get_code_name(cls, win):
        trade_win = cls.get_trade_win(win)
        if trade_win is None:
            return None
        code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2)
        return THSGuiUtil.getText(code_name_win)
    @classmethod
    def fill_codes(cls, codes):
        name_codes = gpcode_manager.get_name_codes()
        codes_ = gpcode_manager.get_gp_list()
        # 先删除没有的代码
        old_codes = cls.__get_distributed_win_codes()
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
        for code in add_codes:
            # 已经加入进去的不做操作
            if code in old_codes:
                # 校验代码是否填充对
                win = cls.__get_code_win(code)
                if not THSGuiUtil.is_win_exist(win):
                    cls.cancel_distribute_win_for_code(code)
                else:
                    code_name = cls.__get_code_name(win)
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
            win = cls.distribute_win_for_code(code)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
class GUITest:
    def test_distribute(self):
        codes = ["300396", "688656", "688029", "688787", "688016", "002659", "002777", "603318", "000333", "003004",
                 "002882", "300014", "688981", "002531"]
        for i in range(10, len(codes)):
            THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
        for i in range(0, 10):
            win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
            time.sleep(1)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
        random.shuffle(codes)
        print(codes[0:10])
        for i in range(10, len(codes)):
            THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
        for i in range(0, 10):
            win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
            time.sleep(1)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
        # THSBuyWinManager.cancel_distribute_win_for_code("600125")
if __name__ == '__main__':
    try:
        # THSGuiTrade.checkEnv();
        # print("环境正常")
        trade = THSGuiTrade();
        print(id(trade))
        # win = trade.get_available_buy_win()
        # if win < 1:
        #     raise Exception("无可用的交易窗口")
        # result = trade.buy("002564", "7.26")
        # # print("交易成功")
        # time.sleep(0.2)
        trade.cancel_buy("000716")
    except Exception as e:
        print(e)
    THSGuiTrade().buy("002853", "18.98", THSBuyWinManagerNew.get_buy_wins()[5])
    # GUITest().test_distribute()
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
    # except Exception as e:
    #     print(e)
trade_manager.py
@@ -9,7 +9,7 @@
import gpcode_manager
import l2_code_operate
import mongo_data
import mysql_data
import tool
from trade_data_manager import TradeBuyDataManager
from trade_gui import THSGuiTrade, async_call
@@ -85,6 +85,7 @@
    redis = __redis_manager.getRedis()
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    redis.setex("trade-success-latest-time", tool.get_expire(), time_str)
    mysqldb = mysql_data.Mysqldb()
    for data in datas:
        _time = data["time"]
        # 过滤错误数据
@@ -93,22 +94,35 @@
        data["_id"] = data["trade_num"]
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
        count = mongo_data.count("ths-trade-success-record", {"_id": data["_id"]})
        if count is None or count < 1:
            mongo_data.save_one("ths-trade-success-record", data)
        counts = mysqldb.select_one("select count(*) from ths_trade_success_record where _id='{}'".format(data["_id"]))
        if counts[0] < 1:
            mysqldb.execute(
                "insert into ths_trade_success_record(_id,code,money,num,price,time,trade_num,type,day,create_time) values('{}','{}','{}','{}','{}','{}','{}',{},'{}',{})".format(
                    data["_id"], data["code"], data["money"], data["num"], data["price"], data["time"],
                    data["trade_num"], data["type"], data["day"], round(t.time() * 1000)))
        else:
            mysqldb.execute(
                "update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s",(
                    data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"],data["_id"]))
# 保存交易委托数据
def save_trade_delegate_data(datas):
    day = datetime.datetime.now().strftime("%Y%m%d")
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    mysqldb = mysql_data.Mysqldb()
    for data in datas:
        data["_id"] = "{}-{}-{}".format(day, data["code"], data["time"])
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
        count = mongo_data.count("ths-trade-delegate-record", {"_id": data["_id"]})
        if count is None or count < 1:
            mongo_data.save_one("ths-trade-delegate-record", data)
        counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"]))
        if counts[0] < 1:
            mysqldb.execute(
                "insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%d)",
                (
                    data["_id"], data["code"],  data["num"], data["price"], data["time"],
                    data["trade_num"],data["trade_price"], data["type"], data["day"], round(t.time() * 1000)))
    # 保存最新的委托数据
    redis = __redis_manager.getRedis()
    redis.setex("trade-delegate-latest", tool.get_expire(), json.dumps(datas))
@@ -119,7 +133,15 @@
def get_trade_success_data():
    redis = __redis_manager.getRedis()
    day = datetime.datetime.now().strftime("%Y%m%d")
    return mongo_data.find("ths-trade-success-record", {"day": day}), redis.get("trade-success-latest-time")
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day))
    datas = []
    for result in results:
        data = {"_id": result[0], "code": result[1], "money": result[2], "num": result[3], "price": result[4],
                "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8],
                "create_time": result[9]}
        datas.append(data)
    return datas, redis.get("trade-success-latest-time")
# 获取交易委托数据
@@ -316,8 +338,12 @@
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = redis_info.keys("*{}*".format(code))
    for k in keys:
        if k.find("pre") is None or k.find("pre") or k.find("zyltgb") < 0:
            redis_info.delete(k)
        if k.find("pre") is not None:
            continue
        if k.find("zyltgb") is not None:
            continue
        redis_info.delete(k)
def __clear_big_data():
@@ -330,6 +356,6 @@
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    # __clear_data("002093")
    __clear_big_data()
    __clear_data("002388")
    # __clear_big_data()
    pass
trade_queue_manager.py
New file
@@ -0,0 +1,134 @@
# 买1量管理
import decimal
import json
import gpcode_manager
import redis_manager
import tool
class THSBuy1VolumnManager:
    __redisManager = redis_manager.RedisManager(1)
    __last_data = {}
    __code_time_volumn_dict = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
        key = "buy1_volumn-{}-{}".format(code, time_str)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
        # 保存最近的
        key = "buy1_volumn_latest_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn)))
    # 保存上一次数据
    def __save_last_recod(self, code, time_str, volumn):
        # 保存最近的
        key = "buy1_volumn_last_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn)))
    def __get_last_record(self, code):
        key = "buy1_volumn_last_info-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    def __get_latest_record(self, code):
        key = "buy1_volumn_latest_info-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    # 返回是否需要更新数据
    def save(self, code, time_str, volumn):
        if volumn < 1:
            return False
        # 不保存和上一次相同的数据
        if code in self.__last_data and self.__last_data[code] == volumn:
            return False
        self.__last_data[code] = volumn
        if code not in self.__code_time_volumn_dict:
            self.__code_time_volumn_dict[code] = {}
        self.__code_time_volumn_dict[code][time_str] = volumn
        # 删除倒数第2个之前的数据
        keys = []
        for k in self.__code_time_volumn_dict[code].keys():
            keys.append(k)
        keys.sort(key=lambda val: int(val.replace(":", "")))
        if len(keys) > 2:
            for i in range(0, len(keys) - 2):
                self.__code_time_volumn_dict[code].pop(keys[i])
            keys = keys[len(keys) - 2:]
        if len(keys) == 2:
            self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]])
        self.__save_recod(code, time_str, volumn)
        return True
    # 获取校验数据
    # 返回上一次的数据,如果没有上一次的就返回本次的
    def get_verify_data(self, code):
        time_str, volumn = self.__get_last_record(code)
        if time_str is not None:
            return time_str, volumn
        time_str, volumn = self.__get_latest_record(code)
        return time_str, volumn
class JueJinBuy1VolumnManager:
    __redisManager = redis_manager.RedisManager(1)
    __last_data = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
        key = "buy1_volumn_juejin-{}-{}".format(code, time_str)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
        key = "buy1_volumn_juejin_latest_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
    def __get_latest_record(self, code):
        key = "buy1_volumn_juejin_latest_info-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    # 返回是否需要更新数据
    def save(self, code, time_str, volumn, price):
        # 判断是否为涨停价
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price != tool.to_price(decimal.Decimal(price)):
            # 非涨停价
            return False
        if volumn < 1:
            return False
        # 不保存和上一次相同的数据
        if code in self.__last_data and self.__last_data[code] == volumn:
            return False
        self.__last_data[code] = volumn
        self.__save_recod(code, time_str, volumn)
        return True
    # 获取校验数据
    # 返回上一次的数据,如果没有上一次的就返回本次的
    def get_verify_data(self, code):
        time_str, volumn = self.__get_latest_record(code)
        return time_str, volumn
if __name__ == '__main__':
    JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12)