文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python操作Rabbit MQ的5种

2023-01-30 22:14

关注

python版本:   2.7.14

一 消息生产者代码:

 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
View Code

 

二 消息消费者代码:

 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
View Code

 

三 图片

 

 

 

官网参考文档: http://www.rabbitmq.com/getstarted.html

 

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯