• <strike id="fdgpu"><input id="fdgpu"></input></strike>
    <label id="fdgpu"></label>
    <s id="fdgpu"><code id="fdgpu"></code></s>

  • <label id="fdgpu"></label>
  • <span id="fdgpu"><u id="fdgpu"></u></span>

    <s id="fdgpu"><sub id="fdgpu"></sub></s>
    您當前的位置是:  首頁(yè) > 資訊 > 國內 >
     首頁(yè) > 資訊 > 國內 >

    開(kāi)源項目:用環(huán)信MQTT實(shí)現"世界頻道"只需5分鐘【附源碼】

    2022-04-27 10:37:43   作者:   來(lái)源:CTI論壇   評論:0  點(diǎn)擊:


      說(shuō)到“世界頻道”想必大家都不陌生,常見(jiàn)的如王者榮耀的世界廣播搖人組隊以及最近興起的Discord社區交友等等。究其目的就是在應用內讓海量用戶(hù)可以實(shí)時(shí)互動(dòng)。有些開(kāi)發(fā)者為了實(shí)現這種場(chǎng)景會(huì )選擇聊天室方案來(lái)實(shí)現,但是這種方式存在一定的局限性,比如聊天室人數上限、海量消息處理等各種情況。
      當然如果有錢(qián)有顏,可以直接選擇云廠(chǎng)商產(chǎn)品(比如環(huán)信的聊天室方案和超級社區),如果有才有time,也可以選擇平替版MQTT實(shí)現方案。今天小猿將介紹用環(huán)信MQTT消息云實(shí)現應用內的世界頻道,滿(mǎn)滿(mǎn)干貨,不要錯過(guò)~~
        使用MQTT實(shí)現世界頻道-Demo效果演示
      協(xié)議優(yōu)勢:
      在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協(xié)議。
    • 輕量級:MQTT本身是物聯(lián)網(wǎng)的連接協(xié)議,專(zhuān)為受限設備和低帶寬場(chǎng)景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動(dòng)應用領(lǐng)域(比如:游戲領(lǐng)域)。
    • 易集成:MQTT作為標準開(kāi)放的消息協(xié)議,經(jīng)過(guò)多年演進(jìn),已支持30多種開(kāi)發(fā)語(yǔ)言,10余種SDK,無(wú)論何種開(kāi)發(fā)環(huán)境,都可以快速找到開(kāi)源SDK。
    • 高并發(fā):MQTT是輕量級的消息傳輸協(xié)議,2字節心跳報文,最小化傳輸和連接成本,云廠(chǎng)商broker產(chǎn)品都可支持千萬(wàn)級并發(fā)接入,適用于高并發(fā)連接場(chǎng)景。
    • 低成本:MQTT是基于客戶(hù)端-服務(wù)器的訂閱/發(fā)布模型,通過(guò)服務(wù)器中間件實(shí)現消息分發(fā),減少消息復制成本,快速實(shí)現一對多在線(xiàn)推送。
    • 靈活性:MQTT協(xié)議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實(shí)現多種業(yè)務(wù)場(chǎng)景。
    • 衍生功能:隨著(zhù)MQTT云服務(wù)的發(fā)展,部分服務(wù)器廠(chǎng)商已支持消息存儲、獲取在線(xiàn)設備列表、查看歷史消息等衍生功能,降低開(kāi)發(fā)工作量與消息存儲成本。
      實(shí)現方案:
      言歸正傳,上干貨。本次技術(shù)實(shí)現方案包含:移動(dòng)客戶(hù)端(Android)、后端服務(wù)(Java)以及MQTT服務(wù)器。這里提一下,MQTT服務(wù)器使用環(huán)信MQTT消息云,使用三方云服務(wù)比較省心,既節省開(kāi)發(fā)時(shí)間,產(chǎn)品性能也不需要擔心,現在注冊可以直接使用環(huán)信MQTT消息云超高額度的免費版:每月100并發(fā)連接、300萬(wàn)消息,完全滿(mǎn)足功能開(kāi)發(fā)使用。
      客戶(hù)端實(shí)現:
      客戶(hù)端實(shí)現主要包含以下兩部分:
    • 底層MQTT業(yè)務(wù)集成:包含引入SDK、MQTT方法封裝、業(yè)務(wù)交互(消息收發(fā))。
    • APP上層交互:在A(yíng)PP首頁(yè)提供世界頻道入口,實(shí)現心情彈幕飄窗(接收)和發(fā)送。
      接下來(lái)上底層MQTT業(yè)務(wù)集成代碼。
      引入SDK:
      這一步環(huán)信官方文檔比較明確,就是根據自己的平臺引入相應的mqtt客戶(hù)端sdk,這里簡(jiǎn)單貼一下AndroidStudio的引入配置
      1// 在根目錄 build.gradle repositories 下加入配置
      2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
      3...
      4// 然后加入 MQTT 依賴(lài)
      5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
      6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
      7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
      方法封裝
      這里貼一下對mqtt相關(guān)方法的簡(jiǎn)單封裝,代碼在vmmqtt模塊兒的MQTTHelper類(lèi)下:
      1 /**
      2 * Create by lzan13 on 2022/3/22
      3 * 描述:MQTT 幫助類(lèi)
      4 */
      5 object MQTTHelper {
      6
      7    private var mqttClient: MqttAndroidClient? = null
      8
      9    // 緩存主題集合
      10    private val topicList = mutableListOf()
      11
      12    /**
      13     * 鏈接MQTT
      14     * @param id 用戶(hù) Id
      15     * @param token 用戶(hù)鏈接 MQTT 的 Token
      16     * @param topic 需要訂閱的主題,不為空就會(huì )在連接成功后進(jìn)行訂閱
      17     */
      18    fun connect(id: String, token: String, topic: String = "") {
      19        // 處理訂閱主題
      20        if (topic.isNotEmpty()) topicList.add(topic)
      21
      22        // 拼接鏈接地址
      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
      24        // 拼接 clientId
      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
      27
      28        //連接參數
      29        val options = MqttConnectOptions()
      30        options.isAutomaticReconnect = true //設置自動(dòng)重連
      31        options.isCleanSession = true // 緩存
      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時(shí)時(shí)間,單位:秒
      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒
      34        options.userName = id // 用戶(hù)名
      35        options.password = token.toCharArray() // 密碼
      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
      37        // 設置MQTT監聽(tīng)
      38        mqttClient?.setCallback(object : MqttCallback {
      39            override fun connectionLost(t: Throwable) {
      40                // 通知鏈接斷開(kāi)
      41                VMLog.d("MQTT 鏈接斷開(kāi) $t")
      42            }
      43
      44            @Throws(Exception::class)
      45            override fun messageArrived(topic: String, message: MqttMessage) {
      46                // 通知收到消息
      47                VMLog.d("MQTT 收到消息:$message")
      48                // 如果未訂閱則直接丟棄
      49                if (!topicList.contains(topic)) return
      50                notifyEvent(topic, String(message.payload))
      51            }
      52
      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
      54        })
      55        //進(jìn)行連接
      56        mqttClient?.connect(options, null, object : IMqttActionListener {
      57            override fun onSuccess(token: IMqttToken) {
      58                VMLog.d("MQTT 鏈接成功")
      59                // 鏈接成功,循環(huán)訂閱緩存的主題
      60                topicList.forEach { subscribe(it) }
      61            }
      62
      63            override fun onFailure(token: IMqttToken, t: Throwable) {
      64                VMLog.d("MQTT 鏈接失敗 $t")
      65            }
      66        })
      67    }
      68
      69    /**
      70     * 訂閱主題
      71     * @param topic 主題
      72     */
      73    fun subscribe(topic: String) {
      74        if (!topicList.contains(topic)) {
      75            topicList.add(topic)
      76        }
      77        try {
      78            //連接成功后訂閱主題
      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
      80                override fun onSuccess(token: IMqttToken) {
      81                    VMLog.d("MQTT 訂閱成功 $topic")
      82                }
      83
      84                override fun onFailure(token: IMqttToken, t: Throwable) {
      85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
      86                }
      87            })
      88        } catch (e: MqttException) {
      89            e.printStackTrace()
      90        }
      91    }
      92
      93    /**
      94     * 取消訂閱
      95     * @param topic 主題
      96     */
      97    fun unsubscribe(topic: String) {
      98        if (topicList.contains(topic)) {
      99            topicList.remove(topic)
      100        }
      101        try {
      102            mqttClient?.unsubscribe(topic)
      103        } catch (e: MqttException) {
      104            e.printStackTrace()
      105        }
      106    }
      107
      108    /**
      109     * 發(fā)送 MQTT 消息
      110     * @param topic 主題
      111     * @param content 內容
      112     */
      113    fun sendMsg(topic: String, content: String) {
      114        val msg = MqttMessage()
      115        msg.payload = content.encodeToByteArray() // 設置消息內容
      116        msg.qos = 0 //設置消息發(fā)送質(zhì)量,可為0,1,2.
      117        // 設置消息的topic,并發(fā)送。
      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
      119            override fun onSuccess(asyncActionToken: IMqttToken) {
      120                VMLog.d("MQTT 消息發(fā)送成功")
      121            }
      122
      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
      124                VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")
      125            }
      126        })
      127    }
      128
      129    /**
      130     * 通知 MQTT 事件
      131     */
      132    private fun notifyEvent(topic: String, data: String) {
      133        LDEventBus.post(topic, data)
      134    }
      135 }
      業(yè)務(wù)交互
      和業(yè)務(wù)相關(guān)的就是在啟動(dòng)APP后,使用后端服務(wù)器返回的鑒權token信息及連接封裝接口登錄環(huán)信通MQTT服務(wù)器,登錄成功后訂閱主題并監聽(tīng)消息。
      1// 請求 token 成功后,調用MQTTHelper.connect()鏈接 MQTT 服務(wù)器,這里會(huì )同時(shí)傳遞監聽(tīng)的主題
      2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
      3
      4/**
      5 * 發(fā)送匹配信息
      6 */
      7private fun sendMatchInfo() {
      8    if (selfMatch.user.nickname.isEmpty()) return
      9    // 提交自己的匹配信息到服務(wù)器
      10    mViewModel.submitMatch(selfMatch)
      11    val json = JSONObject()
      12    json.put("content", selfMatch.content)
      13    json.put("emotion", selfMatch.emotion)
      14    json.put("gender", selfMatch.gender)
      15    json.put("type", selfMatch.type)
      16    val jsonUser = JSONObject()
      17    jsonUser.put("avatar", mUser.avatar)
      18    jsonUser.put("id", mUser.id)
      19    jsonUser.put("nickname", mUser.nickname)
      20    jsonUser.put("username", mUser.username)
      21    json.put("user", jsonUser)
      22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
      23}
      24
      25// 監聽(tīng)消息這里使用了一個(gè)事件總線(xiàn)進(jìn)行通知,在上邊封裝 MQTTHelper 發(fā)送消息也使用了這個(gè),
      26// 訂閱 MQTT 事件
      27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
      28        val match = JsonUtils.fromJson(it, Match::class.java)
      29        // 這里收到匹配信息之后就增加一條彈幕
      30    addBarrage(match)
      31}
      后端服務(wù)實(shí)現
      接下來(lái)介紹后端服務(wù)實(shí)現,主要包含以下兩部分:
    • 配置連接信息:配置環(huán)信MQTT消息云連接信息。
    • 獲取鑒權信息:獲取客戶(hù)端連接需要的鑒權信息。
      配置連接信息
      配置部分只需要按照環(huán)信后臺配置信息進(jìn)行替換就好,配置在config目錄下的config.xxx.json文件內
      1/**
      2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
      3 */
      4config.mqtt = {
      5    host: 'mqtt host', // MQTT 鏈接地址
      6  appId: 'appId', // MQTT AppId
      7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
      8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務(wù) API 地址
      9  clientId: 'client id', // 替換環(huán)信后臺 clientId
      10  clientSecret: 'client secret', // 替換環(huán)信后臺 clientSecret
      11};
      獲取鑒權信息
      這里主要是獲取客戶(hù)端連接所需要的鑒權信息token,為了安全token肯定是要放在服務(wù)器端生成的,廢話(huà)不多說(shuō),上代碼:
      1/**
      2 * Create by lzan13 on 2022/3/22
      3 * 描述:MQTT 幫助類(lèi)
      4 */
      5object MQTTHelper {
      6
      7    private var mqttClient: MqttAndroidClient? = null
      8
      9    // 緩存主題集合
      10    private val topicList = mutableListOf()
      11
      12    /**
      13     * 鏈接MQTT
      14     * @param id 用戶(hù) Id
      15     * @param token 用戶(hù)鏈接 MQTT 的 Token
      16     * @param topic 需要訂閱的主題,不為空就會(huì )在連接成功后進(jìn)行訂閱
      17     */
      18    fun connect(id: String, token: String, topic: String = "") {
      19        // 處理訂閱主題
      20        if (topic.isNotEmpty()) topicList.add(topic)
      21
      22        // 拼接鏈接地址
      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
      24        // 拼接 clientId
      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
      27
      28        //連接參數
      29        val options = MqttConnectOptions()
      30        options.isAutomaticReconnect = true //設置自動(dòng)重連
      31        options.isCleanSession = true // 緩存
      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時(shí)時(shí)間,單位:秒
      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒
      34        options.userName = id // 用戶(hù)名
      35        options.password = token.toCharArray() // 密碼
      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
      37        // 設置MQTT監聽(tīng)
      38        mqttClient?.setCallback(object : MqttCallback {
      39            override fun connectionLost(t: Throwable) {
      40                // 通知鏈接斷開(kāi)
      41                VMLog.d("MQTT 鏈接斷開(kāi) $t")
      42            }
      43
      44            @Throws(Exception::class)
      45            override fun messageArrived(topic: String, message: MqttMessage) {
      46                // 通知收到消息
      47                VMLog.d("MQTT 收到消息:$message")
      48                // 如果未訂閱則直接丟棄
      49                if (!topicList.contains(topic)) return
      50                notifyEvent(topic, String(message.payload))
      51            }
      52
      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
      54        })
      55        //進(jìn)行連接
      56        mqttClient?.connect(options, null, object : IMqttActionListener {
      57            override fun onSuccess(token: IMqttToken) {
      58                VMLog.d("MQTT 鏈接成功")
      59                // 鏈接成功,循環(huán)訂閱緩存的主題
      60                topicList.forEach { subscribe(it) }
      61            }
      62
      63            override fun onFailure(token: IMqttToken, t: Throwable) {
      64                VMLog.d("MQTT 鏈接失敗 $t")
      65            }
      66        })
      67    }
      68
      69    /**
      70     * 訂閱主題
      71     * @param topic 主題
      72     */
      73    fun subscribe(topic: String) {
      74        if (!topicList.contains(topic)) {
      75            topicList.add(topic)
      76        }
      77        try {
      78            //連接成功后訂閱主題
      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
      80                override fun onSuccess(token: IMqttToken) {
      81                    VMLog.d("MQTT 訂閱成功 $topic")
      82                }
      83
      84                override fun onFailure(token: IMqttToken, t: Throwable) {
      85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
      86                }
      87            })
      88        } catch (e: MqttException) {
      89            e.printStackTrace()
      90        }
      91    }
      92
      93    /**
      94     * 取消訂閱
      95     * @param topic 主題
      96     */
      97    fun unsubscribe(topic: String) {
      98        if (topicList.contains(topic)) {
      99            topicList.remove(topic)
      100        }
      101        try {
      102            mqttClient?.unsubscribe(topic)
      103        } catch (e: MqttException) {
      104            e.printStackTrace()
      105        }
      106    }
      107
      108    /**
      109     * 發(fā)送 MQTT 消息
      110     * @param topic 主題
      111     * @param content 內容
      112     */
      113    fun sendMsg(topic: String, content: String) {
      114        val msg = MqttMessage()
      115        msg.payload = content.encodeToByteArray() // 設置消息內容
      116        msg.qos = 0 //設置消息發(fā)送質(zhì)量,可為0,1,2.
      117        // 設置消息的topic,并發(fā)送。
      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
      119            override fun onSuccess(asyncActionToken: IMqttToken) {
      120                VMLog.d("MQTT 消息發(fā)送成功")
      121            }
      122
      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
      124                VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")
      125            }
      126        })
      127    }
      128
      129    /**
      130     * 通知 MQTT 事件
      131     */
      132    private fun notifyEvent(topic: String, data: String) {
      133        LDEventBus.post(topic, data)
      134    }
      135}
      源碼地址
      核心代碼就這么多,不超過(guò)500行,這里沒(méi)有直接調用環(huán)信歷史消息接口獲取消息存儲記錄,后續可以在進(jìn)行改良,簡(jiǎn)化實(shí)現流程。源碼鏈接附上,配合使用效果更佳。
      服務(wù)端github源碼:
      https://github.com/lzan13/vmtemplateserver
      客戶(hù)端github源碼:
      https://gitee.com/lzan13/VMTemplateAndroid
      寫(xiě)在最后
      MQTT協(xié)議資源占用小,并發(fā)連接高,集成簡(jiǎn)單,特別適用于高頻數據交互場(chǎng)景,比如:游戲的世界廣場(chǎng)、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務(wù)實(shí)現更多的業(yè)務(wù)場(chǎng)景,享受技術(shù)帶來(lái)的便利與快樂(lè )。
    【免責聲明】本文僅代表作者本人觀(guān)點(diǎn),與CTI論壇無(wú)關(guān)。CTI論壇對文中陳述、觀(guān)點(diǎn)判斷保持中立,不對所包含內容的準確性、可靠性或完整性提供任何明示或暗示的保證。請讀者僅作參考,并請自行承擔全部責任。

    相關(guān)熱詞搜索: 環(huán)信 MQTT

    上一篇:CPaaS市場(chǎng), 騰訊云穩居第一!

    下一篇:最后一頁(yè)

    專(zhuān)題

    CTI論壇會(huì )員企業(yè)

    亚洲精品网站在线观看不卡无广告,国产a不卡片精品免费观看,欧美亚洲一区二区三区在线,国产一区二区三区日韩 汽车| 安阳县| 田阳县| 长治县| 隆子县| 德州市| 油尖旺区| 威远县| 翁源县| 祁连县| 咸阳市| 德保县| 根河市| 句容市| 任丘市| 四川省| 洮南市| 永寿县| 赤峰市| 鹤山市| 都匀市| 包头市| 金山区| 常德市| 长武县| 定边县| 朔州市| 句容市| 康保县| 汤原县| 云霄县| 邓州市| 嵊泗县| 临猗县| 固安县| 巴青县| 延庆县| 宜宾县| 洪泽县| 景谷| 江口县| http://444 http://444 http://444 http://444 http://444 http://444