Administrator
2023-08-30 87a68117b9957219f17dc7830cb2b33b88a9d1d8
L2进程与策略进程合并
1个文件已删除
5个文件已修改
1个文件已添加
228 ■■■■■ 已修改文件
huaxin_client/l2_client.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_transaction_protocol.py 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_transform_protocol.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 62 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -6,7 +6,7 @@
import threading
import time
from huaxin_client import command_manager, l2_data_transaction_protocol
from huaxin_client import command_manager, l2_data_transform_protocol
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
@@ -127,7 +127,7 @@
        for c in del_codes:
            l2_data_manager.target_codes.discard(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c, pipe_strategy)
            l2_data_manager.run_upload_task(c, l2_data_callback)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
@@ -565,8 +565,6 @@
                data = json.loads(val)
                if data["data"]["type"] == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
                else:
                    l2_data_transaction_protocol.set_write_rece_data(data)
        except Exception as e:
            logging.exception(e)
@@ -574,7 +572,7 @@
pipe_strategy = None
def run(pipe_trade, _pipe_strategy):
def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack)->None:
    logger_system.info("L2进程ID:{}", os.getpid())
    log.close_print()
    if pipe_trade is not None:
@@ -586,10 +584,14 @@
        t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
        t1.start()
    __init_l2()
    global l2_data_callback
    l2_data_callback = _l2_data_callback
    l2_data_manager.run_upload_common()
    l2_data_manager.run_upload_trading_canceled()
    l2_data_manager.run_log()
    # l2_data_manager.run_test(_pipe_strategy)
    l2_data_manager.run_test(l2_data_callback)
    global l2CommandManager
    l2CommandManager = command_manager.L2CommandManager()
    l2CommandManager.init(MyL2ActionCallback())
huaxin_client/l2_data_manager.py
@@ -7,11 +7,12 @@
import random
import threading
import time
from huaxin_client import socket_util, l2_data_transaction_protocol
from huaxin_client import socket_util, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug
@@ -161,13 +162,9 @@
# 循环读取上传数据
def __run_upload_order(code, pipe):
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code)
    # with contextlib.closing(
    #         mmap.mmap(-1, 1000 * 100, tag,
    #                   access=mmap.ACCESS_WRITE)) as _mmap:
    if True:
        while True:
            # print("order task")
@@ -182,7 +179,8 @@
                    udatas.append(temp)
                if udatas:
                    start_time = time.time()
                    upload_data(code, "l2_order", udatas)
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code,  udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
@@ -255,13 +253,13 @@
# 运行上传任务
def run_upload_task(code, pipe_strategy):
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
    # 如果最近的活动时间小于2s就不需要运行
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
@@ -287,12 +285,12 @@
    t.start()
def __test(pipe_strategy):
def __test(_l2_data_callback):
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
    t.start()
    while True:
        try:
@@ -303,15 +301,10 @@
            pass
def run_test(pipe_strage):
    t = threading.Thread(target=lambda: __test(pipe_strage), daemon=True)
def run_test(_l2_data_callback):
    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
    t.start()
def test():
    # upload_data("000798", "trading_order_canceled", 30997688, new_sk=True)
    code = "000333"
    tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code)
    with contextlib.closing(
            mmap.mmap(-1, 1000 * 100, tag,
                      access=mmap.ACCESS_WRITE)) as _mmap:
        pass
huaxin_client/l2_data_transaction_protocol.py
File was deleted
huaxin_client/l2_data_transform_protocol.py
New file
@@ -0,0 +1,15 @@
"""
L2数据传输协议
"""
class L2DataCallBack:
    # L2委托明细
    def OnL2Order(self, code, datas, timestamp):
        pass
    def OnL2Transaction(self, code, datas, timestamp):
        pass
    def OnMarketData(self, code, datas, timestamp):
        pass
main.py
@@ -23,7 +23,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2):
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2,ptl2_l2,psl2_l2):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -39,6 +39,10 @@
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, daemon=True)
    t1.start()
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), daemon=True)
    t1.start()
    # 交易服务
@@ -90,13 +94,9 @@
    logger_system.info("主进程ID:{}", os.getpid())
    tradeServerProcess = multiprocessing.Process(target=createTradeServer,
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy))
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2))
    tradeServerProcess.start()
    # 交易进程与L2进程
    l2Process = multiprocessing.Process(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2,))
    l2Process.start()
    #
    tradeProcess = multiprocessing.Process(target=huaxin_client.trade_client.run, args=(ptl2_trade, pst_trade,))
    tradeProcess.start()
@@ -105,4 +105,3 @@
    # 将tradeServer作为主进程
    tradeServerProcess.join()
    tradeProcess.join()
    l2Process.join()
test/test.py
@@ -1,6 +1,27 @@
import multiprocessing
import threading
import time
from huaxin_client import l2_data_manager
from log_module import async_log_util
from log_module.log import logger_debug
def read(pipe):
    while True:
        val = pipe.recv()
        if val:
            print("read:", val)
def write(pipe):
    while True:
        pipe.send("test")
        time.sleep(1)
if __name__ == "__main__":
    l2_data_manager.upload_data("000798", "trading_order_canceled", 30997688, new_sk=True)
    p1, p2 = multiprocessing.Pipe()
    threading.Thread(target=lambda: write(p1), daemon=True).start()
    threading.Thread(target=lambda: read(p2), daemon=True).start()
    input()
trade/huaxin/trade_server.py
@@ -24,7 +24,7 @@
from code_attribute import gpcode_manager, code_volumn_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager, l2_data_transaction_protocol
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
@@ -443,51 +443,6 @@
                logging.exception(e)
l2_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
def __recv_pipe_l2(pipe_l2):
    def process_l2_order(code, request_id):
        # 读取共享内存中的数据
        with contextlib.closing(
                mmap.mmap(-1, 1000 * 100, l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code),
                          access=mmap.ACCESS_READ)) as m:
            s = m.read(1000 * 100)
            s = s.decode('utf-8').replace('\x00', '')
            hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):读取到共享内存数据", code, request_id)
            if s:
                print(len(s), s)
                data = json.loads(s)
                code = data["code"]
                timestamp = data.get("time")
                datas = data["data"]
                try:
                    # TradeServerProcessor.l2_order(code, datas, timestamp)
                    pass
                finally:
                    hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):数据处理完毕", code, request_id)
                    l2_data_transaction_protocol.set_read_l2_order(pipe_l2, request_id)
    if pipe_l2 is not None:
        while True:
            try:
                val = pipe_l2.recv()
                if val:
                    val = json.loads(val)
                    print("收到来自L2的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == l2_data_transaction_protocol.TYPE_L2_ORDER:
                        request_id = val["request_id"]
                        # 处理l2数据
                        code = val["data"]["code"]
                        hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):接受到来自L2客户端的数据", code, request_id)
                        l2_order_thread_pool.submit(process_l2_order, code, request_id)
            except Exception as e:
                logging.exception(e)
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
    @classmethod
    def __send_response(cls, data_bytes):
@@ -805,6 +760,21 @@
            logger_debug.error(e)
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
    def OnL2Order(self, code, datas, timestamp):
        TradeServerProcessor.l2_order(code, datas, timestamp)
    def OnL2Transaction(self, code, datas, timestamp):
        pass
    def OnMarketData(self, code, datas, timestamp):
        pass
# 回调
my_l2_data_callback = MyL2DataCallback()
def run(pipe_trade, pipe_l1, pipe_l2):
    # 执行一些初始化数据
    block_info.init()