import json import logging import multiprocessing import time from multiprocessing import shared_memory class SharedMemoryObj: def __init__(self, name='shm_0', size=5*1024*1024): try: self.shm = shared_memory.SharedMemory(name=name, create=True, size=size) # 尝试创建共享内存,若失败则映射同名内存空间 except: self.shm = shared_memory.SharedMemory(name=name, create=False) self.shm_name = self.shm.name self.length = 0 self.contentbegin = 0 def write(self, obj) -> bool: """ 将待写内容通过Json序列化,写入共享内存。 写入格式:内容长度 + ~ + 内容 Args: obj: 待写内容 Returns: """ try: if obj is not None: obj_str = json.dumps(obj) obj_len = len(obj_str) content = str(obj_len) + "~" + obj_str b = content.encode() self.shm.buf[0:len(b)] = b return True else: return False except Exception as e: logging.exception(e) return False def read(self) -> dict: """ 读取共享内存,返回字典格式内容 Returns:dict """ try: s = bytes(self.shm.buf[:20]) index = str(s, 'utf-8').find("~") if index != -1: head = s[0:index] contentlength = int(head) content = bytes(self.shm.buf[index + 1:index + 1 + contentlength]).decode('utf-8') obj = json.loads(content, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in d.items()}) # obj = json.loads(content) return obj else: return {} except Exception as e: logging.exception(e) return {} def update(self, obj): org_obj = self.read() org_obj.update(obj) self.write(org_obj) def test_p1(): shareObj = SharedMemoryObj() while True: print("读取到内容:", shareObj.read()) time.sleep(1) def test_p2(): shareObj = SharedMemoryObj() index = 0 while True: index += 1 shareObj.write([index]) time.sleep(1) if __name__ == '__main__': p1 = multiprocessing.Process(target=test_p1) p2 = multiprocessing.Process(target=test_p2) p1.start() p2.start() while True: time.sleep(1)