Administrator
8 天以前 b8ddd4d8026118fd13e01ba751b0f4cc2f0ae363
虚拟账户测试
1个文件已添加
4个文件已修改
493 ■■■■■ 已修改文件
huaxin_client/constant.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 76 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/virtual_trade_account_manager.py 364 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/constant.py
@@ -10,3 +10,5 @@
# 影子订单的量
SHADOW_ORDER_VOLUME = 100
ENABLE_VIRTUAL_ACCOUNT = True
huaxin_client/trade_client.py
@@ -6,7 +6,7 @@
import threading
import time
from huaxin_client import command_manager
from huaxin_client import command_manager, huaxin_sinfo_util
from huaxin_client import constant
import traderapi
from huaxin_client.client_network import SendResponseSkManager
@@ -14,7 +14,8 @@
# 正式账号
from log_module import async_log_util
from log_module.log import logger_local_huaxin_trade_debug as logger, logger_system, logger_trade, \
    logger_local_huaxin_trade_debug, printlog
    logger_local_huaxin_trade_debug, printlog, logger_debug, logger_local_huaxin_trade_deal
from trade import virtual_trade_account_manager
from utils import tool, socket_util, middle_api_protocol
########B类########
@@ -468,6 +469,8 @@
        self.__temp_position_list_dict = {}
        self.__temp_money_account_list_dict = {}
        self.call_back_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
        # 自己的订单号
        self.own_order_local_ids = set()
    def OnFrontConnected(self) -> "void":
        logger.info('Trader OnFrontConnected')
@@ -654,6 +657,10 @@
    def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void":
        try:
            if constant.ENABLE_VIRTUAL_ACCOUNT and not huaxin_sinfo_util.is_own_channel(pOrderField.SInfo):
                # 非自有渠道的单
                return
            async_log_util.info(logger_local_huaxin_trade_debug,
                                '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s] InsertTime[%s]'
                                % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID,
@@ -661,25 +668,37 @@
                                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                                   pOrderField.OrderStatus, pOrderField.InsertTime))
            order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                          "orderLocalID": pOrderField.OrderLocalID,
                          "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                          "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                          "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                          "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                          "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover,
                          "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded,
                          "orderStatus": pOrderField.OrderStatus,
                          "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                          "statusMsg": pOrderField.StatusMsg}
            # 如果全部成交完或者部成部撤需要拉取手续费
            try:
                # if order_data['orderStatus'] in {'4', '5'}:
                #     queue_query_order_detail.put_nowait(
                #         (order_data['securityID'], order_data['orderLocalID'], order_data['orderSysID']))
                if huaxin_sinfo_util.is_own_channel(pOrderField.SInfo):
                    self.own_order_local_ids.add(pOrderField.OrderLocalID)
                    async_log_util.info(logger_local_huaxin_trade_debug, f"自有订单号:{self.own_order_local_ids}")
                if order_data["orderLocalID"] in self.own_order_local_ids:
                    virtual_trade_account_manager.VirtualAccountOrderProcessUtil.set_order_status(order_data)
                    virtual_trade_account_manager.VirtualAccountPositionManager().set_order_status(order_data)
            except Exception as e:
                logger_debug.exception(e)
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown:
                pass
            #     if queue_trade_w_l2_r is not None:
            #         queue_trade_w_l2_r.put_nowait(
            #             json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID,
            #                                                           "volume": pOrderField.VolumeTotalOriginal}}).encode(
            #                 'utf-8'))
            else:
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
                              "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                              "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                              "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                              "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                              "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded,
                              "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                              "statusMsg": pOrderField.StatusMsg}
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data)
        except Exception as e:
            async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错")
@@ -688,11 +707,27 @@
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        try:
            if constant.ENABLE_VIRTUAL_ACCOUNT and pTradeField.OrderLocalID not in self.own_order_local_ids:
                # 非自有渠道的单
                return
            async_log_util.info(logger_local_huaxin_trade_debug,
                                'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]'
                                % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID,
                                   pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price,
                                   pTradeField.Volume))
            trade_data = {
                "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID,
                "securityID": pTradeField.SecurityID,
                "tradeID": pTradeField.TradeID, "orderLocalID": pTradeField.OrderLocalID, "price": pTradeField.Price,
                "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, "tradeTime": pTradeField.TradeTime
            }
            try:
                if trade_data["orderLocalID"] in self.own_order_local_ids:
                    virtual_trade_account_manager.VirtualAccountOrderProcessUtil.set_order_deal(trade_data)
                    virtual_trade_account_manager.VirtualAccountPositionManager().add_deal_info(trade_data)
                    async_log_util.info(logger_local_huaxin_trade_deal, f"{trade_data}")
            except Exception as e:
                logger_debug.exception(e)
        except:
            pass
@@ -784,6 +819,9 @@
                #        pOrderField.OrderRef, pOrderField.OrderSysID,
                #        pOrderField.VolumeTraded, pOrderField.OrderStatus, pOrderField.OrderSubmitStatus,
                #        pOrderField.StatusMsg))
                if huaxin_sinfo_util.is_own_channel(pOrderField.SInfo):
                    self.own_order_local_ids.add(pOrderField.OrderLocalID)
                self.__temp_order_list_dict[nRequestID].append(
                    {"securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID,
                     "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
@@ -791,7 +829,7 @@
                     "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                     "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                     "turnover": pOrderField.Turnover, "orderRef": pOrderField.OrderRef,
                     "volume": pOrderField.VolumeTotalOriginal, "sinfo":pOrderField.SInfo,
                     "volume": pOrderField.VolumeTotalOriginal, "sinfo": pOrderField.SInfo,
                     "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                     "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg})
            else:
log_module/log.py
@@ -214,6 +214,12 @@
                   filter=lambda record: record["extra"].get("name") == "profile",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("virtual_account", "virtual_account_money_records"),
                   filter=lambda record: record["extra"].get("name") == "virtual_account_money_records",
                   rotation="00:00", compression="zip", enqueue=True)
        ################################华鑫日志################################
        logger.add(self.get_hx_path("l2", "transaction"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction",
@@ -309,6 +315,10 @@
        logger.add(self.get_local_huaxin_path("l2", "l2_buy_no"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_buy_no",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("trade", "deal"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_trade_deal",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
@@ -414,6 +424,8 @@
logger_mysql_debug = __mylogger.get_logger("mysql_debug")
logger_virtual_account_money_records= __mylogger.get_logger("virtual_account_money_records")
# -------------------------------华鑫日志---------------------------------
hx_logger_l2_orderdetail = __mylogger.get_logger("hx_l2_orderdetail")
hx_logger_l2_transaction = __mylogger.get_logger("hx_l2_transaction")
@@ -445,6 +457,7 @@
logger_local_huaxin_l2_buy_no = __mylogger.get_logger("local_huaxin_l2_buy_no")
logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info")
logger_local_huaxin_l2_special_volume = __mylogger.get_logger("local_huaxin_l2_special_volume")
logger_local_huaxin_trade_deal = __mylogger.get_logger("local_huaxin_trade_deal")
def close_print():
log_module/log_export.py
@@ -328,3 +328,41 @@
                    except:
                        pass
    return fdatas
def load_virtual_trade_account(date=tool.get_now_date_str()):
    """
    加载虚拟交易数据
    :param date:
    :return:
    """
    path = f"{constant.get_path_prefix()}/sell_logs/gp/virtual_account/virtual_account_money_records.{date}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            if lines:
                for line in lines:
                    time_str = __get_async_log_time(line)
                    data = line[line.find("]") + 1:].strip()
                    fdatas.append((time_str, eval(data)))
    return fdatas
def load_deal_list(date=tool.get_now_date_str()):
    """
    加载虚拟交易数据
    :param date:
    :return:
    """
    path = f"{constant.get_path_prefix()}/sell_logs/huaxin_local/trade/deal.{date}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            if lines:
                for line in lines:
                    # time_str = __get_async_log_time(line)
                    data = line[line.find("]") + 1:].strip()
                    fdatas.append(eval(data))
    return fdatas
trade/virtual_trade_account_manager.py
New file
@@ -0,0 +1,364 @@
"""
虚拟账户管理
"""
from log_module import async_log_util, log_export
from log_module.log import logger_virtual_account_money_records
from utils import tool, huaxin_util
EVENT_CHARGE = "charge"  # 充值
EVENT_BUY_DEAL = "buy_deal"  # 买成交
EVENT_SELL_DEAL = "sell_deal"  # 卖成交
EVENT_BALANCE_ACCOUNT = "balance_account"  # 平账
EVENT_FEE = "fee"  # 手续费
@tool.singleton
class VirtualAccountMoneyManager:
    """
    虚拟账户资金管理
    """
    def __init__(self):
        self.all_money = 0
        self.available_money = 0
        # 冻结金额记录
        self.frozen_money_record_dict = {}
        self.__load_data()
    def __load_data(self):
        """
        加载数据,从今天及以前的日志中读取数据
        :return:
        """
        all_money = None
        available_money = None
        now_day = tool.get_now_date_str()
        # 加载今日之前的数据
        for i in range(1, 60):
            day = tool.date_sub(now_day, i)
            results = log_export.load_virtual_trade_account(day)
            if not results:
                continue
            for r in results:
                if r[1]["type"] == "资金变化":
                    all_money = r[1]["data"]["all_money"]
                    available_money = r[1]["data"]["available_money"]
            if all_money:
                if day != now_day:
                    # 非今日
                    available_money = all_money
                break
        if all_money is not None:
            self.all_money = all_money
            self.available_money = available_money
        # 读取今日日志
        results = log_export.load_virtual_trade_account(now_day)
        if results:
            for r in results:
                _type = r[1]["type"]
                _data = r[1]["data"]
                if _type == '资金变化':
                    self.add_money(_data["amount"], _data["event"], _data["unique_id"], enable_log=False)
                elif _type == '冻结金额':
                    self.frozen_money(_data["amount"], _data["unique_id"], enable_log=False)
                elif _type == '解冻金额':
                    self.un_frozen_money(_data["amount"], _data["unique_id"], enable_log=False)
                elif _type == '消耗冻结金额':
                    self.consume_frozen_money(_data["money"], _data["unique_id"], enable_log=False)
    def __format_log(self, type, data):
        fdata = {"type": type, "data": data}
        return f"{fdata}"
    def add_money(self, money, event, unique_id, enable_log=True):
        """
        添加资金
        :param enable_log:
        :param unique_id: 唯一ID
        :param money: 金额
        :param event: 事件
        :return:
        """
        self.all_money += money
        if event != EVENT_BUY_DEAL:
            # 买入成交不改变可用资金
            self.available_money += money
        if enable_log:
            async_log_util.info(logger_virtual_account_money_records, self.__format_log("资金变化",
                                                                                        {"event": event,
                                                                                         'amount': money,
                                                                                         'all_money': self.all_money,
                                                                                         'available_money': self.available_money,
                                                                                         "unique_id": unique_id}))
    def frozen_money(self, money, unique_id, enable_log=True):
        """
        冻结金额:挂买要冻结
        :param enable_log:
        :param money: 金额
        :param unique_id: 唯一索引
        :return:
        """
        if unique_id in self.frozen_money_record_dict:
            # 已经冻结
            return
        self.available_money -= money
        self.frozen_money_record_dict[unique_id] = money
        if enable_log:
            async_log_util.info(logger_virtual_account_money_records, self.__format_log("冻结金额",
                                                                                        {"unique_id": unique_id,
                                                                                         'amount': money,
                                                                                         'available_money': self.available_money}))
    def un_frozen_money(self, money, unique_id, enable_log=True):
        """
        解冻金额,买入撤单情况下可解冻
        :param enable_log:
        :param money: 金额
        :param unique_id:
        :return:
        """
        self.available_money += money
        if unique_id in self.frozen_money_record_dict:
            self.frozen_money_record_dict.pop(unique_id)
        if enable_log:
            async_log_util.info(logger_virtual_account_money_records, self.__format_log("解冻金额",
                                                                                        {"unique_id": unique_id,
                                                                                         'amount': money,
                                                                                         'available_money': self.available_money}))
    def consume_frozen_money(self, money, unique_id, enable_log=True):
        """
        消耗冻结金额:买入成交会消耗
        :param money: 消耗金额
        :param enable_log:
        :param unique_id:
        :return:
        """
        if unique_id in self.frozen_money_record_dict:
            self.frozen_money_record_dict[unique_id] -= money
            if self.frozen_money_record_dict[unique_id] == 0:
                self.frozen_money_record_dict.pop(unique_id)
            if enable_log:
                async_log_util.info(logger_virtual_account_money_records, self.__format_log("消耗冻结金额",
                                                                                            {"unique_id": unique_id,
                                                                                             "money": money,
                                                                                             'available_money': self.available_money}))
    def get_available_money(self):
        """
        获取可用金额
        :return:
        """
        return self.available_money
    def get_total_money(self):
        """
        获取总共金额
        :return:
        """
        return self.all_money
class VirtualAccountOrderProcessUtil:
    """
    虚拟账户订单处理帮助类
    """
    @classmethod
    def set_order_status(cls, order_info):
        """
        设置订单状态
        :param order_info:
        :return:
        """
        status = order_info["orderStatus"]
        direction = int(order_info["direction"])
        orderLocalID = order_info["orderLocalID"]
        volume = order_info["volume"]
        volumeTraded = order_info["volumeTraded"]
        limitPrice = order_info["limitPrice"]
        # 挂单/撤单 需要记录修改资金
        if status == huaxin_util.TORA_TSTP_OST_Cached:
            # 预埋
            if direction == huaxin_util.TORA_TSTP_D_Buy:
                VirtualAccountMoneyManager().frozen_money(round(round(float(limitPrice), 2) * volume, 2), orderLocalID)
        elif status == huaxin_util.TORA_TSTP_OST_Unknown:
            # 未知
            if direction == huaxin_util.TORA_TSTP_D_Buy:
                VirtualAccountMoneyManager().frozen_money(round(round(float(limitPrice), 2) * volume, 2), orderLocalID)
        elif status == huaxin_util.TORA_TSTP_OST_Accepted:
            # 交易所已接收
            if direction == huaxin_util.TORA_TSTP_D_Buy:
                VirtualAccountMoneyManager().frozen_money(round(round(float(limitPrice), 2) * volume, 2), orderLocalID)
        elif status == huaxin_util.TORA_TSTP_OST_PartTradeCanceled:
            # 部成部撤
            VirtualAccountMoneyManager().un_frozen_money(
                round(round(float(limitPrice), 2) * (volume - volumeTraded), 2), orderLocalID)
        elif status == huaxin_util.TORA_TSTP_OST_AllCanceled:
            # 全部撤单
            VirtualAccountMoneyManager().un_frozen_money(round(round(float(limitPrice), 2) * volume, 2), orderLocalID)
        elif status == huaxin_util.TORA_TSTP_OST_Rejected:
            # 未知
            pass
    @classmethod
    def set_order_deal(cls, deal_info):
        """
        订单成交
        :param deal_info:
        :return:
        """
        direction = deal_info["direction"]
        tradeID = deal_info["tradeID"]
        orderLocalID = deal_info["orderLocalID"]
        volume = deal_info["volume"]
        price = round(float(deal_info["price"]), 2)
        money = round(volume * price, 2)
        if int(direction) == huaxin_util.TORA_TSTP_D_Buy:
            # 买入成交
            VirtualAccountMoneyManager().consume_frozen_money(money, orderLocalID)
            VirtualAccountMoneyManager().add_money(0 - money, event=EVENT_BUY_DEAL, unique_id=tradeID)
        else:
            # 卖出成交
            VirtualAccountMoneyManager().add_money(money, event=EVENT_SELL_DEAL, unique_id=tradeID)
    @classmethod
    def set_order_detail_info(cls, order_info):
        orderSysID, totalFee = order_info['orderSysID'], order_info['totalFee']
        VirtualAccountMoneyManager().add_money(0 - totalFee, EVENT_FEE, orderSysID)
@tool.singleton
class VirtualAccountPositionManager:
    """
    虚拟账户持仓查询
    """
    def __init__(self):
        # {代码:(总量, 历史量, 可用量, 成本金额)}
        self.position_dict = {}
        self.today_delegate_local_order_ids = set()
        self.trade_id_set = set()
        self.__load_data()
    def __load_data(self):
        # 查询近1个月的成交
        now_day = tool.get_now_date_str()
        days = []
        for i in range(0, 30):
            days.append(tool.date_sub(now_day, i))
        days.sort()
        # 统计持仓代码与数量
        position_dict = {}
        for day in days:
            datas = log_export.load_deal_list(day)
            if not datas:
                continue
            for d in datas:
                code = d['securityID']
                volume = d['volume']
                tradeId = d['tradeID']
                price = d['price']
                if tradeId in self.trade_id_set:
                    continue
                self.trade_id_set.add(tradeId)
                if code not in position_dict:
                    position_dict[code] = [0, 0, 0, 0]
                if d['direction'] == '0':
                    position_dict[code][0] += volume
                    if day != now_day:
                        position_dict[code][1] += volume
                        position_dict[code][2] += volume
                    position_dict[code][3] += round(volume * price, 2)
                elif d['direction'] == '1':
                    position_dict[code][0] -= volume
                    if day != now_day:
                        position_dict[code][1] -= volume
                    position_dict[code][2] -= volume
                    position_dict[code][3] -= round(volume * price, 2)
        self.position_dict = {c: v for c, v in position_dict.items() if v[0] > 0}
    def add_deal_info(self, d):
        """
        设置成交信息
        :param d:
        :return:
        """
        code = d['securityID']
        volume = d['volume']
        tradeId = d['tradeID']
        price = d['price']
        if tradeId in self.trade_id_set:
            return
        self.trade_id_set.add(tradeId)
        if code not in self.position_dict:
            self.position_dict[code] = [0, 0, 0, 0]
        # 当日可用量不在成交里面改变
        if d['direction'] == '0':
            self.position_dict[code][0] += volume
            self.position_dict[code][3] += round(volume * price, 2)
        elif d['direction'] == '1':
            self.position_dict[code][0] -= volume
            self.position_dict[code][3] -= round(volume * price, 2)
    def set_order_status(self, order_info):
        """
        设置订单状态,用于处理可用持仓
        :param order_info:
        :return:
        """
        status = order_info["orderStatus"]
        direction = int(order_info["direction"])
        orderLocalID = order_info["orderLocalID"]
        volume = order_info["volume"]
        volumeTraded = order_info["volumeTraded"]
        limitPrice = order_info["limitPrice"]
        code = order_info['securityID']
        if direction == huaxin_util.TORA_TSTP_D_Buy:
            return
        if code not in self.position_dict:
            return
        # 挂单/撤单 需要记录修改资金
        if status == huaxin_util.TORA_TSTP_OST_Cached or status == huaxin_util.TORA_TSTP_OST_Unknown or status == huaxin_util.TORA_TSTP_OST_Accepted:
            # 预埋/未知/挂单
            if orderLocalID in self.today_delegate_local_order_ids:
                return
            self.today_delegate_local_order_ids.add(orderLocalID)
            self.position_dict[code][2] -= volume
        elif status == huaxin_util.TORA_TSTP_OST_PartTradeCanceled:
            # 部成部撤
            self.position_dict[code][2] += (volume - volumeTraded)
        elif status == huaxin_util.TORA_TSTP_OST_AllCanceled:
            # 全部撤单
            self.position_dict[code][2] += volume
    def get_position_dict(self):
        """
        获取持仓
        :return:
        """
        return self.position_dict
    def get_position_of_code(self, code):
        """
        获取持仓
        :return:(总持仓, 昨日持仓, 可用量, 持仓金额)
        """
        return self.position_dict.get(code)
if __name__ == "__main__":
    print(VirtualAccountPositionManager().get_position_dict())
    print(VirtualAccountMoneyManager().get_total_money(),
          VirtualAccountMoneyManager().get_available_money())  #.add_money(20000, EVENT_CHARGE, '')
    async_log_util.run_sync()