SpringBoot 集成 Emqx 发布/订阅数据
最近项目中用到Emqx发布/订阅数据,特此记录便于日后查阅。
ThingsboardEmqxTransportApplication
/*** Copyright © 2016-2023 The Thingsboard Authors* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at*
* http://www.apache.org/licenses/LICENSE-2.0*
* Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.thingsboard.server.emqx;import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.integration.config.EnableIntegration;import java.util.Arrays;@SpringBootConfiguration
@EnableConfigurationProperties
@EnableIntegration
@ComponentScan({"org.thingsboard.server.transport.emqx"})
public class ThingsboardEmqxTransportApplication {private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-emqx-transport";public static void main(String[] args) {SpringApplication.run(ThingsboardEmqxTransportApplication.class, updateArguments(args));}private static String[] updateArguments(String[] args) {if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {String[] modifiedArgs = new String[args.length + 1];System.arraycopy(args, 0, modifiedArgs, 0, args.length);modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;return modifiedArgs;}return args;}
}
GMqttPahoMessageDrivenChannelAdapter
package org.thingsboard.server.transport.emqx.adpter;import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;/*** @author zhangzhixiang on 2023/4/12*/
@Slf4j
public class GMqttPahoMessageDrivenChannelAdapter extends MqttPahoMessageDrivenChannelAdapter {public GMqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory,String... topic) {super(url, clientId, clientFactory, topic);}/*** Fix Bug.** 死锁描述:* Found one Java-level deadlock:* =============================* "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy":* waiting for ownable synchronizer 0x00000000d73e9d70, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),* which is held by "main"* "main":* waiting to lock monitor 0x00007f5840008bf8 (object 0x00000000d73d2480, a org.springframework.integration* .mqtt.inbound.MqttPahoMessageDrivenChannelAdapter),* which is held by "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy"**
原因分析:* main主线程* AbstractEndpoint.start()获取到了ReentrantLock锁* MqttPahoMessageDrivenChannelAdapter.scheduleReconnect()但是需要MqttPahoMessageDrivenChannelAdapter对象锁* MQTT Rec线程* 获取到了MqttPahoMessageDrivenChannelAdapter对象锁,但是需要ReentrantLock锁** @param cause*/@Overridepublic void connectionLost(Throwable cause) {try {this.lifecycleLock.lock();} catch (Exception e) {log.error("Stack Trace: {}", e);} finally {this.lifecycleLock.unlock();}super.connectionLost(cause);}
}
MqttConfig
package org.thingsboard.server.transport.emqx.config;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.thingsboard.server.transport.emqx.DefaultMqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.MqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.adpter.GMqttPahoMessageDrivenChannelAdapter;
import org.thingsboard.server.transport.emqx.constant.Qos;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLSocketFactory;/*** @author zhangzhixiang on 2023/4/12*/
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "emqx")
@IntegrationComponentScan
public class MqttConfig {private String url;private String username;private String password;private int timeout;private int keepalive;private String enabled;private String subClientId;private String pubClientId;private MqttSubMessageHandler messageHandler;private MqttPahoMessageDrivenChannelAdapter adapter;private boolean dataVerifyEnabled;private boolean sslVerifyEnabled;private String caCertFile;private String clientCertFile;private String clientKeyFile;private String[] defaultTopics;///// mqtt subscribe///@PostConstructpublic void init() {inbound();addTopic(defaultTopics);setMessageHandler(new DefaultMqttSubMessageHandler());log.info("EMQX transport started!");}@PreDestroypublic void shutdown() {log.info("Stopping EMQX transport!");removeTopic(defaultTopics);log.info("EMQX transport stopped!");}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {this.subClientId = createMqttClientId(false);adapter = new GMqttPahoMessageDrivenChannelAdapter(url, subClientId, mqttClientFactory());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(Qos.DEFAULT.getValue());adapter.setOutputChannel(mqttInputChannel());log.info("Success to initialize Mqtt channel adapter for subscribe, " +"url=[{}],subClientId=[{}],sslVerifyEnabled=[{}]", url, subClientId, sslVerifyEnabled);return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message> message) throws MessagingException {messageHandler.receiveMessage(message);}};}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);// automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅options.setAutomaticReconnect(true);options.setServerURIs(new String[]{url});if (sslVerifyEnabled) {try {SSLSocketFactory sslSocketFactory = SSLFellow.createSSLSocketFactory(caCertFile, clientCertFile, clientKeyFile);options.setSocketFactory(sslSocketFactory);} catch (Exception e) {log.error("Stack Trace: {}", e);}}factory.setConnectionOptions(options);return factory;}///// mqtt publish///@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {this.pubClientId = createMqttClientId(true);MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(pubClientId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultQos(Qos.DEFAULT.getValue());log.info("Success to initialize Mqtt channel adapter for publish, " +"url=[{}], pubClientId=[{}]", url, pubClientId);return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}///// public functions///public void addTopic(String... topics) {this.adapter.addTopic(topics);}public void removeTopic(String... topics) {this.adapter.removeTopic(topics);}///// private functions///private String createMqttClientId(boolean publish) {String moduleName = Module.getName();if (StringUtils.isBlank(moduleName)) {moduleName = "iot_mqtt";}moduleName += publish ? "_pub_" : "_sub_";return moduleName + RandomStringUtils.randomAlphanumeric(8).toLowerCase();}
}
MqttSender
package org.thingsboard.server.transport.emqx.config;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author zhangzhixiang on 2023/4/12*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {void sendToMqtt(String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
到此SpringBoot 集成 Emqx 发布/订阅数据介绍完成。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
