Android開發(fā)MQTT協(xié)議的模型及通信淺析
前言
為什么要講MQTT協(xié)議?因?yàn)楝F(xiàn)在越來越多的領(lǐng)域會(huì)使用到這個(gè)協(xié)議,無論是做M2M,還是做Iot,或是想實(shí)現(xiàn)推送功能,MQTT都是一個(gè)不錯(cuò)的選擇。
什么是MQTT協(xié)議
MQTT協(xié)議又稱為消息隊(duì)列要測傳輸協(xié)議,他是一種基于發(fā)布/訂閱范式的消息協(xié)議,并且它是一種基于TCP/IP協(xié)議族的應(yīng)用層協(xié)議。
可以看出的它的特點(diǎn):輕量、簡單、基于發(fā)布/訂閱范式、基于TCP/IP、是一種應(yīng)用層協(xié)議。
如果還是不明白,我們可以簡單拿它和我們常用的http協(xié)議做個(gè)比較。
HTTP協(xié)議 | MQTT協(xié)議 |
---|---|
基于TCP或UDP | 基于TCP |
基于 請(qǐng)求/響應(yīng) 模型 | 基于 發(fā)布/訂閱 模型 |
http1.x是傳數(shù)據(jù)包 | 傳輸二進(jìn)制數(shù)據(jù) |
MQTT協(xié)議的模型
我們得知道它是一個(gè)怎樣的模型才好去了解它的一個(gè)工作方式。比如說HTTP協(xié)議簡單分為兩個(gè)角色,一個(gè)Client代表客戶端,一個(gè)Server代表服務(wù)端。
而MQTT簡單來看分為3個(gè)角色,publisher表示發(fā)布者,subscriber表示訂閱者,它們兩個(gè)都是Client,所以任何一個(gè)Client客戶端既能充當(dāng)publisher,也能充當(dāng)subscriber。還有一個(gè)角色是broker表示代理,它是Server服務(wù)端??梢钥闯鯩QTT也是基于C/S的通信架構(gòu),只不過分為3種角色。
如果理解了這個(gè)模型之后,你就會(huì)有個(gè)疑問,發(fā)布和訂閱什么呢?這就需要引入一個(gè)新的東西叫主題topic(如果不理解主題這個(gè)概念的話也沒關(guān)系,后面用代碼就很容易理解主題是什么)
所以它的工作流程就是:
- subscriber訂閱者連接broker代理,并訂閱主題topic
- publisher發(fā)布者連接broker代理(當(dāng)然如何訂閱者和發(fā)布者是同一個(gè)Client的話就不需要重復(fù)連接),并發(fā)布消息到相應(yīng)的主題
- broker代理會(huì)把消息發(fā)給對(duì)應(yīng)訂閱的主題的subscriber訂閱者
開發(fā)MQTT通信
1. 處理客戶端和服務(wù)端
前面我們說了MQTT是繼續(xù)C/S的結(jié)構(gòu),那我們就需要有一個(gè)客戶端和一個(gè)服務(wù)端。
(1)服務(wù)端開發(fā)
很不幸我是開發(fā)前端的,后臺(tái)的開發(fā)我并不熟悉,所以這里的演示中我選擇用云服務(wù)EMQX,想嘗試的朋友可以上這個(gè)網(wǎng)頁去部署自己的云服務(wù),流程很簡單 cloud.emqx.com/ ,免費(fèi)試用14天。
(2)客戶端開發(fā)
因?yàn)槲沂亲鯝ndroid開發(fā)的,所以這里我用Android來舉例子。正常來說可以在TCP的基礎(chǔ)上開發(fā),自己去封裝,但我這只是淺談,所以我用第三方框架進(jìn)行演示,用Paho的mqtt
2. 客戶端開發(fā)
先導(dǎo)入Paho的mqtt
dependencies { ...... implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' }
在manifest中注冊(cè)Paho的MqttService
<application android:allowBackup="true" android:icon="@mipmap/ic_launcher" android:label="@string/app_name" android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/Theme.MyApplication"> <activity android:name=".MainActivity" android:exported="true"> <intent-filter> <action android:name="android.intent.action.MAIN" /> <category android:name="android.intent.category.LAUNCHER" /> </intent-filter> </activity> <service android:name="org.eclipse.paho.android.service.MqttService"/> <service android:name=".MqttActionService"/> </application>
我這邊為了用一個(gè)項(xiàng)目來演示Mqtt通信,所有把MainActivity當(dāng)成publisher發(fā)布者,把MqttActionService當(dāng)成subscriber訂閱者。
所以整體的流程是這樣的,我們先開啟MqttActionService,然后在MqttActionService中進(jìn)行連接和訂閱。再在MainActivity進(jìn)行連接和發(fā)送消息。
先把Mqtt的Client給封裝起來(我這里防止有些朋友看不懂Kotlin,我就用了Java,后面不重要的地方我直接用Kotlin,一般也比較容易看懂)。
public class MyMqttClient { private MqttAndroidClient mClient; private MqttConnectOptions mOptions; private OnMqttConnectListener mOnMqttConnectListener; private final String mClientId; private MqttCallbackExtended mExtended = new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (mOnMqttConnectListener != null){ mOnMqttConnectListener.onConnectComplete(serverURI); } } @Override public void connectionLost(Throwable cause) { if (mOnMqttConnectListener != null){ mOnMqttConnectListener.onConnectFailure(cause); } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }; private IMqttActionListener mConnectAction = new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { if (mOnMqttConnectListener != null){ mOnMqttConnectListener.onConnectFailure(exception); } exception.printStackTrace(); } }; private IMqttMessageListener messageListener = new IMqttMessageListener() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (mOnMqttConnectListener != null){ mOnMqttConnectListener.onMessageArrived(topic, message); } } }; public MyMqttClient(Context context){ this(context, null); } public MyMqttClient(Context context, String clientId){ if (!TextUtils.isEmpty(clientId)) { this.mClientId = clientId; }else { this.mClientId = MqttConfig.clientId; } init(context); } public void init(Context context){ mClient = new MqttAndroidClient(context, MqttConfig.mqttUrl, mClientId); mClient.setCallback(mExtended); mOptions = new MqttConnectOptions(); mOptions.setConnectionTimeout(4000); mOptions.setKeepAliveInterval(30); mOptions.setUserName(MqttConfig.username); mOptions.setPassword(MqttConfig.password.toCharArray()); } public void setOnMqttConnectListener(OnMqttConnectListener onMqttConnectListener) { this.mOnMqttConnectListener = onMqttConnectListener; } /** * 連接 */ public void connect(){ try { if (!mClient.isConnected()){ mClient.connect(mOptions, null, mConnectAction); } }catch (Exception e){ e.printStackTrace(); } } /** * 訂閱 */ public void subscribeToTopic(String mTopic){ this.subscribeToTopic(mTopic, 0); } public void subscribeToTopic(String mTopic, int qos){ try { mClient.subscribe(mTopic, qos, null,null, messageListener); }catch (Exception e){ e.printStackTrace(); } } /** * 發(fā)送消息 */ public void sendMessage(String mTopic, byte[] data){ try { MqttMessage message = new MqttMessage(); message.setPayload(data); mClient.publish(mTopic, message); }catch (Exception e){ e.printStackTrace(); } } public void onDestroy(){ try { mClient.disconnect(); mExtended = null; mConnectAction = null; messageListener = null; }catch (Exception e){ e.printStackTrace(); } } /** * 提供給外層的回調(diào),更方便進(jìn)行使用 */ public interface OnMqttConnectListener{ void onConnectComplete(String serverURI); void onConnectFailure(Throwable e); void onMessageArrived(String topic, MqttMessage message); } }
當(dāng)中有些配置我直接抽出來
public interface MqttConfig { String mqttUrl = "tcp://r0c36017.cn-shenzhen.emqx.cloud:11005"; String clientId = "deployment-r0c36017"; String username = "yeshuaishizhenshuai"; String password = "123456"; String oneTopic = "kylin/topic/one"; }
可以講一下這些參數(shù):
(1) mqttUrl: 連接代理的連接,可以看到我上面云服務(wù)那張截圖里面的“連接地址”和“連接端口” (2) clientId: 客戶端ID,無論是subscriber還是publisher都屬于客戶端,這個(gè)在上面說過,所以都有一個(gè)對(duì)應(yīng)的ID標(biāo)識(shí)他們是屬于哪個(gè)客戶端。我下面的Demo中MqttActionService用的ClienId是deployment-r0c36017,MainActivity用的ClienId是deployment-r0c36018,不同的,所以是兩個(gè)客戶端。 (3) username和password: 這兩個(gè)參數(shù)都是一個(gè)標(biāo)識(shí),會(huì)和后臺(tái)記錄,如果你沒有的話,那你就連不上代理,也就是連不上服務(wù)端。 (4) oneTopic: 就是主題,你訂閱和發(fā)送消息都要對(duì)應(yīng)是哪個(gè)主題。
然后subscriber連接并訂閱主題
class MqttActionService : Service() { private var mqttClient : MyMqttClient ?= null override fun onCreate() { super.onCreate() mqttClient = MyMqttClient(this) mqttClient?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{ override fun onConnectComplete(serverURI: String?) { mqttClient?.subscribeToTopic(MqttConfig.oneTopic) } override fun onConnectFailure(e: Throwable?) { } override fun onMessageArrived(topic: String?, message: MqttMessage?) { val h = Handler(Looper.getMainLooper()) h.post { Toast.makeText(this@MqttActionService.applicationContext, message.toString(), Toast.LENGTH_SHORT).show(); } } }) } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { val handler = Handler() handler.postDelayed({ mqttClient?.connect() }, 1000) return START_STICKY } override fun onBind(intent: Intent?): IBinder? { return null } override fun onDestroy() { super.onDestroy() mqttClient?.onDestroy() } }
然后publisher連接并發(fā)送消息
class MainActivity : AppCompatActivity() { private var clinet : MyMqttClient ?= null private var isConnect = false override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) init() val btn : Button = findViewById(R.id.btn_connect) val send : Button = findViewById(R.id.btn_send) val open : Button = findViewById(R.id.open) open.setOnClickListener { val intent = Intent() intent.setClass(this, MqttActionService::class.java) startService(intent) } btn.setOnClickListener { clinet?.connect() } send.setOnClickListener { clinet?.sendMessage(MqttConfig.oneTopic, "你干嘛啊~哎呦~".toByteArray()) } } private fun init(){ clinet = MyMqttClient(this, "deployment-r0c36018") clinet?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{ override fun onConnectComplete(serverURI: String?) { isConnect = true } override fun onConnectFailure(e: Throwable?) { e?.printStackTrace() isConnect = false } override fun onMessageArrived(topic: String?, message: MqttMessage?) { } }) } }
我這定了3個(gè)按鈕,第一個(gè)按鈕open會(huì)跳轉(zhuǎn)Service然后subscriber連接并訂閱主題,第二個(gè)按鈕btn會(huì)連接代理,第三個(gè)按鈕send發(fā)送消息??碝qttActionService的代碼可以看出,我這里發(fā)送消息后,會(huì)彈出Toast。
Paho的mqtt的BUG
這庫我也是第一次用,我們那用的都是自己擼的(這邊肯定沒法放上來),然后我用的時(shí)候發(fā)現(xiàn)一個(gè)問題。我想給Service去開一條進(jìn)程去處理訂閱的操作的,這樣能更真實(shí)的去模擬,結(jié)果就在連接時(shí)出問題了
經(jīng)檢查,連接的context的進(jìn)程要和org.eclipse.paho.android.service.MqttService的進(jìn)程一致。我去看他源碼是怎么回事。
發(fā)現(xiàn)它內(nèi)部的Binder竟然做了強(qiáng)轉(zhuǎn),這里因?yàn)椴皇谴矶鴷?huì)出現(xiàn)報(bào)錯(cuò)。如果使用這個(gè)庫的話就小心點(diǎn)你要做的夸進(jìn)程的操作。
總結(jié)
今天只是淺談一些MQTT的一些原理和流程,其實(shí)還有更深的功能,比如Qos啊這些還沒說,我覺得一次說太多可能會(huì)讓第一次接觸的人混亂。先簡單的了解MQTT是什么,主要使用的場景,內(nèi)部的原理大致是怎樣的。當(dāng)了解這些之后再去深入的看,會(huì)能夠更好的去理解。
以上就是Android開發(fā)MQTT協(xié)議的模型及通信淺析的詳細(xì)內(nèi)容,更多關(guān)于Android MQTT協(xié)議模型通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
android使用Ultra-PullToRefresh實(shí)現(xiàn)下拉刷新自定義代碼
本篇文章主要介紹了android使用Ultra-PullToRefresh實(shí)現(xiàn)下拉刷新新自定義,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-02-02flutter實(shí)現(xiàn)更新彈窗內(nèi)容例子(親測有效)
Flutter是一款移動(dòng)應(yīng)用程序SDK,包含框架、widget和工具,這篇文章給大家介紹flutter實(shí)現(xiàn)更新彈窗內(nèi)容例子,親測可以使用,需要的朋友參考下吧2021-04-04Android設(shè)計(jì)登錄界面、找回密碼、注冊(cè)功能
這篇文章主要為大家詳細(xì)介紹了Android設(shè)計(jì)登錄界面的方法,Android實(shí)現(xiàn)找回密碼、注冊(cè)功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-05-05Android OKHttp框架的分發(fā)器與攔截器源碼刨析
okhttp是一個(gè)第三方類庫,用于android中請(qǐng)求網(wǎng)絡(luò)。這是一個(gè)開源項(xiàng)目,是安卓端最火熱的輕量級(jí)框架,由移動(dòng)支付Square公司貢獻(xiàn)(該公司還貢獻(xiàn)了Picasso和LeakCanary) 。用于替代HttpUrlConnection和Apache HttpClient2022-11-11OpenGL Shader實(shí)現(xiàn)物件材料效果詳解
在一些主流app上有一些比較特殊的濾鏡效果,例如灰塵、塑料封面、光影效果等,這些其實(shí)是紋理疊加的效果。本文將用OpenGL Shader實(shí)現(xiàn)這些效果,需要的可以參考一下2022-02-02Android獲得當(dāng)前正在顯示的activity類名的方法
這篇文章主要介紹了Android獲得當(dāng)前正在顯示的activity類名的方法,分析了權(quán)限的修改與Java代碼的實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-01-01