如何在 Python 3 中使用 ThreadPoolExecutor

作者选择了COVID-19 救济基金来接受捐赠,作为Write for DOnations计划的一部分。

介绍

Python 线程是一种并行形式,它允许您的程序一次运行多个过程。Python 中的并行也可以使用多个进程来实现,但线程特别适合加速涉及大量 I/O(输入/输出)的应用程序。

示例I/O 绑定操作包括发出 Web 请求和从文件读取数据。与受 I/O 限制的操作相比,受CPU 限制的操作(例如使用 Python 标准库执行数学运算)不会从 Python 线程中受益太多。

Python 3 包含ThreadPoolExecutor用于在线程中执行代码实用程序。

在本教程中,我们将使用ThreadPoolExecutor方便地发出网络请求。我们将定义一个非常适合在线程内调用的ThreadPoolExecutor函数,用于执行该函数,并处理这些执行的结果。

在本教程中,我们将发出网络请求以检查维基百科页面的存在

注意: I/O 密集型操作比 CPU 密集型操作从线程中获益更多的事实是由 Python 中称为全局解释器锁的特性引起的如果您愿意,可以在官方 Python 文档 中了解有关 Python 全局解释器锁的更多信息

先决条件

为了充分利用本教程,建议您熟悉 Python 编程并requests安装本地 Python 编程环境

您可以查看这些教程以获取必要的背景信息:

  • pip install --user requests==2.23.0

第 1 步 – 定义要在线程中执行的函数

让我们首先定义一个我们希望在线程的帮助下执行的函数。

使用nano或您喜欢的文本编辑器/开发环境,您可以打开此文件:

  • nano wiki_page_function.py

在本教程中,我们将编写一个函数来确定维基百科页面是否存在:

wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

get_wiki_page_existence 函数接受两个参数:一个指向维基百科页面的 URL ( wiki_page_url),以及timeout等待该 URL 响应的秒数。

get_wiki_page_existence使用该requests包向该 URL 发出 Web 请求。根据HTTP状态代码response返回一个描述页面是否存在的字符串。不同的状态码代表 HTTP 请求的不同结果。此过程假设200“成功”状态代码表示维基百科页面存在,404“未找到”状态代码表示维基百科页面不存在。

如先决条件部分所述,您需要requests安装软件包才能运行此功能。

让我们尝试通过在url函数后面添加和函数调用来运行get_wiki_page_existence函数:

wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

添加代码后,保存并关闭文件。

如果我们运行这段代码:

  • python wiki_page_function.py

我们将看到如下输出:

Output
https://en.wikipedia.org/wiki/Ocean - exists

get_wiki_page_existence使用有效的维基百科页面调用该函数会返回一个字符串,该字符串确认该页面确实存在。

警告:通常,在不特别注意避免并发错误的情况下,在线程之间共享 Python 对象或状态是不安全的。在定义要在线程中执行的函数时,最好定义一个执行单个作业且不与其他线程共享或发布状态的函数。get_wiki_page_existence是这种函数的一个例子。

步骤 2 — 使用 ThreadPoolExecutor 在线程中执行函数

现在我们有一个非常适合线程ThreadPoolExecutor调用的函数,我们可以方便地使用该函数执行多次调用。

让我们将以下突出显示的代码添加到您的程序中wiki_page_function.py

wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

让我们来看看这段代码是如何工作的:

  • concurrent.futures导入是为了让我们可以访问ThreadPoolExecutor.
  • 一个with语句用于创建一个ThreadPoolExecutor实例executor,将及时完成后清理线程。
  • 四个作业submittedexecutor:每个在URL的wiki_page_urls列表。
  • 每次调用都会submit返回一个存储在列表中Future实例futures
  • as_completed函数等待每个Future get_wiki_page_existence调用完成,以便我们可以打印其结果。

如果我们再次运行这个程序,使用以下命令:

  • python wiki_page_function.py

我们将看到如下输出:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

这个输出是有道理的:3 个 URL 是有效的 Wikipedia 页面,其中一个this_page_does_not_exist不是。请注意,您的输出可能与此输出的排序不同。concurrent.futures.as_completed此示例中函数会在结果可用时立即返回结果,而不管作业以何种顺序提交。

第 3 步 – 处理线程中运行的函数的异常

在上一步中,get_wiki_page_existence成功地为我们所有的调用返回了一个值。在这一步中,我们将看到 也ThreadPoolExecutor可以引发线程函数调用中生成的异常。

让我们考虑以下示例代码块:

wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

此代码块与我们在第 2 步中使用的代码块几乎相同,但有两个主要区别:

  • 我们现在传递timeout=0.00001get_wiki_page_existence. 由于该requests包将无法在0.00001几秒钟内完成其对 Wikipedia 的 Web 请求,因此将引发ConnectTimeout异常。
  • 每次我们捕获ConnectTimeout由 引发的异常future.result()并打印出一个字符串。

如果我们再次运行该程序,我们将看到以下输出:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

ConnectTimeout打印了4 条消息——我们的 4 条消息中的每一条都打印了一条wiki_page_urls,因为它们都无法在0.00001几秒钟内完成,而且 4 个get_wiki_page_existence调用中的每一个都引发了ConnectTimeout异常。

您现在已经看到,如果提交给 a 的函数调用ThreadPoolExecutor引发异常,则可以通过调用Future.result. 调用Future.result所有提交的调用可确保您的程序不会错过从线程函数引发的任何异常。

第 4 步 – 比较有和没有线程的执行时间

现在让我们验证使用ThreadPoolExecutor实际上使您的程序更快。

首先,让我们计时,get_wiki_page_existence如果我们在没有线程的情况下运行它:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

在代码示例中,我们get_wiki_page_existence使用 50 个不同的 Wikipedia 页面 URL 一一调用我们的函数。我们使用该time.time()函数打印出运行我们的程序所需的秒数。

如果我们像以前一样再次运行此代码,我们将看到如下输出:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

为简洁起见,此输出中的条目 2-47 已被省略。

Without threads time当你在你的机器上运行它时,打印出来的秒数会有所不同——没关系,你只是得到一个基线数字来与使用ThreadPoolExecutor. 在这种情况下,它是~5.803几秒钟。

让我们通过 运行相同的 50 个维基百科 URL get_wiki_page_existence,但这次使用ThreadPoolExecutor

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

该代码与我们在第 2 步中创建的代码相同,只是添加了一些打印语句,用于显示执行代码所需的秒数。

如果我们再次运行该程序,我们将看到以下内容:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

同样,之后打印的秒数Threaded time在您的计算机上会有所不同(输出顺序也是如此)。

您现在可以比较使用和不使用线程获取 50 个 Wikipedia 页面 URL 的执行时间。

在本教程中使用的机器上,没有线程需要~5.803几秒钟,有线程需要~1.220几秒钟。我们的程序使用线程运行得更快。

结论

在本教程中,您学习了如何使用ThreadPoolExecutorPython 3 中实用程序高效地运行受 I/O 限制的代码。您创建了一个非常适合在线程内调用的函数,学习了如何从该函数的线程执行中检索输出和异常,并观察到使用线程获得的性能提升。

从这里您可以了解有关该concurrent.futures模块提供的其他并发功能的更多信息

觉得文章有用?

点个广告表达一下你的爱意吧 !😁