文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

python操作RabbitMq的三种工作模式是什么

2023-06-30 00:23

关注

这篇“python操作RabbitMq的三种工作模式是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python操作RabbitMq的三种工作模式是什么”文章吧。

一、简介:

  RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信。而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一。

RabbitMq 应用场景广泛:

二、RabbitMq 生产和消费

生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列

import pikaimport jsoncredentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel=connection.channel()# 声明消息队列,消息将在这个队列传递,如不存在,则创建result = channel.queue_declare(queue = 'python-test')for i in range(10):    message=json.dumps({'OrderId':"1000%s"%i})# 向队列插入数值 routing_key是队列名    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)    print(message)connection.close()

消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息

import pikacredentials = pika.PlainCredentials('shampoo', '123456')connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel = connection.channel()# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列channel.queue_declare(queue = 'python-test', durable = False)# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())# 告诉rabbitmq,用callback来接收消息channel.basic_consume('python-test',callback)# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理channel.start_consuming()

三、RabbitMq 持久化

MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。

queue 声明持久化

# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储result = channel.queue_declare(queue = 'python-test',durable = True)

exchange 声明持久化

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = 'python-test', durable = True)

注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。

消息持久化

虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,                          properties=pika.BasicProperties(delivery_mode = 2))

acknowledgement 消息不丢失

消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

channel.basic_consume(callback,queue = 'python-test',# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      no_ack = False)

三、RabbitMq 发布与订阅

rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:

python操作RabbitMq的三种工作模式是什么

Exchange 一共有三种工作模式:fanout, direct, topicd

模式一:fanout

这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。

发布者:

import pikaimport jsoncredentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel=connection.channel()# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')for i in range(10):    message=json.dumps({'OrderId':"1000%s"%i})# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置    channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,                          properties=pika.BasicProperties(delivery_mode = 2))    print(message)connection.close()

订阅者:

import pikacredentials = pika.PlainCredentials('shampoo', '123456')connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel = connection.channel()# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除result = channel.queue_declare('',exclusive=True)# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去channel.queue_bind(exchange = 'python-test',queue = result.method.queue)# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      auto_ack = False)channel.start_consuming()

模式二:direct

这种工作模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。

发布者:

import pikaimport jsoncredentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel=connection.channel()# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')for i in range(10):    message=json.dumps({'OrderId':"1000%s"%i})# 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化    channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,                          properties=pika.BasicProperties(delivery_mode = 2))    print(message)connection.close()

消费者:

import pikacredentials = pika.PlainCredentials('shampoo', '123456')connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))channel = connection.channel()# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除result = channel.queue_declare('',exclusive=True)# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())#channel.basic_qos(prefetch_count=1)# 告诉rabbitmq,用callback来接受消息channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      auto_ack = False)channel.start_consuming()

模式三:topicd

  这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。

举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,就不贴出来了。

以上就是关于“python操作RabbitMq的三种工作模式是什么”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯