文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

简单有效上手Python3异步asyncio问题

2023-01-03 18:01

关注

Python3异步asyncio问题

官方文档:

https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run

看了一大堆相关的资料和教程,针对的Python版本不同,写法也各不一致,翻了翻官方的文档,发现其实越高版本的Python对异步进行封装的越方便,官方说法叫高层级API,甚至都不用去理解什么Future\task\loop之类的概念了,我现在用的是Python 3.7.5,可以这样很简单的实现阻塞等待\异步并发:如果没有复杂需求的话,用高层级API就可以了。

如果涉及到回调的话貌似还得用低层级的API,后面单独记录。

import asyncio


async def first():
    print('first函数调用开始,下面将会等待3秒模拟函数运行完毕')
    await asyncio.sleep(3)
    print('first函数执行完毕')


async def last():
    print('last函数调用开始')
    await asyncio.sleep(2)
    print('last函数执行完毕')


async def func(delay):
    print('开始异步同时执行的函数+延迟: ' + str(delay))
    await asyncio.sleep(delay)
    print('--异步函数执行完毕+延迟: ' + str(delay))


async def main():
    await first()  # 这里先调用first()函数,并且等它执行完了才会开始
    await asyncio.gather(
        func(1),
        func(2),
        func(3)
    )
    await last()


asyncio.run(main())

上面代码实际执行的过程是:

官方文档中给的建议是只创建一个主入口的main()函数(当然这个函数名可以自定义的),将要调用的其他函数都放在这个函数中,然后再使用asyncio.run()启动,理想情况下应当只被调用一次.

上图:

更新

上面所谓的高阶层API用法最后一行asyncio.run(main())和下面使用低阶层API实现效果是一样的:

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)

下面是学习过程中记录的偏低层实现的资料

最基本的定义和应用

import asyncio

# 定义一个可以异步调用的函数,其类型为coroutine
async def func1():
    pass

if __name__ == '__main__':
    loop = asyncio.get_event_loop()	# 定义一个用来循环异步函数的loop对象
    task = asyncio.ensure_future(func1())	# 创建一个调用异步函数的任务,task类型为future
    # task = loop.create_task(func1())	# 使用loop的.create_task()创建任务,效果一样
    loop.run_until_complete(task)	# 开始在loop循环中执行异步函数,直到该函数运行完毕

asyncio.ensure_future(coroutine)loop.create_task(coroutine)都可以创建一个taskrun_until_complete的参数是一个futrue对象。

当传入一个协程,其内部会自动封装成task,task是Future的子类。

isinstance(task, asyncio.Future)将会输出True。


future类型的任务可以在loop.run_until_complete中执行,也可以直接用await+任务变量名阻塞?调用

什么时候使用异步

import asyncio

# 这是一个耗时很少的异步函数
async def msg(text):	
    await asyncio.sleep(0.1)
    print(text)

# 这是一个耗时较长的异步函数
async def long_operation():
    print('long_operation started')
    await asyncio.sleep(3)
    print('long_operation finished')

# 主函数部分,同样需要声明为async异步类型
async def main():
    await msg('first')
    # 现在需要调用一个耗时较长的函数操作,不希望阻塞的等待它执行完毕
    # 希望long_operation在开始执行后,立即调用msg,这里就可以将long_operation封装到task任务中
    task = asyncio.ensure_future(long_operation())
    await msg('second')
    # 开始task中的long_operation函数
    await task
    # task执行完毕后会继续下面的代码
    print('All done.')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

运行结果:

并发和并行

并发和并行一直是容易混淆的概念。并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。

用上课来举例就是,并发情况下是一个老师在同一时间段辅助不同的人功课。

并行则是好几个老师分别同时辅助多个学生功课。

简而言之就是一个人同时吃三个馒头还是三个人同时分别吃一个的情况,吃一个馒头算一个任务。

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。

创建多个协程的列表,然后将这些协程注册到事件循环中。

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)

结果如下

Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.003541946411133

总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。

asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

异步结果回调

找了个别人写的例子,大致理解了下实现过程:

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
#task = asyncio.ensure_future(coroutine)	# 貌似和上面用loop创建任务效果一样
task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', now() - start)

这里需要注意,在使用低层级API手动创建异步任务的时候,不能同时使用高层级API的简单操作了,比如这里创建task任务对象的时候,就不能用asyncio.create_task(),否则会找不到loop对象,返回下面的错误

RuntimeError: no running event loop

翻了一下asyncio\tasks.py源代码,原来所谓的高层级API就是这么简单的封装啊…

调用的时候在函数内部又定义了一遍loop

def create_task(coro):
    """Schedule the execution of a coroutine object in a spawn task.

    Return a Task object.
    """
    loop = events.get_running_loop()
    return loop.create_task(coro)

我自己写的例子

创建4个异步任务同时开始执行,每个任务执行完成后将结果追加到result_list数组变量中.

import asyncio

result_list = []


async def fun(var):
    return var + 1


def callbackFun(future):
    result_list.append(future.result())


task_list = []

for i in range(1, 5):
    cor = fun(i)
    task = asyncio.ensure_future(cor)
    task.add_done_callback(callbackFun)
    task_list.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
print(result_list)

当然,如果不需要使用回调函数,而是等所有提交的异步任务都执行完成后获取它们的结果,可以这样写:

# 前面省略
loop.run_until_complete(asyncio.wait(task_list))
# task_list中的每个任务执行完成后可以调用它的.result()方法来获取结果
for task in task_list:
    print('每个task执行完的结果:', task.result())

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯