简介

  • 引入并发,就是为了提升程序运行速度

  • 单线程串行:不加改造的程序

  • 多线程并发:threading

    • 多线程:thhreadding,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴地等待IO完成
  • 多cpu并行:multiprocessing

    • 多进程:multiprocessing,利用CPU的多核原理,让CPU可以同时运行多个进程
  • 多机器并行:hadoop/hive/spark

  • IO:读取内存、磁盘、网络

  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行

  • 使用Lock对资源加锁,防止冲突访问

  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式

  • 使用线程池Pool/进程池Pool,实现线程/进程的任务提交、等待结束、获取结果

  • 使用subprocess启动外部程序的进程,并进行输入输出的交互

怎样选择多线程、多进程和多协程

  • Thread Process Coroutine

  • 什么事cpu密集型计算、io密集型计算?

    • cpu密集型(cpu-bound)
      • CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
      • 例如:压缩解压缩、加密解密、正则表达式搜索
    • IO密集型(io-bound)
      • IO密集型指的是系统运作大部分都状况是CPU在等I/O(硬盘/内存)的读/写操作,cpu占用率仍然比较低
      • 例如:文件处理程序、网络爬虫程序、读写数据库程序
  • 多线程、多进程、多协程多对比

    • 多进程 Process (multiprocessing)
      • 一个进程中可以启动N个线程
      • 优点:可以利用多核CPU并行计算
      • 缺点:占用资源最多、可启动数目比线程少
      • 适用于:CPU密集型计算
    • 多线程 Thread (threading)
      • 一个线程中可以启动N个协程
      • 优点:相比进程,更轻量级、占用资源少
      • 缺点:
        • 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
        • 相比协程:启动数目有限制,占用内存资源,有线程切换开销
      • 适用于:IO密集计算、同时运行的任务数目要求不多
    • 多协程 Coroutine (asyncio)
      • 优点:内存开销最少、启动协程数量最多
      • 缺点:支持的库有限制、代码实现复杂
      • 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景
    • 并发和并行
      • 并发(Concurrency):指在同一时间段内,有多个任务在执行,但在某一时刻,只有一个任务在执行。多个任务交替执行,给人一种”同时”执行的感觉。
      • 并行(Parallelism):指在同一时刻,有多个任务同时执行。这需要多核CPU的支持,每个核心同时执行一个任务。
      • 区别:并发是”看起来同时”,并行是”真正同时”。多线程在单核CPU上是并发的,在多核CPU上可以是并行的;多进程在多核CPU上是并行的。
  • 怎样根据任务选择对应技术?

    • 如上

全局解释器锁GIL

  • python速度慢的两大原因
    • 动态类型语言、边解释边执行
    • GIL 无法利用多核CPU并发执行
  • GIL是什么?
    • 全局解释器锁(Global Interpreter Lock)
    • 是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
    • 即便在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程。
  • 为什么有GIL这个东西?
    • 简而言之:Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉了
    • 为了解决多线程之间数据完整性和状态同步问题
    • Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
  • 怎样规避GIL带来的限制?
    • 多线程 threading机制依然是有用的,用于IO密集型计算 业务再I/O期间,线程会释放GIL,实现CPU和IO的并行因此多线程用于IO密集型计算依然可以大幅提升速度 但是多线程用于CPU密集型计算时,只会更加拖慢速度
    • 使用mutilprocessing模块,利用多核CPU并发执行,多进程用于CPU密集型计算

使用多线程

1.准备一个函数
def my_func(a,b):
do_crwa(a,b)

2.创建一个线程
import threading
t = threading.Thread(target=my_func,args=(1,2))

3.启动线程
t.start()

4.等待线程结束
t.join()

生产者消费者模式

  • 多组件的Pipeline技术架构

    • 复杂的事情都不会一下子做完,而是会分很多中间步骤一步步完成
  • 多线程数据通信的queue模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # 导入队列模块
    import queue

    # 创建一个先进先出队列
    q = queue.Queue()

    # 将第一个URL放入队列
    q.put(urls[0])

    # 从队列中获取一个元素
    item = q.get()

    # 获取队列中的元素数量
    q.qsize()

    # 检查队列是否为空
    q.empty()

    # 检查队列是否已满
    q.full()
  • producer_consumer_spider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import queue

def do_crawl(url_queue, html_queue):
while not url_queue.empty():
url = url_queue.get()
html = crawl(url)
html_queue.put(html)
print(f'crawl {url} success')

def do_parse(html_queue, item_queue,fout):
while True:
html = html_queue.get()
results = parse(html)
for result in results:
fout.write(str(result) + '\n')
print(f'parse {url} success')
time.sleep(1)


url_queue = queue.Queue()
html_queue = queue.Queue()
item_queue = queue.Queue()

for url in urls:
url_queue.put(url)

for idx in range(3):
t = threading.Thread(target=do_crawl, args=(url_queue, html_queue),name=f'crawl_thread_{idx}')
t.start()
fout = open('results.txt', 'w')
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, item_queue, fout), name=f'parse_thread_{idx}')
t.start()

线程安全问题以及Lock解决方案

  • 线程安全概念介绍
    • 线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
    • 由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全
  • Lock用于解决线程安全问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 用法一

import threading

lock = threading.Lock()

lock.acquire()
try:
# do something
pass
finally:
lock.release()

# 用法二

import threading

lock = threading.Lock()

with lock:
# do something
pass
  • 示例代码演示问题 以及 解决方案

好用的线程池ThreadPoolExecutor

  • 线程池的原理

线程的生命周期
alt text
新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销

  • 使用线程池的好处

    • 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
    • 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
    • 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
  • ThreadPoolExecutor的使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    - from concurrent.futures import ThreadPoolExecutor,as_completed
    - 用法1:map函数,很简单 注意map的结果和入参是顺序对应的
    - with ThreadPoolExecutor() as executor:
    results = executor.map(func, args)
    for result in results:
    print(result)
    - 用法2:future模式,更强大 注意如果用as_completed顺序是不定的
    - with ThreadPoolExecutor() as executor:
    futures = [executor.submit(func, arg) for arg in args]
    for future in as_completed(futures):
    print(future.result())
    for future in futures:
    print(future.result())
  • 使用线程池改造爬虫程序

在web服务中使用线程池加速

  • Web服务的架构以及特点
    • alt text
  • 使用线程池ThreadPoolExecutor加速
    • 使用线程池ThreadPoolExecutor的好处
    • 方便的将磁盘文件、数据库、远程API的IO调用并发执行
    • 线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
  • 代码用Flask实现Web服务并实现加速
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    # flask_thread_pool

    import flask
    import json
    import time
    from concurrent.futures import ThreadPoolExecutor

    # 使用线程池进行加速
    pool = ThreadPoolExecutor()

    app = flask.Flask(__name__)

    def read_file():
    time.sleep(0.1)
    return 'file'

    def read_db():
    time.sleep(0.2)
    return 'db'

    def read_api():
    time.sleep(0.3)
    return 'api'

    @app.route('/')
    def index():
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)
    return json.dumps({
    'result_file': result_file.result(),
    'result_db': result_db.result(),
    'result_api': result_api.result()
    })

    if __name__ == '__main__':
    app.run(debug=True)

使用多进程multiprocessing加速程序的运行

  • 有了多线程threading,为什么还要多进程multiprocessing
    • alt text
  • 多进程multiprocessing知识梳理
    • alt text
  • 代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
      # 使用更大的数字和更多的测试数据
    numbers = [15485863, 32452843, 49979687, 67867967, 86028121] * 100

    def is_prime(n):
    if n <= 1:
    return False
    if n <= 3:
    return True
    if n % 2 == 0 or n % 3 == 0:
    return False

    # 添加主要的判断逻辑
    i = 5
    while i * i <= n:
    if n % i == 0 or n % (i + 2) == 0:
    return False
    i += 6
    return True

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

    def single_thread():
    for number in numbers:
    is_prime(number)

    def multi_thread():
    with ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(is_prime, numbers)

    def multi_process():
    with ProcessPoolExecutor(max_workers=10) as executor:
    executor.map(is_prime, numbers)

    def benchmark(func, name):
    start = time.time()
    func()
    end = time.time()
    print(f'{name}: {end - start:.2f} seconds')

    if __name__ == '__main__':
    import time

    benchmark(single_thread, 'Single Thread')
    benchmark(multi_thread, 'Multi Thread')
    benchmark(multi_process, 'Multi Process')

在Flask服务中使用进程池加速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import flask
from concurrent.futures import ProcessPoolExecutor

process_pool = ProcessPoolExecutor()

app = flask.Flask(__name__)


import json
import time

def is_prime(n):
"""判断一个数是否为质数的CPU密集型函数"""
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0 or n % 3 == 0:
return False

i = 5
while i * i <= n:
if n % i == 0 or n % (i + 2) == 0:
return False
i += 6
return True

def calculate_primes(numbers):
"""计算一组数中的质数"""
results = {}
for num in numbers:
results[num] = is_prime(num)
return results

@app.route('/cpu_test')
def cpu_test():
# 使用较大的数字来确保CPU密集型计算
numbers = [15485863, 32452843, 49979687, 67867967, 86028121]

# 提交到进程池执行
future = process_pool.submit(calculate_primes, numbers)
result = future.result()

return json.dumps({
'result': result,
'timestamp': time.time()
})
if __name__ == '__main__':
app.run(debug=True)