作者选择了COVID-19 救济基金来接受捐赠,作为Write for DOnations计划的一部分。
介绍
Python 线程是一种并行形式,它允许您的程序一次运行多个过程。Python 中的并行也可以使用多个进程来实现,但线程特别适合加速涉及大量 I/O(输入/输出)的应用程序。
示例I/O 绑定操作包括发出 Web 请求和从文件读取数据。与受 I/O 限制的操作相比,受CPU 限制的操作(例如使用 Python 标准库执行数学运算)不会从 Python 线程中受益太多。
Python 3 包含ThreadPoolExecutor
用于在线程中执行代码的实用程序。
在本教程中,我们将使用ThreadPoolExecutor
方便地发出网络请求。我们将定义一个非常适合在线程内调用的ThreadPoolExecutor
函数,用于执行该函数,并处理这些执行的结果。
在本教程中,我们将发出网络请求以检查维基百科页面的存在。
注意: I/O 密集型操作比 CPU 密集型操作从线程中获益更多的事实是由 Python 中称为全局解释器锁的特性引起的。如果您愿意,可以在官方 Python 文档 中了解有关 Python 全局解释器锁的更多信息。
先决条件
为了充分利用本教程,建议您熟悉 Python 编程并requests
安装本地 Python 编程环境。
您可以查看这些教程以获取必要的背景信息:
- 如何在 Python 3 中编码
-
要将
requests
软件包安装到本地 Python 编程环境中,您可以运行以下命令:
- pip install --user requests==2.23.0
第 1 步 – 定义要在线程中执行的函数
让我们首先定义一个我们希望在线程的帮助下执行的函数。
使用nano
或您喜欢的文本编辑器/开发环境,您可以打开此文件:
- nano wiki_page_function.py
在本教程中,我们将编写一个函数来确定维基百科页面是否存在:
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
该get_wiki_page_existence
函数接受两个参数:一个指向维基百科页面的 URL ( wiki_page_url
),以及timeout
等待该 URL 响应的秒数。
get_wiki_page_existence
使用该requests
包向该 URL 发出 Web 请求。根据HTTP的状态代码,response
返回一个描述页面是否存在的字符串。不同的状态码代表 HTTP 请求的不同结果。此过程假设200
“成功”状态代码表示维基百科页面存在,404
“未找到”状态代码表示维基百科页面不存在。
如先决条件部分所述,您需要requests
安装该软件包才能运行此功能。
让我们尝试通过在url
函数后面添加和函数调用来运行该get_wiki_page_existence
函数:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
添加代码后,保存并关闭文件。
如果我们运行这段代码:
- python wiki_page_function.py
我们将看到如下输出:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
get_wiki_page_existence
使用有效的维基百科页面调用该函数会返回一个字符串,该字符串确认该页面确实存在。
警告:通常,在不特别注意避免并发错误的情况下,在线程之间共享 Python 对象或状态是不安全的。在定义要在线程中执行的函数时,最好定义一个执行单个作业且不与其他线程共享或发布状态的函数。get_wiki_page_existence
是这种函数的一个例子。
步骤 2 — 使用 ThreadPoolExecutor 在线程中执行函数
现在我们有一个非常适合线程ThreadPoolExecutor
调用的函数,我们可以方便地使用该函数执行多次调用。
让我们将以下突出显示的代码添加到您的程序中wiki_page_function.py
:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
让我们来看看这段代码是如何工作的:
concurrent.futures
导入是为了让我们可以访问ThreadPoolExecutor
.- 一个
with
语句用于创建一个ThreadPoolExecutor
实例executor
,将及时完成后清理线程。 - 四个作业
submitted
的executor
:每个在URL的wiki_page_urls
列表。 - 每次调用都会
submit
返回一个存储在列表中的Future
实例futures
。 - 该
as_completed
函数等待每个Future
get_wiki_page_existence
调用完成,以便我们可以打印其结果。
如果我们再次运行这个程序,使用以下命令:
- python wiki_page_function.py
我们将看到如下输出:
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
这个输出是有道理的:3 个 URL 是有效的 Wikipedia 页面,其中一个this_page_does_not_exist
不是。请注意,您的输出可能与此输出的排序不同。concurrent.futures.as_completed
此示例中的函数会在结果可用时立即返回结果,而不管作业以何种顺序提交。
第 3 步 – 处理线程中运行的函数的异常
在上一步中,get_wiki_page_existence
成功地为我们所有的调用返回了一个值。在这一步中,我们将看到 也ThreadPoolExecutor
可以引发线程函数调用中生成的异常。
让我们考虑以下示例代码块:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
此代码块与我们在第 2 步中使用的代码块几乎相同,但有两个主要区别:
- 我们现在传递
timeout=0.00001
到get_wiki_page_existence
. 由于该requests
包将无法在0.00001
几秒钟内完成其对 Wikipedia 的 Web 请求,因此将引发ConnectTimeout
异常。 - 每次我们捕获
ConnectTimeout
由 引发的异常future.result()
并打印出一个字符串。
如果我们再次运行该程序,我们将看到以下输出:
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout
打印了4 条消息——我们的 4 条消息中的每一条都打印了一条wiki_page_urls
,因为它们都无法在0.00001
几秒钟内完成,而且 4 个get_wiki_page_existence
调用中的每一个都引发了ConnectTimeout
异常。
您现在已经看到,如果提交给 a 的函数调用ThreadPoolExecutor
引发异常,则可以通过调用Future.result
. 调用Future.result
所有提交的调用可确保您的程序不会错过从线程函数引发的任何异常。
第 4 步 – 比较有和没有线程的执行时间
现在让我们验证使用ThreadPoolExecutor
实际上使您的程序更快。
首先,让我们计时,get_wiki_page_existence
如果我们在没有线程的情况下运行它:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
在代码示例中,我们get_wiki_page_existence
使用 50 个不同的 Wikipedia 页面 URL 一一调用我们的函数。我们使用该time.time()
函数打印出运行我们的程序所需的秒数。
如果我们像以前一样再次运行此代码,我们将看到如下输出:
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
为简洁起见,此输出中的条目 2-47 已被省略。
Without threads time
当你在你的机器上运行它时,打印出来的秒数会有所不同——没关系,你只是得到一个基线数字来与使用ThreadPoolExecutor
. 在这种情况下,它是~5.803
几秒钟。
让我们通过 运行相同的 50 个维基百科 URL get_wiki_page_existence
,但这次使用ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
该代码与我们在第 2 步中创建的代码相同,只是添加了一些打印语句,用于显示执行代码所需的秒数。
如果我们再次运行该程序,我们将看到以下内容:
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
同样,之后打印的秒数Threaded time
在您的计算机上会有所不同(输出顺序也是如此)。
您现在可以比较使用和不使用线程获取 50 个 Wikipedia 页面 URL 的执行时间。
在本教程中使用的机器上,没有线程需要~5.803
几秒钟,有线程需要~1.220
几秒钟。我们的程序使用线程运行得更快。
结论
在本教程中,您学习了如何使用ThreadPoolExecutor
Python 3 中的实用程序高效地运行受 I/O 限制的代码。您创建了一个非常适合在线程内调用的函数,学习了如何从该函数的线程执行中检索输出和异常,并观察到使用线程获得的性能提升。
从这里您可以了解有关该concurrent.futures
模块提供的其他并发功能的更多信息。