Python并发编程
简介
引入并发,就是为了提升程序运行速度
单线程串行:不加改造的程序
多线程并发:threading
- 多线程:thhreadding,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴地等待IO完成
多cpu并行:multiprocessing
- 多进程:multiprocessing,利用CPU的多核原理,让CPU可以同时运行多个进程
多机器并行:hadoop/hive/spark
IO:读取内存、磁盘、网络
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
使用Lock对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,实现线程/进程的任务提交、等待结束、获取结果
使用subprocess启动外部程序的进程,并进行输入输出的交互
怎样选择多线程、多进程和多协程
Thread Process Coroutine
什么事cpu密集型计算、io密集型计算?
- cpu密集型(cpu-bound)
- CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
- 例如:压缩解压缩、加密解密、正则表达式搜索
- IO密集型(io-bound)
- IO密集型指的是系统运作大部分都状况是CPU在等I/O(硬盘/内存)的读/写操作,cpu占用率仍然比较低
- 例如:文件处理程序、网络爬虫程序、读写数据库程序
- cpu密集型(cpu-bound)
多线程、多进程、多协程多对比
- 多进程 Process (multiprocessing)
- 一个进程中可以启动N个线程
- 优点:可以利用多核CPU并行计算
- 缺点:占用资源最多、可启动数目比线程少
- 适用于:CPU密集型计算
- 多线程 Thread (threading)
- 一个线程中可以启动N个协程
- 优点:相比进程,更轻量级、占用资源少
- 缺点:
- 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
- 相比协程:启动数目有限制,占用内存资源,有线程切换开销
- 适用于:IO密集计算、同时运行的任务数目要求不多
- 多协程 Coroutine (asyncio)
- 优点:内存开销最少、启动协程数量最多
- 缺点:支持的库有限制、代码实现复杂
- 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景
- 并发和并行
- 并发(Concurrency):指在同一时间段内,有多个任务在执行,但在某一时刻,只有一个任务在执行。多个任务交替执行,给人一种”同时”执行的感觉。
- 并行(Parallelism):指在同一时刻,有多个任务同时执行。这需要多核CPU的支持,每个核心同时执行一个任务。
- 区别:并发是”看起来同时”,并行是”真正同时”。多线程在单核CPU上是并发的,在多核CPU上可以是并行的;多进程在多核CPU上是并行的。
- 多进程 Process (multiprocessing)
怎样根据任务选择对应技术?
- 如上
全局解释器锁GIL
- python速度慢的两大原因
- 动态类型语言、边解释边执行
- GIL 无法利用多核CPU并发执行
- GIL是什么?
- 全局解释器锁(Global Interpreter Lock)
- 是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
- 即便在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程。
- 为什么有GIL这个东西?
- 简而言之:Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉了
- 为了解决多线程之间数据完整性和状态同步问题
- Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
- 怎样规避GIL带来的限制?
- 多线程 threading机制依然是有用的,用于IO密集型计算 业务再I/O期间,线程会释放GIL,实现CPU和IO的并行因此多线程用于IO密集型计算依然可以大幅提升速度 但是多线程用于CPU密集型计算时,只会更加拖慢速度
- 使用mutilprocessing模块,利用多核CPU并发执行,多进程用于CPU密集型计算
使用多线程
1.准备一个函数
def my_func(a,b):
do_crwa(a,b)
2.创建一个线程
import threading
t = threading.Thread(target=my_func,args=(1,2))
3.启动线程
t.start()
4.等待线程结束
t.join()
生产者消费者模式
多组件的Pipeline技术架构
- 复杂的事情都不会一下子做完,而是会分很多中间步骤一步步完成
多线程数据通信的queue模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# 导入队列模块
import queue
# 创建一个先进先出队列
q = queue.Queue()
# 将第一个URL放入队列
q.put(urls[0])
# 从队列中获取一个元素
item = q.get()
# 获取队列中的元素数量
q.qsize()
# 检查队列是否为空
q.empty()
# 检查队列是否已满
q.full()producer_consumer_spider
1 | import threading |
线程安全问题以及Lock解决方案
- 线程安全概念介绍
- 线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
- 由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全
- Lock用于解决线程安全问题
1 | # 用法一 |
- 示例代码演示问题 以及 解决方案
好用的线程池ThreadPoolExecutor
- 线程池的原理
线程的生命周期
新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销
使用线程池的好处
- 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
- 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
- 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
ThreadPoolExecutor的使用
1
2
3
4
5
6
7
8
9
10
11
12
13- from concurrent.futures import ThreadPoolExecutor,as_completed
- 用法1:map函数,很简单 注意map的结果和入参是顺序对应的
- with ThreadPoolExecutor() as executor:
results = executor.map(func, args)
for result in results:
print(result)
- 用法2:future模式,更强大 注意如果用as_completed顺序是不定的
- with ThreadPoolExecutor() as executor:
futures = [executor.submit(func, arg) for arg in args]
for future in as_completed(futures):
print(future.result())
for future in futures:
print(future.result())使用线程池改造爬虫程序
在web服务中使用线程池加速
- Web服务的架构以及特点
- 使用线程池ThreadPoolExecutor加速
- 使用线程池ThreadPoolExecutor的好处
- 方便的将磁盘文件、数据库、远程API的IO调用并发执行
- 线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
- 代码用Flask实现Web服务并实现加速
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38# flask_thread_pool
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
# 使用线程池进行加速
pool = ThreadPoolExecutor()
app = flask.Flask(__name__)
def read_file():
time.sleep(0.1)
return 'file'
def read_db():
time.sleep(0.2)
return 'db'
def read_api():
time.sleep(0.3)
return 'api'
@app.route('/')
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps({
'result_file': result_file.result(),
'result_db': result_db.result(),
'result_api': result_api.result()
})
if __name__ == '__main__':
app.run(debug=True)
使用多进程multiprocessing加速程序的运行
- 有了多线程threading,为什么还要多进程multiprocessing
- 多进程multiprocessing知识梳理
- 代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45# 使用更大的数字和更多的测试数据
numbers = [15485863, 32452843, 49979687, 67867967, 86028121] * 100
def is_prime(n):
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0 or n % 3 == 0:
return False
# 添加主要的判断逻辑
i = 5
while i * i <= n:
if n % i == 0 or n % (i + 2) == 0:
return False
i += 6
return True
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def single_thread():
for number in numbers:
is_prime(number)
def multi_thread():
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(is_prime, numbers)
def multi_process():
with ProcessPoolExecutor(max_workers=10) as executor:
executor.map(is_prime, numbers)
def benchmark(func, name):
start = time.time()
func()
end = time.time()
print(f'{name}: {end - start:.2f} seconds')
if __name__ == '__main__':
import time
benchmark(single_thread, 'Single Thread')
benchmark(multi_thread, 'Multi Thread')
benchmark(multi_process, 'Multi Process')
在Flask服务中使用进程池加速
1 | import flask |
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 廾匸!