Administrator
2024-03-18 5c5a34e67334e53e3ac9da5adc85ccc8ec0529c4
删除多余的策略-L2的通信方式
5个文件已修改
72 ■■■■ 已修改文件
huaxin_client/l2_client.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -589,9 +589,7 @@
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, data_callbacks:list) -> None:
def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
@@ -601,11 +599,8 @@
            t1.start()
        # 初始化
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue, data_callback_distribute_manager)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
huaxin_client/l2_data_manager.py
@@ -2,7 +2,6 @@
import json
import logging
import marshal
import multiprocessing
import queue
import threading
import time
@@ -13,7 +12,7 @@
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -34,14 +33,7 @@
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue,
                 data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
@@ -133,10 +125,6 @@
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.order_queue_distribute_manager.get_distributed_queue(code):
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
@@ -157,8 +145,6 @@
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
@@ -179,8 +165,6 @@
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        while True:
            try:
                while len(q) > 0:
@@ -217,8 +201,6 @@
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        temp_list = []
        while True:
            try:
main.py
@@ -28,8 +28,7 @@
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -51,8 +50,7 @@
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_)
@@ -127,16 +125,6 @@
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 创建L2通信队列
        order_queues = []
        transaction_queues = []
        market_queue = multiprocessing.Queue()
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            transaction_queues.append(multiprocessing.Queue())
        # 此处将L2的进程与策略进程合并
        # L2
        # l2Process = multiprocessing.Process(
@@ -145,13 +133,11 @@
        # l2Process.start()
        # 将L2的进程改为进程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, order_queues, transaction_queues, market_queue,
            huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
            queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r)
        # 将tradeServer作为主进程
third_data/kpl_data_manager.py
@@ -417,7 +417,7 @@
    def get_limit_up():
        while True:
            if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")) or True:
            if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                try:
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
trade/huaxin/huaxin_trade_server.py
@@ -36,7 +36,6 @@
from l2.code_price_manager import Buy1PriceManager
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_listen_manager import L2DataListenManager
from l2.l2_data_manager import TradePointManager
from l2.l2_data_util import L2DataUtil
from l2.l2_sell_manager import L2MarketSellManager
@@ -1199,9 +1198,9 @@
    def OnGetActiveListenCount(self, client_id, request_id):
        try:
            order = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            order = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
@@ -1662,7 +1661,6 @@
my_l2_data_callback = MyL2DataCallback()
my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)]
my_trade_response = MyTradeResponse()
l2DataListenManager: L2DataListenManager = None
# 做一些初始化的操作
@@ -1683,9 +1681,7 @@
    threading.Thread(target=run_pending, daemon=True).start()
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r):
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_w_strategy_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1697,11 +1693,6 @@
                     middle_api_protocol.SERVER_PORT,
                     OutsideApiCommandCallback(), common_client_count=50)
        manager.run(blocking=False)
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,