admin
2024-01-08 1110af9cc42cbf6a3ebbb953f18585cb37ba5b8c
bug修复/日志添加
9个文件已修改
1002 ■■■■■ 已修改文件
constant.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 517 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 303 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
socket_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,6 +1,6 @@
import platform
TEST = True
TEST = False
IS_A = False
##B类##
@@ -85,6 +85,15 @@
        "passwd": "Yeshi2016@"
    }
if TEST:
    MYSQL_CONFIG = {
        "host": "gz-cdb-r13d0yi9.sql.tencentcdb.com",
        "port": 62929,
        "database": "gp",
        "charset": "utf8",
        "user": "root",
        "passwd": "Yeshi2016@"
    }
# 获取根路径
def get_path_prefix():
data_server.py
@@ -1,24 +1,22 @@
import http
import json
import random
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import dask
from code_attribute import gpcode_manager
from log import logger_request_debug
from log_module import log_analyse, log_export
from output import limit_up_data_filter, output_util, code_info_output
from output import limit_up_data_filter, output_util
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager, KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLPlatManager, KPLDataType
from trade import trade_manager
from trade.l2_trade_util import BlackListCodeManager
from utils import tool, global_util, kp_client_msg_manager, hosting_api_util
from utils.history_k_data_util import HistoryKDatasUtils
from utils import tool, global_util, hosting_api_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -45,6 +43,17 @@
        if not total_datas:
            KPLLimitUpDataRecordManager.load_total_datas()
            total_datas = KPLLimitUpDataRecordManager.total_datas
        current_datas_results = hosting_api_util.common_request({"ctype":"get_kpl_limit_up_datas"})
        if type(current_datas_results) == str:
            current_datas_results = json.loads(current_datas_results)
        current_datas = current_datas_results.get("data")  #KPLLimitUpDataRecordManager.latest_origin_datas
        current_block_codes = {}
        for c in current_datas:
            if c[5] not in current_block_codes:
                current_block_codes[c[5]] = set()
            current_block_codes[c[5]].add(c[0])
        # 通过涨停时间排序
        total_datas = list(total_datas)
@@ -64,12 +73,15 @@
                limit_up_reason_want_count_dict[d[2]] = 0
            if d[3] in want_codes:
                limit_up_reason_want_count_dict[d[2]] += 1
        # (板块名称,涨停代码数量,想买单数量,涨停时间)
        # (板块名称,涨停代码数量,炸板数量,想买单数量,涨停时间)
        limit_up_reason_statistic_info = [
            (k, len(limit_up_reason_dict[k]), limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for
            (k, len(limit_up_reason_dict[k]),
             len(limit_up_reason_dict[k]) - (len(current_block_codes[k]) if k in current_block_codes else 0),
             limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for
            k in
            limit_up_reason_dict]
        limit_up_reason_statistic_info.sort(key=lambda x: int(x[1]))
        limit_up_reason_statistic_info.sort(
            key=lambda x: len(current_block_codes.get(x[0])) if x[0] in current_block_codes else 0)
        limit_up_reason_statistic_info.reverse()
        codes_set = set([d[3] for d in total_datas])
@@ -260,28 +272,40 @@
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        if url.path == "/kpl/get_limit_up_list":
            response_data = self.__get_limit_up_list()
            self.send_response(200)
            # 发给请求客户端的响应数据
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(response_data.encode())
        else:
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            result = hosting_api_util.get_from_data_server(url.path, ps_dict)
            self.__send_response(result)
        thread_id = random.randint(0, 1000000)
        logger_request_debug.info(f"GET 请求开始({thread_id}):{url.path}")
        try:
            if url.path == "/kpl/get_limit_up_list":
                response_data = self.__get_limit_up_list()
                self.send_response(200)
                # 发给请求客户端的响应数据
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                self.wfile.write(response_data.encode())
            else:
                ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
                result = hosting_api_util.get_from_data_server(url.path, ps_dict)
                self.__send_response(result)
        finally:
            logger_request_debug.info(f"GET 请求结束({thread_id}):{url.path}")
    def do_POST(self):
        thread_id = random.randint(0, 1000000)
        path = self.path
        url = urlparse.urlparse(path)
        if url.path == "/upload_kpl_data":
            # 接受开盘啦数据
            params = self.__parse_request()
            result_str = self.__process_kpl_data(params)
            self.__send_response(result_str)
        logger_request_debug.info(f"POST 请求开始({thread_id}):{url.path}")
        try:
            if url.path == "/upload_kpl_data":
                # 接受开盘啦数据
                params = self.__parse_request()
                result_str = self.__process_kpl_data(params)
                self.__send_response(result_str)
        finally:
            logger_request_debug.info(f"POST 请求结束({thread_id}):{url.path}")
    def __process_kpl_data(self, data):
        data = json.loads(json.dumps(data).replace("概念", ""))
        type_ = data["type"]
        print("开盘啦type:", type_)
        if type_ == KPLDataType.BIDDING.value:
log.py
@@ -47,6 +47,10 @@
                   filter=lambda record: record["extra"].get("name") == "profile",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("request", "request_debug"),
                   filter=lambda record: record["extra"].get("name") == "request_debug",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        path_str = "{}/logs/gp/{}/{}".format(constant.get_path_prefix(), dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
@@ -73,3 +77,5 @@
logger_redis_debug = __mylogger.get_logger("redis_debug")
logger_profile = __mylogger.get_logger("profile")
logger_request_debug = __mylogger.get_logger("request_debug")
middle_api_server.py
@@ -1,6 +1,7 @@
import hashlib
import json
import logging
import random
import socket
import socketserver
import threading
@@ -11,6 +12,7 @@
import trade_manager
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
from log import logger_request_debug
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils.huaxin_trade_record_manager import PositionManager
@@ -63,253 +65,278 @@
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    if type(type_) == int:
                        # 处理数字型TYPE
                        return_str = self.process_num_type(sk, type_, data_str)
                        break
                    thread_id = random.randint(0, 1000000)
                    try:
                        logger_request_debug.info(f"middle_api_server 请求开始({thread_id}):{type_}")
                        if type(type_) == int:
                            # 处理数字型TYPE
                            return_str = self.process_num_type(sk, type_, data_str)
                            break
                    is_sign_right = socket_util.is_client_params_sign_right(data_json)
                    # ------客户端请求接口-------
                    if type_ == 'buy':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        volume = codes_data["volume"]
                        price = codes_data["price"]
                        try:
                            if not code:
                                raise Exception("请上传code")
                            if not volume:
                                raise Exception("请上传volume")
                        is_sign_right = socket_util.is_client_params_sign_right(data_json)
                        # ------客户端请求接口-------
                        if type_ == 'buy':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            volume = codes_data["volume"]
                            price = codes_data["price"]
                            try:
                                if not code:
                                    raise Exception("请上传code")
                                if not volume:
                                    raise Exception("请上传volume")
                            if round(float(price), 2) <= 0:
                                prices = HistoryKDatasUtils.get_now_price([code])
                                if not prices:
                                    raise Exception("现价获取失败")
                                price = prices[0][1]
                            # 下单
                            result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume,
                                                                  round(float(price), 2))
                            if result:
                                resultJSON = result
                                print("下单结果:", resultJSON)
                                if resultJSON['code'] == 0:
                                if round(float(price), 2) <= 0:
                                    prices = HistoryKDatasUtils.get_now_price([code])
                                    if not prices:
                                        raise Exception("现价获取失败")
                                    price = prices[0][1]
                                # 下单
                                result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume,
                                                                      round(float(price), 2))
                                if result:
                                    resultJSON = result
                                    print("下单结果:", resultJSON)
                                    if resultJSON['code'] == 0:
                                        return_str = json.dumps({"code": 0})
                                    else:
                                        raise Exception(resultJSON['msg'])
                                break
                            except Exception as e:
                                raise e
                        elif type_ == 'cancel_order':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            orderSysID = codes_data.get("orderSysID")
                            accountId = codes_data.get("accountId")
                            if code:
                                result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                             accountId,
                                                                             orderSysID, True)
                                print("---撤单结果----")
                                print(result)
                                if result["code"] == 0:
                                    return_str = json.dumps({"code": 0})
                                else:
                                    raise Exception(resultJSON['msg'])
                                    raise Exception(result["msg"])
                            else:
                                return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                            break
                        except Exception as e:
                            raise e
                    elif type_ == 'cancel_order':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        orderSysID = codes_data.get("orderSysID")
                        accountId = codes_data.get("accountId")
                        if code:
                            result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                         accountId,
                                                                         orderSysID, True)
                            print("---撤单结果----")
                            print(result)
                        elif type_ == 'sell':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            volume = codes_data["volume"]
                            price_type = codes_data["price_type"]
                            result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_SELL, code, volume,
                                                                  '', price_type=price_type)
                            if result["code"] == 0:
                                return_str = json.dumps({"code": 0})
                                return_str = json.dumps(result)
                            else:
                                raise Exception(result["msg"])
                        else:
                            return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                        break
                    elif type_ == 'sell':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        volume = codes_data["volume"]
                        price_type = codes_data["price_type"]
                        result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_SELL, code, volume,
                                                              '',price_type=price_type)
                        if result["code"] == 0:
                            print("---卖出结果----")
                            print(result)
                            break
                        elif type_ == 'get_code_position_info':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            result = hosting_api_util.get_code_position_info(code)
                            return_str = json.dumps(result)
                        else:
                            raise Exception(result["msg"])
                        print("---卖出结果----")
                        print(result)
                        break
                    elif type_ == 'get_code_position_info':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        result = hosting_api_util.get_code_position_info(code)
                        return_str = json.dumps(result)
                        break
                            break
                    elif type_ == 'common':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        params = data_json["data"]
                        result = hosting_api_util.common_request(params)
                        return_str = json.dumps(result)
                        break
                        elif type_ == 'common':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                            break
                    elif type_ == 'get_cost_price':
                        # 获取成本价
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        try:
                            price = PositionManager.get_cost_price(code)
                            return_str = json.dumps({"code": 0, "data": {"price": price}})
                        except Exception as e:
                            return_str = json.dumps({"code": 1, "msg": str(e)})
                        break
                    elif type_ == 'delegate_list':
                        # 委托列表
                        update_time = data_json["data"]["update_time"]
                        # 是否可撤 0/1
                        can_cancel = data_json["data"]["can_cancel"]
                        results, update_time = None, None
                        if can_cancel:
                            results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"), None,
                                [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
                        else:
                            results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"), update_time)
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
                        break
                    elif type_ == 'deal_list':
                        # 成交列表
                        results = huaxin_trade_record_manager.DealRecordManager.list_by_day(
                            tool.get_now_date_str("%Y%m%d"))
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results}, "msg": ""})
                    elif type_ == 'position_list':
                        # 持仓股列表
                        results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
                            tool.get_now_date_str("%Y%m%d"))
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results}, "msg": ""})
                    elif type_ == 'money_list':
                        # 资金详情
                        money_data = huaxin_trade_record_manager.MoneyManager.get_data()
                        return_str = json.dumps(
                            {"code": 0, "data": money_data, "msg": ""})
                    elif type_ == 'sync_trade_data':
                        # 同步交易数据
                        sync_type = data_json["data"]["type"]
                        hosting_api_util.refresh_trade_data(sync_type)
                        return_str = json.dumps(
                            {"code": 0, "data": {}, "msg": ""})
                    elif type_ == "get_huaxin_subscript_codes":
                        # 获取华鑫订阅的代码
                        fresults = global_data_cache_util.huaxin_subscript_codes
                        update_time = global_data_cache_util.huaxin_subscript_codes_update_time
                        if update_time is None:
                            update_time = ''
                        return_str = json.dumps(
                            {"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                             "msg": ""})
                        pass
                    elif type_ == "export_l2_data":
                        # 导出L2数据
                        code = data_json["data"]["code"]
                        hosting_api_util.export_l2_data(code)
                        return_str = json.dumps(
                            {"code": 0, "data": {}, "msg": ""})
                    elif type_ == 'everyday_init':
                        # 每日初始化
                        hosting_api_util.everyday_init()
                        return_str = json.dumps(
                            {"code": 0, "data": {}, "msg": ""})
                    elif type_ == 'huaxin_channel_state':
                        # 华鑫通道状态
                        types = []
                        fdata = {}
                        return_str = json.dumps(
                            {"code": 0, "data": fdata, "msg": ""})
                    elif type_ == 'juejin_is_valid':
                        # 掘金是否可用
                        try:
                            date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                            if date:
                        elif type_ == 'get_cost_price':
                            # 获取成本价
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            try:
                                price = PositionManager.get_cost_price(code)
                                return_str = json.dumps({"code": 0, "data": {"price": price}})
                            except Exception as e:
                                return_str = json.dumps({"code": 1, "msg": str(e)})
                            break
                        elif type_ == 'delegate_list':
                            # 委托列表
                            update_time = data_json["data"]["update_time"]
                            # 是否可撤 0/1
                            can_cancel = data_json["data"]["can_cancel"]
                            results, update_time = None, None
                            if can_cancel:
                                results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                    tool.get_now_date_str("%Y%m%d"), None,
                                    [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
                            else:
                                results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                    tool.get_now_date_str("%Y%m%d"), update_time)
                            return_str = json.dumps(
                                {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
                            break
                        elif type_ == 'deal_list':
                            # 成交列表
                            results = huaxin_trade_record_manager.DealRecordManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"))
                            return_str = json.dumps(
                                {"code": 0, "data": {"list": results}, "msg": ""})
                        elif type_ == 'position_list':
                            # 持仓股列表
                            results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"))
                            return_str = json.dumps(
                                {"code": 0, "data": {"list": results}, "msg": ""})
                        elif type_ == 'money_list':
                            # 资金详情
                            money_data = huaxin_trade_record_manager.MoneyManager.get_data()
                            return_str = json.dumps(
                                {"code": 0, "data": money_data, "msg": ""})
                        elif type_ == 'sync_trade_data':
                            # 同步交易数据
                            sync_type = data_json["data"]["type"]
                            hosting_api_util.refresh_trade_data(sync_type)
                            return_str = json.dumps(
                                {"code": 0, "data": {}, "msg": ""})
                        elif type_ == "get_huaxin_subscript_codes":
                            # 获取华鑫订阅的代码
                            fresults = global_data_cache_util.huaxin_subscript_codes
                            update_time = global_data_cache_util.huaxin_subscript_codes_update_time
                            if update_time is None:
                                update_time = ''
                            return_str = json.dumps(
                                {"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                                 "msg": ""})
                            pass
                        elif type_ == "export_l2_data":
                            # 导出L2数据
                            code = data_json["data"]["code"]
                            hosting_api_util.export_l2_data(code)
                            return_str = json.dumps(
                                {"code": 0, "data": {}, "msg": ""})
                        elif type_ == 'everyday_init':
                            # 每日初始化
                            hosting_api_util.everyday_init()
                            return_str = json.dumps(
                                {"code": 0, "data": {}, "msg": ""})
                        elif type_ == 'huaxin_channel_state':
                            # 华鑫通道状态
                            types = []
                            fdata = {}
                            return_str = json.dumps(
                                {"code": 0, "data": fdata, "msg": ""})
                        elif type_ == 'juejin_is_valid':
                            # 掘金是否可用
                            try:
                                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                                if date:
                                    return_str = json.dumps(
                                        {"code": 0, "msg": ""})
                            except Exception as e:
                                return_str = json.dumps(
                                    {"code": 0, "msg": ""})
                        except Exception as e:
                            return_str = json.dumps(
                                {"code": 0, "msg": str(e)})
                    elif type_ == 'get_env_info':
                        # 获取环境信息
                        result = hosting_api_util.get_env_info()
                        return_str = json.dumps(result)
                    elif type_ == 'sync_l1_subscript_codes':
                        # 获取环境信息
                        result = hosting_api_util.sync_l1_subscript_codes()
                        return_str = json.dumps(result)
                                    {"code": 0, "msg": str(e)})
                        elif type_ == 'get_env_info':
                            # 获取环境信息
                            result = hosting_api_util.get_env_info()
                            return_str = json.dumps(result)
                        elif type_ == 'sync_l1_subscript_codes':
                            # 获取环境信息
                            result = hosting_api_util.sync_l1_subscript_codes()
                            return_str = json.dumps(result)
                    elif type_ == 'get_system_logs':
                        # 获取环境信息
                        start_index = data_json["data"]["start_index"]
                        count = data_json["data"]["count"]
                        result = hosting_api_util.get_system_logs(start_index, count)
                        return_str = json.dumps(result)
                    elif type_ == 'test_redis':
                        redis = redis_manager.RedisManager(5).getRedisNoPool()
                        try:
                            _start_time = time.time()
                            times = []
                            for i in range(0, 100):
                                RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False)
                            times.append(time.time() - _start_time)
                            _start_time = time.time()
                            for i in range(0, 20):
                                RedisUtils.smembers(redis, "test_set", auto_free=False)
                            times.append(time.time() - _start_time)
                            return_str = json.dumps(
                                {"code": 0, "data": times, "msg": ""})
                        finally:
                            redis.close()
                    elif type_ == 'get_code_trade_info':
                        # 获取环境信息
                        code = data_json["data"]["code"]
                        result = hosting_api_util.get_code_trade_info(code)
                        return_str = json.dumps(result)
                    elif type_ == 'get_l2_listen_active_count':
                        result = hosting_api_util.get_l2_listen_active_count()
                        return_str = json.dumps(result)
                    elif type_ == "trade_server_channels":
                        channels = socket_manager.ClientSocketManager.list_client()
                        return_str = json.dumps({"code": 0, "data": channels})
                    elif type_ == "save_running_data":
                        result = hosting_api_util.save_running_data()
                        return_str = json.dumps(result)
                    elif type_ == "add_sell_rule":
                        result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_ADD, data=data_json["data"])
                        return_str = json.dumps(result)
                    elif type_ == "del_sell_rule":
                        id_ = data_json["data"]["id"]
                        result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_DELETE, data={"id": id_})
                        return_str = json.dumps(result)
                    elif type_ == "list_sell_rule":
                        result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_GET, data={})
                        return_str = json.dumps(result)
                    elif type_ == "get_code_position_info":
                        code = data_json["data"]["code"]
                        result = hosting_api_util.get_code_position_info(code)
                        return_str = json.dumps(result)
                    elif type_ == "common":
                        params = data_json["data"]
                        result = hosting_api_util.common_request(params)
                        return_str = json.dumps(result)
                        elif type_ == 'get_system_logs':
                            # 获取环境信息
                            start_index = data_json["data"]["start_index"]
                            count = data_json["data"]["count"]
                            result = hosting_api_util.get_system_logs(start_index, count)
                            return_str = json.dumps(result)
                        elif type_ == 'test_redis':
                            redis = redis_manager.RedisManager(5).getRedisNoPool()
                            try:
                                _start_time = time.time()
                                times = []
                                for i in range(0, 100):
                                    RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False)
                                times.append(time.time() - _start_time)
                                _start_time = time.time()
                                for i in range(0, 20):
                                    RedisUtils.smembers(redis, "test_set", auto_free=False)
                                times.append(time.time() - _start_time)
                                return_str = json.dumps(
                                    {"code": 0, "data": times, "msg": ""})
                            finally:
                                redis.close()
                        elif type_ == 'get_code_trade_info':
                            # 获取环境信息
                            code = data_json["data"]["code"]
                            result = hosting_api_util.get_code_trade_info(code)
                            return_str = json.dumps(result)
                        elif type_ == 'get_l2_listen_active_count':
                            result = hosting_api_util.get_l2_listen_active_count()
                            return_str = json.dumps(result)
                        elif type_ == "trade_server_channels":
                            trade_channels = socket_manager.ClientSocketManager.list_client(
                                socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE)
                            common_channels = socket_manager.ClientSocketManager.list_client(
                                socket_manager.ClientSocketManager.CLIENT_TYPE_COMMON)
                            data = {}
                            available_count = 0
                            active_count = 0
                            now_time_str = tool.get_now_time_str()
                            for t in trade_channels:
                                if not t[1]:
                                    available_count += 1
                                if tool.trade_time_sub(now_time_str, t[2]) < 60:
                                    active_count += 1
                            data["trade"] = (len(trade_channels), available_count, active_count)
                            available_count = 0
                            active_count = 0
                            for t in common_channels:
                                if not t[1]:
                                    available_count += 1
                                if tool.trade_time_sub(now_time_str, t[2]) < 60:
                                    active_count += 1
                            data["common"] = (len(common_channels), available_count, active_count)
                            return_str = json.dumps({"code": 0, "data": data})
                        elif type_ == "save_running_data":
                            result = hosting_api_util.save_running_data()
                            return_str = json.dumps(result)
                        elif type_ == "add_sell_rule":
                            result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_ADD, data=data_json["data"])
                            return_str = json.dumps(result)
                        elif type_ == "del_sell_rule":
                            id_ = data_json["data"]["id"]
                            result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_DELETE, data={"id": id_})
                            return_str = json.dumps(result)
                        elif type_ == "list_sell_rule":
                            result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_GET, data={})
                            return_str = json.dumps(result)
                        elif type_ == "get_code_position_info":
                            code = data_json["data"]["code"]
                            result = hosting_api_util.get_code_position_info(code)
                            return_str = json.dumps(result)
                        elif type_ == "common":
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                    finally:
                        logger_request_debug.info(f"middle_api_server 请求结束({thread_id}):{type}")
                break
                # sk.close()
            except Exception as e:
@@ -420,6 +447,28 @@
                else:
                    return_str = json.dumps({"code": 1, "msg": "不可以取消"})
            elif type == 421:
                # 加入暂不买
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
                    hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_MUST_BUY)
                return_str = json.dumps({"code": 0})
            elif type == 422:
                # 移除暂不买
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
                    hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_MUST_BUY)
                return_str = json.dumps({"code": 0})
            elif type == 423:
                # 暂不买列表
                result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_MUST_BUY)
                return_str = json.dumps(result)
            elif type == 430:
                # 查询代码属性
                data = json.loads(_str)
middle_server.py
@@ -1,6 +1,4 @@
import datetime
import hashlib
import io
import json
import logging
import queue
@@ -14,7 +12,7 @@
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug
from log import logger_debug, logger_request_debug
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
@@ -95,88 +93,67 @@
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    if data_json["type"] == 'register':
                        client_type = data_json["data"]["client_type"]
                        rid = data_json["rid"]
                        socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                        sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                        try:
                            # print("客户端", ClientSocketManager.socket_client_dict)
                            while True:
                                result, header = self.getRecvData(sk)
                                try:
                                    resultJSON = json.loads(result)
                                    if resultJSON["type"] == 'heart':
                                        # 记录活跃客户端
                                        socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
                                except json.decoder.JSONDecodeError as e:
                                    print("JSON解析出错", result, header)
                                    if not result:
                                        sk.close()
                                        break
                                time.sleep(1)
                        except ConnectionResetError as ee:
                            socket_manager.ClientSocketManager.del_client(rid)
                        except Exception as e:
                            logging.exception(e)
                    elif data_json["type"] == "response":
                        # 主动触发的响应
                        try:
                            client_id = data_json["client_id"]
                            # hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                            # # 设置响应内容
                            hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes":
                        # 设置订阅的代码
                        try:
                            data = data_json["data"]
                            datas = data["data"]
                            print("l2_subscript_codes", data_json)
                            global_data_cache_util.huaxin_subscript_codes = datas
                            global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "redis":
                        try:
                            data = data_json["data"]
                            ctype = data["ctype"]
                            result_str = ''
                            if ctype == "queue_size":
                                # TODO 设置队列大小
                                result_str = json.dumps({"code": 0})
                            elif ctype == "cmd":
                                data = data["data"]
                                db = data["db"]
                                cmd = data["cmd"]
                                key = data["key"]
                                args = data.get("args")
                                redis = RedisManager(db).getRedis()
                                method = getattr(RedisUtils, cmd)
                                args_ = [redis, key]
                                if args is not None:
                                    if type(args) == tuple or type(args) == list:
                                        args = list(args)
                                        for a in args:
                                            args_.append(a)
                                    else:
                                        args_.append(args)
                                args_ = tuple(args_)
                                result = method(*args_)
                                if type(result) == set:
                                    result = list(result)
                                result_str = json.dumps({"code": 0, "data": result})
                            elif ctype == "cmds":
                    thread_id = random.randint(0, 1000000)
                    logger_request_debug.info(f"middle_server 请求开始({thread_id}):{data_json.get('type')}")
                    try:
                        if data_json["type"] == 'register':
                            client_type = data_json["data"]["client_type"]
                            rid = data_json["rid"]
                            socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                            sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                            try:
                                # print("客户端", ClientSocketManager.socket_client_dict)
                                while True:
                                    result, header = self.getRecvData(sk)
                                    try:
                                        resultJSON = json.loads(result)
                                        if resultJSON["type"] == 'heart':
                                            # 记录活跃客户端
                                            socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
                                    except json.decoder.JSONDecodeError as e:
                                        print("JSON解析出错", result, header)
                                        if not result:
                                            sk.close()
                                            break
                                    time.sleep(1)
                            except ConnectionResetError as ee:
                                socket_manager.ClientSocketManager.del_client(rid)
                            except Exception as e:
                                logging.exception(e)
                        elif data_json["type"] == "response":
                            # 主动触发的响应
                            try:
                                client_id = data_json["client_id"]
                                # hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                                # # 设置响应内容
                                hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_subscript_codes":
                            # 设置订阅的代码
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                result_list=[]
                                for d in datas:
                                    db = d["db"]
                                    cmd = d["cmd"]
                                    key = d["key"]
                                    args = d.get("args")
                                print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes = datas
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "redis":
                            try:
                                data = data_json["data"]
                                ctype = data["ctype"]
                                result_str = ''
                                if ctype == "queue_size":
                                    # TODO 设置队列大小
                                    result_str = json.dumps({"code": 0})
                                elif ctype == "cmd":
                                    data = data["data"]
                                    db = data["db"]
                                    cmd = data["cmd"]
                                    key = data["key"]
                                    args = data.get("args")
                                    redis = RedisManager(db).getRedis()
                                    method = getattr(RedisUtils, cmd)
                                    args_ = [redis, key]
@@ -191,76 +168,100 @@
                                    result = method(*args_)
                                    if type(result) == set:
                                        result = list(result)
                                    result_list.append(result)
                                result_str = json.dumps({"code": 0, "data": result_list})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logger_debug.exception(e)
                            logger_debug.info(f"Redis操作出错:data_json:{data_json}")
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "mysql":
                        try:
                                    result_str = json.dumps({"code": 0, "data": result})
                                elif ctype == "cmds":
                                    datas = data["data"]
                                    result_list=[]
                                    for d in datas:
                                        db = d["db"]
                                        cmd = d["cmd"]
                                        key = d["key"]
                                        args = d.get("args")
                                        redis = RedisManager(db).getRedis()
                                        method = getattr(RedisUtils, cmd)
                                        args_ = [redis, key]
                                        if args is not None:
                                            if type(args) == tuple or type(args) == list:
                                                args = list(args)
                                                for a in args:
                                                    args_.append(a)
                                            else:
                                                args_.append(args)
                                        args_ = tuple(args_)
                                        result = method(*args_)
                                        if type(result) == set:
                                            result = list(result)
                                        result_list.append(result)
                                    result_str = json.dumps({"code": 0, "data": result_list})
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logger_debug.exception(e)
                                logger_debug.info(f"Redis操作出错:data_json:{data_json}")
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "mysql":
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                db = data["db"]
                                cmd = data["cmd"]
                                args = data.get("args")
                                mysql = mysql_data.Mysqldb()
                                method = getattr(mysql, cmd)
                                args_ = []
                                if args:
                                    if type(args) == tuple or type(args) == list:
                                        args_ = list(args)
                                    else:
                                        args_.append(args)
                                args_ = tuple(args_)
                                result = method(*args_)
                                result_str = json.dumps({"code": 0, "data": result}, default=str)
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "juejin":
                            # 掘金请求
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                path_ = data["path"]
                                params = data.get("params")
                                result = JueJinHttpApi.request(path_, params)
                                result_str = json.dumps({"code": 0, "data": result})
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "kpl":
                            # 开盘啦请求
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                url = data["url"]
                                data_ = data.get("data")
                                result = kpl_api_util.request(url, data_)
                                result_str = json.dumps({"code": 0, "data": result})
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "kp_msg":
                            # 看盘消息
                            data = data_json["data"]
                            data = data["data"]
                            db = data["db"]
                            cmd = data["cmd"]
                            args = data.get("args")
                            mysql = mysql_data.Mysqldb()
                            method = getattr(mysql, cmd)
                            args_ = []
                            if args:
                                if type(args) == tuple or type(args) == list:
                                    args_ = list(args)
                                else:
                                    args_.append(args)
                            args_ = tuple(args_)
                            result = method(*args_)
                            result_str = json.dumps({"code": 0, "data": result}, default=str)
                            msg = data["msg"]
                            kp_client_msg_manager.add_msg(msg)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "juejin":
                        # 掘金请求
                        try:
                            data = data_json["data"]
                            data = data["data"]
                            path_ = data["path"]
                            params = data.get("params")
                            result = JueJinHttpApi.request(path_, params)
                            result_str = json.dumps({"code": 0, "data": result})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "kpl":
                        # 开盘啦请求
                        try:
                            data = data_json["data"]
                            data = data["data"]
                            url = data["url"]
                            data_ = data.get("data")
                            result = kpl_api_util.request(url, data_)
                            result_str = json.dumps({"code": 0, "data": result})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "kp_msg":
                        # 看盘消息
                        data = data_json["data"]
                        data = data["data"]
                        msg = data["msg"]
                        kp_client_msg_manager.add_msg(msg)
                        result_str = json.dumps({"code": 0, "data": {}})
                        sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        pass
                            pass
                    finally:
                        logger_request_debug.info(f"middle_server 请求结束({thread_id}):{data_json.get('type')}")
                else:
                    # 断开连接
                    break
socket_manager.py
@@ -6,6 +6,7 @@
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_COMMON = "common"
    CLIENT_TYPE_TRADE = "trade"
    socket_client_dict = {}
@@ -14,7 +15,7 @@
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_TRADE:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
@@ -26,7 +27,7 @@
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_TRADE:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
@@ -96,8 +97,10 @@
                cls.del_client(k)
    @classmethod
    def list_client(cls):
        _type = cls.CLIENT_TYPE_TRADE
    def list_client(cls, type_=None):
        _type = type_
        if not _type:
            _type = cls.CLIENT_TYPE_TRADE
        client_list = sorted(cls.socket_client_dict[_type],
                             key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
                             reverse=True)
@@ -108,6 +111,5 @@
                active_time = 0
            active_time = tool.to_time_str(int(active_time))
            fdata.append(
                (client[0], cls.socket_client_lock_dict[client[0]].locked(),active_time))
                (client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time))
        return fdata
third_data/kpl_api.py
@@ -27,13 +27,29 @@
    return response.text
def daBanList(pidType):
    data = "Order=1&a=DaBanList&st=100&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
           f"&VerSion=5.8.0.2&Index=0&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
def daBanList(pidType, page_size=50, index=0):
    data = f"Order=1&a=DaBanList&st={page_size}&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
           f"&VerSion=5.8.0.2&Index={index}&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
           "&FilterGem=0 "
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
def getLimitUpInfo():
    list_ = []
    page_size = 50
    MAX_SIZE = 150
    for i in range(0, 10):
        result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_))
        result = json.loads(result_str)
        temp_list = result["list"]
        list_ += temp_list
        if len(temp_list) < page_size:
            result['list'] = list_
            return json.dumps(result)
        elif len(list_) > MAX_SIZE:
            return json.dumps(result)
    return None
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
third_data/kpl_data_manager.py
@@ -7,6 +7,7 @@
import constant
from db.redis_manager import RedisUtils
from log import logger_kpl_limit_up_reason_change
from utils import tool
# 开盘啦历史涨停数据管理
@@ -325,7 +326,7 @@
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP)
                    results = kpl_api.getLimitUpInfo()
                    result = json.loads(results)
                    __upload_data("limit_up", result)
                except Exception as e:
@@ -367,7 +368,7 @@
            time.sleep(3)
    threading.Thread(target=get_limit_up, daemon=True).start()
    threading.Thread(target=get_bidding_money, daemon=True).start()
    # threading.Thread(target=get_bidding_money, daemon=True).start()
    threading.Thread(target=get_market_industry, daemon=True).start()
    threading.Thread(target=get_market_jingxuan, daemon=True).start()
utils/hosting_api_util.py
@@ -23,7 +23,7 @@
CODE_LIST_BLACK = "black"
CODE_LIST_WANT = "want"
CODE_LIST_PAUSE_BUY = "pause_buy"
CODE_LIST_MUST_BUY = "must_buy"
# 类型
API_TYPE_TRADE = "trade"  # 交易
API_TYPE_TRADE_STATE = "trade_state"  # 交易状态
@@ -156,7 +156,7 @@
# 设置交易状态
def set_trade_state(state, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_SET,
                                    "state": state,
                                    "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"})
@@ -165,7 +165,7 @@
# 获取交易状态
def get_trade_state(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_GET,
                                    "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -173,7 +173,7 @@
# 设置交易模式
def set_trade_mode(mode, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_SET, "mode": mode,
                                    "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -181,7 +181,7 @@
# 获取交易模式
def get_trade_mode(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_GET,
                                    "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -189,7 +189,7 @@
# -----代码名单操作----
def add_code_list(code, code_list_type, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code,
                                    "operate": OPERRATE_SET,
                                    "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
@@ -197,7 +197,7 @@
def remove_code_list(code, code_list_type, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code,
                                    "operate": OPERRATE_DELETE,
                                    "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
@@ -205,7 +205,7 @@
def get_code_list(code_list_type, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type,
                                    "operate": OPERRATE_GET,
                                    "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
@@ -214,7 +214,7 @@
# -----导出L2数据----
def export_l2_data(code, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_EXPORT_L2, "code": code,
                                    "sinfo": f"cb_{API_TYPE_EXPORT_L2}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -222,7 +222,7 @@
# -----每日初始化----
def everyday_init(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_INIT,
                                    "sinfo": f"cb_{API_TYPE_INIT}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -230,7 +230,7 @@
#  刷新交易数据
def refresh_trade_data(type, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type,
                                    "sinfo": f"cb_{API_TYPE_REFRESH_TRADE_DATA}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -238,7 +238,7 @@
#  获取代码属性
def get_code_attribute(code, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_ATRRIBUTE, "code": code,
                                    "sinfo": f"cb_{API_TYPE_CODE_ATRRIBUTE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -246,7 +246,7 @@
#  获取代码交易状态
def get_code_trade_state(code, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_TRADE_STATE, "code": code,
                                    "sinfo": f"{API_TYPE_CODE_TRADE_STATE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -254,7 +254,7 @@
# 获取环境信息
def get_env_info(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_GET_ENV,
                                    "sinfo": f"cb_{API_TYPE_GET_ENV}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -262,7 +262,7 @@
# 获取环境信息
def sync_l1_subscript_codes(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_SYNC_L1_TARGET_CODES,
                                    "sinfo": f"cb_{API_TYPE_SYNC_L1_TARGET_CODES}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -270,7 +270,7 @@
# 获取系统日志
def get_system_logs(start_index, count, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_SYSTEM_LOG, "start_index": start_index, "count": count,
                                    "sinfo": f"cb_{API_TYPE_SYSTEM_LOG}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -278,7 +278,7 @@
# 拉取data_server的内容
def get_from_data_server(path, params, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_GET_FROM_DATA_SERVER, "path": path, "params": params,
                                    "sinfo": f"cb_{API_TYPE_GET_FROM_DATA_SERVER}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
@@ -286,7 +286,7 @@
# 获取代码的交易信息
def get_code_trade_info(code, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_TRADE_INFO, "code": code,
                                    "sinfo": f"cb_{API_TYPE_CODE_TRADE_INFO}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
@@ -294,7 +294,7 @@
# L2有效监听数量
def get_l2_listen_active_count(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT,
                                    "sinfo": f"cb_{API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
@@ -302,7 +302,7 @@
# 保存正在运行的数据
def save_running_data(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_SAVE_RUNNING_DATA,
                                    "sinfo": f"cb_{API_TYPE_SAVE_RUNNING_DATA}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
@@ -310,10 +310,10 @@
# 保存正在运行的数据
def sell_rule(operate, data={}, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
                                   {"type": API_TYPE_SELL_RULE, "operate": operate, "data": data,
                                    "sinfo": f"cb_{API_TYPE_SELL_RULE}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
    return __read_response(client, request_id, blocking, timeout=10)
# 获取代码持仓信息
@@ -321,7 +321,7 @@
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": API_TYPE_GET_CODE_POSITION_INFO, "code": code,
                                    "sinfo": f"cb_{API_TYPE_GET_CODE_POSITION_INFO}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
    return __read_response(client, request_id, blocking)
def common_request(params={}, blocking=True):
@@ -330,8 +330,8 @@
    if params:
        for k in params:
            data[k] = params[k]
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, data)
    return __read_response(client, request_id, blocking, timeout=30)
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, data)
    return __read_response(client, request_id, blocking, timeout=10)
if __name__ == "__main__":