物联网IOT与Java结合之MQTT协议:EMQT

准备:下载EMQX的windows版本

https://download.csdn.net/download/qq_39246466/87201696

1.pom文件:引入mqtt


4.0.0io.github.pnokerdc3-driver2022.1.0dc3-driver-mqttjarIOT DC3 平台 Mqtt 驱动。org.springframework.integrationspring-integration-streamorg.springframework.integrationspring-integration-mqttorg.eclipse.pahoorg.eclipse.paho.client.mqttv3

2.配置文件:主要节点driver.mqtt

driver:name: EdgeGatewaytype: gatewayproject: @project.artifactId@description: @project.description@schedule:status:enable: truecorn: '0/10 * * * * ?'read:enable: falsecorn: '0/30 * * * * ?'custom:enable: truecorn:  '0/5 * * * * ?'point-attribute:- displayName: 指令Topicname: commandTopictype: stringvalue: commandTopicdescription: 测点/设备接收下行指令的Mqtt主题option:type: input\select\checkox\switch\time...required: truedata-type: static/urldata: jsonString- displayName: 指令Qosname: commandQostype: intvalue: 2description: 测点/设备接收下行指令的Mqtt主题的Qosoption:type: input\select\checkox\switch\time...required: truedata-type: static/urldata: jsonStringmqtt:url: ssl://dc3-emqx:8883auth-type: X509username: dc3password: dc3ca-crt: classpath:/certs/ca.crtclient-key-pass: dc3-clientclient-key: classpath:/certs/client.keyclient-crt: classpath:/certs/client.crtclient: ${spring.application.name}receive-topics:- qos: 0name: mqtt/group/device/#default-send-topic:qos: 1name: dc3-mqtt-topickeep-alive: 15completion-timeout: 3000

3.config引入配置:

        MqttProperties配置类
package io.github.pnoker.common.sdk.bean.mqtt;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.*;
import java.util.List;/*** @author pnoker*/
@Data
@Validated
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "driver.mqtt")
public class MqttProperties {@NotBlank(message = "url can't be empty,ssl://host:port")private String url;@NotNull(message = "auth type can't be empty")private AuthTypeEnum authType = AuthTypeEnum.NONE;private String username;private String password;private String caCrt = "classpath:/certs/ca.crt";private String clientKeyPass = "dc3-client";private String clientKey = "classpath:/certs/client.key";private String clientCrt = "classpath:/certs/client.crt";@NotBlank(message = "client name can't be empty")private String client;@NotNull(message = "default topic can't be empty")private Topic defaultSendTopic = new Topic("dc3/d/v/dc3-driver-mqtt_default", 2);@Size(min = 1, message = "receive topic at least one topic")private List receiveTopics;@NotNull(message = "keep alive interval can't be empty")private Integer keepAlive = 15;@NotNull(message = "completion timeout can't be empty")private Integer completionTimeout = 3000;/*** Mqtt 权限认证类型枚举*/@NoArgsConstructorpublic enum AuthTypeEnum {NONE, CLIENT_ID, USERNAME, X509}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Topic {@NotBlank(message = "topic name can't be empty")private String name;@Min(0)@Max(2)private Integer qos;}}
package io.github.pnoker.driver.config;import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import io.github.pnoker.common.constant.CommonConstant;
import io.github.pnoker.common.sdk.bean.mqtt.MqttProperties;
import io.github.pnoker.common.sdk.utils.X509Util;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;import javax.annotation.Resource;
import java.util.ArrayList;/*** @author pnoker*/
@Slf4j
@Configuration
@IntegrationComponentScan
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfig {private static final String RANDOM_ID = CommonConstant.Symbol.UNDERSCORE + RandomUtil.randomString(8);@Resourceprivate MqttProperties mqttProperties;@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Beanpublic MessageProducer mqttInbound() {// set default receive topicString topicName = "dc3/mc/" + mqttProperties.getClient();if (null == mqttProperties.getReceiveTopics()) {mqttProperties.setReceiveTopics(new ArrayList<>());}boolean match = mqttProperties.getReceiveTopics().stream().anyMatch(topic -> topic.getName().equals(topicName));if (!match) {mqttProperties.getReceiveTopics().add(new MqttProperties.Topic(topicName, 2));}MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClient() + RANDOM_ID + "_in",mqttClientFactory(),mqttProperties.getReceiveTopics().stream().map(MqttProperties.Topic::getName).toArray(String[]::new));adapter.setQos(mqttProperties.getReceiveTopics().stream().mapToInt(MqttProperties.Topic::getQos).toArray());adapter.setOutputChannel(mqttInputChannel());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler outbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + "_out",mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos());messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName());return messageHandler;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Beanpublic MqttConnectOptions getMqttConnectOptions() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// username & passwordif (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) {mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());}// tls x509if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.X509)) {mqttConnectOptions.setSocketFactory(X509Util.getSSLSocketFactory(mqttProperties.getCaCrt(),mqttProperties.getClientCrt(),mqttProperties.getClientKey(),StrUtil.isBlank(mqttProperties.getClientKeyPass()) ? "" : mqttProperties.getClientKeyPass()));if (!StrUtil.isBlank(mqttProperties.getUsername()) && !StrUtil.isBlank(mqttProperties.getPassword())) {mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());}}// disable https hostname verificationmqttConnectOptions.setHttpsHostnameVerificationEnabled(false);mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());return mqttConnectOptions;}}

4.通过mqtt发送数据:@MessagingGateway(defaultRequestChannel = "mqttOutputChannel") 将定义的config中方法引入即可

package io.github.pnoker.driver.mqtt.handler;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** @author pnoker*/
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttSendHandler {/*** 使用 Default Topic & Default Qos 发送数据** @param data string*/void sendToMqtt(String data);/*** 使用 Default Topic & 自定义 Qos 发送数据** @param qos  自定义 Qos* @param data string*/void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);/*** 使用 自定义 Topic & Default Qos 发送数据** @param topic 自定义 Topic* @param data  string*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);/*** 使用 自定义 Topic & 自定义 Qos 发送数据** @param topic 自定义 Topic* @param qos   自定义 Qos* @param data  string*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}

5.通过mqtt接收数据:@Resource
    private MqttReceiveService mqttReceiveService;注入service代码 @ServiceActivator(inputChannel = "mqttInputChannel")引入到方法上表示喝config中定义方法联通,统一处理接收的数据

package io.github.pnoker.driver.mqtt.handler;import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.github.pnoker.common.sdk.bean.mqtt.MessageHeader;
import io.github.pnoker.common.sdk.bean.mqtt.MessagePayload;
import io.github.pnoker.common.sdk.bean.mqtt.MessageType;
import io.github.pnoker.common.sdk.bean.mqtt.MqttMessage;
import io.github.pnoker.common.utils.JsonUtil;
import io.github.pnoker.driver.mqtt.job.MqttScheduleJob;
import io.github.pnoker.driver.mqtt.service.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;/*** @author pnoker*/
@Slf4j
@Configuration
public class MqttReceiveHandler {@Value("${driver.mqtt.batch.speed}")private Integer batchSpeed;@Resourceprivate MqttReceiveService mqttReceiveService;@Resourceprivate ThreadPoolExecutor threadPoolExecutor;/*** 此处用于接收 MQTT 发送过来的数据,订阅的主题为 application.yml 中 mqtt.receive-topics 配置的 Topic 列表* +(加号):可以(只能)匹配一个单词* #(井号):可以匹配多个单词(或者零个)** @return MessageHandler*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handlerValue() {return message -> {MessagePayload messagePayload = JsonUtil.parseObject(message.getPayload().toString(), MessagePayload.class);// 处理空字段// 当类型为空时,使用默认类型// 当消息载荷为空时,使用其 String 内容if (ObjectUtil.isNull(messagePayload)) {messagePayload = new MessagePayload(message.getPayload(), MessageType.DEFAULT);} else {if (StrUtil.isEmpty(messagePayload.getPayload())) messagePayload.setPayload(message.getPayload().toString());if (ObjectUtil.isNull(messagePayload.getMessageType())) messagePayload.setMessageType(MessageType.DEFAULT);}MessageHeader messageHeader = new MessageHeader(message.getHeaders());MqttMessage mqttMessage = new MqttMessage(messageHeader, messagePayload);// Judge whether to process data in batch according to the data transmission speedif (MqttScheduleJob.messageSpeed.get() < batchSpeed) {threadPoolExecutor.execute(() -> {// Receive single mqtt messagemqttReceiveService.receiveValue(mqttMessage);});} else {// Save point value to scheduleMqttScheduleJob.messageLock.writeLock().lock();MqttScheduleJob.mqttMessages.add(mqttMessage);MqttScheduleJob.messageLock.writeLock().unlock();}};}
}

6.使用发送数据在service的实现层用法:引入直接使用

 7.使用接收数据在service的实现层用法:引入直接使用,

package io.github.pnoker.driver.mqtt.service.impl;import io.github.pnoker.common.sdk.bean.mqtt.MqttMessage;
import io.github.pnoker.driver.mqtt.service.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.util.List;/*** @author pnoker*/
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {@Overridepublic void receiveValue(MqttMessage mqttMessage) {}@Overridepublic void receiveValues(List mqttMessageList) {}
}

8.使用的消息体:

package io.github.pnoker.common.sdk.bean.mqtt;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;import java.io.Serializable;/*** @author pnoker*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MqttMessage implements Serializable {private MessageHeader messageHeader;private MessagePayload messagePayload;
}

9.其他辅助类:

package io.github.pnoker.common.sdk.bean.mqtt;import lombok.NoArgsConstructor;/*** @author pnoker*/
@NoArgsConstructor
public enum MessageType {OPC_UA,OPC_DA,MODBUS,PLC,SERIAL,SOCKET,HEARTBEAT,DEFAULT
}
package io.github.pnoker.common.sdk.bean.mqtt;import io.github.pnoker.common.utils.JsonUtil;
import lombok.Data;
import lombok.experimental.Accessors;/*** @author pnoker*/
@Data
@Accessors(chain = true)
public class MessagePayload {private String payload;private MessageType messageType;public MessagePayload() {this.messageType = MessageType.DEFAULT;}public MessagePayload(Object payload, MessageType messageType) {this.payload = JsonUtil.toJsonString(payload);this.messageType = messageType;}
}
package io.github.pnoker.common.sdk.bean.mqtt;import cn.hutool.core.util.ObjectUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.springframework.messaging.MessageHeaders;import java.io.Serializable;
import java.util.UUID;/*** @author pnoker*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MessageHeader implements Serializable {private String id;private Integer mqttId;private Integer mqttReceivedQos;private String mqttReceivedTopic;private Boolean mqttDuplicate;private Boolean mqttReceivedRetained;private Long timestamp;public MessageHeader(MessageHeaders messageHeaders) {if (ObjectUtil.isNotNull(messageHeaders)) {try {UUID id = messageHeaders.get("id", UUID.class);if (ObjectUtil.isNotNull(id)) {this.id = id.toString();}} catch (Exception ignored) {}try {this.mqttId = messageHeaders.get("mqtt_id", Integer.class);} catch (Exception ignored) {}try {this.mqttReceivedQos = messageHeaders.get("mqtt_receivedQos", Integer.class);} catch (Exception ignored) {}try {this.mqttReceivedTopic = messageHeaders.get("mqtt_receivedTopic", String.class);} catch (Exception ignored) {}try {this.mqttDuplicate = messageHeaders.get("mqtt_duplicate", Boolean.class);} catch (Exception ignored) {}try {this.mqttReceivedRetained = messageHeaders.get("mqtt_receivedRetained", Boolean.class);} catch (Exception ignored) {}try {this.timestamp = messageHeaders.get("timestamp", Long.class);} catch (Exception ignored) {}}}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部