Administrator
2024-04-18 30d7773f4b0d8e76000252070abd19f9348d5654
共享内存测试
1个文件已修改
66 ■■■■ 已修改文件
test/test_mmap.py 66 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mmap.py
@@ -1,30 +1,75 @@
import ctypes
import logging
import mmap
import contextlib
import multiprocessing
import struct
import time
from multiprocessing import Process, Value, Array
import msgpack
from huaxin_client import l2_data_manager
def run_process_1(arr:Array):
def run_process_1(arr: Array):
    while True:
        data_lenth = arr[0]
        data = bytes(arr[1:data_lenth+1]).decode('utf-8')
        if data:
            print(eval(data))
        try:
            time_ = time.time()
            lenth_count = 5
            raw_data = arr.get_obj()
            data_lenth = int(bytes(raw_data[:lenth_count]).decode('utf-8'))
            data = raw_data[lenth_count:data_lenth + lenth_count]
            data = msgpack.unpackb(bytes(data))
            print("解码时间", time.time() - time_)
            if data:
                print("数据长度", len(data))
        except Exception as e:
            logging.exception(e)
        time.sleep(1)
def run_process_2(arr:Array):
    str_=b"['000333',13.89]"
    arr[0] = len(str_)
    arr[1:len(str_)+1] = str_
def run_process_2(arr: Array):
    data = [('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1')]
    for i in range(0, 100):
        data.append(data[0])
    time_ = time.time()
    str_ = msgpack.packb(data)
    print("编码时间", time.time() - time_)
    print("数据长度", len(str_))
    lenth_count = 5
    str_lenth = "{0:0>5}".format(len(str_))
    lenth_bytes = str_lenth.encode('utf-8')
    arr[0:lenth_count] = lenth_bytes
    arr[lenth_count:len(str_) + lenth_count] = str_
###########################测试List#####################################
def update_value(_shared_list):
    while True:
        _shared_list.append([123, "123123", "54677"])
        time.sleep(1)
def read_value(_shared_list):
    while True:
        print(_shared_list)
        time.sleep(1)
def test_manager():
    manager = multiprocessing.Manager()
    # 使用管理器对象创建一个共享列表
    shared_list = manager.list()
    p1 = multiprocessing.Process(target=update_value, args=(shared_list,))
    p2 = multiprocessing.Process(target=read_value, args=(shared_list,))
    p1.start()
    p2.start()
    while True:
        time.sleep(2)
if __name__ == '__main__':
    arr = Array(ctypes.c_byte, range(1024*1024))
    arr = Array('i', range(1024 * 1024))
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(arr,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(arr,))
@@ -33,3 +78,4 @@
    while True:
        time.sleep(2)
    # test_manager()