python并发
且任容枯 Lv4

全局锁

一般一个程序可以理解为CPU计算加IO操作 CPU计算是计算密集型,IO操作是IO密集型
多线程 利用CPU计算和IO可以同时进行来加速程序运行 适用于IO密集型操作
多进程 利用CPU的多个核去同时执行实现正真的并行 适用于CPU密集型操作

GIL :全局解释器锁 是一个互斥锁 同一时刻只能有一个线程执行代码 导致多线程无法并行执行

多线程

threading 模块

1
2
3
4
5
6
7
8
9
import threading

def task(a, b):
print("Thread is running")

t = threading.Thread(target=task, args=(1, 2)) # 创建线程对象
t.start() # 启动线程
t.join() # 等待线程执行完成

example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

def task(a, b):
print(f"Thread{a}{b} is running")
time.sleep(2)

t1 = threading.Thread(target=task, args=(1, 2))
t2 = threading.Thread(target=task, args=("a", "b"))


if __name__ == "__main__":
start_time = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

线程池 ThreadPoolExecutor

map方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ThreadPoolExecutor
import time
def task(a, b):
print(f"Thread{a}{b} is running")
time.sleep(2)
return str(a+b)

if __name__ == "__main__":
start_time = time.time()

with ThreadPoolExecutor(max_workers=2) as executor:
# 使用 map 方式批量提交任务
rets = executor.map(task, [1, "a"], [2, "b"]) # 参数自动按位置展开
for ret in rets:
print(ret)
end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

submit方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent.futures import ThreadPoolExecutor
import time
def task(a, b):
print(f"Thread{a}{b} is running")
time.sleep(2)
return str(a+b)

if __name__ == "__main__":
start_time = time.time()

with ThreadPoolExecutor(max_workers=2) as executor:
# 使用列表推导式提交多个任务
futures = [executor.submit(task, *args) for args in [(1,2), ("a","b")]]

# 等待所有任务完成
for future in futures:
future.result() # 等待任务完成并获取结果

end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

多进程

multiprocessing 模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
from multiprocessing import Process

def count_primes(start, end):
count = 0
for i in range(start, end):
if i > 1:
for j in range(2, i):
if (i % j) == 0:
break
else:
count += 1
return count



if __name__ == "__main__":
start_time = time.time()
num_primes = count_primes(1, 100000)
end_time = time.time()
print(f"excutor time {end_time - start_time} second") # excutor time 28.518683433532715 second

start_time = time.time()
p1 = Process(target=count_primes, args=(1, 50000))
p1.start()
p1.join()
p2 = Process(target=count_primes, args=(50000, 100000))
p2.start()
p2.join()
end_time = time.time()
print(f"excutor time {end_time - start_time} second")
# excutor time 33.34045934677124 second 这里为什么反而没有变快 是因为大数的判断反而耗时,这里主要掌握一下用法 更推荐使用进程池的方式

进程池 ProcessPoolExecutor

map方式

1
2
3
4
5
6
7
8
9


with ProcessPoolExecutor() as executor:
start_time = time.time()
# map方式
num_primes = list(executor.map(count_primes, [1, 50000], [50000, 100000]))
end_time = time.time()
print(f"excutor time {end_time - start_time} second")

submit方式

1
2
3
4
5
6
7
8
9

with ProcessPoolExecutor() as executor:
start_time = time.time()
futures = [executor.submit(count_primes, 1, 50000), executor.submit(count_primes, 50000, 100000)]
for future in as_completed(futures):
num_primes = future.result()
end_time = time.time()
print(f"excutor time {end_time - start_time} second")

要注意是否分配的任务是公平的否则并不能提高效率