未收到来自Android MQTT的消息发布到RabbitMQ

问题描述

我已经设置RabbitMQ,启用了用于管理的Web UI,启用了mqtt_plugin以及端口1883、8883、5672和15672(Docker)。我将Paho MqttClient用于我正在开发的Android应用程序,用于将消息发布到MQ代理。连接很好,但是在Web UI和CLI上没有收到作为检查的消息。

rabbitmqctl list_queues

连接页面:

active publisher

频道页:

Channel page

交换页:

active exchanges

队列页面:

available queues

下面是我正在处理的代码。

<com.google.android.gms.ads.AdView
      xmlns:ads="http://schemas.android.com/apk/res-auto"
      android:id="@+id/adView"
      android:layout_width="wrap_content"
      android:layout_height="wrap_content"
      android:layout_centerHorizontal="true"
      android:layout_alignParentBottom="true"
      ads:adSize="BANNER"
      ads:adUnitId="ca-app-pub-3940256099942544/6300978111">
  </com.google.android.gms.ads.AdView>

我一直在寻找答案,并尝试重新安装MQ,但是仍然得到相同的结果。

解决方法

这是coonectMq的常见扩展,并从中获取消息。 (MqConnectionExtention.kt)

    fun Context.connectMq(publishTopicChannelName: String,onConnectionSuccess: (topic: String?,message: MqttMessage?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this,"tcp://34.212.00.188:1883",mClientId)

    Timber.e("ChannelName:$publishTopicChannelName")
    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean,serverURI: String) {
            if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
                Log.e("TAG","Reconnected to : $serverURI")
                // Because Clean Session is true,we need to re-subscribe
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName,object : IMqttMessageListener {
                        override fun messageArrived(topic: String?,message: MqttMessage?) {
                            onConnectionSuccess(topic,message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG","Connected to: $serverURI")
            }
        }

        override fun connectionLost(cause: Throwable) {
            Log.e("TAG","The Connection was lost.")
        }

        override fun messageArrived(topic: String,message: MqttMessage) {
            Log.e("TAG","Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME","MQ_CONNECTION_PASSWORD")
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions,null,object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName,message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }
            }

            override fun onFailure(asyncActionToken: IMqttToken,exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
            }
        })
    } catch (ex: MqttException) {
        ex.printStackTrace()
    }
}

private fun setUpConnectionOptions(username: String,password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

从Java类中,我像下面这样调用它并成功获取消息:

private void subscribeMQForVideo() {
    
    MqConnectionExtentionKt.connectMq(mContext,"mq_video_channel_name",(topic,mqttMessage) -> {
        // message Arrived!
        Log.e("TAG","Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));

        
        return null;
    });
}

要发布类似扩展的消息,我创建的扩展几乎没有什么不同。 (MqConnectionPublishExtention.kt)

    fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this,BuildConfig.MQ_SERVER_URI,mClientId)

    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean,we need to re-subscribe
                onConnectionSuccess(mqttAndroidClient)

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG","Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME,BuildConfig.MQ_CONNECTION_PASSWORD)
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions,object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                onConnectionSuccess(mqttAndroidClient)
            }

            override fun onFailure(asyncActionToken: IMqttToken,password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

发布来自Java类的消息

private void publishExerciseDataToMQChannel() {
   
 MqConnectionPublishExtentionKt.connectMq(mContext,(mqttAndroidClient) -> {

        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("params",mlParams);
            jsonObject.put("workoutid",workoutId);
            jsonObject.put("userid",model.getUserIdFromPrefs());
            jsonObject.put("stream_id",streamDataModel.getStreamId());
            

            MqttMessage message = new MqttMessage();
            message.setPayload(jsonObject.toString().getBytes());
            mqttAndroidClient.publish("Channel_name",message);

            Log.e("TAG",message.getQos() + "");
            if (!mqttAndroidClient.isConnected()) {
                Log.e("TAG",mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
        } catch (MqttException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (JSONException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
        return null;
    });
}

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...