Administrator
2023-08-14 31823d638d8d824bb6240e5b0fe11c54d97473a7
trade/huaxin/trade_server.py
@@ -11,10 +11,16 @@
import time
import dask
import psutil
from line_profiler import LineProfiler
import constant
import inited_data
import outside_api_command_manager
from code_attribute import gpcode_manager
from db import redis_manager, mysql_data
from db.redis_manager import RedisUtils
from huaxin.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
@@ -24,11 +30,14 @@
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import KPLCodeJXBlockManager
from third_data.history_k_data_util import JueJinApi
from trade import deal_big_money_manager, current_price_process_manager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update
from utils import socket_util
from trade.trade_manager import TradeTargetCodeModeManager
from utils import socket_util, data_export_util, middle_api_protocol, tool
trade_data_request_queue = queue.Queue()
@@ -277,6 +286,16 @@
                            print("l2_subscript_codes", data_json)
                            # 订阅的代码
                            huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas)
                            # 上传数据
                            codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                            fresults = []
                            if codes:
                                for code in codes:
                                    code_name = gpcode_manager.get_code_name(code)
                                    fresults.append((code, code_name))
                            fdata =  middle_api_protocol.load_l2_subscript_codes(fresults)
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "get_level1_codes":
@@ -356,9 +375,292 @@
                pass
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
    @classmethod
    def __send_response(cls, data_bytes):
        sk = SendResponseSkManager.create_send_response_sk()
        try:
            data_bytes = socket_util.load_header(data_bytes)
            sk.sendall(data_bytes)
            result, header_str = socket_util.recv_data(sk)
            result = json.loads(result)
            if result["code"] != 0:
                raise Exception(result['msg'])
        finally:
            sk.close()
    def send_response(self, data, _client_id, _request_id):
        data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
                                 "request_id": _request_id}).encode('utf-8')
        for i in range(3):
            try:
                self.__send_response(data_bytes)
                print("发送数据成功")
                break
            except Exception as e1:
                pass
    # 交易
    def OnTrade(self, client_id, request_id, data):
        try:
            trade_type = data["trade_type"]
            if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
                code = data["code"]
                direction = data["direction"]
                volume = data["volume"]
                price_type = data["price_type"]
                price = data["price"]
                sinfo = data["sinfo"]
                result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo,
                                                blocking=True, request_id=request_id)
                self.send_response({"code": 0, "data": result}, client_id, request_id)
            elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER:
                code = data["code"]
                direction = data["direction"]
                accountID = data["accountID"]
                orderSysID = data["orderSysID"]
                sinfo = data["sinfo"]
                result = huaxin_trade_api.cancel_order(direction, accountID, orderSysID, sinfo=sinfo,
                                                       blocking=True, request_id=request_id)
                self.send_response({"code": 0, "data": result}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易状态
    def OnTradeState(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                state = data["state"]
                if state:
                    trade_manager.TradeStateManager().open_buy()
                else:
                    trade_manager.TradeStateManager().close_buy()
                self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                mode = data["mode"]
                TradeTargetCodeModeManager().set_mode(mode)
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                mode = TradeTargetCodeModeManager().get_mode_cache()
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 代码名单
    def OnCodeList(self, client_id, request_id, data):
        try:
            code_list_type = data["code_list_type"]
            operate = data["operate"]
            code = data.get("code")
            fresult = {"code": 0}
            if code_list_type == outside_api_command_manager.CODE_LIST_WANT:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WantBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WantBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    l2_trade_util.forbidden_trade(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.remove_from_forbidden_trade_codes(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = l2_trade_util.BlackListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    l2_trade_util.WhiteListCodeManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.WhiteListCodeManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = l2_trade_util.WhiteListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.PauseBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.PauseBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.PauseBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            self.send_response(fresult, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnExportL2(self, client_id, request_id, data):
        try:
            code = data["code"]
            data_export_util.export_l2_excel(code)
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnEveryDayInit(self, client_id, request_id, data):
        try:
            inited_data.everyday_init()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnRefreshTradeData(self, client_id, request_id, data):
        try:
            sync_type = data["ctype"]
            if sync_type == "delegate_list":
                huaxin_trade_data_update.add_delegate_list()
            elif sync_type == "deal_list":
                huaxin_trade_data_update.add_deal_list()
            elif sync_type == "money":
                huaxin_trade_data_update.add_money_list()
            elif sync_type == "position_list":
                huaxin_trade_data_update.add_position_list()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeAttribute(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeTradeState(self, client_id, request_id, data):
        try:
            code = data["code"]
            state = trade_manager.CodesTradeStateManager().get_trade_state(code)
            result = {"code": 0, "data": {"state": state}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetEnvInfo(self, client_id, request_id, data):
        try:
            fdata = {}
            try:
                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                if date:
                    fdata["juejin"] = 1
            except Exception as e:
                fdata["juejin"] = 0
            fdata["kpl"] = {}
            # 获取开盘啦数据
            kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                         KPLDataType.INDUSTRY_RANK.value]
            for kpl_type in kpl_types:
                if kpl_type in KPLDataManager.kpl_data_update_info:
                    fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
            try:
                # 验证redis
                RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                fdata["redis"] = 1
            except:
                fdata["redis"] = 0
            try:
                # 验证mysql
                mysql_data.Mysqldb().select_one("select 1")
                fdata["mysql"] = 1
            except:
                fdata["mysql"] = 0
            try:
                # redis异步任务数量
                fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
            except:
                pass
            # 获取CPU与内存适用情况
            memory_info = psutil.virtual_memory()
            cpu_percent = psutil.cpu_percent(interval=1)
            fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
            result = {"code": 0, "data": fdata, "msg": ""}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
def run(pipe_trade, pipe_l1):
    # 执行一些初始化数据
    block_info.init()
    # 启动外部接口监听
    manager = outside_api_command_manager.ApiCommandManager()
    manager.init(middle_api_protocol.SERVER_HOST,
                 middle_api_protocol.SERVER_PORT,
                 OutsideApiCommandCallback())
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.set_pipe_trade(pipe_trade)
    # 监听l1那边传过来的代码