文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

concurrent.futures进行

2023-01-30 23:19

关注

Python中进行并发编程一般使用threading和multiprocessing模块,不过大部分的并发编程任务都是派生一系列线程,从队列中收集资源,然后用队列收集结果。在这些任务中,往往需要生成线程池,concurrent.futures模块对threading和multiprocessing模块进行了进一步的包装,可以很方便地实现池的功能。

下载

python3中concurrent.futures是标准库,在python2中还需要自己安装futures:

pip install futures

Executor与Future

concurrent.futures供了ThreadPoolExecutor和ProcessPoolExecutor两个类,都继承自Executor,分别被用来创建线程池和进程池,接受max_workers参数,代表创建的线程数或者进程数。ProcessPoolExecutor的max_workers参数可以为空,程序会自动创建基于电脑cpu数目的进程数。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests

def load_url(url):
    return requests.get(url)

url = 'http://httpbin.org'
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(load_url, url)

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。

print future.done()
print future.result().status_code

Future类似于js中的Promise,可以添加回调函数:

future.add_done_callback(fn)

回调函数fn在future取消或者完成后运行,参数是future本身。

submit()方法只能进行单个任务,用并发多个任务,需要使用map与as_completed。

map

URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']

def load_url(url):
    return requests.get(url)

with ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page status_code %s' % (url, data.status_code))

 结果:

'http://httpbin.org' page status_code 200
'http://example.com/' page status_code 200
'https://api.github.com/' page status_code 200

 map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器。

   由上面可以看出返回结果与序列结果的顺序是一致的

as_completed

  as_completed()方法返回一个Future组成的生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,直到所有的任务结束。

def load_url(url):
    return url, requests.get(url).status_code

with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = [executor.submit(load_url, url) for url in URLS]
    for future in as_completed(tasks):
        print future.result()

 结果:

('http://example.com/', 200)
('http://httpbin.org', 200)
('https://api.github.com/', 200)

 可以看出,结果与序列顺序不一致,先完成的任务会先通知主线程。

wait

   wait方法可以让主线程阻塞,直到满足设定的要求。有三种条件ALL_COMPLETED, FIRST_COMPLETED,FIRST_EXCEPTION。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
from concurrent.futures import as_completed
import requests

URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']

def load_url(url):
    requests.get(url)
    print url

with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = [executor.submit(load_url, url) for url in URLS]
    wait(tasks, return_when=ALL_COMPLETED)
    print 'all_cone'

 返回:

http://example.com/
http://httpbin.org
https://api.github.com/
all_cone

 可以看出阻塞到任务全部完成。

ProcessPoolExecutor

使用ProcessPoolExecutor与ThreadPoolExecutor方法基本一致,注意文档中有一句:

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

需要__main__模块。

def main():
    with ProcessPoolExecutor() as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for f in as_completed(tasks):
            ret = f.done()
            if ret:
                print f.result().status_code

if __name__ == '__main__':
    main()

  

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯