文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

concurrent.futures模块ThreadPoolExecutor、ProcessPoolExecutor讲解及使用实例

2023-09-15 12:08

关注

导入concurrent.futures.ThreadPoolExecutor

import concurrent.futures

concurrent.futures模块详解

这个模块是python并发执行的标准库,具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

Executor是抽象类(父类),可以通过子类访问,即线程或进程的 ExecutorPools 。ThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。

源码函数分析

init

class ThreadPoolExecutor(_base.Executor):    # Used to assign unique thread names when thread_name_prefix is not supplied.    _counter = itertools.count().__next__    def __init__(self, max_workers=None, thread_name_prefix='',                 initializer=None, initargs=()):        # max_workers参数为空时,默认为机器处理器个数+4,最大值为32        # thread_name_prefix  线程可选名称前缀        # initializer  初始化工作线程使,指定的可调用函数        # initargs  传给可调用函数的参数元组        """Initializes a new ThreadPoolExecutor instance.        Args:            max_workers: The maximum number of threads that can be used to                execute the given calls.            thread_name_prefix: An optional name prefix to give our threads.            initializer: A callable used to initialize worker threads.            initargs: A tuple of arguments to pass to the initializer.        """        if max_workers is None:            # ThreadPoolExecutor is often used to:            # * CPU bound task which releases GIL            # * I/O bound task (which releases GIL, of course)            #            # We use cpu_count + 4 for both types of tasks.            # But we limit it to 32 to avoid consuming surprisingly large resource            # on many core machine.            max_workers = min(32, (os.cpu_count() or 1) + 4)        if max_workers <= 0:            raise ValueError("max_workers must be greater than 0")        if initializer is not None and not callable(initializer):            raise TypeError("initializer must be a callable")        self._max_workers = max_workers        self._work_queue = queue.SimpleQueue()        self._idle_semaphore = threading.Semaphore(0)        self._threads = set()        self._broken = False        self._shutdown = False        self._shutdown_lock = threading.Lock()        self._thread_name_prefix = (thread_name_prefix or        ("ThreadPoolExecutor-%d" % self._counter()))        self._initializer = initializer        self._initargs = initargs
# demo1import concurrent.futuresimport urllib.requestURLS = ['http://www.foxnews.com/',        'http://www.cnn.com/',        'http://europe.wsj.com/',        'http://www.bbc.co.uk/',        'http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contentsdef load_url(url, timeout):    with urllib.request.urlopen(url, timeout=timeout) as conn:        return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:    # Start the load operations and mark each future with its URL    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}    for future in concurrent.futures.as_completed(future_to_url):        url = future_to_url[future]        try:            data = future.result()        except Exception as exc:            print('%r generated an exception: %s' % (url, exc))        else:            print('%r page is %d bytes' % (url, len(data)))
# demo2import osimport randomimport threadingimport requests as rqimport timefrom threading import Thread, Lockfrom queue import Queue  # 用于多线程之间线程安全的数据通信from concurrent.futures import ThreadPoolExecutor, as_completedpool = ThreadPoolExecutor()'''利用线程池对I/O密集型任务进行优化with ThreadPoolExecutor() as pool:        futures = [pool.submit(craw, url) for url in urls]        for future in futures:#as_completed后的结果顺序是不固定的            print(future.result())            html_queue.put(future.result())'''def event_1():    print("event_1 started")    time.sleep(1)    print("event_1 ended")    return 1def event_2():    print("event_2 started")    time.sleep(2)    print("event_2 ended")    return 2def event_3():    print("event_3 started")    time.sleep(3)    print("event_3 ended")    return 3def main():    t0 = time.time()    res1 = pool.submit(event_1)    res2 = pool.submit(event_2)    res3 = pool.submit(event_3)    print(res1.result())    print(res2.result())    print(res3.result())    t1 = time.time()    print(t1 - t0)if __name__ == '__main__':    main()

submit

submit(*args, **kwargs)
安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

 with ThreadPoolExecutor(max_workers=1) as executor:    future = executor.submit(pow, 323, 1235)    print(future.result())

map

不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。

    def xiaotu_receive_thread(self, *phone):        for i in phone:            res = self.number_parameter(i)            userid, nickname, token, sign = res            print(res)            s = f"{token['S']}"            t = f"{token['T']}"            c_list = (s, t, userid)        cooke_list = [c_list] * 10        with concurrent.futures.ThreadPoolExecutor() as pool:            pool.map(AccountNumber().xiaotu_receive, cooke_list)

shutdown(wait=True)

执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。

import shutilwith ThreadPoolExecutor(max_workers=4) as e:    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ProcessPoolExecutor 进程池执行器

ProcessPoolExecutor使用实例

import concurrent.futuresimport mathPRIMES = [    112272535095293,    112582705942171,    112272535095293,    115280095190773,    115797848077099,    1099726899285419]def is_prime(n):    if n % 2 == 0:        return False    sqrt_n = int(math.floor(math.sqrt(n)))    for i in range(3, sqrt_n + 1, 2):        if n % i == 0:            return False    return Truedef main():    with concurrent.futures.ProcessPoolExecutor() as executor:        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):            print('%d is prime: %s' % (number, prime))if __name__ == '__main__':    main()

来源地址:https://blog.csdn.net/weixin_43587784/article/details/129167145

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯