Python多线程(线程池)与多进程(进程池)简单使用

目前Python多线程不能利用CPU多核优势,IO密集型可用多线程,CPU密集型适合用多进程

首先定义一个用来并行跑的函数,加一个随机sleep时间好感受并行的结果,两个参数好理解并行时多参数怎么传递。

1
2
3
4
import random
def Test(a, b):
time.sleep(random.randint(5, 20))
print(str(a) + '_' + str(b) + '\t')

线程池

1
2
3
4
5
6
7
8
9
10
import random
import threadpool
def MultiThreadTest():
pool = threadpool.ThreadPool(20)
li = []
for i in range(1000):
li.append((None, {'a': i, 'b': i + 10}))
requests = threadpool.makeRequests(Test, li)
[pool.putRequest(req) for req in requests]
pool.wait()

threadpool需要安装,pip就可以 多参数用 (None, {....}),当前版本threadpool 1.3.2是这么写的。

进程池

1
2
3
4
5
6
7
import multiprocessing
def MultiProcessTest():
pool = multiprocessing.Pool(processes = 4)
for i in range(1000):
pool.apply_async(Test, (i, i + 10, ))
pool.close()
pool.join()

先close后join。

共享数据

另外,多线程可以用Python的Queue共享数据,多进程要用multiprocessing.Queue。

这里尝试用multiprocessing的dict保存数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing
def Test(a, b, mpDict):
print(str(a) + "test", b)
mpDict[str(a) + "test"] = b
def MultiProcessTest():
pool = multiprocessing.Pool(processes=4)
mpDict = multiprocessing.Manager().dict()
for i in range(5):
pool.apply_async(Test, (i, i + 10, mpDict, ))
pool.close()
pool.join()
traditionDict = dict(mpDict)
print(traditionDict)

生产者-消费者 模型

Pool 共享 Queue 有个 multiprocessing.Queue() 只支持 Process 出来的进程,不支持 Pool 的,在 Pool 中需要使用 multiprocessing.Manager()

下面代码为 1 个生产者和 4 个消费者的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 生产者
def write(q):
a = np.random.randint(0, 100, (100, 2, 2))
for value in range(a.shape[0]):
print('Put %s to queue...\n' % a[value])
q.put(a[value])
print(q.qsize())


# 消费者:
def read(q):
while True:
# get的参数是 block=True, timeout=None
# block表示队列空时是阻塞等待还是抛出异常
# timeout指等待一定时间抛出异常,还是无限等待。
value = q.get(True)
print('Get %s from queue.\n' % value)
print(q.qsize())
time.sleep(random.random())


def test_pool():
manager = mp.Manager()
q = manager.Queue(2)
pw = Process(target=write, args=(q,))
pw.start()
worker_num = 4
pool = mp.Pool(processes=worker_num)
for i in range(worker_num):
print('start data worker ' + str(i))
pool.apply_async(read, (q, ))
pool.close()
pw.join()
pool.join()