手把手使用netty4+Springboot2.x实现mqtt协议之客户端与服务器端
大家好,由于我想去北京找工作,但是发现北京人才济济,面试要求各种五花八门的卷,很多岗位都能胜任,但是因为专科学历,连面试机会都没有,不过今天我给大家带来的教程,是完全可以让大家应用到实际工作中快速使用的干货,并给大家准备了一套纯净版springboot+netty 4.x的源码实现方案,有支持qos1、qos2、qos3具体的示例,并有日志说明
码云地址:mqtt-netty: 基于springboot2.1.5,netty4.1.53.Final 实现mqtt服务,实现功能1.服务端发布主题消息,所用订阅该主题客户端可接收到数据2.qos类型级别校验,使用qos1级别3.账号密码校验,客户端id校验。4.发布主题是否一致校验 - Gitee.com
首先了解一个很重要的知识点,Qos类型分为3种,而服务端与客户端ack应答包类型也不同,
详细讲解:MQTT - 随笔分类 - 胖达利亚 - 博客园
下列简单描述(B为服务端需要返回的包类型)
- 对于订阅时要求
Qos = 0的客户端 C0:B 将该消息转发给 C0(不加确认是否收到) - 对于订阅时要求
Qos = 1的客户端 C1:B 将该消息转发给 C1,C1 收到后向 B 发送确认PUBACK,B 收到确认则将消息从队列中删除,否则确认超时重发;
C1 只要收到消息,就会将其转发给 C1 的应用,所以 C1 的应用可能收到重复的消息。 - 对于订阅时要求
Qos = 2的客户端 C2:B 将该消息转发给 C2,C2 收到后向 B 发送确认PUBREC,B 收到确认后向 C2 发送确认收到确认PUBREL;C2收到PUBREL后向 B 发送PUBCOMP发布完成包
C2 只有收到消息并发出PUBREC且收到对应的PUBREL,才会将消息转发给 C2 的应用,所以 C2 的应用不会收到重复的消息
第一步.准备maven环境:springboot + netty环境 +mqtt客户端
mqtt客户端安装包:MQTT X:跨平台 MQTT 5.0 桌面客户端工具
org.springframework.boot spring-boot-starter-parent 2.1.5.RELEASE
io.netty netty-all 4.1.53.Final
第二步.构建netty服务
因为mqtt为TCP协议,所以我们需要创建两组线程组,(如果是UDP协议创建一组就可以,以后我会写一篇UDP协议的netty服务搭建,用UDP协议操作树莓派小车也是挺有意思的)
需要一个netty服务启动类,
一个实现ChannelInboundHandlerAdapter的实体类用于通道解析
一个用于拼装mqtt包结构请求体的工具类
package com.my.netty;import com.my.netty.mqtt.handler.TestMqttHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestServer {public void run(){// 监听端口号int port = 8888;// 构建主线程-用于分发socket请求EventLoopGroup boosGroup = new NioEventLoopGroup(1);// 构建工作线程-用于处理请求处理EventLoopGroup workGroup = new NioEventLoopGroup(4);try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boosGroup,workGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true)
// .childOption(ChannelOption.SO_BACKLOG,1024) //等待队列.childOption(ChannelOption.SO_REUSEADDR,true) //快速复用.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 这个地方注意,如果客户端发送请求体超过此设置值,会抛异常socketChannel.pipeline().addLast(new MqttDecoder(1024*1024));socketChannel.pipeline().addLast( MqttEncoder.INSTANCE);// 加载MQTT编解码协议,包含业务逻辑对象socketChannel.pipeline().addLast(new TestMqttHandler());}});serverBootstrap.bind(port).addListener(future -> {log.info("服务端成功绑定端口号={}",port);});}catch (Exception e){boosGroup.shutdownGracefully();workGroup.shutdownGracefully();log.error("mqttServer启动失败:{}",e);}}
}
package com.my.netty.mqtt.handler;import com.my.netty.core.dto.ClientDTO;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;/**
* @author: liyang
* @date: 2020/7/29 13:22
* @description: MQTT业务类
*/
@Slf4j
@ChannelHandler.Sharable
public class TestMqttHandler extends ChannelInboundHandlerAdapter {private static final Collection clientList = new HashSet();private static final Map msgMap = new HashMap<>();@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof MqttMessage) {Channel channel = ctx.channel();MqttMessage message = (MqttMessage) msg;MqttMessageType messageType = message.fixedHeader().messageType();log.info("MQTT接收到的发送类型===》{}",messageType);switch (messageType) {// 建立连接case CONNECT:try {this.connect(channel, (MqttConnectMessage) message);}catch (Exception e){//如果用户密码,客户端ID校验不成功,会二次建立CONNECT类型连接//但是没有实际意义}break;// 发布消息case PUBLISH:this.publish(channel, (MqttPublishMessage) message);break;// 订阅主题case SUBSCRIBE:this.subscribe(channel, (MqttSubscribeMessage) message);break;// 退订主题case UNSUBSCRIBE:this.unSubscribe(channel, (MqttUnsubscribeMessage) message);break;// 心跳包case PINGREQ:this.pingReq(channel, message);break;// 断开连接case DISCONNECT:this.disConnect(channel, message);break;// 确认收到响应报文,用于服务器向客户端推送qos1/qos2后,客户端返回服务器的响应case PUBACK:this.puback(channel, message);break;// qos2类型,发布收到case PUBREC:this.pubrec(channel, message);break;// qos2类型,发布释放响应case PUBREL:this.pubrel(channel, message);break;// qos2类型,发布完成case PUBCOMP:this.pubcomp(channel, message);break;default:if (log.isDebugEnabled()) {log.debug("Nonsupport server message type of '{}'.", messageType);}break;}}}/*** 创建连接时,需要响应对应的ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:12*/public void connect(Channel channel, MqttConnectMessage msg) {//连接需要答复MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, AT_LEAST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true), null);channel.writeAndFlush(okResp);clientList.add(channel);}/*** 响应ping心跳ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:13*/public void pingReq(Channel channel, MqttMessage msg) {if (log.isDebugEnabled()) {log.debug("MQTT pingReq received.");}MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,AT_LEAST_ONCE, false, 0));channel.writeAndFlush(pingResp);}/*** 服务端主动断开连接** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:13*/public void disConnect(Channel channel, MqttMessage msg) {clientList.remove(channel);if (channel.isActive()) {channel.close();if (log.isDebugEnabled()) {log.debug("MQTT channel '{}' was closed.", channel.id().asShortText());}}}/*** qos2中使用,发布确认** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:14*/public void puback(Channel channel, MqttMessage msg){// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = msg.variableHeader();}/*** qos2中发布释放ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:13*/public void pubrel(Channel channel, MqttMessage msg){Object mqttMessageIdVariableHeader = msg.variableHeader();if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {// qos2类型,接收发布者消息log.info("qos2客户端返回确认的消息包:{}",msg.payload());MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;MqttMessage mqttMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),MqttMessageIdVariableHeader.from(header.messageId()),0);channel.writeAndFlush(mqttMessage);for (Channel channel1 : clientList) {try {send(channel1,"aaa",MqttQoS.EXACTLY_ONCE,"我收到那");} catch (InterruptedException e) {log.error("该通道推送消息失败,可加入容错机制,channel:{}",channel1);}}}}/*** qos2:发布收到ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/14 0:09*/public void pubrec(Channel channel, MqttMessage msg) {Object mqttMessageIdVariableHeader = msg.variableHeader();if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {// qos2类型,接收发布者消息log.info("qos2客户端返回确认的消息包:{}",msg.payload());MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;MqttMessage mqttMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.EXACTLY_ONCE, false, 0),MqttMessageIdVariableHeader.from(header.messageId()),0);channel.writeAndFlush(mqttMessage);}}/*** qos2发布完成** @param channel:* @param msg:* @author liyang* @since 2022/3/14 0:11*/public void pubcomp(Channel channel, MqttMessage msg) {Object mqttMessageIdVariableHeader = msg.variableHeader();if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {// qos2类型,接收发布者消息
// log.info("qos2客户端返回确认的消息包:{}",msg.payload());
// MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
// MqttMessage mqttMessage = MqttMessageFactory.newMessage(
// new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),
// MqttMessageIdVariableHeader.from(header.messageId()),
// 0);
// channel.writeAndFlush(mqttMessage);}}/*** 客户端发布消息时使用** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:14*/public void publish(Channel channel, MqttPublishMessage msg) {log.info("qos类型是{}",msg.fixedHeader().qosLevel());String topic = msg.variableHeader().topicName();log.info("订阅主题:{}",topic);ByteBuf buf = msg.content().duplicate();byte[] tmp = new byte[buf.readableBytes()];buf.readBytes(tmp);String content = null;try {content = new String(tmp,"UTF-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}//校验传入的数据是否符合要求if(StringUtils.isBlank(content)){log.error("MQTT接收到的数据包为空===》{}",content);puback(channel,msg,"MQTT接收到的数据包为空");return;}log.info("MQTT读取到的客户端发送信息===>{}",content);// 如果是qos1或者qos2类型都需要响应puback(channel,msg,content);// 推送主题消息log.info("推送客户端客户端消息:{}",content);if (AT_LEAST_ONCE == msg.fixedHeader().qosLevel() || AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {for (Channel channel1 : clientList) {try {send(channel1,topic,msg.fixedHeader().qosLevel(),content);} catch (InterruptedException e) {log.error("该通道推送消息失败,可加入容错机制,channel:{}",channel1);}}}}/*** 客户端订阅消息ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:14*/public void subscribe(Channel channel, MqttSubscribeMessage msg) {MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
// mqttQoS = MqttQoS.EXACTLY_ONCE;MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, mqttQoS, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),new MqttSubAckPayload(0));channel.writeAndFlush(subAckMessage);}/*** 客户端取消订阅ACK包** @param channel:* @param msg:* @author liyang* @since 2022/3/13 23:15*/public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {MqttUnsubAckMessage unSubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);channel.writeAndFlush(unSubAckMessage);disConnect(channel,msg);}/*** 捕获异常状态,客户端断开钩子函数** @param ctx:* @param cause:* @author liyang* @since 2022/3/13 23:15*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("MQTT客户端被强制关闭:{}:{}",ctx.channel().id().asShortText(),cause);if (ctx.channel().isActive()) {ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);ctx.channel().close();}}/*** qos1中响应客户端ACK包** @param channel:* @param msg:* @param payLoad:* @author liyang* @since 2022/3/13 23:16*/// 客户端QOS1消息类型( MqttQoS.AT_LEAST_ONCE = qos1),需要服务器响应包private void puback(Channel channel, MqttPublishMessage msg, String payLoad){if (MqttQoS.AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {// qos0消息类型,不需要ACK客户端return;}if (MqttQoS.AT_LEAST_ONCE == msg.fixedHeader().qosLevel()) {// qos1消息类型,需要向客户端返回MqttMessageType.PUBACK 类型ACK应答MqttPubAckMessage sendMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),payLoad);channel.writeAndFlush(sendMessage);return;}if (MqttQoS.EXACTLY_ONCE == msg.fixedHeader().qosLevel()) {// qos2消息类型MqttMessage mqttMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.EXACTLY_ONCE, false, 0),MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),payLoad);channel.writeAndFlush(mqttMessage);}}/*** 向客户端发布订阅主题消息** @param channel:* @param topic:* @param qos:* @param sendMessage:* @return* @author liyang* @since 2022/3/13 23:16*/public ChannelFuture send(Channel channel, String topic,MqttQoS qos ,String sendMessage ) throws InterruptedException {MqttRequest request = new MqttRequest((sendMessage.getBytes()));MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH,request.isDup(),qos,request.isRetained(),0),new MqttPublishVariableHeader(topic, 0),Unpooled.buffer().writeBytes(request.getPayload()));msgMap.put(pubMessage.variableHeader().messageId()+"",pubMessage.variableHeader().messageId()+"");// 超过高水位,则采取同步模式if (channel.isWritable()) {return channel.writeAndFlush(pubMessage);}return channel.writeAndFlush(pubMessage).sync();}
}
package com.my.netty.mqtt.handler;import io.netty.handler.codec.mqtt.MqttQoS;/*** @author: liyang* @date: 2020-11-04 15:58* @description: 请求消息体**/
public class MqttRequest {private boolean mutable = true;private byte[] payload;private MqttQoS qos = MqttQoS.AT_LEAST_ONCE;private boolean retained = false;private boolean dup = false;private int messageId;public MqttRequest() {this.setPayload(new byte[0]);}public MqttRequest(byte[] payload) {this.setPayload(payload);}public MqttRequest(byte[] payload,MqttQoS qos) {this.setPayload(payload);this.setQos(qos);}public byte[] getPayload() {return this.payload;}public void clearPayload() {this.checkMutable();this.payload = new byte[0];}public void setPayload(byte[] payload) {this.checkMutable();if (payload == null) {throw new NullPointerException();} else {this.payload = payload;}}public boolean isRetained() {return this.retained;}public void setRetained(boolean retained) {this.checkMutable();this.retained = retained;}public MqttQoS getQos() {return qos;}public void setQos(MqttQoS qos) {this.qos = qos;}public boolean isMutable() {return mutable;}public void setMutable(boolean mutable) {this.mutable = mutable;}protected void checkMutable() throws IllegalStateException {if (!this.mutable) {throw new IllegalStateException();}}public boolean isDup() {return dup;}public void setDup(boolean dup) {this.dup = dup;}public int getMessageId() {return messageId;}public void setMessageId(int messageId) {this.messageId = messageId;}@Overridepublic String toString() {return new String(this.payload);}
}
第三步.发布netty服务
由于咱们是基于springboot架构搭建,所以可以利用
@Component + @PostConstruct 标签在springboot项目初始化时运行netty服务, 但是要注意springBean注入对象需要再创建netty服务是,将单例bean对象传入使用
package com.my.netty;//import com.my.netty.core.cache.ClientCache;
//import com.my.netty.core.dto.ClientDTO;
//import com.my.netty.core.dto.InitParamDTO;
//import com.my.netty.core.server.IServer;
//import com.my.netty.core.service.IDataService;
//import com.my.netty.mqtt.MqttServer;
//import com.my.netty.websocket.WebSocketServer;
//import com.my.netty.websocket.service.WebSocketDataServiceImpl;
//import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//import java.util.Collection;/*** @author: liyang* @date: 2020-11-04 14:28* @description: netty启动服务**/
@Slf4j
@Component
public class NettyServerInit {
// @Resource(name = "mqttDataServiceImpl")
// private IDataService mqttService;
// @Resource(name = "webSocketDataServiceImpl")
// private WebSocketDataServiceImpl webSocketDataService;@PostConstructpublic void init(){// 简单版测试服务TestServer testServer = new TestServer();testServer.run();// // 创建mqtt服务
// MqttServer mqttServer = new MqttServer(InitParamDTO.base(9001, 1, 4, mqttService,new ClientCache()));
// // 创建websocket服务
// WebSocketServer wsServer = new WebSocketServer(InitParamDTO.base(9010, 1, 4, webSocketDataService, new ClientCache()));
// // 启动服务
// IServer mqttRun = mqttServer.run();
// IServer wsRun = wsServer.run();
//
// // todo 测试代码-测试监听服务客户端状态
// Thread thread = new Thread(new Runnable() {
// @SneakyThrows
// @Override
// public void run() {
// while (true) {
// Thread.sleep(1500);
// Collection clientList = mqttRun.getClientAllList();
// log.info("mqtt客户端连接数:{}:{}", clientList.size());
//
// Collection clientAllList = wsRun.getClientAllList();
// log.info("ws客户端连接数:{}:{}", clientAllList.size());
// }
// }
// });
// thread.run();}
}
第四步.netty客户端测试
win客户端安装包方式,双击打开






本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
