更新時(shí)間:2023年10月13日15時(shí)28分 來源:傳智教育 瀏覽次數(shù):
Paho Java客戶端是用Java編寫的MQTT客戶端庫,用于開發(fā)在JVM或其他Java兼容平臺(例如Android)上運(yùn)行的應(yīng)用程序。
Paho不僅可以對接EMQ X Broker,還可以對接滿足符合MQTT協(xié)議規(guī)范的消息代理服務(wù)端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場景。
Paho Java客戶端提供了兩個(gè)API:
1:MqttAsyncClient提供了一個(gè)完全異步的API,其中活動的完成是通過注冊的回調(diào)通知的。
2:MqttClient是MqttAsyncClient周圍的同步包裝器,在這里,功能似乎與應(yīng)用程序同步。
(1)找到項(xiàng)目:emq-demo,添加坐標(biāo)依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
(2)編寫客戶端封裝類的代碼:com.itheima.mqtt.client.EmqClient
/** * Created by 傳智播客*黑馬程序員. */ @Component public class EmqClient { private Logger log = LoggerFactory.getLogger(EmqClient.class); private IMqttClient mqttClient; @Autowired private MqttProperties mqttProperties; @Autowired private MqttCallback mqttCallback; @PostConstruct private void init(){ //MqttClientPersistence是接口 實(shí)現(xiàn)類有:MqttDefaultFilePersistence;MemoryPersistence MqttClientPersistence memoryPersistence = new MemoryPersistence(); try { mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence); } catch (MqttException e) { log.error("MqttClient初始化失敗,brokerurl={},clientId= {}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId()); } } /** * 連接broker * @param username * @param password */ public void connect(String username,String password){ //創(chuàng)建MQTT連接選項(xiàng)對象--可配置mqtt連接相關(guān)選項(xiàng) MqttConnectOptions connectOptions = new MqttConnectOptions(); //自動重連 connectOptions.setAutomaticReconnect(true); /** * 設(shè)置為true后意味著:客戶端斷開連接后emq不保留會話保留會話,否則會產(chǎn)生訂閱共享隊(duì)列的存活 客戶端收不到消息的情況 * 因?yàn)閿嚅_的連接還被保留的話,emq會將隊(duì)列中的消息負(fù)載到斷開但還保留的客戶端,導(dǎo)致存活的客戶 端收不到消息 * 解決該問題有兩種方案:1.連接斷開后不要保持;2.保證每個(gè)客戶端有固定的clientId */ connectOptions.setCleanSession(true); connectOptions.setUserName(username); connectOptions.setPassword(password.toCharArray()); //設(shè)置mqtt消息回調(diào) mqttClient.setCallback(mqttCallback); //連接broker try { mqttClient.connect(connectOptions); } catch (MqttException e) { log.error("連接mqtt broker失敗,失敗原因:{}",e.getMessage()); } } /** * 發(fā)布 * @param topic * @param msg */ public void publish(String topic, String msg, QosEnum qos, boolean retain){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos.value()); mqttMessage.setRetained(retain); mqttMessage.setPayload(msg.getBytes()); if(mqttClient.isConnected()){ try { mqttClient.publish(topic,mqttMessage); } catch (MqttException e) { log.error("mqtt消息發(fā)布失敗,topic={},msg={},qos={},retain={},errormsg= {}",topic,msg,qos,retain,e.getMessage()); } } } /** * 訂閱 * @param topicFilter * @return */ public void subscribe(String topicFilter,QosEnum qos){ try { mqttClient.subscribe(topicFilter,qos.value()); } catch (MqttException e) { log.error("訂閱失敗,topicfilter={},qos={},errormsg= {}",topicFilter,qos,e.getMessage()); } } /** * 斷開連接 */ @PreDestroy public void disConnect(){ try { mqttClient.disconnect(); } catch (MqttException e) { log.error("斷開連接出現(xiàn)異常,errormsg={}",e.getMessage()); } } }
需要在application.yml中添加自定義的配置:
mqtt: broker-url: tcp://192.168.200.129:1883 client-id: demo-client username: user password: 123456
同時(shí)需要創(chuàng)建屬性配置類來加載該配置數(shù)據(jù),創(chuàng)建:com.itheima.mqtt.properties.MqttProperties
/** * Created by 傳智播客*黑馬程序員. */ @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String brokerUrl; private String clientId; private String username; private String password; public String getBrokerUrl() { return brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "MqttProperties{" + "brokerUrl='" + brokerUrl + '\'' + ", clientId='" + clientId + '\'' + ", username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
還需創(chuàng)建QoS服務(wù)之類枚舉:com.itheima.mqtt.enums.QosEnum
/** * Created by 傳智播客*黑馬程序員. */ public enum QosEnum { QoS0(0),QoS1(1),QoS2(2); QosEnum(int qos) { this.value = qos; } private final int value; public int value(){ return this.value; } }
(3)在連接接收到消息之后,我們需要將消息傳入消息回調(diào):com.itheima.mqtt.client.MessageCallback
/** * Created by 傳智播客*黑馬程序員. */ @Component public class MessageCallback implements MqttCallback { private Logger log = LoggerFactory.getLogger(MessageCallback.class); @Override public void connectionLost(Throwable cause) { //丟失對服務(wù)端的連接后觸發(fā)該方法回調(diào),此處可以做一些特殊處理,比如重連 log.info("丟失了對broker的連接"); } /** * 訂閱到消息后的回調(diào) * 該方法由mqtt客戶端同步調(diào)用,在此方法未正確返回之前,不會發(fā)送ack確認(rèn)消息到broker * 一旦該方法向外拋出了異??蛻舳藢惓jP(guān)閉,當(dāng)再次連接時(shí);所有QoS1,QoS2且客戶端未進(jìn)行ack確認(rèn)的 消息都將由 * broker服務(wù)器再次發(fā)送到客戶端 * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("訂閱到了消息;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload())); } /** * 消息發(fā)布完成且收到ack確認(rèn)后的回調(diào) * QoS0:消息被網(wǎng)絡(luò)發(fā)出后觸發(fā)一次 * QoS1:當(dāng)收到broker的PUBACK消息后觸發(fā) * QoS2:當(dāng)收到broer的PUBCOMP消息后觸發(fā) * @param token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { int messageId = token.getMessageId(); String[] topics = token.getTopics(); log.info("消息發(fā)送完成,messageId={},topics={}", messageId, topics); } }
(4)編寫消息發(fā)布和訂閱的測試,在啟動類中添加如下代碼。
@Autowired private EmqClient emqClient; @Autowired private MqttProperties mqttProperties; @PostConstruct public void init(){ emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword()); //訂閱某一主題 emqClient.subscribe("testtopic/#", QosEnum.QoS2); //開啟一個(gè)新的線程向該主題發(fā)送消息 new Thread(()->{ while (true){ emqClient.publish("testtopic/123","mqtt msg:"+ LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
(5)測試:在Dashboard中開啟使用username進(jìn)行認(rèn)證的組件,其他組件停止即可,然后啟動項(xiàng)目,查看控制臺輸出即可。
北京校區(qū)