Golang 向 IBM MQ 发送消息错误 MQRC_CONNECTION_BROKEN

问题描述

我现在正在使用 Golang 连接 IBM Websphere MQ-ESB 和我用来与 MQ-ESB 通信的库

https://github.com/ibm-messaging/mq-golang
https://github.com/ibm-messaging/mq-golang-jms20

通常,它可以将消息发送到 MQ-ESB,但是当发生错误时,会出现错误 MQ Connection broken。这导致我的应用程序无法再将消息发送到 MQ-ESB。重启服务是解决这个问题的方法(但不是解决方法)。有人有想法吗? 谢谢

这是创建 mq 连接的代码

func NewIBMMQConnection(mqConnConfig *mqjms.ConnectionFactoryImpl) jms20subset.JMSContext {
if !viper.GetBool("mq.openconnection") {
    return &mqjms.ContextImpl{}
}

logx.WithFields(logrus.Fields{
    "queue manager": viper.GetString("mq.qManager"),"host":          viper.GetString("mq.host"),"port":          viper.GetInt("mq.port"),"channel":       viper.GetString("mq.qChannel"),}).Infof("[CONfig] [MDM IBMMQ]")

conn,exception := mqConnConfig.CreateContext()
if exception != nil {
    if exception.GetLinkedError() != nil {
        logx.Fatalf("new mdm mq error: %s",exception.GetLinkedError())
    }
}
return conn
}

func NewIBMMQConfig() *mqjms.ConnectionFactoryImpl {
    return &mqjms.ConnectionFactoryImpl{
        QMName:      viper.GetString("mq.qManager"),Hostname:    viper.GetString("mq.host"),PortNumber:  viper.GetInt("mq.port"),ChannelName: viper.GetString("mq.qChannel"),UserName:    viper.GetString("mq.login"),Password:    viper.GetString("mq.pass"),}
}

这是main.go中实例化连接的代码

func main() {
    db := newGormDB()
    defer closeDB(db)

    mq := ibmmq.NewIBMMQConnection(ibmmq.NewIBMMQConfig())
    defer mq.Close()
    ibmmq := ibmmq.New(mq)

    ...

    ... 

    go startServer()

    shutdown()
}

这是代码生成消息

func (i *IBMMQ) ProduceMSGToMQ(ctx context.Context,msg string) error {
logx.WithContext(ctx).Infof("Producing Message queueName: message: %s",msg)
err := i.producer.SendString(i.queueCDDEMoeny,msg)

if err != nil {
    logx.WithSeverityError(ctx).Errorf("Send msg to mq error: %s",err.GetErrorCode()+"-"+err.GetReason()+"-"+err.GetLinkedError().Error())
    return errors.New("Send msg to mq error: " + err.GetErrorCode() + "-" + err.GetReason() + "-" + err.GetLinkedError().Error())
}
return nil
}

解决方法

我不是 Go 语言专家,只是一个 IBM MQ 专家,但这里是(不是双关语!)。

我没有看到你在哪里调用 ProduceMSGToMQ 函数,但我想象是这样的:-

error := ibmmq.ProduceMSGToMQ(...)
if error != nil && (error.GetReason() == "MQRC_CONNECTION_BROKEN") {
    mq := ibmmq.NewIBMMQConnection(ibmmq.NewIBMMQConfig())
}

希望能有所帮助。

,

我们还必须重新启动服务以恢复 MQ 客户端上的连接。 但是在设置了重新连接选项后问题解决了。

class LocationService : Service() {
private var context: Context? = null
private var settingsClient: SettingsClient? = null
private var locationSettingsRequest: LocationSettingsRequest? = null
private var locationManager: LocationManager? = null
private var locationRequest: LocationRequest? = null
private var notificationManager: NotificationManager? = null
private var fusedLocationClient: FusedLocationProviderClient? = null
private val binder: IBinder = LocalBinder()
private var locationCallback: LocationCallback? = null
private var location: Location? = null

override fun onBind(intent: Intent?): IBinder {
    // Called when a client (MainActivity in case of this sample) comes to the foreground
    // and binds with this service. The service should cease to be a foreground service
    // when that happens.
    Log.i(TAG,"in onBind()")
    return binder
}

override fun onCreate() {
    super.onCreate()

    context = this
    fusedLocationClient = LocationServices.getFusedLocationProviderClient(this)

    createLocationRequest()

    locationCallback = object : LocationCallback() {
        @RequiresApi(Build.VERSION_CODES.O)
        override fun onLocationResult(locationResult: LocationResult) {
            super.onLocationResult(locationResult)

            for (location in locationResult.locations) {
                onNewLocation(location)
            }
        }
    }

    val handlerThread = HandlerThread(TAG)
    handlerThread.start()

    notificationManager = getSystemService(NOTIFICATION_SERVICE) as NotificationManager?

    // Android O requires a Notification Channel.
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
        val name: CharSequence = "service"
        val mChannel = NotificationChannel(CHANNEL_ID,name,NotificationManager.IMPORTANCE_DEFAULT)

        // Set the Notification Channel for the Notification Manager.
        notificationManager?.createNotificationChannel(mChannel)
    }
}

override fun onStartCommand(intent: Intent?,flags: Int,startId: Int): Int {
    Log.i(TAG,"Service started")
    val startedFromNotification =
        intent?.getBooleanExtra(EXTRA_STARTED_FROM_NOTIFICATION,false)

    // We got here because the user decided to remove location updates from the notification.
    if (startedFromNotification == true) {
        removeLocationUpdates()
        stopSelf()
    }
    // Tells the system to not try to recreate the service after it has been killed.
    return START_NOT_STICKY
}

/**
 * Returns the [NotificationCompat] used as part of the foreground service.
 */
private val notification: Notification
    private get() {
        val intent = Intent(this,LocationService::class.java)

        // Extra to help us figure out if we arrived in onStartCommand via the notification or not.
        intent.putExtra(EXTRA_STARTED_FROM_NOTIFICATION,true)

        // The PendingIntent that leads to a call to onStartCommand() in this service.
        val servicePendingIntent =
            PendingIntent.getService(this,intent,PendingIntent.FLAG_UPDATE_CURRENT)

        // The PendingIntent to launch activity.
        val activityPendingIntent =
            PendingIntent.getActivity(this,Intent(this,MainActivity::class.java),0)
        val builder = NotificationCompat.Builder(this)
            .addAction(R.drawable.ic_delete,"title",activityPendingIntent)
            .addAction(R.drawable.ic_delete,"remove",servicePendingIntent)
            .setContentTitle("location title").setOngoing(true)
            .setPriority(Notification.PRIORITY_HIGH).setSmallIcon(R.drawable.btn_dialog)
            .setWhen(System.currentTimeMillis())


        // Set the Channel ID for Android O.
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            builder.setChannelId(CHANNEL_ID) // Channel ID
        }
        return builder.build()
    }

/**
 * Makes a request for location updates. Note that in this sample we merely log the
 * [SecurityException].
 */
fun requestLocationUpdates() {
    Log.i(TAG,"Requesting location updates")

    startForeground(NOTIFICATION_ID,notification)
    try {
        fusedLocationClient?.requestLocationUpdates(locationRequest,locationCallback,null)
    } catch (unlikely: SecurityException) {
        Log.e(TAG,"Lost location permission. Could not request updates. $unlikely")
    }
}

@RequiresApi(Build.VERSION_CODES.O)
private fun onNewLocation(location: Location) {
    Log.i(TAG,"New location ${LocalDateTime.now()}: $location")
    this.location = location

    // Notify anyone listening for broadcasts about the new location.
    val intent = Intent(ACTION_BROADCAST)
    intent.putExtra(EXTRA_LOCATION,location)
    LocalBroadcastManager.getInstance(applicationContext).sendBroadcast(intent)

    // Update notification content if running as a foreground service.
    if (serviceIsRunningInForeground(this)) {
        notificationManager?.notify(NOTIFICATION_ID,notification)
    }
}


/**
 * Sets the location request parameters.
 */
private fun createLocationRequest() {
    locationManager = context?.getSystemService(LOCATION_SERVICE) as LocationManager
    settingsClient = LocationServices.getSettingsClient(context)

    locationRequest = LocationRequest.create()
    locationRequest?.priority = LocationRequest.PRIORITY_HIGH_ACCURACY
    locationRequest?.interval = 1000
    locationRequest?.fastestInterval = 1000

    val builder = LocationSettingsRequest.Builder().addLocationRequest(locationRequest)
    locationSettingsRequest = builder.build()

    builder.setAlwaysShow(true) //this is the key ingredient
}

/**
 * Removes location updates. Note that in this sample we merely log the
 * [SecurityException].
 */
fun removeLocationUpdates() {
    Log.i(TAG,"Removing location updates")
    try {
        fusedLocationClient?.removeLocationUpdates(locationCallback)
        stopSelf()
    } catch (unlikely: SecurityException) {
        Log.e(TAG,"Lost location permission. Could not remove updates. $unlikely")
    }
}

/**
 * Class used for the client Binder.  Since this service runs in the same process as its
 * clients,we don't need to deal with IPC.
 */
inner class LocalBinder : Binder() {
    val service: LocationService
        get() = this@LocationService
}

/**
 * Returns true if this is a foreground service.
 *
 * @param context The [Context].
 */
fun serviceIsRunningInForeground(context: Context): Boolean {
    val manager = context.getSystemService(Context.ACTIVITY_SERVICE) as ActivityManager
    for (service in manager.getRunningServices(Int.MAX_VALUE)) {
        if (javaClass.name == service.service.className) {
            if (service.foreground) {
                return true
            }
        }
    }
    return false
}

companion object {
    private const val PACKAGE_NAME = "com.example.locationforegroundservice"
    private val TAG = "TEST"

    /**
     * The name of the channel for notifications.
     */
    private const val CHANNEL_ID = "channel_01"
    const val ACTION_BROADCAST = PACKAGE_NAME + ".broadcast"
    const val EXTRA_LOCATION = PACKAGE_NAME + ".location"
    private const val EXTRA_STARTED_FROM_NOTIFICATION =
        PACKAGE_NAME + ".started_from_notification"

    /**
     * The desired interval for location updates. Inexact. Updates may be more or less frequent.
     */
    private const val UPDATE_INTERVAL_IN_MILLISECONDS: Long = 1000

    /**
     * The fastest rate for active location updates. Updates will never be more frequent
     * than this value.
     */
    private const val FASTEST_UPDATE_INTERVAL_IN_MILLISECONDS =
        UPDATE_INTERVAL_IN_MILLISECONDS / 2

    /**
     * The identifier for the notification displayed for the foreground service.
     */
    private const val NOTIFICATION_ID = 12345678
}