-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.py
73 lines (71 loc) · 2.41 KB
/
pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# _*_ encoding=utf-8 _*_
import threading
import psutil
from python线程池.queue import ThreadSafeQueue
from python线程池.task import Task,AsyncTask
#任务处理线程
class ProcessThread(threading.Thread):
def __init__(self,task_queue,*args,**kwargs):
threading.Thread.__init__(self,*args,**kwargs)
#任务线程停止的标记
self.dismiss_flag=threading.Event()
#任务队列(处理线程不断从队列取出元素处理)
self.task_queue = task_queue
self.arge=args
self.kwargs=kwargs
def run(self):
while True:
#判断线程是否被要求停止
if self.dismiss_flag.is_set():
break
task=self.task_queue.pop()
if not isinstance(task,Task):
continue
#执行task实际逻辑是通过函数调用引进来的
result=task.callable(*task.args,**task.kwargs)
if isinstance(task,AsyncTask):
task.set_result(result)
def dismiss(self):
self.dismiss_flag.set()
def stop(self):
self.dismiss()
#线程池
class ThreadPool:
def __init__(self,size=0):
if not size:
#约定线程池的大小为cpu核数的两倍(最佳实践)
size=psutil.cpu_count()*2
#线程池
self.pool=ThreadSafeQueue(size)
#任务队列
self.task_queue=ThreadSafeQueue()
for i in range(size):
#将任务队列传到任务处理线程中,再添加到线程池中。
self.pool.put(ProcessThread(self.task_queue))
#启动线程池
def start(self):
for i in range(self.pool.size()):
thread=self.pool.get(i)
thread.start()
#停止线程池
def join(self):
for i in range(self.pool.size()):
thread = self.pool.get(i)
thread.stop()
while self.pool.size():
thread=self.pool.pop()
thread.join()
#往线程池提交任务
def put(self,item):
if not isinstance(item,Task):
raise TaskTypeErrorException()
self.task_queue.put(item)
def batch_put(self,item_list):
if not isinstance(item_list,list):
item_list=list(item_list)
for item in item_list:
self.put(item)
def size(self):
return self.pool.size()
class TaskTypeErrorException(Exception):
pass