admin
2025-06-04 287c506725b2d970f721f80169f83c2418cb0991
添加新版低吸中间服务器
8个文件已修改
1个文件已添加
149 ■■■■■ 已修改文件
low_suction_proxy_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_ls_api_server.py 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
socket_manager.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/global_data_cache_util.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/huaxin_trade_record_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
low_suction_proxy_server.py
@@ -19,8 +19,8 @@
logger = logging.getLogger("http.server")
logger.setLevel(logging.CRITICAL)
# 183.234.94.164/125.93.72.196
REAL_HOST, REAL_PORT = "183.234.94.164", 12881
# 183.234.94.163/125.93.72.195
REAL_HOST, REAL_PORT = "183.234.94.163", 12881
class DataServer(BaseHTTPRequestHandler):
main.py
@@ -5,9 +5,12 @@
import data_server
import middle_api_server
import middle_cb_api_server
import middle_ls_api_server
import middle_server
from log_module import async_log_util
# cd /usr/local/middle_server_source/gp-server
# PYTHONPATH=../../gp_server_source/lib python3 main.py
if __name__ == "__main__":
    t1 = threading.Thread(target=lambda: middle_api_server.run(), daemon=True)
    t1.start()
@@ -20,6 +23,8 @@
    # 可转债API端口为13008
    t1 = threading.Thread(target=lambda: middle_cb_api_server.run(13008), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: middle_ls_api_server.run(14008), daemon=True)
    t1.start()
    # t1 = threading.Thread(target=lambda: middle_l1_data_server.run(12881), daemon=True)
    # t1.start()
middle_api_server.py
@@ -200,7 +200,7 @@
                            params = data_json["data"]
                            ctype = params.get("ctype")
                            trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price",
                                                "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue"}
                                                "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue", "auto_cancel_sell_mode"}
                            if ctype in trade_sell_types:
                                result = hosting_api_util.common_request(params,
                                                                         client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
@@ -266,7 +266,7 @@
                            fdata = []
                            try:
                                # 获取当前涨停比例
                                rate_results_dict = CodesLimitRateManager.get_price_rates(set([r[0] for r in fresults]))
                                rate_results_dict = global_data_cache_util.huaxin_subscript_codes_rate
                                for r in fresults:
                                    fdata.append(
                                        (r[0], r[1], rate_results_dict.get(r[0]) if r[0] in rate_results_dict else 0,
middle_ls_api_server.py
New file
@@ -0,0 +1,93 @@
import hashlib
import json
import logging
import socket
import socketserver
from log_module import request_log_util
from log_module.log import logger_request_debug
from utils import socket_util, hosting_api_util
"""
低吸外部接口
"""
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    def __is_sign_right(self, data_json):
        list_str = []
        sign = data_json["sign"]
        data_json.pop("sign")
        for k in data_json:
            list_str.append(f"{k}={data_json[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        if md5 != sign:
            raise Exception("签名出错")
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            return_str = ""
            try:
                data, header = socket_util.recv_data(sk)
                if data:
                    data_str = data
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    try:
                        request_log_util.request_info("middle_ls_api_server", f"请求开始:{type_}")
                        is_sign_right = socket_util.is_client_params_sign_right(data_json)
                        # ------客户端请求接口-------
                        if type_ == 'common':
                            params = data_json["data"]
                            result = hosting_api_util.common_request_for_low_suction(params)
                            return_str = json.dumps(result)
                            break
                    finally:
                        request_log_util.request_info("middle_ls_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
                logging.exception(e)
                logger_request_debug.exception(e)
                return_str = json.dumps({"code": 401, "msg": str(e)})
                break
            finally:
                sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
    def finish(self):
        super().finish()
def run(port):
    print("create middle_ls_api_server")
    laddr = "0.0.0.0", port
    print("middle_ls_api_server is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
middle_server.py
@@ -161,6 +161,15 @@
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_subscript_codes_rate":
                            # 设置订阅的代码的涨幅
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes_rate = datas
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_position_subscript_codes":
                            # 设置订阅的代码
                            try:
@@ -360,6 +369,13 @@
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                        elif data_json["type"] == 'low_suction':
                            # TODO 低吸通道
                            datas = data_json["data"]
                            pass
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
socket_manager.py
@@ -9,6 +9,7 @@
    CLIENT_TYPE_COMMON = "common"
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_TRADE_SELL = "trade_sell"
    CLIENT_TYPE_TRADE_LOW_SUCTION = "trade_low_suction"
    # 可转债客户端
    CLIENT_TYPE_TRADE_CB = "trade_cb"
@@ -18,7 +19,8 @@
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB:
        if _type in {cls.CLIENT_TYPE_COMMON, cls.CLIENT_TYPE_TRADE, cls.CLIENT_TYPE_TRADE_SELL,
                     cls.CLIENT_TYPE_TRADE_CB, cls.CLIENT_TYPE_TRADE_LOW_SUCTION}:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
utils/global_data_cache_util.py
@@ -3,6 +3,7 @@
"""
huaxin_subscript_codes = []
huaxin_subscript_codes_update_time =None
huaxin_subscript_codes_rate = {}
huaxin_position_subscript_codes = []
huaxin_position_subscript_codes_update_time =None
utils/hosting_api_util.py
@@ -432,5 +432,21 @@
    return __read_response(client, request_id, blocking, timeout=10)
def common_request_for_low_suction(params, blocking=True):
    """
    通用请求
    :param params:
    :param blocking:
    :return:
    """
    data = {"type": API_TYPE_COMMON_REQUEST,
            "sinfo": f"cb_{API_TYPE_COMMON_REQUEST}_{round(time.time() * 1000)}"}
    if params:
        for k in params:
            data[k] = params[k]
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_LOW_SUCTION, data)
    return __read_response(client, request_id, blocking, timeout=10)
if __name__ == "__main__":
    pass
utils/huaxin_trade_record_manager.py
@@ -17,7 +17,7 @@
class DelegateRecordManager:
    key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime",
                "insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded",
                "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef"]
                "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef", "sinfo"]
    @classmethod
    def add(cls, datas):
@@ -34,12 +34,12 @@
                        nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']])
                        name = nameDict.get(d['securityID'])
                        mysqldb.execute(
                            "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
                            "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
                                _id, d["orderLocalID"], d["securityID"], name, d["direction"],
                                d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"],
                                d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"],
                                d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(),
                                tool.get_now_datetime_str(), d["accountID"]))
                                tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"]))
                    else:
                        # 修改数据
                        updateDict = {}