Administrator
2023-08-14 31823d638d8d824bb6240e5b0fe11c54d97473a7
华鑫托管改造初步设计
13个文件已修改
4个文件已添加
1189 ■■■■■ 已修改文件
db/mysql_data_delegate.py 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager_delegate.py 195 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin/constant.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin/l2_client.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/kp_client_msg_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
outside_api_command_manager.py 265 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 308 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/middle_api_protocol.py 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/socket_util.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/mysql_data_delegate.py
New file
@@ -0,0 +1,60 @@
# 网络代理
import logging
import pymysql
# 把连接参数定义成字典
import constant
from utils import middle_api_protocol
config = constant.MYSQL_CONFIG
class Mysqldb:
    # 初始化方法
    def __init__(self):
        pass
    def __request(self, cmd, args):
        data = {
            "db": config["database"],
            "cmd": cmd,
        }
        if args:
            data["args"] = args
        fdata = middle_api_protocol.load_redis_cmd(data)
        result = middle_api_protocol.request(fdata)
        return result
    # 查询sql语句返回的所有数据
    def select_all(self, sql):
        return self.__request("select_all", [sql])
    # 查询sql语句返回的一条数据
    def select_one(self, sql):
        return self.__request("select_one", [sql])
    # 查询sql语句返回的几条数据
    def select_many(self, sql, num):
        return self.__request("select_many", [sql, num])
    # 增删改除了SQL语句不一样其他都是一样的,都需要提交
    def execute(self, sql, args=None):
        return self.__request("execute", [sql, args])
    def execute_many(self, sql, args=None):
        return self.__request("execute_many", [sql, args])
    # 当对象被销毁时,游标要关闭,连接也要关闭
    # 创建时是先创建连接后创建游标,关闭时是先关闭游标后关闭连接
    def __del__(self):
        pass
if __name__ == '__main__':
    mysqldb = Mysqldb()
    # 插入单条数据
    mysqldb.execute("insert into clients(account,pwd,rule) values(%s,%s,%s)", ("test", 123456, "\"123"))
    # 插入多条数据
    mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)",
                         [("test", 123456, "\"123"), ("test", 123456, "\"123")])
db/redis_manager.py
@@ -4,7 +4,6 @@
import logging
import queue
import time
from threading import Thread
import redis
@@ -172,10 +171,6 @@
if __name__ == "__main__":
    RedisUtils.setex_async(0, "test", tool.get_expire(), "123123")
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.incrby_async(0, "test_1", 1)
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.delete_async(0, "test")
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.run_loop()
    redis = RedisManager(1).getRedis()
    db = redis.connection_pool.connection_kwargs['db']
    print(db)
db/redis_manager_delegate.py
New file
@@ -0,0 +1,195 @@
"""
redis管理器
"""
import logging
import queue
import time
import redis
import constant
from log_module.log import logger_redis_debug
from utils import tool, middle_api_protocol
config = constant.REDIS_CONFIG
pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
                                    db=db, decode_responses=True, max_connections=50) for db in range(16)]
class RedisManager:
    def __init__(self, db=config["db"]):
        self.pool = pool_caches[db]
        self.db = db
    def getRedis(self):
        return redis.Redis(connection_pool=self.pool)
    def getRedisNoPool(self):
        return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db,
                           decode_responses=True)
class RedisUtils:
    __async_task_queue = queue.Queue()
    @classmethod
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return lamada_method()
        finally:
            logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def __request(cls, db, cmd, key, args=None):
        data = {
            "db": db,
            "cmd": cmd,
            "key": key,
        }
        if args:
            data["args"] = args
        fdata = middle_api_protocol.load_redis_cmd(data)
        result = middle_api_protocol.request(fdata)
        return result
    @classmethod
    def __get_db(cls, redis_):
        return redis_.connection_pool.connection_kwargs['db']
    @classmethod
    def get(cls, redis_, key, auto_free=True):
        return cls.exec("get", key, lambda: cls.__request(cls.__get_db(redis_), "get", key))
    @classmethod
    def scard(cls, redis_, key, auto_free=True):
        return cls.exec("scard", key, lambda: cls.__request(cls.__get_db(redis_), "scard", key))
    @classmethod
    def delete(cls, redis_, key, auto_free=True, _async=False):
        return cls.exec("delete", key, lambda: cls.__request(cls.__get_db(redis_), "delete", key))
    @classmethod
    def delete_async(cls, db, key, auto_free=True):
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", 0, key)
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
        return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key))
    @classmethod
    def set(cls, redis_, key, val, auto_free=True):
        return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val))
    @classmethod
    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
        return cls.exec("setex", key, lambda: cls.__request(cls.__get_db(redis_), "setex", key, [expire, val]))
    @classmethod
    def setex_async(cls, db, key, expire, val, auto_free=True):
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
        return cls.exec("setnx", key, lambda: cls.__request(cls.__get_db(redis_), "setnx", key, val))
    @classmethod
    def expire(cls, redis_, key, expire, auto_free=True):
        return cls.exec("expire", key, lambda: cls.__request(cls.__get_db(redis_), "expire", key, expire))
    @classmethod
    def expire_async(cls, db, key, expire, auto_free=True):
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", 0, key)
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
        return cls.exec("sadd", key, lambda: cls.__request(cls.__get_db(redis_), "sadd", key, val))
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", 0, key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
        return cls.exec("sismember", key, lambda: cls.__request(cls.__get_db(redis_), "sismember", key, val))
    @classmethod
    def smembers(cls, redis_, key, auto_free=True):
        return cls.exec("smembers", key, lambda: cls.__request(cls.__get_db(redis_), "smembers", key))
    @classmethod
    def srem(cls, redis_, key, val, auto_free=True):
        return cls.exec("srem", key, lambda: cls.__request(cls.__get_db(redis_), "srem", key, val))
    @classmethod
    def srem_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", 0, key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        return cls.exec("incrby", key, lambda: cls.__request(cls.__get_db(redis_), "incrby", key, num))
    @classmethod
    def incrby_async(cls, db, key, num, auto_free=True):
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", 0, key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
        return cls.exec("lpush", key, lambda: cls.__request(cls.__get_db(redis_), "lpush", key, val))
    @classmethod
    def lpop(cls, redis_, key, auto_free=True):
        return cls.exec("lpop", key, lambda: cls.__request(cls.__get_db(redis_), "lpop", key))
    @classmethod
    def rpush(cls, redis_, key, val, auto_free=True):
        return cls.exec("rpush", key, lambda: cls.__request(cls.__get_db(redis_), "rpush", key, val))
    @classmethod
    def realse(cls, redis_):
        pass
    @classmethod
    def add_async_task(cls, db, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
    @classmethod
    def get_async_task_count(cls):
        return cls.__async_task_queue.qsize()
    # 运行异步任务
    @classmethod
    def run_loop(cls):
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    db = data[0]
                    method_name = data[1]
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(RedisUtils, method_name)
                    if type(args) == tuple:
                        args = list(args)
                        args.insert(0, _redis)
                        args = tuple(args)
                        result = method(*args)
                    else:
                        args = tuple([_redis, args])
                        result = method(args)
            except Exception as e1:
                logging.exception(e1)
                pass
if __name__ == "__main__":
    pass
huaxin/constant.py
@@ -1,4 +1,5 @@
# addr, port = "111.230.16.67", 10008
SERVER_IP = '43.138.167.68'
# SERVER_IP = '43.138.167.68'
SERVER_IP = "192.168.3.122"
SERVER_PORT = 10008
TEST = True
huaxin/l2_client.py
@@ -471,14 +471,8 @@
        print("接受到命令", client_id, request_id, codes_data)
        try:
            spi.set_codes_data(codes_data)
            SendResponseSkManager.send_normal_response("l2_cmd",
                                                       SendResponseSkManager.load_response(client_id, request_id,
                                                                                           {"code": 0, "msg": "设置成功"}))
        except Exception as e:
            logging.exception(e)
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
def __init_l2():
main.py
@@ -60,7 +60,6 @@
    # l1与trade间的通信
    pl1t_l1, pl1t_trade = multiprocessing.Pipe()
    logger_l2_process_time.info("测试123")
    serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
    serverProcess.start()
    # 将tradeServer作为主进程
@@ -70,4 +69,5 @@
    l2Process = multiprocessing.Process(target=huaxin.l2_client.run, args=(ptl2_l2,))
    tradeProcess.start()
    l2Process.start()
    # L1订阅数据
    huaxin.l1_client.run_async(pl1t_l1)
output/kp_client_msg_manager.py
@@ -10,6 +10,7 @@
from log_module import log, log_export
from db.redis_manager import RedisManager, RedisUtils
from log_module.log import logger_kp_msg
from utils import middle_api_protocol, tool
CLIENT_IDS = ["zjb", "hxh"]
@@ -41,13 +42,17 @@
# 添加消息
@tool.async_call
def add_msg(code, msg):
    # 根据代码获取名称
    name = gpcode_manager.get_code_name(code)
    msg = f"【{name}({code})】{msg}"
    __temp_msg_queue.put_nowait(msg)
    # 添加到日志
    logger_kp_msg.info(msg)
    fdata = middle_api_protocol.load_kp_msg(msg)
    middle_api_protocol.request(fdata)
    #
    # __temp_msg_queue.put_nowait(msg)
    # # 添加到日志
    # logger_kp_msg.info(msg)
def read_msg(client_id):
outside_api_command_manager.py
New file
@@ -0,0 +1,265 @@
"""
外部接口管理
"""
import json
import logging
import random
import socket
import threading
import time
# 心跳信息
from huaxin import socket_util
from huaxin.client_network import SendResponseSkManager
from utils import middle_api_protocol
MSG_TYPE_HEART = "heart"
# 命令信息
MSG_TYPE_CMD = "cmd"
CLIENT_TYPE_TRADE = "trade"
# 心跳时间间隔
HEART_SPACE_TIME = 3
TRADE_DIRECTION_BUY = 1
TRADE_DIRECTION_SELL = 2
TRADE_TYPE_ORDER = 1
TRADE_TYPE_CANCEL_ORDER = 2
# 数据操作
OPERRATE_SET = 1  # 设置
OPERRATE_DELETE = 2  # 删除
OPERRATE_GET = 3  # 获取
# 代码名单类型
CODE_LIST_WHITE = "white"
CODE_LIST_BLACK = "black"
CODE_LIST_WANT = "want"
CODE_LIST_PAUSE_BUY = "pause_buy"
# 类型
API_TYPE_TRADE = "trade"  # 交易
API_TYPE_TRADE_STATE = "trade_state"  # 交易状态
API_TYPE_TRADE_MODE = "trade_mode"  # 交易模式
API_TYPE_CODE_LIST = "code_list"  # 代码名单
API_TYPE_EXPORT_L2 = "export_l2"  # 导出L2数据
API_TYPE_INIT = "init"  # 初始化
API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data"  # 交易数据刷新
API_TYPE_CODE_ATRRIBUTE = "code_attribute"  # 代码属性
API_TYPE_CODE_TRADE_STATE = "code_trade_state"  # 代码交易状态
API_TYPE_GET_ENV = "get_env"  # 获取环境信息
class ActionCallback(object):
    # 交易
    def OnTrade(self, client_id, request_id, data):
        pass
    # 交易状态
    def OnTradeState(self, client_id, request_id, data):
        pass
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        pass
    # 代码名单
    def OnCodeList(self, client_id, request_id, data):
        pass
    def OnExportL2(self, client_id, request_id, data):
        pass
    def OnEveryDayInit(self, client_id, request_id, data):
        pass
    def OnRefreshTradeData(self, client_id, request_id, data):
        pass
    def OnGetCodeAttribute(self, client_id, request_id, data):
        pass
    def OnGetCodeTradeState(self, client_id, request_id, data):
        pass
    def OnGetEnvInfo(self, client_id, request_id, data):
        pass
# 交易指令管理
# 交易指令管理
class ApiCommandManager:
    trade_client_dict = {}
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance
    @classmethod
    def __create_client(cls, client_type, rid):
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
        client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
        # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
        client.connect(cls.ip_port)
        client.send(SendResponseSkManager.format_response(
            json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
        client.recv(1024)
        return client
    @classmethod
    def __create_and_run_client(cls, type, index=None):
        key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
        if index is not None:
            key += f"_{index}"
        sk = cls.__create_client(type, key)
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        print("create_and_run_client success", type, key)
        return key, sk
    @classmethod
    def init(cls, addr, port, trade_action_callback, trade_client_count=20):
        cls.trade_client_dict = {}
        cls.trade_client_count = trade_client_count
        cls.action_callback = trade_action_callback
        cls.ip_port = (addr, port)
        for i in range(trade_client_count):
            result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i)
            cls.trade_client_dict[result[0]] = result[1]
    # 听取指令
    @classmethod
    def __listen_command(cls, _type, client_id, sk):
        while True:
            try:
                result = socket_util.recv_data(sk)[0]
                if result:
                    try:
                        print("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
                            sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
                            continue
                        data = result_json["data"]
                        content_type = data["type"]
                        print("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            print("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
                            continue
                        if content_type == API_TYPE_TRADE:
                            # 交易
                            cls.action_callback.OnTrade(client_id, request_id, data)
                        elif content_type == API_TYPE_TRADE_STATE:
                            cls.action_callback.OnTradeState(client_id, request_id, data)
                        elif content_type == API_TYPE_TRADE_MODE:
                            cls.action_callback.OnTradeMode(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_LIST:
                            cls.action_callback.OnCodeList(client_id, request_id, data)
                        elif content_type == API_TYPE_EXPORT_L2:
                            cls.action_callback.OnExportL2(client_id, request_id, data)
                        elif content_type == API_TYPE_INIT:
                            cls.action_callback.OnEveryDayInit(client_id, request_id, data)
                        elif content_type == API_TYPE_REFRESH_TRADE_DATA:
                            cls.action_callback.OnRefreshTradeData(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_ATRRIBUTE:
                            cls.action_callback.OnGetCodeAttribute(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_TRADE_STATE:
                            cls.action_callback.OnGetCodeTradeState(client_id, request_id, data)
                        elif content_type == API_TYPE_GET_ENV:
                            cls.action_callback.OnGetEnvInfo(client_id, request_id, data)
                    except Exception as e:
                        logging.exception(e)
                        pass
                    finally:
                        # 发送响应
                        sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
                else:
                    raise Exception("接收的内容为空")
            except Exception as e:
                logging.exception(e)
                if _type == CLIENT_TYPE_TRADE:
                    if client_id in cls.trade_client_dict:
                        cls.trade_client_dict.pop(client_id)
                        print("pop trade client", client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
    @classmethod
    def __heart_beats(cls, _type, client_id, sk):
        while True:
            try:
                sk.send(SendResponseSkManager.format_response(
                    json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # print("心跳信息发送成功", client_id)
            except Exception as e:
                print("心跳信息发送失败", _type, client_id)
                logging.exception(e)
                if _type == CLIENT_TYPE_TRADE:
                    if client_id in cls.trade_client_dict:
                        cls.trade_client_dict.pop(client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
            time.sleep(HEART_SPACE_TIME)
    @classmethod
    def __listen_command_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __heartbeats_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __maintain_client(cls):
        while True:
            try:
                if len(cls.trade_client_dict) < cls.trade_client_count:
                    print("__maintain_client", CLIENT_TYPE_TRADE, cls.trade_client_count - len(cls.trade_client_dict))
                    for i in range(cls.trade_client_count - len(cls.trade_client_dict)):
                        result = cls.__create_and_run_client(CLIENT_TYPE_TRADE)
                        cls.trade_client_dict[result[0]] = result[1]
            except:
                pass
            time.sleep(1)
    # 维护连接数的稳定
    def run(self, blocking=True):
        # 维护client
        if blocking:
            self.__maintain_client()
        else:
            t1 = threading.Thread(target=lambda: self.__maintain_client())
            t1.setDaemon(True)
            t1.start()
if __name__ == "__main__":
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None)
    manager.run()
    input()
third_data/data_server.py
@@ -566,7 +566,9 @@
def run(addr, port):
    # 运行看盘消息采集
    kp_client_msg_manager.run_capture()
    # kp_client_msg_manager.run_capture()
    kpl_data_manager.run_pull_task()
    handler = DataServer
    # httpd = socketserver.TCPServer((addr, port), handler)
    httpd = ThreadedHTTPServer((addr, port), handler)
third_data/history_k_data_util.py
@@ -9,7 +9,7 @@
import constant
from db.redis_manager import RedisUtils
from utils import tool
from utils import tool, middle_api_protocol
from db import redis_manager
import gm.api as gmapi
@@ -25,25 +25,32 @@
                return datetime.datetime.fromisoformat(val)
            return val
        url = f'{cls.__BASE_URL}{path_str}'
        # 发送POST请求
        response = requests.post(url, json=data_json)
        result = response.text
        resultJson = json.loads(result)
        if resultJson['code'] == 0:
            data = resultJson['data']
            if type(data) == list:
                for d in data:
        DELEGATE = True
        fdata = None
        if DELEGATE:
            fdata = middle_api_protocol.load_juejin(path_str, data_json)
            fdata = middle_api_protocol.request(fdata)
        else:
            url = f'{cls.__BASE_URL}{path_str}'
            # 发送POST请求
            response = requests.post(url, json=data_json)
            result = response.text
            resultJson = json.loads(result)
            if resultJson['code'] == 0:
                fdata = resultJson['data']
        if fdata:
            if type(fdata) == list:
                for d in fdata:
                    if type(d) != dict:
                        continue
                    for k in d:
                        d[k] = deformat_date(d[k])
            elif type(data) == dict:
                for k in data:
                    data[k] = deformat_date(data[k])
            return data
        return None
            elif type(fdata) == dict:
                for k in fdata:
                    fdata[k] = deformat_date(fdata[k])
            return fdata
        else:
            return None
    @classmethod
    def get_instruments(cls, symbols, fields):
@@ -87,12 +94,11 @@
        redis = cls.__redisManager.getRedis()
        try:
            account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
            strategy_id =RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            token = RedisUtils.get(redis, "juejin-token", auto_free=False)
            return account_id, strategy_id, token
        finally:
            RedisUtils.realse(redis)
    @classmethod
    def get_juejin_code_list_with_prefix(cls, codes):
@@ -269,16 +275,4 @@
if __name__ == "__main__":
    constant.JUEJIN_LOCAL_API = False
    list_ = JueJinApi.get_exchanges_codes(["SHSE","SZSE"])
    fdatas = []
    for d in list_:
        if d["sec_id"].find("60") != 0 and d["sec_id"].find("00") != 0:
            continue
        if d["sec_level"] != 1:
            continue
        if d["pre_close"] * 1.1 > 40:
            continue
        if (d["listed_date"] + datetime.timedelta(days=100)).timestamp() > datetime.datetime.now().timestamp():
            continue
        fdatas.append(d)
    print(len(fdatas))
    print(HistoryKDatasUtils.get_lowest_price_rate("000725", 30))
third_data/kpl_api.py
@@ -2,26 +2,67 @@
import requests
from utils import middle_api_protocol
# 竞价
DABAN_TYPE_BIDDING = 8
# 涨停
DABAN_TYPE_LIMIT_UP = 1
# 炸板
DABAN_TYPE_OPEN_LIMIT_UP = 2
# 跌停
DABAN_TYPE_LIMIT_DOWN = 3
# 曾跌停
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
        "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
    }
    # proxies={'https': '192.168.3.251:9002'}
    # 禁止代理,不然会走本地代理
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
    return response
    DELEGATE = True
    if not DELEGATE:
        headers = {
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
        }
        # proxies={'https': '192.168.3.251:9002'}
        # 禁止代理,不然会走本地代理
        response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
        if response.status_code != 200:
            raise Exception("请求出错")
        return response.text
    else:
        fdata = middle_api_protocol.load_kpl(url, data)
        return middle_api_protocol.request(fdata)
def daBanList(pidType):
    data = "Order=1&a=DaBanList&st=100&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
           f"&VerSion=5.8.0.2&Index=0&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
           "&FilterGem=0 "
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 获取代码的板块
def getStockIDPlate(code):
    data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if int(result["errcode"]) != 0:
        return None
@@ -31,50 +72,35 @@
# 获取概念代码
def getCodesByPlate(plate_code):
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取概念中的板块强度
def getSonPlate(plate_code):
    data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    return result.get("ListJX")
@@ -82,10 +108,7 @@
# 获取自由流通市值
def getZYLTAmount(code):
    data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if "real" in result:
        return result["real"].get("actualcirculation_value")
@@ -93,4 +116,4 @@
if __name__ == "__main__":
    print(getZYLTAmount("000333"))
    print(getStockIDPlate("000333"))
third_data/kpl_data_manager.py
@@ -1,6 +1,9 @@
import json
import os
import threading
import time
import requests
import constant
from db.redis_manager import RedisUtils
@@ -23,7 +26,7 @@
        return self.__redisManager.getRedis()
    def save_reason(self, code, reason):
        RedisUtils.setex(self.__get_redis(),f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
        RedisUtils.setex(self.__get_redis(), f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
    def list_all(self):
        keys = RedisUtils.keys(self.__get_redis(), "kpl_limitup_reason-*")
@@ -310,5 +313,66 @@
    return yesterday_codes
# 运行拉取任务
def run_pull_task():
    def __upload_data(type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    def get_limit_up():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP)
                    result = json.loads(results)
                    __upload_data("limit_up", result)
                except Exception as e:
                    pass
            time.sleep(3)
    def get_bidding_money():
        # 竞价数据上传
        while True:
            if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                    result = json.loads(results)
                    __upload_data("biddings", result)
                except Exception as e:
                    pass
            time.sleep(3)
    def get_market_industry():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketIndustryRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("industry_rank", result)
                except:
                    pass
            time.sleep(3)
    def get_market_jingxuan():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("jingxuan_rank", result)
                except:
                    pass
            time.sleep(3)
    threading.Thread(target=get_limit_up, daemon=True).start()
    threading.Thread(target=get_bidding_money, daemon=True).start()
    threading.Thread(target=get_market_industry, daemon=True).start()
    threading.Thread(target=get_market_jingxuan, daemon=True).start()
if __name__ == "__main__":
    print(KPLLimitUpDataRecordManager.get_latest_blocks_set("002671"))
    run_pull_task()
    input()
trade/huaxin/huaxin_trade_api.py
@@ -187,8 +187,9 @@
# 网络请求
def __request(_type, data):
    request_id = __get_request_id(_type)
def __request(_type, data, request_id=None):
    if not request_id:
        request_id = __get_request_id(_type)
    try:
        hx_logger_trade_loop.info("请求发送开始:client_id-{} request_id-{}", 0, request_id)
        root_data = {"type": "cmd",
@@ -244,7 +245,9 @@
# volume:交易量
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=True):
def order(direction, code, volume, price, price_type=2, blocking=True, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"b_{code}_{round(time.time() * 1000)}"
    print("客户端", ClientSocketManager.socket_client_dict)
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
@@ -252,7 +255,7 @@
                            "code": code,
                            "volume": volume,
                            "price_type": price_type,
                            "price": price, "sinfo": f"b_{code}_{round(time.time() * 1000)}"})
                            "price": price, "sinfo": sinfo}, request_id=request_id)
    try:
        return __read_response(request_id, blocking)
@@ -261,12 +264,14 @@
        huaxin_trade_data_update.add_money_list()
def cancel_order(direction, code, orderSysID, blocking=True):
def cancel_order(direction, code, orderSysID, blocking=True, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}"
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                            "direction": direction,
                            "code": code,
                            "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"})
                            "orderSysID": orderSysID, "sinfo": sinfo},request_id=request_id)
    try:
        return __read_response(request_id, blocking)
    finally:
trade/huaxin/trade_server.py
@@ -11,10 +11,16 @@
import time
import dask
import psutil
from line_profiler import LineProfiler
import constant
import inited_data
import outside_api_command_manager
from code_attribute import gpcode_manager
from db import redis_manager, mysql_data
from db.redis_manager import RedisUtils
from huaxin.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
@@ -24,11 +30,14 @@
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import KPLCodeJXBlockManager
from third_data.history_k_data_util import JueJinApi
from trade import deal_big_money_manager, current_price_process_manager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update
from utils import socket_util
from trade.trade_manager import TradeTargetCodeModeManager
from utils import socket_util, data_export_util, middle_api_protocol, tool
trade_data_request_queue = queue.Queue()
@@ -277,6 +286,16 @@
                            print("l2_subscript_codes", data_json)
                            # 订阅的代码
                            huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas)
                            # 上传数据
                            codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                            fresults = []
                            if codes:
                                for code in codes:
                                    code_name = gpcode_manager.get_code_name(code)
                                    fresults.append((code, code_name))
                            fdata =  middle_api_protocol.load_l2_subscript_codes(fresults)
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "get_level1_codes":
@@ -356,9 +375,292 @@
                pass
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
    @classmethod
    def __send_response(cls, data_bytes):
        sk = SendResponseSkManager.create_send_response_sk()
        try:
            data_bytes = socket_util.load_header(data_bytes)
            sk.sendall(data_bytes)
            result, header_str = socket_util.recv_data(sk)
            result = json.loads(result)
            if result["code"] != 0:
                raise Exception(result['msg'])
        finally:
            sk.close()
    def send_response(self, data, _client_id, _request_id):
        data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
                                 "request_id": _request_id}).encode('utf-8')
        for i in range(3):
            try:
                self.__send_response(data_bytes)
                print("发送数据成功")
                break
            except Exception as e1:
                pass
    # 交易
    def OnTrade(self, client_id, request_id, data):
        try:
            trade_type = data["trade_type"]
            if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
                code = data["code"]
                direction = data["direction"]
                volume = data["volume"]
                price_type = data["price_type"]
                price = data["price"]
                sinfo = data["sinfo"]
                result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo,
                                                blocking=True, request_id=request_id)
                self.send_response({"code": 0, "data": result}, client_id, request_id)
            elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER:
                code = data["code"]
                direction = data["direction"]
                accountID = data["accountID"]
                orderSysID = data["orderSysID"]
                sinfo = data["sinfo"]
                result = huaxin_trade_api.cancel_order(direction, accountID, orderSysID, sinfo=sinfo,
                                                       blocking=True, request_id=request_id)
                self.send_response({"code": 0, "data": result}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易状态
    def OnTradeState(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                state = data["state"]
                if state:
                    trade_manager.TradeStateManager().open_buy()
                else:
                    trade_manager.TradeStateManager().close_buy()
                self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                mode = data["mode"]
                TradeTargetCodeModeManager().set_mode(mode)
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                mode = TradeTargetCodeModeManager().get_mode_cache()
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 代码名单
    def OnCodeList(self, client_id, request_id, data):
        try:
            code_list_type = data["code_list_type"]
            operate = data["operate"]
            code = data.get("code")
            fresult = {"code": 0}
            if code_list_type == outside_api_command_manager.CODE_LIST_WANT:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WantBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WantBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    l2_trade_util.forbidden_trade(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.remove_from_forbidden_trade_codes(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = l2_trade_util.BlackListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    l2_trade_util.WhiteListCodeManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.WhiteListCodeManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = l2_trade_util.WhiteListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.PauseBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.PauseBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.PauseBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            self.send_response(fresult, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnExportL2(self, client_id, request_id, data):
        try:
            code = data["code"]
            data_export_util.export_l2_excel(code)
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnEveryDayInit(self, client_id, request_id, data):
        try:
            inited_data.everyday_init()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnRefreshTradeData(self, client_id, request_id, data):
        try:
            sync_type = data["ctype"]
            if sync_type == "delegate_list":
                huaxin_trade_data_update.add_delegate_list()
            elif sync_type == "deal_list":
                huaxin_trade_data_update.add_deal_list()
            elif sync_type == "money":
                huaxin_trade_data_update.add_money_list()
            elif sync_type == "position_list":
                huaxin_trade_data_update.add_position_list()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeAttribute(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeTradeState(self, client_id, request_id, data):
        try:
            code = data["code"]
            state = trade_manager.CodesTradeStateManager().get_trade_state(code)
            result = {"code": 0, "data": {"state": state}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetEnvInfo(self, client_id, request_id, data):
        try:
            fdata = {}
            try:
                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                if date:
                    fdata["juejin"] = 1
            except Exception as e:
                fdata["juejin"] = 0
            fdata["kpl"] = {}
            # 获取开盘啦数据
            kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                         KPLDataType.INDUSTRY_RANK.value]
            for kpl_type in kpl_types:
                if kpl_type in KPLDataManager.kpl_data_update_info:
                    fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
            try:
                # 验证redis
                RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                fdata["redis"] = 1
            except:
                fdata["redis"] = 0
            try:
                # 验证mysql
                mysql_data.Mysqldb().select_one("select 1")
                fdata["mysql"] = 1
            except:
                fdata["mysql"] = 0
            try:
                # redis异步任务数量
                fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
            except:
                pass
            # 获取CPU与内存适用情况
            memory_info = psutil.virtual_memory()
            cpu_percent = psutil.cpu_percent(interval=1)
            fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
            result = {"code": 0, "data": fdata, "msg": ""}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
def run(pipe_trade, pipe_l1):
    # 执行一些初始化数据
    block_info.init()
    # 启动外部接口监听
    manager = outside_api_command_manager.ApiCommandManager()
    manager.init(middle_api_protocol.SERVER_HOST,
                 middle_api_protocol.SERVER_PORT,
                 OutsideApiCommandCallback())
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.set_pipe_trade(pipe_trade)
    # 监听l1那边传过来的代码
trade/trade_manager.py
@@ -66,12 +66,12 @@
    # 开启购买入口
    def open_buy(self):
        self.__trade_buy_state_cache = True
        RedisUtils.setex(self.__get_redis(), "trade_buy_state", tool.get_expire(), 1)
        RedisUtils.setex_async(self.__get_redis(), "trade_buy_state", tool.get_expire(), 1)
    # 关闭购买入口
    def close_buy(self):
        self.__trade_buy_state_cache = False
        RedisUtils.setex(self.__get_redis(), "trade_buy_state", tool.get_expire(), 0)
        RedisUtils.setex_async(self.__get_redis(), "trade_buy_state", tool.get_expire(), 0)
    # 是否可以下单
    @classmethod
utils/middle_api_protocol.py
New file
@@ -0,0 +1,60 @@
"""
与中间服务器的通信协议
"""
import json
from utils import socket_util
# SERVER_HOST = '43.138.167.68'
SERVER_HOST = '192.168.3.122'
SERVER_PORT = 10008
def request(data_json):
    data_bytes = socket_util.load_header(json.dumps(data_json).encode('utf-8'))
    sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT)
    try:
        sk.sendall(data_bytes)
        result_str, header_str = socket_util.recv_data(sk)
        result_json = json.loads(result_str)
        if result_json["code"] != 0:
            raise Exception(result_json["msg"])
        return result_json.get("data")
    finally:
        sk.close()
# ------------------------------Redis协议----------------------------------
def load_redis_cmd(data):
    fdata = {"type": "redis", "data": {"ctype": "cmd", "data": data}}
    return fdata
# ------------------------------Mysql协议----------------------------------
def load_mysql_cmd(data):
    fdata = {"type": "mysql", "data": {"ctype": "cmd", "data": data}}
    return fdata
# -------------------------------掘金--------------------------------------
def load_juejin(path_, data_json):
    fdata = {"type": "juejin", "data": {"ctype": "juejin", "data": {"path": path_, "params": data_json}}}
    return fdata
# ------------------------------开盘啦-------------------------------------
def load_kpl(url, data):
    fdata = {"type": "kpl", "data": {"ctype": "kpl", "data": {"url": url, "data": data}}}
    return fdata
# ------------------------------看盘消息------------------------------------
def load_kp_msg(msg):
    fdata = {"type": "kp_msg", "data": {"ctype": "kp_msg", "data": {"msg": msg}}}
    return fdata
# ------------------------------L2订阅列表------------------------------------
def load_l2_subscript_codes(datas):
    fdata = {"type": "l2_subscript_codes", "data": {"ctype": "l2_subscript_codes", "data": datas}}
    return fdata
utils/socket_util.py
@@ -4,8 +4,15 @@
# 添加数据头
import json
import socket
from utils import crypt_util
def create_socket(addr, port):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
    client.connect((addr, port))
    return client
def load_header(data_bytes):
@@ -44,7 +51,7 @@
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k],separators=(',',':'))}")
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
@@ -62,15 +69,14 @@
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k],separators=(',',':'))}")
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    new_sign = crypt_util.md5_encrypt("&".join(str_list))
    #print("加密前字符串","&".join(str_list))
    # print("加密前字符串","&".join(str_list))
    if sign == new_sign:
        return True
    else:
        return False