rocketmq安装使用教程

rocketmq安装使用教程

本文假设您已经完成了下列前序步骤:

  • docker安装配置

一、docker安装rokcetmq

1.1 安装namesrv

docker pull rocketmqinc/rocketmq:latest
/**- 安装namesrv- -d   # 后台运行- -p   #设置默认端口,这里rocketmq默认9876端口- -v  #设置映射本地目录到容器内的目录,这个注意我都是把本地	的/usr/local/docker/rocketmq/**映射到容器内的对应目录的,这个可以改成你本地的linux目录,当然也可以和我一样。我理解的就是MQ的数据和日志什么的不能放在容器中啊,因为容器毕竟占用的空间有限,就映射一下放在本地目录中。
*/
docker run -d -p 9876:9876 -v /usr/local/docker/rocketmq/data/namesrv/logs:/root/logs -v /usr/local/docker/rocketmq/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:latest sh mqnamesrv

1.2 安装 broker

创建broker配置文件(路径不存在就创建路径):
mkdir -p /usr/local/docker/rocketmq/conf
vi /usr/local/docker/rocketmq/conf/broker.conf
------------------------------------------------------------------
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 39.96.46.193
------------------------------------------------------------------
  • 用broker.conf配置启动容器
docker run -d -p 10911:10911 -p 10909:10909 -v  /usr/local/docker/rocketmq/data/broker/logs:/root/logs -v  /usr/local/docker/rocketmq/data/broker/store:/root/store -v  /usr/local/docker/rocketmq/conf/broker.conf:/opt/rocketmq-latest/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:latest sh mqbroker -c /opt/rocketmq-latest/conf/broker.conf

1.3 安装 rocketmq 控制台

  • 拉取镜像
docker pull pangliang/rocketmq-console-ng
  • 后台启动
docker run -d -e "JAVA_OPTS=-Drocketmq.namesrv.addr=39.96.46.193:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t pangliang/rocketmq-console-ng
  • 检查镜像启动情况
docker ps -a

在这里插入图片描述
注意注意:一定要开启防火墙的相应端口
通过看上面的配置需要开启9876、10911、10909、18080四个端口号

1.4 查看web控制台

打开浏览器访问39.96.46.193:8080 注意访问你linux的ip和上面rocketmq控制台映射的8080端口。
在这里插入图片描述
这里就不过多介绍控制台的使用了,想了解的可以自己查找资料。

二、rocketmq代码使用

2.1 springboot整合rocketmq

  • pom.xml
        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency>
  • application.yml
#name-server
rocketmq:name-server: 39.96.46.193:9876producer:group: mygroupmyrocketmq-config:my-topic: my_topicmy-consumer-group: my_consumer_group
  • 生产者
package com.yss.dataMiddle.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.derby.iapi.util.IdUtil;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.io.UnsupportedEncodingException;
import java.util.List;/*** @description:* @author: Han LiDong* @create: 2020/10/13 15:07* @update: 2020/10/13 15:07*/
@RestController
@RequestMapping("/mq")
@Slf4j
public class RocketMqController {@Value("${myrocketmq-config.my-topic}")private String my_topic;@Value("${myrocketmq-config.my-consumer-group}")private String my_consumer_group;@Autowiredprivate RocketMQTemplate rocketMQTemplate;//同步发送@GetMapping("/sync")public void sync(@RequestParam("msg")String msg) throws UnsupportedEncodingException {//生产消息 topic:tag ,消息//SendResult sendResult = rocketMQTemplate.syncSend(my_topic+ ":hld-tag", msg);/***生产数据到指定队列中*/rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {log.info("MessageQueue:{} message:{},o:{} ",list,message,o);//long index = a % list.size();  这里可以根据订单号取模,计算放到哪个队列中return list.get(0); //所有数据都放入第一个队列中}});SendResult sendResult = rocketMQTemplate.syncSendOrderly(my_topic+ ":hld-tag", msg,"hld");//SendResult sendResult = rocketMQTemplate.syncSend(my_topic, msg);System.out.println("同步发送字符串: " + msg + "至topic: "+my_topic+",发送结果: " + sendResult);}//异步发送@GetMapping("/async")public void async(){//生产消息 topic:tag ,消息内容 ,回调函数rocketMQTemplate.asyncSend("test", "Hello world!", new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.println("异步发送成功: "+ var1);}@Overridepublic void onException(Throwable var1) {System.out.println("异步发送失败: "+ var1);}});}//单向发送@GetMapping("/oneway")public void oneway(){//生产消息 topic:tag ,消息内容  没有返回结果rocketMQTemplate.sendOneWay("test", "Hello world!");System.out.println("单向发送");}
}
  • 消费者
/*** consumerGroup:消费组(一个ConsumerGroup中的Consumer实例平均分摊消费消息)* topic :主题* selectorExpression:tag(消费哪个tag的消息)*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${myrocketmq-config.my-consumer-group}", topic = "${myrocketmq-config.my-topic}",selectorExpression= "hld-tag")
public class MqConsumer implements  RocketMQListener<String> {@Value("${server.port}")private String port;@Overridepublic void onMessage(String message) {log.info("port:{} consume收到消息:{} ",port,message);}
}

如果想要启动多个消费者可以idea配置不同端口启动多个服务:
在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部