Python并发编程

TrystanLei2022年10月2日
约 1985 字大约 7 分钟...

多进程程序例子

import argparse
import redis
from tqdm import tqdm
from multiprocessing import Pool, Process

parser = argparse.ArgumentParser(description='PyTorch MM Training')
parser.add_argument('--port', default=6379, type=str, help="port id")
parser.add_argument('--file', default=None, type=str, help="paths for train instances")
args = parser.parse_args()

host = '127.0.0.1'
port = args.port

# r = redis.Redis(host=host, port=port)

train_file = args.file

def worker_i(train_file, i, nums=16):
    r = redis.Redis(host=host, port=port)
    k = 0

    if i == 0:
        iter_ = tqdm(open(train_file, 'r'))
    else:
        iter_ = open(train_file, 'r')

    for info in iter_:
        if k % nums == i: # 将多个任务分成nums份
            r.set(k, info.strip())
        k += 1
process_list = []

nums = 8
for i in range(nums):
    p = Process(target=worker_i, args=(train_file, i, nums))
    p.start()
    process_list.append(p)

for i in process_list:
    p.join()

print('写入完成')

Python 中的三种并发编程方式

tch30Z

三种方式:多线程(Thread)、多进程(Process)、协程(Coroutine)

什么是 CPU 密集型计算、IO 密集型计算

多线程、多进程、协程的对比

怎样根据任务选择对应技术

UsvTjR

全局解释器锁 GIL(Global Interpreter Lock)

Python 速度慢的两大原因:

全局解释器锁:

怎样规避 GIL 带来的限制:

Python 多线程代码

直接创建线程

import threading

def craw(url):  # 1. 准备一个函数
	r = requests.get(url)
	print(len(r.text))

def multi_thread():
	threads = []  # 2. 准备一个线程list
	for url in urls:
		threads.append(  # 3. 创建线程
			threading.Thread(target=craw, args=(url,))
		)

	for thread in threads:
		thread.start()  # 4. 启动线程

	for thread in threads:
		thread.join()  # 5. 等待结束

使用线程池ThreadPoolExecutor

用法 1:使用 map 函数(所有任务一起提交)

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
		result = pool.map(craw, urls)
		# map的结果和入参的顺序对应的
		for result in results:
				print(result)

用法 2:future 模式,更强大(一个任务一个任务提交)

from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor() as pool:
		# 使用 dict 可以知道future对应的入参
		futures = {pool.submit(craw, url): url
								for url in urls}
		# 方式1: 结果仍然按顺序
		for future, url in futures.items():
				print(url, future.result())

		# 方式2: 使用 as_completed 顺序是不定的
		for future in as_completed(futures):  # 注:字典的遍历是遍历key
				url = futures[future]
				print(url, future.result())

线程池原理

Python 多进程代码

多线程 threading 与多进程 multiprocessing 的代码对比

oMERIi

直接改个类名就能运行!

所以代码去看 Python 多线程代码

多进程优雅退出

import ctypes
import time
from multiprocessing import Process, RawValue

class CountdownTask:
    def __init__(self):
        self._running = RawValue(ctypes.c_bool, True)
    def terminate(self):
        self._running.value = False
    def run(self, n):
        while self._running.value and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

c = CountdownTask()
t = Process(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join() # Wait for actual termination (if needed)

线程池原理

线程池的原理

YPT9vT

特点

优点

协程的原理以及代码

【2021 最新版】Python 并发编程实战,用多线程、多进程、多协程加速程序运行_哔哩哔哩_bilibiliopen in new window

协程的原理

协程:在单线程内实现并发

单线程爬虫执行路径

dFUEmJ

协程爬虫执行路径

OSigeS

协程的使用以及异步 IO

import asyncio

# 获取事件循环(超级循环)
loop = asyncio.get_event_loop()

# 定义协程
async def myfunc(url):
		await get_url(url)

# 创建task列表
tasks = [loop.create_task(myfunc(url))
					for url in urls]

# 执行爬虫事件列表
loop.run_until_complete(asyncio.wait(tasks))

关键字async:表示定义一个协程。

关键字await:表示后面的函数是一个异步 IO,并且不进行阻塞,而是直接在超级循环直接进入下一个任务的执行(当前任务放弃 CPU,下一个任务获得 CPU)。

注意

异步 IO 中依赖的库必须支持异步 IO 特性(这要求库在 IO 时不能阻塞,否则切换不到下一个任务了)

注意

requests 库不支持异步 IO,需要使用 aiohttp 库

例子:

import asyncio, aiohtttp

async def async_craw(url):
		async with aiohttp.ClientSession() as sess:
				async with sess.get(url) as resp:
						result = await resp.text()
						print(url, len(result))

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url))
					for url in urls]
loop.run_until_complete(asyncio.wait(tasks))

所有的异步对象也需要使用 async 关键字来标注。

相关信息

协程与普通函数运行的不同点在于协程需要使用超级循环来进行调度。

协程并发度的控制

可以使用 信号量 Semaphore 来进行控制:

import asyncio, aiohtttp

semaphore = asyncio.Semaphore(10)
async def async_craw(url):
		async with semaphore:  # here
				async with aiohttp.ClientSession() as sess:
						async with sess.get(url) as resp:
								result = await resp.text()
								print(url, len(result))

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url))
					for url in urls]
loop.run_until_complete(asyncio.wait(tasks))

信号量 Semaphore

相关信息

信号量一般用于访问有限数量的共享资源。

信号量是一个同步对象,用于保持在 0 至指定最大值之间的一个计数器。

# 方法一:使用with
sem = asyncio.Semaphore(10)

# ...later
async with sem:
		# work with shared resource
# 方法二:手动acquire、release
sem = asyncio.Semaphore(10)

# ...later
await sem.acquire()
try:
		# work with shared resource
finally:
		sem.release()
评论
Powered by Waline v2.6.3