admin
2024-05-16 726e083845c4cfe2d50f381ee09336136be3e01e
可转债接口
7个文件已修改
2个文件已添加
484 ■■■■■ 已修改文件
main.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_cb_api_server.py 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_l1_data_server.py 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
socket_manager.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/global_data_cache_util.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/kpl_api_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -4,6 +4,7 @@
import data_server
import log
import middle_api_server
import middle_cb_api_server
import middle_server
if __name__ == "__main__":
@@ -15,4 +16,9 @@
    t1.start()
    t1 = threading.Thread(target=lambda: middle_server.run(12880), daemon=True)
    t1.start()
    # 可转债API端口为13008
    t1 = threading.Thread(target=lambda: middle_cb_api_server.run(13008), daemon=True)
    t1.start()
    # t1 = threading.Thread(target=lambda: middle_l1_data_server.run(12881), daemon=True)
    # t1.start()
    middle_server.run()
middle_api_server.py
@@ -14,6 +14,7 @@
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
from log import logger_request_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi
@@ -158,11 +159,14 @@
                            break
                        elif type_ == 'common':
                            # 验证签名
                            # if not is_sign_right:
                            #    raise Exception("签名错误")
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            ctype = params.get("ctype")
                            trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price"}
                            if ctype in trade_sell_types:
                                result = hosting_api_util.common_request(params,
                                                                         client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
                            else:
                                result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                            break
@@ -213,12 +217,25 @@
                            # 同步交易数据
                            sync_type = data_json["data"]["type"]
                            hosting_api_util.refresh_trade_data(sync_type)
                            hosting_api_util.refresh_trade_data(sync_type, blocking=False,
                                                                client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
                            return_str = json.dumps(
                                {"code": 0, "data": {}, "msg": ""})
                        elif type_ == "get_huaxin_subscript_codes":
                            # 获取华鑫订阅的代码
                            fresults = global_data_cache_util.huaxin_subscript_codes
                            update_time = global_data_cache_util.huaxin_subscript_codes_update_time
                            if update_time is None:
                                update_time = ''
                            return_str = json.dumps(
                                {"code": 0,
                                 "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                                 "msg": ""})
                            pass
                        elif type_ == "get_huaxin_position_subscript_codes":
                            # 获取华鑫订阅的代码
                            fresults = global_data_cache_util.huaxin_position_subscript_codes
                            update_time = global_data_cache_util.huaxin_position_subscript_codes_update_time
                            if update_time is None:
                                update_time = ''
                            return_str = json.dumps(
@@ -294,7 +311,7 @@
                            return_str = json.dumps(result)
                        elif type_ == "trade_server_channels":
                            trade_channels = socket_manager.ClientSocketManager.list_client(
                                socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE)
                                socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
                            common_channels = socket_manager.ClientSocketManager.list_client(
                                socket_manager.ClientSocketManager.CLIENT_TYPE_COMMON)
                            data = {}
@@ -336,10 +353,6 @@
                            code = data_json["data"]["code"]
                            result = hosting_api_util.get_code_position_info(code)
                            return_str = json.dumps(result)
                        elif type_ == "common":
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                        elif type_ == "register_msg_receiver":
                            params = data_json["data"]
                            msg_types = params["types"]
@@ -375,12 +388,21 @@
                                    continue
                                fresults.append(temp)
                                return_str = json.dumps(json.dumps({"code": 0, "data": fresults}))
                        elif type_ == "get_buy1_info":
                            code = data_json["data"]["code"]
                            result = hosting_api_util.common_request({"ctype": "get_buy1_info", "code": code},
                                                                     client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
                            return_str = json.dumps(result)
                        elif type_ == "get_l1_data":
                            results = L1DataManager.get_current_l1_data()
                            return_str = json.dumps({"code": 0, "data": results})
                    finally:
                        log.request_info("middle_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
                logging.exception(e)
                logger_request_debug.exception(e)
                return_str = json.dumps({"code": 401, "msg": str(e)})
                break
            finally:
middle_cb_api_server.py
New file
@@ -0,0 +1,179 @@
import hashlib
import json
import logging
import socket
import socketserver
import log
from log import logger_request_debug
from utils import socket_util, hosting_api_util
"""
可转债外部接口
"""
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    def __is_sign_right(self, data_json):
        list_str = []
        sign = data_json["sign"]
        data_json.pop("sign")
        for k in data_json:
            list_str.append(f"{k}={data_json[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        if md5 != sign:
            raise Exception("签名出错")
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            return_str = ""
            try:
                data, header = socket_util.recv_data(sk)
                if data:
                    data_str = data
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    try:
                        log.request_info("middle_cb_api_server", f"请求开始:{type_}")
                        is_sign_right = socket_util.is_client_params_sign_right(data_json)
                        # ------客户端请求接口-------
                        if type_ == 'buy':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            volume = codes_data["volume"]
                            price = codes_data["price"]
                            price_type = codes_data["price_type"]
                            try:
                                if not code:
                                    raise Exception("请上传code")
                                if not volume:
                                    raise Exception("请上传volume")
                                # 下单
                                result = hosting_api_util.trade_order_for_cb(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                             volume, price, price_type)
                                if result:
                                    resultJSON = result
                                    print("下单结果:", resultJSON)
                                    if resultJSON['code'] == 0:
                                        return_str = json.dumps({"code": 0})
                                    else:
                                        raise Exception(resultJSON['msg'])
                                break
                            except Exception as e:
                                raise e
                        elif type_ == 'cancel_order':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            orderSysID = codes_data.get("orderSysID")
                            accountId = codes_data.get("accountId")
                            if code:
                                result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                             accountId,
                                                                             orderSysID, True)
                                print("---撤单结果----")
                                print(result)
                                if result["code"] == 0:
                                    return_str = json.dumps({"code": 0})
                                else:
                                    raise Exception(result["msg"])
                            else:
                                return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                            break
                        elif type_ == 'sell':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            volume = codes_data["volume"]
                            price_type = codes_data["price_type"]
                            result = hosting_api_util.trade_order_for_cb(hosting_api_util.TRADE_DIRECTION_SELL, code,
                                                                         volume,
                                                                         '', price_type=price_type)
                            if result["code"] == 0:
                                return_str = json.dumps(result)
                            else:
                                raise Exception(result["msg"])
                            print("---卖出结果----")
                            print(result)
                            break
                        elif type_ == 'get_code_position_info':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            result = hosting_api_util.get_position_for_cb(code)
                            return_str = json.dumps(result)
                            break
                        elif type_ == 'get_account_money':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            result = hosting_api_util.get_account_money_for_cb()
                            return_str = json.dumps(result)
                            break
                        elif type_ == 'common':
                            params = data_json["data"]
                            result = hosting_api_util.common_request_for_cb(params)
                            return_str = json.dumps(result)
                            break
                    finally:
                        log.request_info("middle_cb_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
                logging.exception(e)
                logger_request_debug.exception(e)
                return_str = json.dumps({"code": 401, "msg": str(e)})
                break
            finally:
                sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
    def finish(self):
        super().finish()
def run(port):
    print("create middle_cb_api_server")
    laddr = "0.0.0.0", port
    print("middle_cb_api_server is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
if __name__ == "__main__":
    run(9005)
middle_l1_data_server.py
New file
@@ -0,0 +1,151 @@
import builtins
import hashlib
import json
import logging
import queue
import random
import socket
import socketserver
import threading
import time
import constant
import log
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
trade_data_request_queue = queue.Queue()
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    def __is_sign_right(self, data_json):
        list_str = []
        sign = data_json["sign"]
        data_json.pop("sign")
        for k in data_json:
            list_str.append(f"{k}={data_json[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        if md5 != sign:
            raise Exception("签名出错")
    @classmethod
    def getRecvData(cls, skk):
        data = ""
        header_size = 10
        buf = skk.recv(header_size)
        header_str = buf
        if buf:
            start_time = time.time()
            buf = buf.decode('utf-8')
            if buf.startswith("##"):
                content_length = int(buf[2:10])
                received_size = 0
                while not received_size == content_length:
                    r_data = skk.recv(10240)
                    received_size += len(r_data)
                    data += r_data.decode('utf-8')
            else:
                data = skk.recv(1024 * 1024)
                data = buf + data.decode('utf-8')
        return data, header_str
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            try:
                data, header = self.getRecvData(sk)
                if data:
                    data_str = data
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = None
                    try:
                        data_json = json.loads(data_str)
                    except json.decoder.JSONDecodeError as e:
                        # JSON解析失败
                        sk.sendall(socket_util.load_header(json.dumps(
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    type_ = data_json["type"]
                    __start_time = time.time()
                    try:
                        if data_json["type"] == 'l1_data':
                            datas = data_json["data"]
                            L1DataManager.add_datas(datas)
                            break
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
                        if time.time() - __start_time > 2:
                            log.logger_tuoguan_request_debug.info(
                                f"耗时:{int(time.time() - __start_time)}s  数据:{data_json}")
                else:
                    # 断开连接
                    break
                # sk.close()
            except Exception as e:
                # log.logger_tuoguan_request_debug.exception(e)
                logging.exception(e)
                break
    def finish(self):
        super().finish()
# L1数据管理
class L1DataManager:
    __l1_datas_dict = {}
    @classmethod
    def add_datas(cls, datas):
        for data in datas:
            """
            data数据结构:(代码,昨日收盘价,最新价,总成交量,总成交额,更新时间)
            """
            cls.__l1_datas_dict[data[0]] = data
    @classmethod
    def get_current_l1_data(cls):
        return [cls.__l1_datas_dict[x] for x in cls.__l1_datas_dict]
def run(port):
    print("create MiddleL1DataServer")
    laddr = "0.0.0.0", port
    print("MiddleServer is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
if __name__ == "__main__":
    print(builtins.type("") == str)
middle_server.py
@@ -15,6 +15,7 @@
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
@@ -140,6 +141,16 @@
                                print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes = datas
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_position_subscript_codes":
                            # 设置订阅的代码
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                print("l2_position_subscript_codes", data_json)
                                global_data_cache_util.huaxin_position_subscript_codes = datas
                                global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "redis":
@@ -285,6 +296,10 @@
                            push_msg_manager.push_msg(_type, data)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        elif data_json["type"] == 'l1_data':
                            datas = data_json["data"]
                            L1DataManager.add_datas(datas)
                            break
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
socket_manager.py
@@ -9,6 +9,8 @@
    CLIENT_TYPE_COMMON = "common"
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_TRADE_SELL = "trade_sell"
    # 可转债客户端
    CLIENT_TYPE_TRADE_CB = "trade_cb"
    socket_client_dict = {}
    socket_client_lock_dict = {}
@@ -16,7 +18,7 @@
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
@@ -28,7 +30,7 @@
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
utils/global_data_cache_util.py
@@ -2,4 +2,7 @@
全局缓存
"""
huaxin_subscript_codes = []
huaxin_subscript_codes_update_time =None
huaxin_subscript_codes_update_time =None
huaxin_position_subscript_codes = []
huaxin_position_subscript_codes_update_time =None
utils/hosting_api_util.py
@@ -97,6 +97,12 @@
                    raise Exception(f"读取内容超时: request_id={request_id}")
        finally:
            pass
    else:
        try:
            # 如果不是阻塞请求需要释放客户端
            ClientSocketManager.release_client(client[0])
        except:
            pass
    return None
@@ -230,8 +236,8 @@
#  刷新交易数据
def refresh_trade_data(type, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON,
def refresh_trade_data(type, blocking=True, client_type=ClientSocketManager.CLIENT_TYPE_COMMON):
    request_id, client = __request(client_type,
                                   {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type,
                                    "sinfo": f"cb_{API_TYPE_REFRESH_TRADE_DATA}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -325,16 +331,78 @@
    return __read_response(client, request_id, blocking)
def common_request(params={}, blocking=True):
def common_request(params={}, blocking=True, client_type=ClientSocketManager.CLIENT_TYPE_COMMON):
    data = {"type": API_TYPE_COMMON_REQUEST,
            "sinfo": f"cb_{API_TYPE_COMMON_REQUEST}_{round(time.time() * 1000)}"}
    if params:
        for k in params:
            data[k] = params[k]
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, data)
    request_id, client = __request(client_type, data)
    return __read_response(client, request_id, blocking, timeout=10)
###############可转债#################
def trade_order_for_cb(direction, code, volume, price, price_type=2, blocking=True):
    """
    交易
    :param direction: 1-买 2-卖
    :param code: 代码
    :param volume: 量
    :param price: 价格
    :param price_type: 价格类型
    :param blocking: 是否阻塞
    :return:
    """
    request_id, client = __request(
        ClientSocketManager.CLIENT_TYPE_TRADE_CB,
        {"type": API_TYPE_TRADE, "trade_type": 1,
         "direction": direction,
         "code": code,
         "volume": volume,
         "price_type": price_type,
         "price": price, "sinfo": f"order_{code}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
def get_position_for_cb(code=None, blocking=True):
    """
    获取持仓
    :param code:
    :param blocking:
    :return:
    """
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_CB,
                                   {"type": API_TYPE_GET_CODE_POSITION_INFO, "code": code,
                                    "sinfo": f"cb_{API_TYPE_GET_CODE_POSITION_INFO}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
def get_account_money_for_cb(blocking=True):
    """
    获取账户资金情况
    :param code:
    :param blocking:
    :return:
    """
    return common_request_for_cb({"ctype": "get_account_money"}, blocking)
def common_request_for_cb(params, blocking=True):
    """
    通用请求
    :param params:
    :param blocking:
    :return:
    """
    data = {"type": API_TYPE_COMMON_REQUEST,
            "sinfo": f"cb_{API_TYPE_COMMON_REQUEST}_{round(time.time() * 1000)}"}
    if params:
        for k in params:
            data[k] = params[k]
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_CB, data)
    return __read_response(client, request_id, blocking, timeout=10)
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))
    pass
utils/kpl_api_util.py
@@ -8,7 +8,7 @@
    }
    # proxies={'https': '192.168.3.251:9002'}
    # 禁止代理,不然会走本地代理
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None}, timeout=10)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text