RocketMq的简单demo

首先要确定RocketMq安装成功,环境配置ok,并且启动了。

参考:Rocketmq安装与环境配置_小小舍的博客-CSDN博客

依赖:

org.apache.rocketmqrocketmq-client4.3.0

生产者:

package com.xxs.rocketmqdemo.rocket;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.Date;/*** Created by xxs on 2021/12/15 18:16** @Description*/
public class Producer {public static void main(String[] args) throws MQClientException {//创建一个消息生产者,传入的是消息组名称DefaultMQProducer producer = new DefaultMQProducer("rmq-group");//输入nameserver服务的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.setInstanceName("producer");//启动生产者producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);  //每秒发送一次MQ//创建消息Message msg = new Message("Topic-test",// topic"testTag",// tag(new Date() + " RocketMQ test msg " + i).getBytes()// body);//发送,返回结果对象SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId()); //消息idSystem.out.println(sendResult.getMessageQueue()); //队列信息System.out.println(sendResult.getSendStatus());  //发送结果System.out.println(sendResult.getOffsetMsgId()); //下一个要消费的消息的偏移量System.out.println(sendResult.getQueueOffset());  //队列消息偏移量System.out.println();System.out.println("================================================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

消费者:

package com.xxs.rocketmqdemo.rocket;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by xxs on 2021/12/15 18:15** @Description*/
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setInstanceName("consumer");//订阅某个主题,然后使用tag过滤消息,不过滤可以用*代表consumer.subscribe("Topic-test", "testTag");//注册监听回调实现类来处理broker推送过来的消息,MessageListenerConcurrently是并发消费consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) {for (MessageExt msg : messages) {System.out.println(msg.getMsgId() + " ===== " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//消费者启动完成System.out.println("Consumer Started.");}}

遇到问题:

运行报错:No route info of this topic

解决:依赖的版本要和本地rocketmq一致

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部