Administrator
2023-07-11 022dea6d9f0044e25710638e13280bb23d89f51f
华鑫适配
1个文件已添加
317 ■■■■■ 已修改文件
trade/huaxin/trade_api_server.py 317 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py
New file
@@ -0,0 +1,317 @@
import hashlib
import json
import logging
import queue
import random
import socket
import socketserver
import threading
import time
import constant
from l2 import l2_data_manager_new
from l2.huaxin import huaxin_target_codes_manager
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_trade_debug
from third_data.history_k_data_util import HistoryKDatasUtils
from trade import trade_manager
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_record_manager
from utils import socket_util, tool, huaxin_util
trade_data_request_queue = queue.Queue()
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    def __is_sign_right(self, data_json):
        list_str = []
        sign = data_json["sign"]
        data_json.pop("sign")
        for k in data_json:
            list_str.append(f"{k}={data_json[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        if md5 != sign:
            raise Exception("签名出错")
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            return_str = ""
            try:
                data, header = socket_util.recv_data(sk)
                if data:
                    data_str = data
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    is_sign_right = socket_util.is_client_params_sign_right(data_json)
                    # ------客户端请求接口-------
                    if type_ == 'buy':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        volume = codes_data["volume"]
                        price = codes_data["price"]
                        try:
                            if not code:
                                raise Exception("请上传code")
                            if not volume:
                                raise Exception("请上传volume")
                            if round(float(price), 2) <= 0:
                                prices = HistoryKDatasUtils.get_now_price([code])
                                if not prices:
                                    raise Exception("现价获取失败")
                                price = prices[0][1]
                            # 下单
                            result = trade_api.order(trade_api.TRADE_DIRECTION_BUY, code, volume,
                                                     round(float(price), 2))
                            if result:
                                resultJSON = trade_api.parseResponse(result)
                                if resultJSON['code'] == 0:
                                    # TODO 下单成功保存下单结果数据
                                    pass
                                else:
                                    # 返回下单失败
                                    raise Exception("下单失败")
                        except Exception as e:
                            raise e
                    elif type_ == 'cancel_order':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        orderSysID = codes_data["orderSysID"]
                        if code and orderSysID:
                            result = trade_api.cancel_order(trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True)
                            print("---撤单结果----")
                            print(result)
                        elif code:
                            state = trade_manager.get_trade_state(code)
                            if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                                try:
                                    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
                                    return_str = json.dumps({"code": 0})
                                except Exception as e:
                                    return_str = json.dumps({"code": 2, "msg": str(e)})
                            else:
                                return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
                        else:
                            return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                    elif type_ == 'sell':
                        # 验证签名
                        if not is_sign_right:
                            raise Exception("签名错误")
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        volume = codes_data["volume"]
                        price = codes_data["price"]
                        # 是否强制卖0/1
                        force_sell = codes_data["force"]
                        # TODO 强制卖策略
                        result = trade_api.order(trade_api.TRADE_DIRECTION_SELL, code, volume, price)
                        print("---卖出结果----")
                        print(result)
                    elif type_ == 'delegate_list':
                        # 委托列表
                        update_time = data_json["data"]["update_time"]
                        # 是否可撤 0/1
                        can_cancel = data_json["data"]["can_cancel"]
                        results, update_time = None, None
                        if can_cancel:
                            results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"), None,
                                [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
                        else:
                            results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                                tool.get_now_date_str("%Y%m%d"), update_time)
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
                    elif type_ == 'deal_list':
                        # 成交列表
                        results, update_time = huaxin_trade_record_manager.DealRecordManager.list_by_day(
                            tool.get_now_date_str("%Y%m%d"))
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results}, "msg": ""})
                    elif type_ == 'position_list':
                        # 持仓股列表
                        results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
                            tool.get_now_date_str("%Y%m%d"))
                        return_str = json.dumps(
                            {"code": 0, "data": {"list": results}, "msg": ""})
                    elif type_ == 'money_list':
                        # 资金详情
                        money_data = huaxin_trade_record_manager.MoneyManager.get_data()
                        return_str = json.dumps(
                            {"code": 0, "data": money_data, "msg": ""})
                    elif type_ == 'sync_trade_data':
                        # 同步交易数据
                        sync_type = data_json["data"]["type"]
                        if sync_type == "delegate_list":
                            trade_data_request_queue.put_nowait({"type": "delegate_list"})
                        elif sync_type == "deal_list":
                            trade_data_request_queue.put_nowait({"type": "deal_list"})
                        elif sync_type == "money":
                            trade_data_request_queue.put_nowait({"type": "money"})
                        elif sync_type == "position_list":
                            trade_data_request_queue.put_nowait({"type": "position_list"})
                        return_str = json.dumps(
                            {"code": 0, "data": {}, "msg": ""})
                    # 查询委托列表
                    elif type_ == 'test':
                        # 卖出
                        # trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45)
                        result = trade_api.get_deal_list()
                        print("\n\n---成交列表----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_delegate_list(True)
                        print("\n\n---可撤委托----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_delegate_list(False)
                        print("\n\n---全部委托----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_position_list()
                        print("\n\n---持仓列表----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_money()
                        print("\n\n---账户列表----")
                        for d in result["data"]:
                            print(d)
                    elif type_ == 'test_l2':
                        codes_data = data_json["data"]
                        result = trade_api.set_l2_codes_data(codes_data)
                        print("\n\n---L2设置结果----")
                        print(result)
                    else:
                        while True:
                            r = sk.recv(1024 * 100)
                            if r:
                                print(r.decode('utf-8'))
                                sk.sendall("123".encode('utf-8'))
                        # result = trade_server_processor.process(data_json["data"])
                        # sk.sendall(json.dumps({"code": 0}).encode(encoding='utf-8'))
                else:
                    # 断开连接
                    break
                # sk.close()
            except Exception as e:
                logging.exception(e)
                break
            finally:
                pass
    def finish(self):
        super().finish()
def __read_trade_data_queue():
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                hx_logger_trade_debug.info(f"获取交易数据开始:{type_}")
                if type_ == "delegate_list":
                    data = huaxin_trade_api.get_delegate_list(False)
                    dataJSON = json.loads(data)
                    if dataJSON["data"]["code"] == 0:
                        data = dataJSON["data"]["data"]
                        huaxin_trade_record_manager.DelegateRecordManager.add(data)
                elif type_ == "money":
                    data = huaxin_trade_api.get_money()
                    dataJSON = json.loads(data)
                    if dataJSON["data"]["code"] == 0:
                        data = dataJSON["data"]["data"]
                        huaxin_trade_record_manager.MoneyManager.save_data(data)
                elif type_ == "deal_list":
                    data = huaxin_trade_api.get_deal_list()
                    dataJSON = json.loads(data)
                    if dataJSON["data"]["code"] == 0:
                        data = dataJSON["data"]["data"]
                        huaxin_trade_record_manager.DealRecordManager.add(data)
                # 持仓股
                elif type_ == "position_list":
                    data = huaxin_trade_api.get_position_list()
                    dataJSON = huaxin_trade_api.parseResponse(data)
                    if dataJSON["code"] == 0:
                        data = dataJSON["data"]
                        huaxin_trade_record_manager.PositionManager.add(data)
                hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有1s的间隔
            time.sleep(1)
def __set_target_codes():
    while True:
        try:
            datas = huaxin_target_codes_manager.pop()
            if datas:
                result = huaxin_trade_api.set_l2_codes_data(datas)
                print("设置L2代码结果:", result)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(1)
def run():
    print("create TradeApiServer")
    # 拉取交易信息
    t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10009
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
if __name__ == "__main__":
    pass