Administrator
2022-09-08 e7f8c6013d777dd5ba10b8d548d2d3db6158d37a
'完善'
21个文件已修改
7个文件已添加
433 ■■■■ 已修改文件
__pycache__/data_export_util.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/data_process.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/global_util.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/gpcode_manager.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/juejin.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/l2_code_operate.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/l2_data_manager.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/l2_data_util.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/log.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/mongo_data.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/server.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/ths_industry_util.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/tool.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/trade_gui.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/trade_manager.cpython-37.pyc 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mongo_data.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
__pycache__/data_export_util.cpython-37.pyc
Binary files differ
__pycache__/data_process.cpython-37.pyc
Binary files differ
__pycache__/global_util.cpython-37.pyc
Binary files differ
__pycache__/gpcode_manager.cpython-37.pyc
Binary files differ
__pycache__/juejin.cpython-37.pyc
Binary files differ
__pycache__/l2_code_operate.cpython-37.pyc
Binary files differ
__pycache__/l2_data_manager.cpython-37.pyc
Binary files differ
__pycache__/l2_data_util.cpython-37.pyc
Binary files differ
__pycache__/log.cpython-37.pyc
Binary files differ
__pycache__/mongo_data.cpython-37.pyc
Binary files differ
__pycache__/server.cpython-37.pyc
Binary files differ
__pycache__/ths_industry_util.cpython-37.pyc
Binary files differ
__pycache__/tool.cpython-37.pyc
Binary files differ
__pycache__/trade_gui.cpython-37.pyc
Binary files differ
__pycache__/trade_manager.cpython-37.pyc
Binary files differ
data_process.py
@@ -1,9 +1,12 @@
# 数据处理
import decimal
import json
import logging
import time as t
import random
import datetime
import authority
import mysql
import redis_manager
import gpcode_manager
@@ -49,8 +52,12 @@
def parseType(str):
    try:
    dict = json.loads(str)
    return dict["type"]
    except Exception as e:
        logging.exception(e)
        return -1
def parseGPCode(str):
@@ -164,14 +171,16 @@
    redis.setex("client-active-{}".format(client_id), 10, host)
def getValidClients():
def getValidL2Clients():
    redis = __redisManager.getRedis();
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        client_ids.append(_id)
    return client_ids
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
def getActiveClientIP(client_id):
global_util.py
New file
@@ -0,0 +1,50 @@
# 代码行业映射
import pymongo
import ths_industry_util
import gpcode_manager
import mongo_data
code_industry_map = {}
# 行业代码映射
industry_codes_map = {}
# 自由流通市值映射
zyltgb_map = {}
# 今日涨停代码隐射
today_limit_up_codes = {}
# 加载行业数据
def load_industry():
    _code_industry_map, _industry_codes_map = ths_industry_util.get_code_industry_maps()
    code_industry_map.clear()
    code_industry_map.update(_code_industry_map)
    industry_codes_map.clear()
    industry_codes_map.update(_industry_codes_map)
# 加载目标标的的自由流通股本
def load_zyltgb():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        results = mongo_data.find("ths-zylt", {"_id": code})
        if results is not None:
            results = [doc for doc in results]
            if len(results) > 0:
                zyltgb_map[code] = results[0]
# 添加今日涨停数据
def add_limit_up_codes(datas, clear=False):
    if clear:
        today_limit_up_codes.clear()
    # 涨停数量
    __dict = {}
    for data in datas:
        __dict[data["code"]] = data
    # print(__dict)
    today_limit_up_codes.update(__dict)
if __name__ == "__main__":
    load_industry()
gpcode_manager.py
@@ -1,13 +1,16 @@
import json
import random
import time
import authority
import global_util
import redis_manager
import tool
import juejin
import data_process
import decimal
__redisManager = redis_manager.RedisManager()
__redisManager = redis_manager.RedisManager(0)
def set_gp_list(gpset):
@@ -30,6 +33,8 @@
# 涨停犁碑坳
def set_limit_up_list(gpset):
    # 保存到内存中
    global_util.add_limit_up_codes(gpset)
    # 获取基本信息
    redis_instance = __redisManager.getRedis()
    # 删除之前的
@@ -84,7 +89,7 @@
# 设置收盘价
def set_price_pre(code, price):
    codes= get_listen_codes()
    codes= get_gp_list()
    if code not in codes:
        return
    redis_instance = __redisManager.getRedis()
@@ -162,14 +167,15 @@
def get_can_listen_pos(client_id=0):
    client_ids = []
    if client_id <= 0:
        client_ids = data_process.getValidClients()
        client_ids = data_process.getValidL2Clients()
    else:
        client_ids.append(client_id)
    random.shuffle(client_ids)
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = redis_instance.keys(k)
        random.shuffle(keys)
        codes = []
        for key in keys:
            result = redis_instance.get(key)
@@ -210,8 +216,9 @@
# 监听是否满了
def is_listen_full():
    clients = data_process.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= 8
    return len(codes) >= 8*len(clients)
# 是否正在操作
gui.py
@@ -159,14 +159,14 @@
            juejin.re_set_price_pres(gpcode_manager.get_gp_list())
        def get_limit_up_codes_win():
            width = 400
            width = 500
            height = 800
            win = Tk()
            win.title("今日涨停")
            win.resizable(height=False, width=False)
            limit_up_datas = {}
            limit_up_datas["row{}".format(0)] = {'代码': '', '首次涨停时间': '', '现价': '', '涨停封单额': ''}
            limit_up_datas["row{}".format(0)] = {'代码': '', '首次涨停时间': '', '现价': '','涨幅':'', '涨停封单额': ''}
            cl = Label(win, text="更新时间:", bg="#DDDDDD", fg="#666666")
            cl.place(x=10, y=10)
@@ -199,8 +199,9 @@
                table_limit_up.model.setValueAt(data["code"], index, 0)
                table_limit_up.model.setValueAt(data["time"], index, 1)
                table_limit_up.model.setValueAt(float(data["price"]), index, 2)
                table_limit_up.model.setValueAt(float(data["limitUpPercent"]), index, 3)
                table_limit_up.model.setValueAt(
                    "{}{}".format(float(data["limitMoney"]), ("亿" if data["limitMoneyUnit"] == 0 else "万")), index, 3)
                    "{}{}".format(float(data["limitMoney"]), ("亿" if data["limitMoneyUnit"] == 0 else "万")), index, 4)
                index += 1
            table_limit_up.redraw()
@@ -649,13 +650,14 @@
        btn.place(x=10, y=100)
        btn = Button(frame, text="修复L2数据", command=lambda: L2CodeOperate.get_instance().repaire_l2_data(code.get()))
        btn.place(x=100, y=100)
        btn.place(x=70
                  , y=100)
        btn = Button(frame, text="导出L2数据", command=lambda: export_l2_data(code.get()))
        btn.place(x=200, y=100)
        btn.place(x=145, y=100)
        btn = Button(frame, text="导出L2原始数据", command=lambda: export_l2_data_origin(code.get()))
        btn.place(x=260, y=100)
        btn.place(x=220, y=100)
        # 交易按钮
        btn = Button(frame, textvariable=btntext, command=startJueJinGui)
juejin.py
@@ -3,17 +3,22 @@
import datetime
import json
import time as t
import schedule
import gm.api as gmapi
import global_util
import gpcode_manager
import threading
import server
import tool
import redis_manager
import authority
import decimal
from l2_code_operate import L2CodeOperate
from log import logger_juejin_tick
from log import logger_juejin_tick, logger_system
redisManager = redis_manager.RedisManager()
@@ -34,13 +39,37 @@
    return account_id, strategy_id, token
# 每日初始化
def everyday_init():
    logger_system.info("每日初始化")
    # 载入行业股票代码
    global_util.load_industry()
    # 载入代码自由流通市值
    global_util.load_zyltgb()
    # 今日实时涨停
    global_util.add_limit_up_codes([], True)
    # 主要获取收盘价
    get_latest_info(None)
def __run_schedule():
    while True:
        schedule.run_pending()
def init(context):
    # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
    print("掘金初始化")
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:00:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00')
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00')
@@ -190,8 +219,6 @@
                print(str(e))
class JueJinManager:
    def __init__(self, pipe):
        self.pipe = pipe
@@ -241,6 +268,7 @@
    print(result)
# 获取近90天的最大量与最近的量
def get_volumns(codes):
    end = datetime.datetime.now()
    # 获取近90天的历史数据
@@ -254,6 +282,7 @@
                            start_time="{:%Y-%m-%d}".format(start),
                            fields="symbol,volume,eob",
                            end_time="{:%Y-%m-%d}".format(end))
    print(len(results))
    _fresult = {}
    for result in results:
@@ -270,13 +299,5 @@
if __name__ == '__main__':
    # trade("SZSE.000521", 100)
    # print("")
    # JueJinManager.get_gp_latest_info(["000592","002808"])
    get_current_info()
    # data_process.saveCodeVolumn(get_volumns(["000333","002911"]))
    # _redis_manager = redis_manager.RedisManager()
    # redis = _redis_manager.getRedis()
    # keys = redis.keys("test-inrec")
    # print(keys)
    _fresult=get_volumns(["000333","002531"])
    print(_fresult)
l2_code_operate.py
@@ -5,6 +5,7 @@
import queue
import threading
import data_process
import gpcode_manager
import l2_data_manager
@@ -105,20 +106,30 @@
                        L2CodeOperate.setGPCode(client_id, pos, code)
                    # 修复l2的数据错误
                    elif type == 3:
                        if tool.is_trade_time():
                        client = data["client"]
                        data=data["data"]
                        result = server.send_msg(client, data)
                        print("L2數據修復結果:",result)
                        else:
                            print("非交易时间,放弃修复L2")
                    elif type == 4:
                        # 清理监听位置
                        client = data["client"]
                        pos = data["pos"]
                        L2CodeOperate.setGPCode(client, pos, "")
                else:
                    time.sleep(1)
            except Exception as e:
                logging.exception(e)
                print("发送操作异常:",str(e))
    def add_operate(self, type, code):
    def add_operate(self, type, code, client=None, pos=None):
        redis = self.redis_manager_.getRedis()
        print("add_operate", type, code)
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code}))
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code, "client": client, "pos": pos}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
@@ -160,3 +171,41 @@
        if value is not None:
            return int(value)
        return value
# 获取客户端正在监听的代码
def get_listen_codes_from_client(client_id):
    data = {"action": "getL2Codes"}
    result = server.send_msg(client_id, data)
    result = json.loads(result)
    if result["code"] == 0:
        data = json.loads(result["data"])
        codes = data["data"]
        result_list = {}
        for d in codes:
            result_list[d["index"]]=d["code"]
        return result_list
    else:
        raise Exception("获取客户端监听代码出错")
# 矫正客户端代码
def correct_client_codes():
    client_ids = data_process.getValidL2Clients()
    for client_id in client_ids:
        try:
            index_codes = get_listen_codes_from_client(client_id)
            for index in range(0, 8):
                code = gpcode_manager.get_listen_code_by_pos(client_id, index)
                if code is not None and len(code) > 0 and index_codes.get(index) != code:
                    # 修复代码
                    L2CodeOperate().repaire_operate(client_id, index, code)
                elif code is None or len(code) == 0 and index_codes.get(index) is not None:
                    # 删除前端代码位
                    L2CodeOperate().add_operate(4, "", client_id, index)
        except Exception as e:
            logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
if __name__ == "__main__":
    correct_client_codes()
l2_data_manager.py
@@ -4,6 +4,7 @@
from datetime import datetime
import data_process
import l2_data_util
import mysql
import gpcode_manager
@@ -19,6 +20,9 @@
local_latest_datas = {}
# 本地今日数据
local_today_datas = {}
# 本地手数+操作那类型组成的临时变量
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
class L2DataException(Exception):
@@ -125,26 +129,15 @@
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            key = k.replace("l2-", "")
            split_data = key.split("-")
            code = split_data[0]
            operateType = split_data[1]
            time = split_data[2]
            num = split_data[3]
            price = split_data[4]
            limitPrice = split_data[5]
            cancelTime = split_data[6]
            cancelTimeUnit = split_data[7]
            item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice,
                    "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit}
            value = redis.get(k)
            json_value = json.loads(value)
            _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas.setdefault(code, new_datas)
        local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
def saveL2Data(code, datas):
@@ -233,6 +226,10 @@
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # 只能保存近3s的数据
            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
            if abs(ts1 - ts_now) <= 3:
            # TODO 保存数据
            redis = _redisManager.getRedis()
            redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
@@ -317,7 +314,8 @@
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配")
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
@@ -326,6 +324,8 @@
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
@@ -697,8 +697,17 @@
if __name__ == "__main__":
    print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    pass
    # 删除大数据
    redis = redis_manager.RedisManager(1).getRedis()
    keys = redis.keys("big_data*")
    for key in keys:
        redis.delete(key)
    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    # load_l2_data("002868")
    # keys= local_today_num_operate_map["002868"]
    # for k in keys:
    #     print(len( local_today_num_operate_map["002868"][k]))
    # pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
    # __set_l2_data_latest_count("000333", 20)
l2_data_util.py
New file
@@ -0,0 +1,106 @@
# l2数据工具
# 比较时间的大小
import json
def compare_time(time1, time2):
    result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
    return result
# 将key转为l2数据对象
def l2_data_key_2_obj(k, value):
    key = k.replace("l2-", "")
    split_data = key.split("-")
    code = split_data[0]
    operateType = split_data[1]
    time = split_data[2]
    num = split_data[3]
    price = split_data[4]
    limitPrice = split_data[5]
    cancelTime = split_data[6]
    cancelTimeUnit = split_data[7]
    item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice,
            "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit}
    json_value = json.loads(value)
    _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
    return _data
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}".format(data["val"]["num"], data["val"]["operateType"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
# 减去时间
def __sub_time(time_str, seconds):
    time_seconds = get_time_as_seconds(time_str) - seconds
    h = time_seconds // 3600
    m = time_seconds % 3600 // 60
    s = time_seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s)
def get_time_as_seconds(time_str):
    times = time_str.split(":")
    time_seconds = int(times[0]) * 3600 + int(times[1]) * 60 + int(times[2])
    return time_seconds
# 计算时间的区间
def __compute_time_space_as_second(cancel_time, cancel_time_unit):
    __time = int(cancel_time)
    if int(cancel_time) == 0:
        return 0, 0
    unit = int(cancel_time_unit)
    if unit == 0:
        # 秒
        return __time, (__time + 1)
    elif unit == 1:
        # 分钟
        return __time * 60, (__time + 1) * 60
    elif unit == 2:
        # 小时
        return __time * 3600, (__time + 1) * 3600
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data, today_datas):
    # 计算时间区间
    min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    for data in today_datas:
        if int(data["val"]["operateType"]) != 0:
            continue
        if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
            continue
        if compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data
    return None
def test(datas):
    datas["code"] = "test"
if __name__ == "__main__":
    # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}}
    # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}]
    # result= get_buy_data_with_cancel_data(cancel_data,today_datas)
    # print(result)
    __datas = {}
    test(__datas)
    print(__datas)
log.py
@@ -5,32 +5,34 @@
    return "D:/logs/gp/{}/{}".format(dir_name, log_name)+".{time:YYYY-MM-DD}.log"
#   每一天生成一个日志文件,历史日志文件采用zip压缩
#   每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
logger.add(get_path("trade", "trade_gui"), filter=lambda record: record["extra"].get("name") == "trade_gui",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade", rotation="00:00",
           compression="zip")
logger.add(get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate", rotation="00:00",
logger.add(get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate",
           rotation="00:00",
           compression="zip")
logger.add(get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("juejin", "juejin_tick"), filter=lambda record: record["extra"].get("name") == "juejin_tick",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("ths", "code_operate"), filter=lambda record: record["extra"].get("name") == "code_operate",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device",
           rotation="00:00", compression="zip")
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
           rotation="00:00", compression="zip", enqueue=True)
logger_trade_gui = logger.bind(name="trade_gui")
logger_trade = logger.bind(name="trade")
@@ -41,6 +43,7 @@
logger_juejin_tick = logger.bind(name="juejin_tick")
logger_code_operate = logger.bind(name="code_operate")
logger_device = logger.bind(name="device")
logger_system = logger.bind(name="system")
if __name__ == '__main__':
    logger_trade_gui.info("测试")
mongo_data.py
@@ -20,6 +20,7 @@
    except:
        pass
def save_one(dn_name, data):
    try:
        db = _getdb()
@@ -35,11 +36,12 @@
        db = _getdb()
        collections = db[dn_name]
        result = collections.find(where_dict)
        print(result)
        # print(result)
        return result
    except:
        pass
def count(dn_name, where_dict):
    try:
        db = _getdb()
server.py
@@ -2,6 +2,7 @@
import logging
import socketserver
import socket
import threading
import time
import data_process
@@ -66,7 +67,9 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, datas = l2_data_manager.parseL2Data(_str)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                        try:
                            # print("L2数据接受",day,code,len(datas))
                            # 查询
@@ -98,8 +101,12 @@
                            if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                key = "{}-{}-{}".format(client, channel, code)
                                if key not in self.l2_data_error_dict or round(
                                        time.time() * 1000) - self.l2_data_error_dict[key] > 2000:
                                    self.l2CodeOperate.repaire_l2_data(code)
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code)
                                    self.l2_data_error_dict[key] = round(time.time() * 1000)
                        except Exception as e:
@@ -114,10 +121,6 @@
                                logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                    except:
                        pass
                elif type == 10:
                    # level2交易队列
                    try:
@@ -126,7 +129,6 @@
                            data_process.saveL2Data(day, code, setData)
                    except:
                        print("异常")
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
@@ -138,7 +140,10 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    sync_target_codes_to_ths()
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    codeList = data_process.parseGPCode(_str)
@@ -185,17 +190,18 @@
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print("现价数量", len(data))
                    for item in data:
                        juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    if "memery" in data:
                        mem = data["memery"]
                        logger_device.info("({})内存使用率:{}".format(client_id, mem))
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host)
                    # print("心跳:", client_id)
@@ -235,7 +241,8 @@
                send_msg(client, {"action": "test"})
            except:
                pass
        # 矫正客户端代码
        l2_code_operate.correct_client_codes()
        time.sleep(5)
ths_industry_util.py
New file
@@ -0,0 +1,22 @@
# 同花顺行业
import mongo_data
# 获取行业映射
def get_code_industry_maps():
    __code_map = {}
    __industry_map = {}
    results = mongo_data.find("ths-industry-codes", {})
    for r in results:
        code = r["_id"]
        industry = r["second_industry"]
        __code_map[code] = industry
        if __industry_map.get(industry) is None:
            __industry_map[industry] = set()
        __industry_map[industry].add(code)
    return __code_map, __industry_map
if __name__ == "__main__":
    _code_map, _industry_map = get_code_industry_maps()
    print(_code_map,_industry_map)
trade_manager.py
@@ -231,9 +231,9 @@
        _time = data["time"]
        if _time == "00:00:00":
            continue
        if code is not None:
            set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
        if code is not None and int(data["type"]) == 0:
            forbidden_trade(code)
            set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
# 处理委托成功数据