Administrator
2024-03-17 15199f8e93fe48e6261c99eadf6673d788db3a80
L2进程与策略进程合并
8个文件已修改
212 ■■■■■ 已修改文件
huaxin_client/code_queue_distribute_manager.py 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/code_queue_distribute_manager.py
@@ -43,3 +43,48 @@
    # 获取空闲的位置数量
    def get_free_queue_count(self):
        return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys())
# 回调对象分配
class CodeDataCallbackDistributeManager:
    # queue_list
    def __init__(self, callback_list: list):
        flist = []
        for callback in callback_list:
            flist.append((id(callback), callback))
        self.callback_list = flist
        self.distibuted_code_callback_dict = {}
    # 获取可用的队列
    def get_available_callback(self):
        distibuted_callbacks_ids = set()
        for code in self.distibuted_code_callback_dict:
            distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
        for callback in self.callback_list:
            if callback[0] not in distibuted_callbacks_ids:
                return callback
        return None
    # 为代码分配队列
    def distribute_callback(self, code):
        if code in self.distibuted_code_callback_dict:
            return self.distibuted_code_callback_dict.get(code)
        callback_info = self.get_available_callback()
        if not callback_info:
            raise Exception("无可用的回调对象")
        self.distibuted_code_callback_dict[code] = callback_info
        return callback_info
    # 获取代码分配的队列
    def get_distributed_callback(self, code):
        if code in self.distibuted_code_callback_dict:
            return self.distibuted_code_callback_dict.get(code)[1]
        else:
            return None
    def release_distribute_callback(self, code):
        if code in self.distibuted_code_callback_dict:
            self.distibuted_code_callback_dict.pop(code)
    # 获取空闲的位置数量
    def get_free_queue_count(self):
        return len(self.callback_list) - len(self.distibuted_code_callback_dict.keys())
huaxin_client/l2_client.py
@@ -13,9 +13,10 @@
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
@@ -546,7 +547,7 @@
def test_add_codes(queue_r):
    time.sleep(5)
    time.sleep(10)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
@@ -565,8 +566,9 @@
                  ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    time.sleep(1)
    time.sleep(10)
    while True:
        try:
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
@@ -575,51 +577,20 @@
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
        time.sleep(0.1)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        except Exception as e:
            logging.exception(e)
        finally:
        time.sleep(10)
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
    #     #     if type(value) == bytes:
    #     #         value = value.decode("utf-8")
    #     #     data = json.loads(value)
    #     #     _type = data["type"]
    #     #     if _type == "listen_volume":
    #     #         volume = data["data"]["volume"]
    #     #         code = data["data"]["code"]
    #     #         spi.set_code_special_watch_volume(code, volume)
    #     #     elif _type == "l2_cmd":
    #     #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    #
    #     demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
    #                   ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
    #
    #     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    #     time.sleep(1)
    #
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    #     queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
    #     time.sleep(0.1)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
@@ -632,12 +603,13 @@
        # 初始化
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts)
                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        # 测试
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
huaxin_client/l2_data_manager.py
@@ -6,12 +6,15 @@
import queue
import threading
import time
import constant
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -34,13 +37,17 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
@@ -119,7 +126,11 @@
    def add_market_data(self, data):
        # 加入上传队列
        self.market_data_queue.put_nowait(data)
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        callback = self.data_callback_distribute_manager.get_distributed_callback(code)
        if callback:
            callback.OnMarketData(code, data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
@@ -127,6 +138,9 @@
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
@@ -135,6 +149,7 @@
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        if not constant.is_windows():
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
@@ -151,6 +166,7 @@
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
@@ -187,8 +203,10 @@
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time())
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
@@ -216,7 +234,8 @@
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
@@ -254,6 +273,8 @@
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        if constant.is_windows():
            return
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -86,16 +86,13 @@
    return None
__re_compute_threading_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# L2数据列表
# 范围(位置信息,可信的类型)
def get_l2_place_order_position(code, limit_up_price, datas):
    order_info = get_order_info(code)
    if not order_info:
        # 暂无下单信息
        return None
        return None, order_info
    price = order_info[0]
    volume = order_info[1]
    exec_data = order_info[2]
@@ -178,18 +175,12 @@
        if code in _place_order_info_dict:
            _place_order_info_dict.pop(code)
        __place_order_position[code] = real_place_index_info[0]
        try:
            __re_compute_threading_pool.submit(__recompute_for_slow_time, code, order_info, real_place_index_info[0])
        except:
            pass
        return real_place_index_info[0]
        return real_place_index_info[0], order_info
    else:
        return None
        return None, order_info
# TODO: 需要测试该方法
# 因为L2数据慢的问题而重新计算
def __recompute_for_slow_time(code, order_info, real_place_index):
def recompute_for_slow_time(code, order_info, real_place_index):
    try:
        # 计算当前时间是否满足时间条件
        now_time_str = tool.get_now_time_str()
@@ -283,15 +274,16 @@
                            findexes_info.sort(key=lambda x: x[0] - x[1])
                            # 获取成功
                            real_place_index = findexes_info[0][1]
                            async_log_util.info(logger_real_place_order_position,
                                                f"真实下单位矫正:{code}-{real_place_index}  下单数据:{order_info}")
                            return real_place_index
                finally:
                    pass
        else:
            return
            return None
        pass
    except Exception as e:
        logger_real_place_order_position.exception(e)
    return None
# 获取真实下单位置
l2/l2_data_listen_manager.py
@@ -8,6 +8,7 @@
import zmq
import constant
from log_module import async_log_util
from log_module.log import logger_debug
@@ -144,6 +145,7 @@
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        if not constant.is_windows():
        self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
l2/l2_data_manager_new.py
@@ -29,11 +29,13 @@
    local_latest_datas, local_today_canceled_buyno_map
import l2.l2_data_util
from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug, \
    logger_l2_not_buy_reasons
    logger_l2_not_buy_reasons, logger_real_place_order_position
from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager
from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager, MarketSituationManager
import concurrent.futures
class L2DataManager:
@@ -242,6 +244,7 @@
    __PlaceOrderCountManager = PlaceOrderCountManager()
    __CodeNatureRecordManager = code_nature_analyse.CodeNatureRecordManager()
    __MarketSituationManager = MarketSituationManager()
    __re_compute_threading_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    # 获取代码评分
    @classmethod
@@ -334,6 +337,19 @@
            origin_datas.clear()
    @classmethod
    def __recompute_real_order_index(cls, code, pre_real_order_index, order_info):
        real_order_index = huaxin_delegate_postion_manager.recompute_for_slow_time(code, order_info,
                                                                                   pre_real_order_index)
        if real_order_index:
            exec_data = order_info[2]
            order_begin_pos = cls.__get_order_begin_pos(
                code)
            if order_begin_pos and order_begin_pos.buy_exec_index == exec_data["index"]:
                cls.set_real_place_order_index(code, real_order_index, order_begin_pos)
                async_log_util.info(logger_real_place_order_position,
                                    f"真实下单位矫正:{code}-{real_order_index}  下单数据:{order_info}")
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        now_time_str = tool.get_now_time_str()
        if len(add_datas) > 0:
@@ -355,12 +371,18 @@
                        #     cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index)
                    else:
                        # 获取下单位置
                        place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float(
                        place_order_index, order_info = huaxin_delegate_postion_manager.get_l2_place_order_position(
                            code, float(
                            gpcode_manager.get_limit_up_price(code)), add_datas)
                        if place_order_index:
                            order_begin_pos = cls.__get_order_begin_pos(
                                code)
                            cls.set_real_place_order_index(code, place_order_index, order_begin_pos)
                            try:
                                cls.__re_compute_threading_pool.submit(
                                    cls.__recompute_real_order_index, code, place_order_index, order_info)
                            except:
                                pass
                            async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
                except:
                    async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错")
@@ -847,11 +869,11 @@
        if cls.__PauseBuyCodesManager.is_in_cache(code):
            return False, True, f"该代码被暂停交易"
        if int(tool.get_now_time_str().replace(":", "")) >= 145700:
        now_time_int = int(tool.get_now_time_str().replace(":", ""))
        if  now_time_int>= 145700:
            return False, True, f"14:57后不能交易"
        if  130100>=now_time_int>= 112900:
            return False, True, f"11:29:00-13:01:00不能交易"
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -868,8 +890,6 @@
        average_rate = cls.__Buy1PriceManager.get_average_rate(code)
        if average_rate and average_rate <= 0.01 and tool.trade_time_sub(tool.get_now_time_str(), "10:30:00") >= 0:
            return False, True, f"均价涨幅({average_rate})小于1%"
        total_data = local_today_datas.get(code)
        order_begin_pos = cls.__get_order_begin_pos(
@@ -1665,8 +1685,6 @@
                        max_space_time_ms = 9*1000
                        break
        if not threshold_num:
            # 目标手数
            threshold_num = round(threshold_money / (limit_up_price * 100))
@@ -1675,12 +1693,10 @@
        # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        # buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        # 如果大单含有率大于50%,则时间囊括范围提高到3s
        if max_num_set and origin_count:
main.py
@@ -140,11 +140,16 @@
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_ipc_hosts.append(f"ipc://l2order{i}.ipc")
        # 此处将L2的进程与策略进程合并
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts))
        l2Process.start()
        # l2Process = multiprocessing.Process(
        #     target=huaxin_client.l2_client.run,
        #     args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback))
        # l2Process.start()
        # 将L2的进程改为进程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,
            huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
@@ -154,7 +159,7 @@
        # 将tradeServer作为主进程
        l1Process.join()
        l2Process.join()
        # l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
trade/huaxin/huaxin_trade_server.py
@@ -1658,6 +1658,7 @@
# 回调
my_l2_data_callback = MyL2DataCallback()
my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)]
my_trade_response = MyTradeResponse()
l2DataListenManager: L2DataListenManager = None