import logging import multiprocessing import threading import concurrent.futures import time from utils import tool def task_function(task_param, j): # 这里是任务的具体逻辑,task_param是任务的参数 # 可以在这里进行耗时的计算、IO操作等 print(time.time(), task_param, j, id(threading.current_thread())) time.sleep(2) # 返回任务的结果 return f"result:{task_param}" def call_back(result): print(time.time(), "call_back", id(threading.current_thread())) def run_process_1(pipe): thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) def process_data(val): print("处理任务", val) raise Exception("测试异常") def recv_data(): while True: try: val = pipe.recv() if val: thread_pool.submit(process_data, val) except Exception as e: logging.exception(e) threading.Thread(target=lambda: recv_data(), daemon=True).start() while True: time.sleep(2) def run_process_2(pipe): def send_data(): while True: time.sleep(2) for i in range(1): pipe.send("test: " + tool.get_now_time_str()) threading.Thread(target=lambda: send_data(), daemon=True).start() threading.Thread(target=lambda: send_data(), daemon=True).start() while True: time.sleep(2) def test1(params): time.sleep(1) print("执行结束", params) if __name__ == '__main__': thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) thread_pool.submit(lambda: test1("123123")) thread_pool.submit(lambda: test1("123123")) thread_pool.submit(lambda: test1("123123")) thread_pool.submit(lambda: test1("123123")) while True: running_count = 0 for future in thread_pool._threads: if future.is_alive(): running_count += 1 print(running_count) time.sleep(1)