Administrator
2022-09-20 5ae8b19fdc000fc719f3ad45fa5f7462fdbffbdf
l2数据计算优化
15个文件已修改
398 ■■■■■ 已修改文件
code_volumn_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 247 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mongo_data.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
redis_manager.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_volumn_manager.py
@@ -47,6 +47,20 @@
    return _volumn
# 保存今日量
def save_today_volumn(code, volumn, volumnUnit):
    _volumn = None
    if volumnUnit == 0:
        _volumn = round(float(volumn) * 100000000)
    elif volumnUnit == 1:
        _volumn = round(float(volumn) * 10000)
    elif volumnUnit == 2:
        _volumn = int(volumn)
    if _volumn is not None:
       set_today_volumn(code,_volumn*100)
# 将量从数据库加入内存
def load():
    redis = __redis_manager.getRedis()
global_util.py
@@ -1,9 +1,12 @@
# 代码行业映射
import pymongo
import code_volumn_manager
import ths_industry_util
import gpcode_manager
import mongo_data
TEST = False
code_industry_map = {}
# 行业代码映射
@@ -24,6 +27,12 @@
big_money_num = {}
# 涨停时间
limit_up_time = {}
def init():
    load_volumn()
    load_zyltgb()
    load_industry()
# 加载行业数据
@@ -50,6 +59,17 @@
                    zyltgb_map[code] = round(float(result["zyltgb"]) * 10000)
# 加载量
def load_volumn():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        max60, yesterday = code_volumn_manager.get_histry_volumn(code)
        today = code_volumn_manager.get_today_volumn(code)
        max60_volumn[code] = max60
        yesterday_volumn[code] = yesterday
        today_volumn[code] = today
# 添加今日涨停数据
def add_limit_up_codes(datas, clear=False):
    if clear:
gui.py
@@ -16,6 +16,7 @@
import multiprocessing
import l2_code_operate
import l2_trade_factor
import redis_manager
import mongo_data
import server
@@ -28,6 +29,8 @@
def createServer(pipe):
    print("create SocketServer")
    # 初始化参数
    global_util.init()
    laddr = "", 9001
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe=pipe)  # 注意:参数是MyBaseRequestHandle
    # tcpserver.handle_request()  # 只接受一个客户端连接
@@ -79,7 +82,6 @@
        def sync_target_codes():
            server.sync_target_codes_to_ths()
            print(result)
        def click():
            text.delete('1.0', END)
@@ -234,7 +236,6 @@
                time.sleep(1)
        def refresh_data():
            print("refresh-l2-data")
            for client_id in code_sv_map:
                ip = data_process.getActiveClientIP(client_id)
                ths_dead=data_process.getTHSState(client_id)
@@ -627,6 +628,9 @@
            showinfo("提示", "导出完成")
        def compute_m(code):
            m = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
            showinfo("提示", "{}".format(m))
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
        frame.grid(row=2, column=2, rowspan=2, pady=5)
@@ -663,6 +667,9 @@
        btn = Button(frame, text="导出L2原始数据", command=lambda: export_l2_data_origin(code.get()))
        btn.place(x=220, y=100)
        btn = Button(frame, text="获取m值", command=lambda: compute_m(code.get()))
        btn.place(x=10, y=120)
        # 交易按钮
        btn = Button(frame, textvariable=btntext, command=startJueJinGui)
        btn.place(x=10, y=150)
juejin.py
@@ -41,14 +41,20 @@
    return account_id, strategy_id, token
# 每日初始化
def everyday_init():
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
def init_data():
    # 载入行业股票代码
    global_util.load_industry()
    # 载入代码自由流通市值
    global_util.load_zyltgb()
    # 载入量
    global_util.load_volumn()
# 每日初始化
def everyday_init():
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
    # 今日实时涨停
    global_util.add_limit_up_codes([], True)
    # 主要获取收盘价
@@ -68,6 +74,8 @@
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
    init_data()
def __run_schedule():
    while True:
@@ -80,8 +88,10 @@
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
    init_data()
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:00:00").do(everyday_init)
    schedule.every().day.at("14:37:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
@@ -163,8 +173,7 @@
    start2 = 60 * 60 * 12 + 50 * 60;
    end2 = 60 * 60 * 15 + 5 * 60;
    # TODO 测试
    test = False
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or test:
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or global_util.TEST:
        symbol = tick['symbol']
        price = tick['price']
        # print(symbol,price)
@@ -299,22 +308,21 @@
                            start_time="{:%Y-%m-%d}".format(start),
                            fields="symbol,volume,eob",
                            end_time="{:%Y-%m-%d}".format(end))
    print(len(results))
    _fresult = {}
    for result in results:
        code = result["symbol"].split(".")[1]
        volumn = int(result["volume"])
        day = "{:%Y-%m-%d}".format(result["eob"])
        if _fresult.get(code) is None:
            _fresult[code] = (volumn, volumn)
        if volumn > _fresult[code][0]:
            _fresult[code][0] = volumn;
        _fresult[code][1] = volumn;
            _fresult[code] = (volumn, _fresult[code][1])
        _fresult[code] = (_fresult[code][0], volumn)
    return _fresult
if __name__ == '__main__':
    _fresult = get_volumns(["000333", "002531"])
    print(_fresult)
    everyday_init()
l2_code_operate.py
@@ -69,7 +69,7 @@
        while True:
            try:
                data = redis.lpop("code_operate_queue")
                print("读取操作队列", data, redis.llen("code_operate_queue"))
                # print("读取操作队列", data, redis.llen("code_operate_queue"))
                if data is not None:
                    data = json.loads(data)
                    logger_code_operate.info("读取操作队列:{}", data)
l2_data_manager.py
@@ -1,6 +1,9 @@
import decimal
import json
import logging
import os
import random
import threading
import time as t
from datetime import datetime
@@ -15,7 +18,7 @@
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade
from log import logger_l2_trade, logger_l2_trade_cancel
from trade_data_manager import TradeBuyDataManager
_redisManager = redis_manager.RedisManager(1)
@@ -67,21 +70,26 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, 0, None
            return None, None, None, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2]
        return _data[0], _data[1], _data[2], _data[3]
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, nums, compute_index, buy_index):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_index is not None:
            redis.setex(_key, expire, json.dumps((buy_index, nums, compute_index)))
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
        else:
            _buy_index, _nums, _compute_index = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire, json.dumps((_buy_index, nums, compute_index)))
            _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -161,12 +169,6 @@
            # 计算保留的时间
            expire = tool.get_expire()
            start_index = redis_instance.get("l2-maxindex-{}".format(code))
            if start_index is None:
                start_index = -1
            else:
                start_index = int(start_index)
            max_index = start_index
            i = 0
            for _data in datas:
                i += 1
@@ -174,20 +176,20 @@
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    max_index = start_index + i
                    value = {"index": start_index + i, "re": _data["re"]}
                    redis_instance.setex(key, expire, json.dumps(value))
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
            redis_instance.setex("l2-maxindex-{}".format(code), expire, max_index)
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, round(t.time() * 1000) - start_time)
    print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
    return datas
@@ -231,8 +233,8 @@
    @classmethod
    def is_same_time(cls, time1, time2):
        # TODO 测试
        # if 1 > 0:
        #     return True
        if global_util.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
@@ -251,10 +253,11 @@
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # TODO 设置add_data的序号
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
@@ -283,7 +286,8 @@
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
@@ -291,6 +295,7 @@
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
@@ -330,12 +335,13 @@
        return datas
    @classmethod
    def get_time_as_second(time_str):
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    def is_limit_up_price_buy(val):
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
@@ -349,7 +355,8 @@
        return True
    # 是否涨停买撤
    def is_limit_up_price_buy_cancel(val):
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
@@ -371,14 +378,31 @@
# L2交易数据处理器
# 一些常见的概念:
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
# 计算位置(当前计算的整个计算的位置):compute_index
#
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
@@ -393,7 +417,7 @@
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"]
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
@@ -415,7 +439,7 @@
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3)
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, add_datas, capture_timestamp)
@@ -451,20 +475,30 @@
            start_index = 0
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.__compute_order_cancel_begin_single(code, start_index, 3)
            buy_num_for_cancel = 0
            computed_index = buy_index
            cancel_index = cls.__compute_order_cancel_begin_single(code, max(start_index - 3, 0), 3)
            buy_num_for_cancel = buy_num
            computed_index = buy_single_index
            if cancel_index is not None:
                cls.debug(code, "找到撤单信号,数据处理起始点:{} 数据:{}", start_index, local_today_datas[code][start_index])
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_cancel(code, cancel_index, computed_index, buy_num_for_cancel, threshold_money,
            cls.__start_compute_cancel(code, cancel_index, max(computed_index, buy_exec_index + 1), buy_num_for_cancel,
                                       threshold_money,
                                       capture_time)
        else:
            # 无买撤信号,终止执行
            pass
            # 无买撤信号,是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入")
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
    # 开始计算撤的信号
    @classmethod
@@ -474,30 +508,27 @@
                                                                                           origin_num, threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
                      total_datas[computed_index])
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
                    "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index])))
                try:
                    trade_manager.start_cancel_buy(code)
                    # 取消买入标识
                    TradePointManager.delete_buy_point(code)
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                cls.debug(code, "之前无虚拟下单,执行真实撤单")
                cls.__cancel_buy(code)
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, 0, threshold_money, capture_time)
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time)
                pass
        else:
            cls.debug(code, "未获取到撤单执行信号,计算开始位置:{}, 纯买额:{}", compute_start_index, buy_num_for_cancel)
            # 无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
                                                                cancel_index)
@@ -506,6 +537,7 @@
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
@@ -515,66 +547,85 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        logger_l2_trade.info(
            "执行买入:{} ".format(code))
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.__get_order_begin_pos(code)
        # index, num, finish_index = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, len(total_datas) - compute_start_index, 3)
            index = _index
            has_single, _index = cls.__compute_order_begin_pos(code, max(compute_start_index - 3, 0), 3)
            buy_single_index = _index
            if has_single:
                num = 0
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
        if index is None:
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        compute_index, buy_nums = cls.sum_buy_num_for_order(code, compute_start_index, num, threshold_money)
        compute_index, buy_nums = cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,
                                                              threshold_money)
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 数据:{}", compute_index, threshold_money, total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, compute_index, buy_nums, index)
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                cls.__process_order(code, compute_index + 1, capture_time)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, len(total_datas) - 1, buy_nums, index)
            cls.__save_order_begin_data(code, buy_single_index, -1, len(total_datas) - 1, buy_nums)
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        index, num, compute_index = TradePointManager.get_buy_compute_start_data(code)
        return index, num
        buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
        return buy_single_index, buy_exec_index, compute_index, num
    @classmethod
    def __save_order_begin_data(self, code, compute_index, num, buy_index=None):
        TradePointManager.set_buy_compute_start_data(code, num, compute_index, buy_index)
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
    # 获取撤单起始位置
    @classmethod
@@ -587,18 +638,13 @@
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def __compute_order_begin_pos(self, code, compute_data_count, continue_count):
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        if len(datas) - start_index < continue_count:
            return False, None
        __time = None
        _limit_up_count_1s = 0
        _limit_up_count_1s_start_index = -1
@@ -634,7 +680,7 @@
                #             index_3 = j
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                    logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                    return i
                    return True, i
            # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
@@ -656,9 +702,9 @@
            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
                logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
                return _limit_up_count_1s_start_index
                return True, _limit_up_count_1s_start_index
        return None
        return False, None
    # 是否有撤销信号
    @classmethod
@@ -701,8 +747,9 @@
    def __unreal_order(self):
        pass
    @classmethod
    def __get_threshmoney(cls, code):
        l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 获取预估挂买位
    @classmethod
@@ -760,6 +807,7 @@
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{}", start_index, len(total_datas) - 1, sure_pos)
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
@@ -774,22 +822,40 @@
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas)
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                    elif sure_data["val"]["time"] == _val["time"]:
                        # 同一秒,而且还在预估买入位之后按概率计算
                        property_buy_num_count -= int(_val["num"]) * int(data["re"])
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买额:{}", i, buy_nums * limit_up_price)
                    else:
                        cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
                        if sure_data["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # TODO 未找到买撤数据的买入点
                    pass
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{}", property_buy_num, i, buy_nums + property_buy_num)
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num, sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type
    @classmethod
    def test(cls):
        code = "002336"
        cls.random_key[code] = random.randint(0, 100000)
        load_l2_data(code)
        try:
            # cls.__sum_buy_num_for_cancel_order(code, 112, 100000, 10000000)
            has_single, _index = cls.__compute_order_begin_pos(code, max(9, 0), 3)
            print(has_single, _index)
        except Exception as e:
            logging.exception(e)
def __get_time_second(time_str):
@@ -1037,25 +1103,4 @@
if __name__ == "__main__":
    code = "000868"
    local_today_datas.setdefault(code, [])
    path = "C:/Users/Administrator/Desktop/demo/000868/"
    for file_name in os.listdir(path):
        p = "{}{}".format(path, file_name)
        f = open(p)
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = L2DataUtil.format_l2_data(data, code, 10.00)
            add_datas = L2DataUtil.get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
            if code in local_latest_datas:
                local_latest_datas[code] = result
            else:
                local_latest_datas.setdefault(code, result)
        f.close()
    for d in local_today_datas[code]:
        print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])
    L2TradeDataProcessor.test()
l2_data_util.py
@@ -81,13 +81,13 @@
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data):
def get_buy_data_with_cancel_data(cancel_data,local_today_num_operate_map):
    # 计算时间区间
    min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    buy_datas = l2_data_manager.local_today_num_operate_map.get("{}-{}".format(cancel_data["val"]["num"], "0"))
    buy_datas = local_today_num_operate_map.get("{}-{}".format(cancel_data["val"]["num"], "0"))
    if buy_datas is None:
        # 无数据
        return None, None
l2_trade_factor.py
@@ -96,24 +96,29 @@
        # 自由流通股本影响比例
        zyltgb_rate = cls.get_zylt_rate(zyltgb)
        # 行业涨停影响比例
        industry_rate=0
        industry_rate = 0
        if total_industry_limit_percent is not None:
            industry_rate = cls.get_industry_rate(total_industry_limit_percent)
        # 量影响比例
        volumn_rate = 0
        if volumn_day60_max is not None and volumn_yest is not None and volumn_today is not None:
            volumn_rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
            volumn_rate = cls.get_volumn_rate(int(volumn_day60_max), int(volumn_yest), int(volumn_today))
        # 涨停时间影响比例
        limit_up_time_rate=0
        limit_up_time_rate = 0
        if limit_up_time is not None:
            limit_up_time_rate = cls.get_limit_up_time_rate(limit_up_time)
        # 万手哥影响
        big_money_rate = 0
        if big_money_num is not None:
            big_money_rate = cls.get_big_money_rate(big_money_num)
        print("zyltgb_rate:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}",zyltgb_rate,industry_rate,volumn_rate,limit_up_time_rate,big_money_rate)
        print(
            "zyltgb_rate:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(zyltgb_rate,
                                                                                                            industry_rate,
                                                                                                            volumn_rate,
                                                                                                            limit_up_time_rate,
                                                                                                            big_money_rate))
        return 1 - (zyltgb_rate + industry_rate + volumn_rate + limit_up_time_rate + big_money_rate);
        return round(1 - (zyltgb_rate + industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4)
    @classmethod
    def compute_rate_by_code(cls, code):
@@ -132,9 +137,10 @@
        if zyltgb is None:
            print("没有获取到自由流通市值")
            return 10000000
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate = cls.compute_rate_by_code(code)
        print("m值获取:",code,round(zyltgb*rate))
        return round(zyltgb*rate)
        print("m值获取:", code, round(zyltgb * rate))
        return round(zyltgb * rate)
# l2因子归因数据
log.py
@@ -22,6 +22,10 @@
logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_trade_cancel"), filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_big_data"), filter=lambda record: record["extra"].get("name") == "l2_big_data",
           rotation="00:00", compression="zip", enqueue=True)
@@ -43,6 +47,9 @@
logger_l2_error = logger.bind(name="l2_error")
logger_l2_process = logger.bind(name="l2_process")
logger_l2_trade = logger.bind(name="l2_trade")
logger_l2_trade_cancel = logger.bind(name="l2_trade_cancel")
logger_l2_big_data = logger.bind(name="l2_big_data")
logger_juejin_tick = logger.bind(name="juejin_tick")
logger_code_operate = logger.bind(name="code_operate")
mongo_data.py
@@ -47,7 +47,7 @@
        db = _getdb()
        collections = db[dn_name]
        result = collections.count_documents(where_dict)
        print(result)
        # print(result)
        return result
    except:
        pass
redis_manager.py
@@ -9,9 +9,22 @@
class RedisManager:
    def __init__(self,db=config["db"]):
    def __init__(self, db=config["db"]):
        self.pool = redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
                                         db=db, decode_responses=True)
    def getRedis(self):
        return redis.Redis(connection_pool=self.pool)
if __name__ == "__main__":
    _redisManager = RedisManager(1)
    redis = _redisManager.getRedis()
    keys = redis.keys("l2-*")
    for k in keys:
        redis.delete(k)
    keys = redis.keys("l2-data-latest-*")
    for k in keys:
        redis.delete(k)
    pass
server.py
@@ -5,6 +5,7 @@
import threading
import time
import code_volumn_manager
import data_process
import global_util
import gpcode_manager
@@ -212,6 +213,10 @@
                    if data is not None:
                        print("现价数量", len(data))
                        for item in data:
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                            juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
tool.py
@@ -3,6 +3,8 @@
import time as t
import datetime
import global_util
def get_expire():
    now = int(t.time())
@@ -27,6 +29,10 @@
# 是否为交易时间
def is_trade_time():
    # 测试
    if global_util.TEST:
        return True
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    start1 = 60 * 60 * 9 + 24 * 60;
    end1 = 60 * 60 * 11 + 35 * 60;
trade_gui.py
@@ -5,6 +5,8 @@
import win32gui
import win32api
import win32con
import global_util
from log import *
from threading import Thread
@@ -234,10 +236,11 @@
            # 验证涨停价
            limit_up_price_now = self.getLimitUpPrice(win)
            # TODO 测试,暂时不验证涨停价
            if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
                error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
                raise Exception(error)
            # 测试,暂时不验证涨停价
            if not global_util.TEST:
                if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
                    error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
                    raise Exception(error)
            # 开始交易,买入按钮ID:0x000003EE
            # buy_hwnd = win32gui.GetDlgItem(win, 0x000003EE)
trade_manager.py
@@ -101,7 +101,7 @@
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
        count = mongo_data.count("ths-trade-delegate-record", {"_id": data["_id"]})
        if count < 1:
        if count is None or count < 1:
            mongo_data.save_one("ths-trade-delegate-record", data)
    # 保存最新的委托数据
    redis = __redis_manager.getRedis()
@@ -246,7 +246,7 @@
# 取消委托成功
def __cancel_success(code):
    TradeBuyDataManager.remove_buy_capture_time(code)
    TradeBuyDataManager.remove_buy_position_info(code)
    # 下单成功,加入固定代码库
    l2_data_manager.remove_from_l2_fixed_codes(code)
    logger_trade.info("{}撤单成功".format(code))