文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python进程间通讯与进程池超详细讲解

2022-12-23 12:00

关注

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

get([block[, timeout]]):读取并删除一个元素;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

进程间的Pipe基于fork机制建立:

def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

multiprocessing.Pool([processes[, initializer[, initargs]]])

Pool类中主要方法:

def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()

到此这篇关于Python进程间通讯与进程池超详细讲解的文章就介绍到这了,更多相关Python进程间通讯内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯