导入concurrent.futures.ThreadPoolExecutor
import concurrent.futures
concurrent.futures模块详解
这个模块是python并发执行的标准库,具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。
- 模块组成
1、concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
2、submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
3、map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
4、shutdown(Wait=True): 发出让执行者释放所有资源的信号。
5、concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。
Executor是抽象类(父类),可以通过子类访问,即线程或进程的 ExecutorPools 。ThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。
源码函数分析
init
class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): # max_workers参数为空时,默认为机器处理器个数+4,最大值为32 # thread_name_prefix 线程可选名称前缀 # initializer 初始化工作线程使,指定的可调用函数 # initargs 传给可调用函数的参数元组 """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: # ThreadPoolExecutor is often used to: # * CPU bound task which releases GIL # * I/O bound task (which releases GIL, of course) # # We use cpu_count + 4 for both types of tasks. # But we limit it to 32 to avoid consuming surprisingly large resource # on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._max_workers = max_workers self._work_queue = queue.SimpleQueue() self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) self._initializer = initializer self._initargs = initargs
- 举例
# demo1import concurrent.futuresimport urllib.requestURLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contentsdef load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
# demo2import osimport randomimport threadingimport requests as rqimport timefrom threading import Thread, Lockfrom queue import Queue # 用于多线程之间线程安全的数据通信from concurrent.futures import ThreadPoolExecutor, as_completedpool = ThreadPoolExecutor()'''利用线程池对I/O密集型任务进行优化with ThreadPoolExecutor() as pool: futures = [pool.submit(craw, url) for url in urls] for future in futures:#as_completed后的结果顺序是不固定的 print(future.result()) html_queue.put(future.result())'''def event_1(): print("event_1 started") time.sleep(1) print("event_1 ended") return 1def event_2(): print("event_2 started") time.sleep(2) print("event_2 ended") return 2def event_3(): print("event_3 started") time.sleep(3) print("event_3 ended") return 3def main(): t0 = time.time() res1 = pool.submit(event_1) res2 = pool.submit(event_2) res3 = pool.submit(event_3) print(res1.result()) print(res2.result()) print(res3.result()) t1 = time.time() print(t1 - t0)if __name__ == '__main__': main()
submit
submit(*args, **kwargs)
安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
map
-
map(func, *iterables, timeout=None, chunksize=1)
-
类似内置函数 map(func, *iterables),但是有两点不同:
1、立即获取 iterables 而不会惰性获取;
2、异步执行 func,并支持多次并发调用。
它返回一个迭代器。 -
从调用 Executor.map() 开始的 timeout 秒之后,如果在迭代器上调用了 next() 并且无可用结果的话,迭代器会抛出 concurrent.futures.TimeoutError 异常。
-
timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间。
-
如果 func 调用抛出了异常,那么该异常会在从迭代器获取值的时候抛出。
-
当使用 ProcessPoolExecutor 的时候,这个方法会把 iterables 划分成多个块,作为独立的任务提交到进程池。这些块的近似大小可以通过给 chunksize 指定一个正整数。对于很长的 iterables,使用较大的 chunksize 而不是采用默认值 1,可以显著提高性能。对于 ThreadPoolExecutor,chunksize 不起作用。
不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。
def xiaotu_receive_thread(self, *phone): for i in phone: res = self.number_parameter(i) userid, nickname, token, sign = res print(res) s = f"{token['S']}" t = f"{token['T']}" c_list = (s, t, userid) cooke_list = [c_list] * 10 with concurrent.futures.ThreadPoolExecutor() as pool: pool.map(AccountNumber().xiaotu_receive, cooke_list)
shutdown(wait=True)
- 告诉执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源。
- 在 shutdown 之后再调用 Executor.submit() 和 Executor.map() 会报运行时错误 RuntimeError。
- 如果 wait 为 True,那么这个方法会在所有等待的 future 都执行完毕,并且属于执行器 executor 的资源都释放完之后才会返回。
- 如果 wait 为 False,本方法会立即返回。属于执行器的资源会在所有等待的 future 执行完毕之后释放。
- 不管 wait 取值如何,整个 Python 程序在等待的 future 执行完毕之前不会退出。
- 你可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。
执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。
import shutilwith ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
ProcessPoolExecutor 进程池执行器
-
ProcessPoolExecutor 使用了 multiprocessing 模块,这允许它可以规避 Global Interpreter Lock,但是也意味着只能执行和返回可序列化的(picklable)对象。
-
main 模块必须被 worker 子进程导入,这意味着 ProcessPoolExecutor 在交互解释器中无法工作。
-
在已经被提交到 ProcessPoolExecutor 中的可调用对象内使用 Executor 或者 Future 方法会导致死锁。
-
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
1、这个 Executor 子类最多用 max_workers 个进程来异步执行调用。
2、如果不指定 max_workers 或者为 None,它默认为本机的处理器数量。
3、如果 max_workers 小于等于 0,会抛出 ValueError 异常。
4、mp_context 是多进程上下文(multiprocessing context)或者 None,它会被用来启动 workers。
5、如果不指定 mp_context 或者为 None,会使用默认的多进程上下文环境。 -
initializer 是一个可选的可调用对象,会在每个 worker 进程启动之前调用。
-
initargs 是传递给 initializer 的参数元组。
-
如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenProcessPool 异常,继续提交 submit 任务也会抛出此异常。
ProcessPoolExecutor使用实例
import concurrent.futuresimport mathPRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419]def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return Truedef main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime))if __name__ == '__main__': main()
来源地址:https://blog.csdn.net/weixin_43587784/article/details/129167145