python 消息队列如何接收处理_python使用消息队列RabbitMq(进阶)

b9a7737b25d6eacbe60e5edf6426e28b892.jpg

0075cd5e482f09147d5a5a9fe20a4fd51e1.jpg

importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()#声明queue

channel.queue_declare(queue='hello')#RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')print("[x] Sent 'Hello World!'")

connection.close()

发送

f07e230758a85b73e9959725ec370f48812.jpg

58bfce4cb8f6dded676e27e7b4e932ce40e.jpg

__author__ = 'hardy'

importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()#You may ask why we declare the queue again ‒ we have already declared it in our previous code.#We could avoid that if we were sure that the queue already exists. For example if send.py program#was run before. But we're not yet sure which program to run first. In such cases it's a good#practice to repeat declaring the queue in both programs.

channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)

channel.basic_consume(callback,

queue='hello',

no_ack=True)print('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

接收

消息队列的发送端流程

1、连接

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

2、声明queue

channel.queue_declare(queue='hello')

队列持久化

channel.queue_declare(queue='hello', durable=True)

3、发送消息

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')

消息持久化(必须队列持久化)

channel.basic_publish(exchange='',

routing_key="hello",

body=message,

properties=pika.BasicProperties(

delivery_mode = 2, # make message persistent

))

4、关闭

connection.close()

消息队列接收端流程

1、连接

connection = pika.BlockingConnection(pika.ConnectionParameters(

'localhost'))

channel = connection.channel()

2、声明queue

channel.queue_declare(queue='hello')

3、创建回调函数(处理数据)

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

4、设置

channel.basic_consume(callback,

queue='hello',

no_ack=True)

5、开始接收数据

channel.start_consuming()

6、确认消息被消费

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

time.sleep(body.count(b'.'))

print(" [x] Done")

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,

queue='task_queue',

no_ack=True #no_ack=True消息不需要确认,默认no_ack=false,消息需要确认

)


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部