Administrator
2023-08-25 dbc2ec056e478704cc352586a22e9e07755b15f0
共享内存测试
3个文件已修改
3个文件已添加
189 ■■■■ 已修改文件
huaxin_client/command_manager.py 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client_server.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mmap.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_threadpool.py 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -78,7 +78,7 @@
        cls.pipe_l2 = pipe_l2
    @classmethod
    def __process_command(cls, _type, client_id, result_json):
    def process_command(cls, _type, client_id, result_json):
        try:
            data = result_json["data"]
            print("接收内容", result_json)
@@ -106,7 +106,7 @@
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data)
        except Exception as e:
            logger_local_huaxin_trade_debug.debug(f"__process_command出错:{result_json}")
            logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}")
            logging.exception(e)
            logging.error(result_json)
@@ -115,23 +115,26 @@
        if pipe_strategy is None:
            return
        # 本地命令接收
        while True:
            try:
                val = pipe_strategy.recv()
                if val:
                    val = json.loads(val)
                    print("run_process_command", val)
                    _type = val["type"]
                    _data = val["data"]
                    # 查看是否是设置L2的代码
                    if _type == CLIENT_TYPE_CMD_L2:
                        cls.pipe_l2.send(
                            json.dumps({"type": "set_l2_codes", "data": _data}))
                    else:
                        threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True).start()
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
                logging.exception(e)
        try:
            while True:
                try:
                    val = pipe_strategy.recv()
                    if val:
                        val = json.loads(val)
                        print("run_process_command", val)
                        _type = val["type"]
                        _data = val["data"]
                        # 查看是否是设置L2的代码
                        if _type == CLIENT_TYPE_CMD_L2:
                            cls.pipe_l2.send(
                                json.dumps({"type": "set_l2_codes", "data": _data}))
                        else:
                            threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start()
                except Exception as e:
                    logger_local_huaxin_trade_debug.exception(e)
                    logging.exception(e)
        except Exception as e:
            logger_local_huaxin_trade_debug.exception(e)
    # 维护连接数的稳定
    def run(self, blocking=True):
huaxin_client/trade_client_server.py
New file
@@ -0,0 +1,37 @@
import json
import socket
import socketserver
from huaxin_client.command_manager import TradeCommandManager
from utils import socket_util
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    def setup(self):
        pass
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            try:
                # data = sk.recv(1024*1024, socket.MSG_WAITALL)
                data, header = socket_util.recv_data(sk)
                if data:
                    # TODO 处理数据
                    data_json = json.loads(data)
                    type_ = data_json['type']
                    TradeCommandManager.process_command(type_, None, data_json)
            finally:
                pass
test/test_mmap.py
New file
@@ -0,0 +1,34 @@
import mmap
import contextlib
import multiprocessing
import time
def run_process_1(pipe):
    with contextlib.closing(mmap.mmap(-1, 1000*100, tagname='l2-000333', access=mmap.ACCESS_WRITE)) as m:
        for i in range(1, 10001):
            m.seek(0)
            m.write(("msg " + str(i)).encode("utf-8"))
            m.flush()
            time.sleep(1)
def run_process_2(pipe):
    while True:
        with contextlib.closing(mmap.mmap(-1, 6, tagname='l2-000333', access=mmap.ACCESS_READ)) as m:
            s = m.read(1024)
            s = s.decode('utf-8').replace('\x00', '')
            if s:
                print(s)
            time.sleep(1)
if __name__ == '__main__':
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,))
    serverProcess.start()
    jueJinProcess.start()
    while True:
        time.sleep(2)
test/test_threadpool.py
New file
@@ -0,0 +1,69 @@
import logging
import multiprocessing
import threading
import concurrent.futures
import time
from utils import tool
def task_function(task_param, j):
    # 这里是任务的具体逻辑,task_param是任务的参数
    # 可以在这里进行耗时的计算、IO操作等
    print(time.time(), task_param, j, id(threading.current_thread()))
    time.sleep(2)
    # 返回任务的结果
    return f"result:{task_param}"
def call_back(result):
    print(time.time(), "call_back", id(threading.current_thread()))
def run_process_1(pipe):
    thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    def process_data(val):
        print("处理任务", val)
        raise Exception("测试异常")
    def recv_data():
        while True:
            try:
                val = pipe.recv()
                if val:
                    thread_pool.submit(process_data, val)
            except Exception as e:
                logging.exception(e)
    threading.Thread(target=lambda: recv_data(), daemon=True).start()
    while True:
        time.sleep(2)
def run_process_2(pipe):
    def send_data():
        while True:
            time.sleep(2)
            for i in range(1):
                pipe.send("test: " + tool.get_now_time_str())
    threading.Thread(target=lambda: send_data(), daemon=True).start()
    threading.Thread(target=lambda: send_data(), daemon=True).start()
    while True:
        time.sleep(2)
if __name__ == '__main__':
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,))
    serverProcess.start()
    jueJinProcess.start()
    while True:
        time.sleep(2)
trade/huaxin/trade_server.py
@@ -185,8 +185,9 @@
                            code = data["code"]
                            timestamp = data.get("time")
                            datas = data["data"]
                            now_timestamp = int(time.time() * 1000)
                            async_log_util.info(hx_logger_l2_orderdetail,
                                                f"{code}#耗时:{int(time.time() * 1000) - timestamp}#{datas}")
                                                f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{datas}")
                            l2_log.threadIds[code] = random.randint(0, 100000)
                            l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas)
                        finally:
@@ -380,11 +381,8 @@
                                    logger_l2_g_cancel.info(f"{code}-需要撤单:{msg}")
                                else:
                                    logger_l2_g_cancel.info(f"{code}-不需要撤单:{msg}")
                            except Exception as e:
                                logger_l2_g_cancel.error(f"{code}-撤单异常:{str(e)}")
                                logger_l2_g_cancel.exception(e)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
trade/l2_trade_factor.py
@@ -140,7 +140,7 @@
    # 获取时间计算范围,返回s
    def get_time_range(self):
        # ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)]
        ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)]
        ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 2), pow(2, 2), pow(2, 2), pow(2, 2)]
        # 暂时去除分的影响
        # if -1 < self.score_index < 3:
        #     return ts[0]