Administrator
2022-10-27 6e71fbcb119e7068ba35380edaa5cc66e7c71f1b
交易体系完善
21个文件已修改
4个文件已添加
1033 ■■■■■ 已修改文件
authority.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_data_util.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_data_loader.py 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 69 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_util.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mysql_data.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 60 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
authority.py
@@ -6,6 +6,8 @@
# 新增用户
def add_user(id, account, pwd, rule):
    mysqldb = mysql_data.Mysqldb()
@@ -57,6 +59,11 @@
    return result[0], results_[1]
if __name__ == '__main__':
    # add_rule("super", ["l2", "limit_up", "industry", "trade_success", "trade_delegate", "code_upload"])
    # add_rule("client-l2", ["l2", "limit_up", "trade_success", "trade_delegate"])
client_manager.py
New file
@@ -0,0 +1,51 @@
# 客户端管理器
import json
import authority
import redis_manager
import ths_util
__redisManager = redis_manager.RedisManager(0)
def getValidL2Clients():
    redis = __redisManager.getRedis()
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        # 客户端同花顺没卡死才能加入
        if not ths_util.is_ths_dead(_id):
            client_ids.append(int(_id))
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
# 获取客户端IP
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis()
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[0]
def saveClientActive(client_id, host, thsDead):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis()
    redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    ths_util.set_ths_dead_state(client_id, thsDead)
# 获取客户端同花顺状态
def getTHSState(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[1]
code_data_util.py
@@ -3,10 +3,26 @@
"""
# 股票代码相关的参数
import decimal
import time
import gpcode_manager
import mysql_data
import redis_manager
import tool
_redisManager = redis_manager.RedisManager(0)
# 代码对应的价格是否正确
def is_same_code_with_price(code, price):
    # 昨日收盘价
    price_close = gpcode_manager.get_price_pre(code)
    max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1"))
    min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9"))
    if min_price <= decimal.Decimal(str(price)) <= max_price:
        return True
    return False
# 自由流通股本工具类
class ZYLTGBUtil:
@@ -24,3 +40,24 @@
        if val is not None:
            return int(val)
        return None
    @classmethod
    def save_list(self, datasList):
        # 保存自由流通市值
        mysqldb = mysql_data.Mysqldb()
        for data in datasList:
            # 保存
            _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                     "update_time": int(round(time.time() * 1000))}
            if float(data["zyltgb"]) > 0:
                # 保存10天
                ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
                result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"]))
                if result is None:
                    mysqldb.execute(
                        "insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(
                            data["code"], data["zyltgb"], data["zyltgb_unit"], round(time.time() * 1000)))
                else:
                    mysqldb.execute(
                        "update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(
                            data["zyltgb"], data["zyltgb_unit"], round(time.time() * 1000), data["code"]))
constant.py
New file
@@ -0,0 +1,4 @@
# 是否为测试
TEST = False
# 水下捞累计连续水下时间最小值
UNDER_WATER_PRICE_TIME_AS_SECONDS = 600
data_process.py
@@ -1,19 +1,8 @@
# 数据处理
import decimal
import json
import logging
import time as t
import authority
import mysql_data
import redis_manager
import gpcode_manager
# 统计今日卖出
# 统计今日买入
import ths_util
import tool
from code_data_util import ZYLTGBUtil
__redisManager = redis_manager.RedisManager(0)
@@ -58,112 +47,6 @@
    return data
def parseL2TradeQueueData(str):
    dict = json.loads(str)
    data = dict["data"]
    code = data["code"]
    trade_data = data["data"]
    return code, trade_data
# 代码对应的价格是否正确
def is_same_code_with_price(code, price):
    # 昨日收盘价
    price_close = gpcode_manager.get_price_pre(code)
    max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1"))
    min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9"))
    if min_price <= decimal.Decimal(str(price)) <= max_price:
        return True
    return False
# 保存L2交易队列
def saveL2TradeQueueData(code, data):
    redis = __redisManager.getRedis()
    data_str = json.dumps(data)
    key = "trade-queue-{}".format(code)
    # 保存5s的数据
    redis.setex(key, 5, data_str)
# 获取L2交易队列
def getL2TradeQueueData(code):
    redis = __redisManager.getRedis()
    key = "trade-queue-{}".format(code)
    data_str = redis.get(key)
    if data_str is None or len(data_str) <= 0:
        return None
    return json.loads(data_str)
# 保存自由流通市值
def saveZYLTSZ(datasList):
    mysqldb = mysql_data.Mysqldb()
    for data in datasList:
        # 保存
        _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                 "update_time": int(round(t.time() * 1000))}
        if float(data["zyltgb"]) > 0:
            # 保存10天
            ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
            result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"]))
            if result is None:
                mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000)))
            else:
                mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"]))
def saveClientActive(client_id, host, thsDead):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    ths_util.set_ths_dead_state(client_id, thsDead)
def getValidL2Clients():
    redis = __redisManager.getRedis();
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        # 客户端同花顺没卡死才能加入
        if not ths_util.is_ths_dead(_id):
            client_ids.append(int(_id))
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
# 获取客户端IP
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[0]
# 获取客户端同花顺状态
def getTHSState(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[1]
# 保存量能
def saveCodeVolumn(datas):
    redis = __redisManager.getRedis()
    for key in datas:
        k = "volumn-max-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["max_volumn"])
        k = "volumn-latest-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"])
if __name__ == '__main__':
    print(getActiveClientIP(3))
    pass
global_data_loader.py
New file
@@ -0,0 +1,63 @@
import code_volumn_manager
import global_util
import gpcode_manager
import ths_industry_util
from code_data_util import ZYLTGBUtil
def init():
    load_volumn()
    load_zyltgb()
    load_industry()
    load_name_codes()
# 加载行业数据
def load_industry():
    _code_industry_map, _industry_codes_map = ths_industry_util.get_code_industry_maps()
    global_util.code_industry_map.clear()
    global_util.code_industry_map.update(_code_industry_map)
    global_util.industry_codes_map.clear()
    global_util.industry_codes_map.update(_industry_codes_map)
# 加载目标标的的自由流通股本
def load_zyltgb():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        result = ZYLTGBUtil.get(code)
        if result is not None:
            global_util.zyltgb_map[code] = result
# 加载名称代码隐射
def load_name_codes():
    dict_ = gpcode_manager.get_name_codes()
    if dict_:
        for key in dict_:
            global_util.name_codes[key] = dict_[key]
# 加载量
def load_volumn():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        max60, yesterday = code_volumn_manager.get_histry_volumn(code)
        today = code_volumn_manager.get_today_volumn(code)
        global_util.max60_volumn[code] = max60
        global_util.yesterday_volumn[code] = yesterday
        global_util.today_volumn[code] = today
# 添加今日涨停数据
def add_limit_up_codes(datas, clear=False):
    if datas is None:
        return
    if clear:
        global_util.today_limit_up_codes.clear()
    # 涨停数量
    __dict = {}
    for data in datas:
        __dict[data["code"]] = data
    # print(__dict)
    global_util.today_limit_up_codes.update(__dict)
global_util.py
@@ -1,16 +1,7 @@
"""
全局临时变量
"""
# 代码行业映射
import code_volumn_manager
import gpcode_manager
import ths_industry_util
from code_data_util import ZYLTGBUtil
TEST = True
code_industry_map = {}
# 行业代码映射
@@ -37,66 +28,5 @@
big_money_num = {}
# 涨停时间
limit_up_time = {}
def init():
    load_volumn()
    load_zyltgb()
    load_industry()
    load_name_codes()
# 加载行业数据
def load_industry():
    _code_industry_map, _industry_codes_map = ths_industry_util.get_code_industry_maps()
    code_industry_map.clear()
    code_industry_map.update(_code_industry_map)
    industry_codes_map.clear()
    industry_codes_map.update(_industry_codes_map)
# 加载目标标的的自由流通股本
def load_zyltgb():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        result = ZYLTGBUtil.get(code)
        if result is not None:
            zyltgb_map[code] = result
# 加载名称代码隐射
def load_name_codes():
    dict_ = gpcode_manager.get_name_codes()
    if dict_:
        for key in dict_:
            name_codes[key] = dict_[key]
# 加载量
def load_volumn():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        max60, yesterday = code_volumn_manager.get_histry_volumn(code)
        today = code_volumn_manager.get_today_volumn(code)
        max60_volumn[code] = max60
        yesterday_volumn[code] = yesterday
        today_volumn[code] = today
# 添加今日涨停数据
def add_limit_up_codes(datas, clear=False):
    if datas is None:
        return
    if clear:
        today_limit_up_codes.clear()
    # 涨停数量
    __dict = {}
    for data in datas:
        __dict[data["code"]] = data
    # print(__dict)
    today_limit_up_codes.update(__dict)
if __name__ == "__main__":
    load_zyltgb()
    print(zyltgb_map["002819"])
# 现价
cuurent_prices = {}
gpcode_manager.py
@@ -5,20 +5,15 @@
import random
import time
import authority
import global_util
import client_manager
import redis_manager
import tool
import juejin
import data_process
import decimal
__redisManager = redis_manager.RedisManager(0)
def set_gp_list(gpset):
    # 获取基本信息
    code_datas = juejin.JueJinManager.get_gp_latest_info(gpset)
def set_gp_list(code_datas):
    codes = []
    name_codes = {}
    for _data in code_datas:
@@ -63,8 +58,6 @@
def set_limit_up_list(gpset):
    if gpset is None:
        return
    # 保存到内存中
    global_util.add_limit_up_codes(gpset)
    # 获取基本信息
    redis_instance = __redisManager.getRedis()
    # 删除之前的
@@ -134,6 +127,11 @@
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
def get_limit_up_price_by_preprice(price):
    if price is None:
        return None
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
# 获取跌停价
def get_limit_down_price(code):
    price = get_price_pre(code)
@@ -196,7 +194,7 @@
def get_can_listen_pos(client_id=0):
    client_ids = []
    if client_id <= 0:
        client_ids = data_process.getValidL2Clients()
        client_ids = client_manager.getValidL2Clients()
    else:
        client_ids.append(client_id)
    random.shuffle(client_ids)
@@ -245,7 +243,7 @@
# 监听是否满了
def is_listen_full():
    clients = data_process.getValidL2Clients()
    clients = client_manager.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= 8 * len(clients)
gui.py
@@ -7,16 +7,14 @@
import win32gui
import alert_util
import data_export_util
import multiprocessing
import global_util
import log
import mysql_data
import redis_manager
import server
import trade_gui
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from l2_trade_factor import L2TradeFactorUtil
@@ -47,7 +45,7 @@
def createServer(pipe_juejin, pipe_gui):
    print("create SocketServer")
    # 初始化参数
    global_util.init()
    global_data_loader.init()
    t1 = threading.Thread(target=lambda: __read_server_pipe(pipe_gui))
    # 后台运行
@@ -285,6 +283,16 @@
            except Exception as e:
                normal = False
                cl_win.configure(text="异常:{}".format(str(e)), foreground="#FF7F27")
            try:
                juejin_length= JueJinManager.get_listen_codes_lenth()
                codes_length = len(gpcode_manager.get_gp_list())
                cl_codes.configure(text="{}/{}".format(juejin_length,codes_length), foreground="#008000")
            except Exception as e:
                pass
            # 状态有问题,需要报警
            if not normal:
                alert_util.alarm()
@@ -322,6 +330,11 @@
        cl_win = Label(frame, text="未知", bg="#DDDDDD")
        cl_win.place(x=300, y=y_)
        cl = Label(frame, text="掘金代码回调数量:", bg="#DDDDDD")
        cl.place(x=350, y=y_)
        cl_codes = Label(frame, text="未知", bg="#DDDDDD")
        cl_codes.place(x=450, y=y_)
        refresh_data()
        # 添加更新线程
        t1 = threading.Thread(target=lambda: update_data())
@@ -343,8 +356,8 @@
        def refresh_data():
            for client_id in code_sv_map:
                ip = data_process.getActiveClientIP(client_id)
                ths_dead = data_process.getTHSState(client_id)
                ip = client_manager.getActiveClientIP(client_id)
                ths_dead = client_manager.getTHSState(client_id)
                if ip is not None and len(ip) > 0:
                    if ths_dead:
                        client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#FF7F27")
@@ -355,7 +368,9 @@
                for i in range(0, 8):
                    code = gpcode_manager.get_listen_code_by_pos(client_id, i)
                    data_count = l2_data_manager.get_l2_data_latest_count(code)
                    data_count = l2_data_util.get_l2_latest_data_number(code)
                    if data_count is None:
                        data_count=0
                    if code is not None and len(code) > 0:
                        code_sv_map[client_id][i].set(code + "({})".format(data_count))
                    else:
@@ -735,7 +750,7 @@
            showinfo("提示", "导出完成")
        def compute_m(code):
            m = L2TradeFactorUtil.compute_m_value(code)
            m,msg = L2TradeFactorUtil.compute_m_value(code)
            showinfo("提示", "{}".format(m))
        def clear_l2(code):
juejin.py
@@ -13,12 +13,15 @@
import gm.api as gmapi
import big_money_num_manager
import client_manager
import code_volumn_manager
import data_process
import constant
import global_data_loader
import global_util
import gpcode_manager
import threading
import l2_trade_util
import server
import tool
@@ -28,12 +31,14 @@
import trade_gui
from l2_code_operate import L2CodeOperate
from l2_data_manager import L2LimitUpMoneyStatisticUtil
from l2_data_manager import L2LimitUpMoneyStatisticUtil, L2DataUtil
from log import logger_juejin_tick, logger_system
from trade_data_manager import CodeActualPriceProcessor
from trade_queue_manager import JueJinBuy1VolumnManager
redisManager = redis_manager.RedisManager()
redisManager = redis_manager.RedisManager(0)
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
__actualPriceProcessor = CodeActualPriceProcessor()
# 设置账户信息
@@ -54,11 +59,11 @@
def init_data():
    # 载入行业股票代码
    global_util.load_industry()
    global_data_loader.load_industry()
    # 载入代码自由流通市值
    global_util.load_zyltgb()
    global_data_loader.load_zyltgb()
    # 载入量
    global_util.load_volumn()
    global_data_loader.load_volumn()
# 每日初始化
@@ -67,7 +72,7 @@
    logger_system.info("每日初始化")
    # 今日实时涨停
    global_util.add_limit_up_codes([], True)
    global_data_loader.add_limit_up_codes([], True)
    # 主要获取收盘价
    get_latest_info(None)
    # 获取60天最大量与昨日量
@@ -87,7 +92,7 @@
    global_util.limit_up_time.clear()
    init_data()
    # 初始化同花顺主站
    l2_clients = data_process.getValidL2Clients()
    l2_clients = client_manager.getValidL2Clients()
    for client in l2_clients:
        server.repair_ths_main_site(client)
@@ -187,7 +192,7 @@
    start2 = 60 * 60 * 12 + 50 * 60
    end2 = 60 * 60 * 15 + 5 * 60
    # TODO 测试
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or global_util.TEST:
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or constant.TEST:
        symbol = tick['symbol']
        price = tick['price']
        # print(symbol,price)
@@ -197,11 +202,12 @@
        # 保存最新价
        symbol = symbol.split(".")[1]
        JueJinManager.add_listen_code(symbol)
        time_ = tick["created_at"].strftime("%H:%M:%S")
        data_=(symbol,time_,tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
        data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
        logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2],
                                data_[3])
        need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2],data_[3])
        need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3])
        if need_sync:
            # 同步数据
            L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
@@ -259,6 +265,16 @@
                else:
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code))
                try:
                    __actualPriceProcessor.process_rate(code, rate, now_str)
                except Exception as e:
                    logging.exception(e)
                try:
                    __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice(pricePre) == tool.to_price(decimal.Decimal(d["price"])))
                except Exception as e:
                    logging.exception(e)
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        # 预填充下单代码
@@ -273,7 +289,7 @@
            logging.exception(e)
            pass
        client_ids = data_process.getValidL2Clients()
        client_ids = client_manager.getValidL2Clients()
        # 最多填充的代码数量
        max_count = len(client_ids) * 8
        if max_count == 0:
@@ -300,7 +316,11 @@
        # 增加应该增加的代码
        for code in add_code_list:
            if not gpcode_manager.is_listen(code):
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
                if not l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            else:
                if l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        print(add_code_list, del_list)
@@ -382,6 +402,17 @@
    def stop(self):
        gmapi.stop()
    @classmethod
    def add_listen_code(cls, code):
        redis = redisManager.getRedis()
        redis.setex("juejin_listen_code-{}".format(code), 20, "1")
    @classmethod
    def get_listen_codes_lenth(cls):
        redis = redisManager.getRedis()
        keys = redis.keys("juejin_listen_code-*")
        return len(keys)
def trade(code, volume):
    account_id, s_id, token = getAccountInfo()
l2_code_operate.py
@@ -7,13 +7,13 @@
import logging
import threading
import data_process
import client_manager
import gpcode_manager
import l2_data_manager
import l2_trade_util
import server
import tool
import trade_manager
import time
import redis_manager
from log import logger_code_operate
@@ -93,7 +93,7 @@
                            if client_id is not None and pos is not None:
                                L2CodeOperate.setGPCode(client_id, pos, "")
                    elif type == 1:
                        if trade_manager.is_in_forbidden_trade_codes(code):
                        if l2_trade_util.is_in_forbidden_trade_codes(code):
                            continue
                        if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
@@ -215,7 +215,7 @@
# 矫正客户端代码
def correct_client_codes():
    client_ids = data_process.getValidL2Clients()
    client_ids = client_manager.getValidL2Clients()
    for client_id in client_ids:
        try:
            index_codes = get_listen_codes_from_client(client_id)
l2_data_manager.py
@@ -9,7 +9,10 @@
from datetime import datetime
import big_money_num_manager
import code_data_util
import constant
import data_process
import global_data_loader
import global_util
import l2_data_util
@@ -245,9 +248,11 @@
    process_time = data["processTime"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas
    return day, client, channel, code, capture_time, process_time, datas,data
# 保存l2数据
@@ -278,7 +283,7 @@
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if global_util.TEST:
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
@@ -500,7 +505,7 @@
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
@@ -1036,7 +1041,8 @@
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        money,msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return money
    # 获取预估挂买位
    @classmethod
@@ -1377,7 +1383,7 @@
    def test_can_order(cls):
        code = "000948"
        global_util.load_industry()
        global_data_loader.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
l2_data_manager_new.py
@@ -4,7 +4,7 @@
import time as t
import big_money_num_manager
import data_process
import code_data_util
import global_util
import gpcode_manager
import l2_data_log
@@ -13,7 +13,6 @@
import l2_trade_factor
import l2_trade_test
import limit_up_time_manager
import log
import redis_manager
import ths_industry_util
import tool
@@ -22,8 +21,10 @@
    local_today_num_operate_map, L2LimitUpMoneyStatisticUtil
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
# TODO l2数据管理
from trade_data_manager import CodeActualPriceProcessor
class L2DataManager:
    # 格式化数据
    def format_data(self, datas):
@@ -148,6 +149,7 @@
    unreal_buy_dict = {}
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    @classmethod
    def debug(cls, code, content, *args):
@@ -174,7 +176,7 @@
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
@@ -190,9 +192,20 @@
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
                    # 第1条数据是否为09:30:00
                    if add_datas[0]["val"]["time"] == "09:30:00":
                        if global_util.cuurent_prices.get(code):
                            price_data = global_util.cuurent_prices.get(code)
                            if price_data[1]:
                                # 当前涨停价,设置涨停时间
                                logger_l2_process.info("开盘涨停:{}", code)
                                # 保存涨停时间
                                limit_up_time_manager.save_limit_up_time(code, "09:30:00")
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
@@ -223,7 +236,7 @@
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        threshold_money, msg = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    @classmethod
@@ -302,6 +315,7 @@
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second(
@@ -316,17 +330,48 @@
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
            # 水下捞且板块中的票小于21不能买
            if global_util.industry_hot_num.get(industry) <= 16:
                return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
            if codes_index.get(code) != 0:
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了,老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
                    first_codes.append(key)
            for key in first_codes:
                state = trade_manager.get_trade_state(key)
                if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 老大已经买成功了
                    return False, "老大{}已经买成功,老二无需购买".format(key)
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            for key in first_codes:
                # 找到了老大
                time_ = limit_up_time_manager.get_limit_up_time(key)
                if time_ == "09:30:00":
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
        # 过时  老二,本板块中涨停票数<29 不能买
        # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
        #         industry) is not None:
        #     if global_util.industry_hot_num.get(industry) < 29:
        #         return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
@@ -394,7 +439,7 @@
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        threshold_money = cls.__get_threshmoney(code)
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                                      compute_start_index),
@@ -402,6 +447,8 @@
                                                                                            count, threshold_money,
                                                                                            buy_single_index,
                                                                                            capture_time)
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
l2_data_util.py
@@ -7,10 +7,10 @@
import datetime
import json
import time
from tool import async_call
import l2_data_manager
import tool
from trade_gui import async_call
def run_time():
@@ -134,19 +134,20 @@
        return True
    sell_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"]))
    for i in range(0, len(sell_datas)):
        data = sell_datas[i]
        if int(data["val"]["operateType"]) != 2:
            continue
        if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
            continue
        if min_space == 0 and max_space == 0:
            # 本秒内
            if compare_time(data["val"]["time"], min_time) == 0:
    if sell_datas:
        for i in range(0, len(sell_datas)):
            data = sell_datas[i]
            if int(data["val"]["operateType"]) != 2:
                continue
            if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
                continue
            if min_space == 0 and max_space == 0:
                # 本秒内
                if compare_time(data["val"]["time"], min_time) == 0:
                    return data["index"] < target_data["index"]
            # 数据在正确的区间
            elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
                return data["index"] < target_data["index"]
        # 数据在正确的区间
        elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data["index"] < target_data["index"]
    return False
@@ -180,6 +181,22 @@
            break
# 保存l2最新数据的大小
@async_call
def save_l2_latest_data_number(code, num):
    redis = l2_data_manager._redisManager.getRedis()
    redis.setex("l2_latest_data_num-{}".format(code), 3, num)
# 获取最新数据条数
def get_l2_latest_data_number(code):
    redis = l2_data_manager._redisManager.getRedis()
    num = redis.get("l2_latest_data_num-{}".format(code))
    if num is not None:
        return int(num)
    return None
# l2数据拼接工具
class L2DataConcatUtil:
@@ -190,7 +207,7 @@
        self.code = code
    def __get_data_identity(self, data_):
        data=data_["val"]
        data = data_["val"]
        return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"),
                                          data.get("cancelTime"), data.get("cancelTimeUnit"))
@@ -215,7 +232,8 @@
    def get_add_datas(self):
        # 查询当前数据是否在最近一次数据之后
        if self.last_datas and self.datas:
            if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
            if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(
                    self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
                return []
        # 获取拼接点
@@ -254,7 +272,7 @@
    def load_data(datas):
        data_list = []
        for data in datas:
            data_list.append({"val":{"time": data}})
            data_list.append({"val": {"time": data}})
        return data_list
    # 不匹配
l2_trade_factor.py
@@ -4,9 +4,9 @@
# l2交易因子
import big_money_num_manager
import global_data_loader
import global_util
import limit_up_time_manager
import log
class L2TradeFactorUtil:
@@ -121,16 +121,17 @@
        big_money_rate = 0
        if big_money_num is not None:
            big_money_rate = cls.get_big_money_rate(big_money_num)
        print(
            "industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(industry_rate,
                                                                                             volumn_rate,
                                                                                             limit_up_time_rate,
                                                                                             big_money_rate))
        msg = "zyltgb:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(zyltgb,
                                                                                                         industry_rate,
                                                                                                         volumn_rate,
                                                                                                         limit_up_time_rate,
                                                                                                         big_money_rate)
        final_rate = round(1 - (industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4)
        if final_rate < 0.1:
            final_rate = 0.1
        return final_rate
        return final_rate, msg
    @classmethod
    def compute_rate_by_code(cls, code):
@@ -143,7 +144,7 @@
        # 获取行业热度
        industry = global_util.code_industry_map.get(code)
        if industry is None:
            global_util.load_industry()
            global_data_loader.load_industry()
            industry = global_util.code_industry_map.get(code)
        total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None
@@ -156,7 +157,7 @@
        volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
            code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
        if volumn_day60_max is None or volumn_yest is None:
            global_util.load_volumn()
            global_data_loader.load_volumn()
            volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
                code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
        # 首次涨停时间
@@ -180,7 +181,7 @@
    def __get_zyltgb(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_util.load_zyltgb()
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
        return zyltgb
@@ -188,15 +189,15 @@
    def compute_m_value(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_util.load_zyltgb()
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb is None:
                print("没有获取到自由流通市值")
                return 10000000
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate = cls.compute_rate_by_code(code)
        rate, msg = cls.compute_rate_by_code(code)
        # print("m值获取:", code, round(zyltgb * rate))
        return round(zyltgb * rate)
        return round(zyltgb * rate), msg
    # 获取安全笔数
    @classmethod
@@ -210,7 +211,15 @@
            return 30
        if count < 5:
            return 5
        return count
        big_money_num = global_util.big_money_num.get(code)
        if big_money_num is None:
            big_money_num = big_money_num_manager.get_num(code)
        rate = 0
        if big_money_num is not None:
            rate = cls.get_big_money_rate(big_money_num)
        return round(count*(1-rate/2))
# l2因子归因数据
l2_trade_util.py
New file
@@ -0,0 +1,42 @@
# 是否在禁止交易代码中
import redis_manager
import tool
__redis_manager = redis_manager.RedisManager(2)
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
# 移除禁止交易代码
def remove_from_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.srem(key, code)
# 添加代码到禁止交易
def add_to_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
# 禁止代码交易
def forbidden_trade(code):
    add_to_forbidden_trade_codes(code)
    # l2_data_manager.remove_from_l2_fixed_codes(code)
    # l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
def is_in_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
limit_up_time_manager.py
@@ -2,7 +2,6 @@
"""
涨停时间管理器
"""
import l2_data_util
import redis_manager
import tool
import global_util
mysql_data.py
@@ -1,4 +1,6 @@
# 先要导入pymysql
import logging
import pymysql
# 把连接参数定义成字典
@@ -56,6 +58,7 @@
            self.conn.commit()
        except Exception as e:
            print("提交出错\n:", e)
            logging.exception(e)
            # 如果出错要回滚
            self.conn.rollback()
@@ -66,6 +69,7 @@
            # 提交
            self.conn.commit()
        except Exception as e:
            logging.exception(e)
            print("提交出错\n:", e)
            # 如果出错要回滚
            self.conn.rollback()
server.py
@@ -10,8 +10,10 @@
import time
import alert_util
import client_manager
import code_volumn_manager
import data_process
import global_data_loader
import global_util
import gpcode_manager
import authority
@@ -19,15 +21,15 @@
import l2_data_log
import l2_data_manager
import l2_data_manager_new
import log
import l2_data_util
import ths_industry_util
import ths_util
import tool
import trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
from trade_data_manager import TradeCancelDataManager
from log import logger_l2_error, logger_device, logger_trade_delegate
from trade_queue_manager import THSBuy1VolumnManager
@@ -84,7 +86,7 @@
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data(
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
@@ -129,6 +131,8 @@
                                                                   "l2数据正确性判断时间")
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -148,6 +152,7 @@
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
@@ -155,23 +160,17 @@
                                                        True)
                    except Exception as e:
                        logging.exception(e)
                elif type == 10:
                    # level2交易队列
                    try:
                        code, setData = data_process.parseL2TradeQueueData(_str)
                        if gpcode_manager.is_listen(code):
                            data_process.saveL2Data(day, code, setData)
                    except:
                        print("异常")
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
                    data_process.saveZYLTSZ(data_list)
                    ZYLTGBUtil.save_list(data_list)
                    code_list = []
                    for data in data_list:
                        code_list.append(data["code"])
                    gpcode_manager.set_gp_list(code_list)
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    gpcode_manager.set_gp_list(code_datas)
                    # 重新订阅
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
@@ -183,6 +182,9 @@
                    dataList = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    gpcode_manager.set_limit_up_list(dataList)
                    # 保存到内存中
                    if dataList:
                        global_data_loader.add_limit_up_codes(dataList)
                    ths_industry_util.set_industry_hot_num(dataList)
                elif type == 3:
                    # 交易成功信息
@@ -242,14 +244,23 @@
                        index = data["index"]
                        code_name = data["codeName"]
                        volumn = data["volumn"]
                        price = data["price"]
                        time_ = data["time"]
                        code = global_util.name_codes.get(code_name)
                        if code is None:
                            global_util.load_name_codes()
                            global_data_loader.load_name_codes()
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 校正时间
                            seconds = tool.get_time_as_second(time_)
                            if seconds % 3 > 0:
                                seconds = seconds - seconds % 3
                            time_ = tool.time_seconds_format(seconds)
                            # 保存数据
                            self.buy1_volumn_manager.save(code, time_, volumn)
                            need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price)
                            if need_sync:
                                # 同步数据
                                l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
@@ -257,7 +268,7 @@
                    client_id = data["client"]
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host, thsDead)
                    client_manager.saveClientActive(int(client_id), host, thsDead)
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
@@ -275,7 +286,7 @@
def send_msg(client_id, data):
    _ip = data_process.getActiveClientIP(client_id)
    _ip = client_manager.getActiveClientIP(client_id)
    print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
ths_industry_util.py
@@ -3,13 +3,17 @@
"""
# 同花顺行业
import datetime
import time
import global_data_loader
import global_util
import mysql_data
# 获取行业映射
import tool
def get_code_industry_maps():
    __code_map = {}
    __industry_map = {}
@@ -32,12 +36,18 @@
    industry_hot_dict = {}
    code_industry_map = global_util.code_industry_map
    if code_industry_map is None or len(code_industry_map) == 0:
        global_util.load_industry();
        global_data_loader.load_industry()
        code_industry_map = global_util.code_industry_map
    if code_industry_map is None:
        raise Exception("获取代码对应的行业出错")
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    for data in limit_up_datas:
        # 时间比现在早的时间才算数
        if data["time"] != "00:00:00" and tool.get_time_as_second(now_str) < tool.get_time_as_second(
                data["time"]):
            continue
        code = data["code"]
        industry = code_industry_map.get(code)
        if industry is None:
@@ -63,7 +73,7 @@
def get_same_industry_codes(code, codes):
    industry = global_util.code_industry_map.get(code)
    if industry is None:
        global_util.load_industry()
        global_data_loader.load_industry()
        industry = global_util.code_industry_map.get(code)
    if industry is None:
        return None, None
@@ -102,12 +112,12 @@
    mysqldb = mysql_data.Mysqldb()
    result = mysqldb.select_one("select * from ths_industry_codes where _id={}".format(code))
    if result is None:
        mysqldb.insert(
            "insert into ths_industry_codes(code,industry_name,zyltgb,zyltgb_unit) values('{}','{}','{}',{})".format(
        mysqldb.execute(
            "insert into ths_industry_codes(_id,second_industry,zyltgb,zyltgb_unit) values('{}','{}','{}',{})".format(
                code, industry_name, zyltgb, zyltgb_unit, round(time.time() * 1000)))
    else:
        mysqldb.update(
            "update ths_industry_codes set industry_name='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
        mysqldb.execute(
            "update ths_industry_codes set second_industry='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format(
                industry_name, zyltgb, zyltgb_unit, code))
@@ -119,7 +129,7 @@
        _list = []
        for data in datas:
            # 保存
            __save_code_industry(data["code"],industry_name,data["zyltgb"],data["zyltgb_unit"])
            __save_code_industry(data["code"], industry_name, data["zyltgb"], data["zyltgb_unit"])
if __name__ == "__main__":
tool.py
@@ -6,8 +6,16 @@
import time
import time as t
import datetime
from threading import Thread
import global_util
import constant
def async_call(fn):
    def wrapper(*args, **kwargs):
        Thread(target=fn, args=args, kwargs=kwargs).start()
    return wrapper
def get_expire():
@@ -34,7 +42,7 @@
# 是否为交易时间
def is_trade_time():
    # 测试
    if global_util.TEST:
    if constant.TEST:
        return True
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
@@ -61,8 +69,33 @@
    return decorator
def get_time_as_second(time_str):
    ts = time_str.split(":")
    return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
# 将秒数格式化为时间
def time_seconds_format(seconds):
    h = seconds // 3600
    m = seconds % 3600 // 60
    s = seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s)
# 交易時間的差值
# 如11:29:59 与 13:00:00只相差1s
def trade_time_sub(time_str_1, time_str_2):
    split_time = get_time_as_second("11:30:00")
    time_1 = get_time_as_second(time_str_1)
    time_2 = get_time_as_second(time_str_2)
    if time_1 < split_time < time_2:
        time_2 = time_2 - 90 * 60
    return time_1 - time_2
if __name__ == "__main__":
    d1 = decimal.Decimal("0.12")
    d2 = decimal.Decimal("0.12")
    if d1 == d2:
        print("123")
    print(trade_time_sub("11:29:59", "13:00:00"))
    print(trade_time_sub("11:29:59", "14:00:00"))
    print(trade_time_sub("10:29:59", "11:29:59"))
    print(trade_time_sub("13:29:59", "14:29:59"))
trade_data_manager.py
@@ -2,11 +2,13 @@
交易数据股那里器
用于对交易临时数据(交易状态,代码状态等)进行管理
"""
import datetime
import json
import time
# 交易撤销数据管理器
import constant
import global_util
import l2_data_util
import redis_manager
import tool
@@ -70,7 +72,7 @@
        redis = cls.redisManager.getRedis()
        val_str = redis.get("buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None,None
            return None, None, None, None
        else:
            val = json.loads(val_str)
            return val[0], val[1], val[2], val[3]
@@ -137,7 +139,7 @@
            # 间隔2s及其以上表示数据异常
            # 间隔2s以上的就以下单时间下一秒末尾作为确认点
            start_index = l2_data_index
            if len(l2_today_datas)-1 > start_index:
            if len(l2_today_datas) - 1 > start_index:
                for i in range(start_index + 1, len(l2_today_datas)):
                    _time = l2_today_datas[i]["val"]["time"]
                    if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2:
@@ -149,7 +151,7 @@
                cls.__set_buy_sure_position(code, l2_data_index, l2_data)
        elif new_time_int - old_time_int >= 0:
            # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置
            index = len(l2_today_datas)-1 - (len(l2_add_datas)) // 2
            index = len(l2_today_datas) - 1 - (len(l2_add_datas)) // 2
            data = l2_today_datas[index]
            cls.__set_buy_sure_position(code, index, data)
        else:
@@ -158,6 +160,76 @@
            pass
# 代码实时价格管理器
class CodeActualPriceProcessor:
    __redisManager = redis_manager.RedisManager(0)
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 保存跌价的时间
    def __save_down_price_time(self, code, time_str):
        key = "under_water_last_time-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), time_str)
    def __remove_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        self.__get_redis().delete(key)
    def __get_last_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        return self.__get_redis().get(key)
    def __increment_down_price_time(self, code, seconds):
        key = "under_water_seconds-{}".format(code)
        self.__get_redis().incrby(key, seconds)
    def __get_down_price_time_as_seconds(self, code):
        key = "under_water_seconds-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        else:
            return int(val)
    def process_rate(self, code, rate, time_str):
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
        # now_str = datetime.datetime.now().strftime("%H:%M:%S")
        if rate >= 0:
            down_start_time = self.__get_last_down_price_time(code)
            if down_start_time is None:
                return
            else:
                # 累计增加时间
                time_second = tool.trade_time_sub(time_str, down_start_time)
                self.__increment_down_price_time(code, time_second)
                # 删除起始时间
                self.__remove_down_price_time(code)
        else:
            # 记录开始值
            if self.__get_last_down_price_time(code) is None:
                self.__save_down_price_time(code, time_str)
    # 保存现价
    def save_current_price(self, code, price, is_limit_up):
        global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))
        pass
    # 是否为水下捞
    def is_under_water(self, code):
        time_seconds = self.__get_down_price_time_as_seconds(code)
        if time_seconds is None:
            return False
        else:
            return time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS
if __name__ == "__main__":
    TradeBuyDataManager.set_buy_capture_time("123456", 178938828, 1232)
    print(TradeBuyDataManager.get_buy_capture_time("123456"))
    processor = CodeActualPriceProcessor()
    processor.process_rate("123456", -0.2, "09:30:00")
    processor.process_rate("123456", -0.3, "09:40:00")
    processor.process_rate("123456", 0.3, "09:50:00")
    processor.is_under_water("123456")
trade_gui.py
@@ -8,22 +8,15 @@
import random
import win32gui
import win32api
import win32con
import global_util
import constant
import gpcode_manager
import l2_trade_util
import redis_manager
import tool
from log import *
from threading import Thread
def async_call(fn):
    def wrapper(*args, **kwargs):
        Thread(target=fn, args=args, kwargs=kwargs).start()
    return wrapper
from tool import async_call
class THSGuiTrade(object):
@@ -256,7 +249,7 @@
            #     raise Exception(error)
            # TODO 暂时不验证涨停价
            if not global_util.TEST:
            if not constant.TEST:
                if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
                    error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
                    raise Exception(error)
@@ -271,7 +264,11 @@
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0);
            logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
            self.close_delegate_success_dialog()
            # 过时
            # self.close_delegate_success_dialog()
            self.refresh_data()
            # 循环读取下单结果,最多等待10s
            # for i in range(0, 50):
            #     hwnd = self.getTradeResultWin()
@@ -388,6 +385,27 @@
            self.buy_cancel_lock.release()
            # 清空代码框
            self.input_number(code_input, "")
    # 刷新交易窗口数据
    @async_call
    def refresh_data(self):
        # 获取到专业下单页面
        win = self.getCancelBuyWin()
        child_win = None
        refresh_btn = None
        for i in range(0, 20):
            child_win = win32gui.FindWindowEx(win, child_win, "#32770", None)
            if not child_win:
                break
            if not win32gui.IsWindowVisible(child_win):
                continue
            temp = win32gui.FindWindowEx(child_win, None, "Button", "还原")
            if temp:
                refresh_btn = win32gui.GetDlgItem(child_win, 0x00000457)
                break
        if refresh_btn:
            # 点击刷新
            THSGuiUtil.click(refresh_btn)
class THSGuiUtil:
@@ -764,8 +782,20 @@
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        # 删除禁止的代码
        new_codes = []
        new_delete_codes = []
        for code in codes:
            if not l2_trade_util.is_in_forbidden_trade_codes(code):
                # 取消分配
                new_codes.append(code)
            else:
                new_delete_codes.append(code)
        add_codes = new_codes[0:10]
        del_codes = new_codes[10:]
        del_codes.extend(new_delete_codes)
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
@@ -813,7 +843,7 @@
if __name__ == '__main__':
    THSGuiTrade().buy("002853", "18.98", THSBuyWinManagerNew.get_buy_wins()[5])
    THSGuiTrade().buy("002900", "16.18")
    # GUITest().test_distribute()
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
trade_manager.py
@@ -3,16 +3,16 @@
对一系列的代码交易变量,下单,撤单进行管理
"""
# 交易管理器
import datetime
import json
import time
import gpcode_manager
import l2_code_operate
import l2_trade_util
import mysql_data
import tool
from trade_data_manager import TradeBuyDataManager
from trade_gui import THSGuiTrade, async_call
from trade_gui import THSGuiTrade, THSBuyWinManagerNew
import time as t
import l2_data_manager
@@ -34,6 +34,8 @@
TRADE_STATE_BUY_CANCEL_SUCCESS = 14
# 买成功
TRADE_STATE_BUY_SUCCESS = 12
guiTrade = THSGuiTrade()
@@ -102,8 +104,10 @@
                    data["trade_num"], data["type"], data["day"], round(t.time() * 1000)))
        else:
            mysqldb.execute(
                "update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s",(
                    data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"],data["_id"]))
                "update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s",
                (
                    data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"],
                    data["_id"]))
# 保存交易委托数据
@@ -118,10 +122,10 @@
        counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"]))
        if counts[0] < 1:
            mysqldb.execute(
                "insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%d)",
                "insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
                (
                    data["_id"], data["code"],  data["num"], data["price"], data["time"],
                    data["trade_num"],data["trade_price"], data["type"], data["day"], round(t.time() * 1000)))
                    data["_id"], data["code"], data["num"], data["price"], data["time"],
                    data["trade_num"], data["trade_price"], data["type"], data["day"], round(t.time() * 1000)))
    # 保存最新的委托数据
    redis = __redis_manager.getRedis()
@@ -155,50 +159,11 @@
        return json.loads(result), time_str
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
# 移除禁止交易代码
def remove_from_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.srem(key, code)
# 添加代码到禁止交易
def add_to_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
# 禁止代码交易
def forbidden_trade(code):
    add_to_forbidden_trade_codes(code)
    l2_data_manager.remove_from_l2_fixed_codes(code)
    l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
# 是否在禁止交易代码中
def is_in_forbidden_trade_codes(code):
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    # 是否禁止交易
    if is_in_forbidden_trade_codes(code):
    if l2_trade_util.is_in_forbidden_trade_codes(code):
        raise Exception("禁止交易")
    trade_state = get_trade_state(code)
    if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
@@ -220,7 +185,7 @@
# 购买
@async_call
@tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    try:
        guiTrade.buy(code, price)
@@ -290,7 +255,7 @@
        if _time == "00:00:00":
            continue
        if code is not None and int(data["type"]) == 0:
            forbidden_trade(code)
            l2_trade_util.forbidden_trade(code)
            state = get_trade_state(code)
            if state != TRADE_STATE_BUY_SUCCESS:
                set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
@@ -298,6 +263,8 @@
                l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager.delete_buy_point(code)
                # 移除交易窗口分配
                THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
# 处理委托成功数据
trade_queue_manager.py
@@ -47,9 +47,16 @@
        return val[0], val[1]
    # 返回是否需要更新数据
    def save(self, code, time_str, volumn):
    def save(self, code, time_str, volumn,price):
        # 客户端数据未加载出来过滤
        if volumn < 1:
            return False
        # 判断是否为涨停价
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price != tool.to_price(decimal.Decimal(price)):
            # 非涨停价
            volumn = 0
        # 不保存和上一次相同的数据
        if code in self.__last_data and self.__last_data[code] == volumn:
            return False
@@ -112,10 +119,7 @@
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price != tool.to_price(decimal.Decimal(price)):
            # 非涨停价
            return False
        if volumn < 1:
            return False
            volumn = 0
        # 不保存和上一次相同的数据
        if code in self.__last_data and self.__last_data[code] == volumn:
            return False