import logging
|
import multiprocessing
|
import threading
|
|
import concurrent.futures
|
import time
|
|
from utils import tool
|
|
|
def task_function(task_param, j):
|
# 这里是任务的具体逻辑,task_param是任务的参数
|
# 可以在这里进行耗时的计算、IO操作等
|
|
print(time.time(), task_param, j, id(threading.current_thread()))
|
|
time.sleep(2)
|
|
# 返回任务的结果
|
return f"result:{task_param}"
|
|
|
def call_back(result):
|
print(time.time(), "call_back", id(threading.current_thread()))
|
|
|
def run_process_1(pipe):
|
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
|
|
def process_data(val):
|
print("处理任务", val)
|
raise Exception("测试异常")
|
|
def recv_data():
|
while True:
|
try:
|
val = pipe.recv()
|
if val:
|
thread_pool.submit(process_data, val)
|
except Exception as e:
|
logging.exception(e)
|
|
threading.Thread(target=lambda: recv_data(), daemon=True).start()
|
while True:
|
time.sleep(2)
|
|
|
def run_process_2(pipe):
|
def send_data():
|
while True:
|
time.sleep(2)
|
for i in range(1):
|
pipe.send("test: " + tool.get_now_time_str())
|
|
threading.Thread(target=lambda: send_data(), daemon=True).start()
|
threading.Thread(target=lambda: send_data(), daemon=True).start()
|
while True:
|
time.sleep(2)
|
|
|
def test1(params):
|
time.sleep(1)
|
print("执行结束", params)
|
|
|
if __name__ == '__main__':
|
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
|
thread_pool.submit(lambda: test1("123123"))
|
thread_pool.submit(lambda: test1("123123"))
|
thread_pool.submit(lambda: test1("123123"))
|
thread_pool.submit(lambda: test1("123123"))
|
while True:
|
running_count = 0
|
for future in thread_pool._threads:
|
if future.is_alive():
|
running_count += 1
|
print(running_count)
|
time.sleep(1)
|