Administrator
2023-08-21 ef0c984a8b7f9e218903222ad01736c441af7f0e
华鑫本地/系统订单号管理独立
7个文件已修改
238 ■■■■■ 已修改文件
test/l2_trade_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_record_manager.py 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/huaxin_util.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py
@@ -147,7 +147,7 @@
        current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2))
        pss_server, pss_strategy = multiprocessing.Pipe()
        huaxin_trade_api.set_pipe_trade(pss_server)
        huaxin_trade_api.run_pipe_trade(pss_server)
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
trade/huaxin/huaxin_trade_api.py
@@ -9,7 +9,8 @@
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback
from trade.huaxin import huaxin_trade_data_update
from utils import socket_util
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from utils import socket_util, huaxin_util
# 外部传入的交易队列
pipe_trade = None
@@ -60,7 +61,7 @@
# 设置交易通信队列
def set_pipe_trade(pipe_trade_):
def run_pipe_trade(pipe_trade_):
    global pipe_trade
    pipe_trade = pipe_trade_
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
@@ -252,11 +253,34 @@
    return None
__TradeOrderIdManager = TradeOrderIdManager()
def set_response(request_id, response):
    if request_id:
        hx_logger_trade_loop.info("请求响应: request_id-{}", request_id)
        # 主动触发
        __request_response_dict[request_id] = response
        # 设置替换本地订单号
        if response.get('code') == 0:
            data = response['data']
            # 处理下单
            if data.get('orderStatus') == huaxin_util.TORA_TSTP_OST_Accepted:
                localOrderId = data.get('localOrderId')
                orderSysID = data.get('orderSysID')
                accountID = data.get('accountID')
                code = data.get('securityId')
                if localOrderId and orderSysID:
                    # 移除本地单号,添加系统单号
                    __TradeOrderIdManager.add_order_id(code, accountID, orderSysID)
                    __TradeOrderIdManager.remove_local_order_id(code,localOrderId)
    else:
        # 被动触发
        pass
@@ -294,7 +318,7 @@
def cancel_order(direction, code, orderSysID, localOrderID=None, blocking=False, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}"
        sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                            "direction": direction,
trade/huaxin/huaxin_trade_data_update.py
@@ -9,6 +9,7 @@
from log_module.log import hx_logger_trade_debug
from trade import trade_huaxin, trade_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from utils import huaxin_util
trade_data_request_queue = queue.Queue()
@@ -38,7 +39,11 @@
                                        trade_huaxin.order_success(d['securityID'],
                                                                   d['accountID'],
                                                                   d['orderSysID'])
                                    elif huaxin_util.is_canceled(d["orderStatus"]) or huaxin_util.is_deal(d["orderStatus"]):
                                        # 已经撤单/已经成交,需要处理临时保存的系统订单号
                                        TradeOrderIdManager().remove_order_id(d['securityID'],
                                                                                           d['accountID'],
                                                                                           d['orderSysID'])
                                if codes:
                                    try:
                                        trade_manager.process_trade_delegate_data([{"code": c} for c in codes])
trade/huaxin/huaxin_trade_record_manager.py
@@ -6,7 +6,7 @@
import datetime
import json
from db.redis_manager_delegate import RedisUtils
from db.redis_manager_delegate import RedisUtils, RedisManager
from utils import tool
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from third_data.history_k_data_util import HistoryKDatasUtils
@@ -302,6 +302,96 @@
        return json.loads(val)
# 交易订单号管理
class TradeOrderIdManager:
    __db = 2
    __redisManager = RedisManager(2)
    __instance = None
    __huaxin_order_id_cache = {}
    __huaxin_local_order_id_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeOrderIdManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "huaxin_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
            keys = RedisUtils.keys(__redis, "huaxin_local_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_local_order_id_cache, code, vals)
        finally:
            RedisUtils.realse(__redis)
        # 添加订单ID
    def add_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code not in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code] = set()
        self.__huaxin_local_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_local_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_local_order_id-{code}", tool.get_expire())
        # 删除订单ID
    def remove_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_local_order_id-{code}", val)
        # 查询所有的订单号
    def list_local_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_local_order_id-{code}")
    def list_local_order_ids_cache(self, code):
        if code in self.__huaxin_local_order_id_cache:
            return self.__huaxin_local_order_id_cache[code]
        return set()
    # 添加订单ID
    def add_order_id(self, code, account_id, sys_order_id):
        val = json.dumps((account_id, sys_order_id))
        if code not in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code] = set()
        self.__huaxin_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_order_id-{code}", tool.get_expire())
    # 删除订单ID
    def remove_order_id(self, code, account_id, order_id):
        val = json.dumps((account_id, order_id))
        if code in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_order_id-{code}", val)
    # 查询所有的订单号
    def list_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_id-{code}")
    def list_order_ids_cache(self, code):
        if code in self.__huaxin_order_id_cache:
            return self.__huaxin_order_id_cache[code]
        return set()
if __name__ == "__main__":
    results = DelegateRecordManager.list_by_day('20230704', '1970-01-01')
    print(results)
trade/huaxin/trade_server.py
@@ -666,12 +666,11 @@
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.set_pipe_trade(pipe_trade)
    huaxin_trade_api.run_pipe_trade(pipe_trade)
    # 监听l1那边传过来的代码
    t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
    t1.start()
    print("create TradeServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
trade/trade_huaxin.py
@@ -6,103 +6,13 @@
import time
import constant
from db.redis_manager_delegate import RedisManager, RedisUtils
from log_module.log import logger_juejin_trade, hx_logger_trade_debug
from trade.huaxin import huaxin_trade_api
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from utils import tool, huaxin_util
from l2 import huaxin
__context_dict = {}
# 交易订单号管理
class TradeOrderIdManager:
    __db = 2
    __redisManager = RedisManager(2)
    __instance = None
    __huaxin_order_id_cache = {}
    __huaxin_local_order_id_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeOrderIdManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "huaxin_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
            keys = RedisUtils.keys(__redis, "huaxin_local_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_local_order_id_cache, code, vals)
        finally:
            RedisUtils.realse(__redis)
        # 添加订单ID
    def add_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code not in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code] = set()
        self.__huaxin_local_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_local_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_local_order_id-{code}", tool.get_expire())
        # 删除订单ID
    def remove_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_local_order_id-{code}", val)
        # 查询所有的订单号
    def list_local_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_local_order_id-{code}")
    def list_local_order_ids_cache(self, code):
        if code in self.__huaxin_local_order_id_cache:
            return self.__huaxin_local_order_id_cache[code]
        return set()
    # 添加订单ID
    def add_order_id(self, code, account_id, sys_order_id):
        val = json.dumps((account_id, sys_order_id))
        if code not in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code] = set()
        self.__huaxin_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_order_id-{code}", tool.get_expire())
    # 删除订单ID
    def remove_order_id(self, code, account_id, order_id):
        val = json.dumps((account_id, order_id))
        if code in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_order_id-{code}", val)
    # 查询所有的订单号
    def list_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_id-{code}")
    def list_order_ids_cache(self, code):
        if code in self.__huaxin_order_id_cache:
            return self.__huaxin_order_id_cache[code]
        return set()
__TradeOrderIdManager = TradeOrderIdManager()
utils/huaxin_util.py
@@ -36,3 +36,15 @@
    return False
# 是否已经撤单
def is_canceled(state):
    if state == TORA_TSTP_OST_AllCanceled or state == TORA_TSTP_OST_PartTradeCanceled:
        return True
    return False
# 是否已经成交
def is_deal(state):
    if state == TORA_TSTP_OST_PartTraded or state == TORA_TSTP_OST_AllTraded or state == TORA_TSTP_OST_PartTradeCanceled:
        return True
    return False