1、MQTT工具类
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.constants.QosConstants;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class MqttClientUtil {private MqttClient mqttClient;public MqttClientUtil(MqttClient mqttClient) {this.mqttClient = mqttClient;}public void subscribe(String topic, MqttCallback mqttCallback) {try {mqttClient.setCallback(mqttCallback);mqttClient.subscribe(topic);} catch (MqttException e) {log.error("mqtt订阅消息失败!topic:{}", topic);e.printStackTrace();}}public void publishSimple(String topic, String message) {publish(topic, message, 1, true);}public void publish(String topic, String message, int qos, boolean retained) {try {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttClient.publish(topic, mqttMessage);} catch (MqttException e) {if ("客户机未连接".equals(e.getMessage())){try {mqttClient.connect();}catch (Exception exception){log.error("客户机重连失败!");}publish(topic, message, qos, retained);return;}log.error("mqtt发布消息失败!topic:{},message:{}", topic, message);e.printStackTrace();}}
}
2、MQTT配置类
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class MqttConfig {@Value("${spring.mqtt.url}")private String brokerUrl; @Value("${spring.mqtt.username}")private String username; @Value("${spring.mqtt.password}")private String password; private final String CLIENT_ID = "mqttHook_client_id"; @Beanpublic MqttClientUtil getMqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, CLIENT_ID);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);return new MqttClientUtil(client);}
}
3、回调函数 Callback
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMqttCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {log.error("mqtt连接丢失!");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payload = new String(message.getPayload());System.out.println("接收到消息:" + payload);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("发布消息成功");}
}
4、测试
@RestController
@Api(tags = "测试mqtt")
@RequestMapping("/test")
public class TestController {@Autowired private MqttClientUtil mqttClientUtil;@PostMapping("/test")@ApiOperation("测试")public void test() {String topic = "testtopic/ha";mqttClientUtil.subscribe(topic, new MyMqttCallback());System.out.println("成功");mqttClientUtil.publishSimple(topic, "哈哈哈");System.out.println("成功2");}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!