python版本: 2.7.14
一 消息生产者代码:
1 # -*- coding: utf-8 -*-
2
3 import json
4 import pika
5 import urllib
6 import urllib2
7 import chardet
8 import sys
9 import json
10 from common import CommonMethod
11 import pika
12 import time
13
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17
18 # 1."Hello World!"
19 def hello_world():
20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22 channel = connection.channel()
23
24 channel.queue_declare(queue='hello')
25 channel.basic_publish(exchange='',
26 routing_key='hello', # specify queue name
27 body='Hello World!')
28 print(" [x] Sent 'Hello World!'")
29 connection.close()
30
31 # 2."Work queues"
32 def new_task():
33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35 channel = connection.channel()
36
37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化
38 message = ' '.join(sys.argv[1:]) or "Hello World!"
39 channel.basic_publish(exchange='',
40 routing_key='task_queue',
41 body=message,
42 properties=pika.BasicProperties(
43 delivery_mode = 2, # 设置消息持久化
44 ))
45 print(" [x] Sent %r" % message)
46 connection.close()
47
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52 channel = connection.channel()
53
54 channel.exchange_declare(exchange='logs', # 申明logs交换机
55 exchange_type='fanout') # 交换机类型: 发布/订阅
56
57 channel.basic_publish(exchange='logs',
58 routing_key='',
59 body=message)
60 print(" [x] Sent %r" % message)
61 connection.close()
62
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67 channel = connection.channel()
68
69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70 exchange_type='direct') # 交换机类型: 路由(Routing)
71
72 channel.basic_publish(exchange='direct_logs',
73 routing_key=log_level,
74 body=message)
75 print(" [x] Sent %r:%r" % (log_level, message))
76 connection.close()
77
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
二 消息消费者代码:
1 # -*- coding: utf-8 -*-
2
3 import json
4 import pika
5 import urllib
6 import urllib2
7 import chardet
8 import sys
9 import json
10 from common import CommonMethod
11 import pika
12 import time
13
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17
18 # 1."Hello World!"
19 def hello_world():
20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22 channel = connection.channel()
23
24 channel.queue_declare(queue='hello')
25 channel.basic_publish(exchange='',
26 routing_key='hello', # specify queue name
27 body='Hello World!')
28 print(" [x] Sent 'Hello World!'")
29 connection.close()
30
31 # 2."Work queues"
32 def new_task():
33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35 channel = connection.channel()
36
37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化
38 message = ' '.join(sys.argv[1:]) or "Hello World!"
39 channel.basic_publish(exchange='',
40 routing_key='task_queue',
41 body=message,
42 properties=pika.BasicProperties(
43 delivery_mode = 2, # 设置消息持久化
44 ))
45 print(" [x] Sent %r" % message)
46 connection.close()
47
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52 channel = connection.channel()
53
54 channel.exchange_declare(exchange='logs', # 申明logs交换机
55 exchange_type='fanout') # 交换机类型: 发布/订阅
56
57 channel.basic_publish(exchange='logs',
58 routing_key='',
59 body=message)
60 print(" [x] Sent %r" % message)
61 connection.close()
62
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67 channel = connection.channel()
68
69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70 exchange_type='direct') # 交换机类型: 路由(Routing)
71
72 channel.basic_publish(exchange='direct_logs',
73 routing_key=log_level,
74 body=message)
75 print(" [x] Sent %r:%r" % (log_level, message))
76 connection.close()
77
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
三 图片
官网参考文档: http://www.rabbitmq.com/getstarted.html