想要为兔子MQ使用vert.x创建断路器

问题描述

我想使用vert.x实现断路器mq的断路器。如果Rabbit mq断开,则根据配置断开电路。

我为vert.x断路器单独设置了POC,并且还可以与Rabbit MQ客户端连接。使用vert.x。

现在,我想构建一个断路器,如果电路断开,那么客户端将不将消息推送到队列中,而是保留在数据库中,一旦电路关闭,它将开始在mq中推送数据库消息。请帮忙。

我已经使用下面的链接来创建有效的poc。

用于断路器的代码段

     CircuitBreakerOptions options = new CircuitBreakerOptions()
        .setMaxFailures(0)
        .setTimeout(5000)
        .setMaxRetries(3)
        .setFallbackOnFailure(true);

    CircuitBreaker breaker =
        CircuitBreaker.create("my-circuit-breaker",vertx,options)
        .openHandler(v -> {
          System.out.println("Circuit opened");
        }).closeHandler(v -> {
          System.out.println("Circuit closed");
        }).retryPolicy(retryCount -> retryCount * 100L);

    breaker.executeWithFallback(promise -> {
      vertx.createHttpClient().getNow(8080,"localhost","/",response -> {
        if (response.statusCode() != 200) {
          promise.fail("HTTP error");
        } else {
          response.exceptionHandler(promise::fail).bodyHandler(buffer -> {
            promise.complete(buffer.toString());
          });
        }
      });
    },v -> {
      // Executed when the circuit is opened
      return "Hello (fallback)";
    },ar -> {
      // Do something with the result
      System.out.println("Result: " + ar.result());
    });
  }

用于RAbbit MQ客户端的代码段

public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    RabbitMQOptions config = new RabbitMQOptions();
    // Each parameter is optional
    // The default parameter with be used if the parameter is not set
    config.setUser("guest");
    config.setPassword("guest");
    config.setHost("localhost");
    config.setPort(5672);
    // config.setVirtualHost("vhost1");
    // config.setConnectionTimeout(6000); // in milliseconds
    config.setRequestedHeartbeat(60); // in seconds
    // config.setHandshakeTimeout(6000); // in milliseconds
    // config.setRequestedChannelMax(5);
    // config.setNetworkRecoveryInterval(500); // in milliseconds
    // config.setAutomaticRecoveryEnabled(true);
    // vertx.

    RabbitMQClient client = RabbitMQClient.create(vertx,config);
    CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(0).setTimeout(5000).setMaxRetries(0)
            .setFallbackOnFailure(true);

    CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker",options).openHandler(v -> {
        System.out.println("Circuit opened");
    }).closeHandler(v -> {
        System.out.println("Circuit closed");
    }).retryPolicy(retryCount -> retryCount * 100L);

    
    breaker.executeWithFallback(promise -> {
          vertx.createHttpClient().getNow(8080,response -> {
            if (response.statusCode() != 200) {
              promise.fail("HTTP error");
            } else {
              response.exceptionHandler(promise::fail).bodyHandler(buffer -> {
                promise.complete(buffer.toString());
              });
            }
          });
        },v -> {
          // Executed when the circuit is opened
          return "Hello (fallback)";
        },ar -> {
          // Do something with the result
          System.out.println("Result: " + ar.result());
        });
    
    client.start(rh -> {

        if (rh.failed()) {
            System.out.println("failed");
        } else {
            for (int i = 0; i > 5; i++) {
                System.out.println(rh.succeeded());
                System.out.println("client : " + client.isConnected());
                breaker.executeWithFallback(promise -> {
                    if(!client.isConnected()) {
                        promise.fail("MQ client is not connected");
                    }
                },v -> {
                      // Executed when the circuit is opened
                      return "Hello (fallback)";
                    },ar -> {
                      // Do something with the result
                      System.out.println("Result: " + ar.result());
                    });
                /* RabbitMQClient client2 = RabbitMQClient.create(vertx); */
                JsonObject message = new JsonObject().put("body","Hello RabbitMQ,from Vert.x !");
                client.basicPublish("eventExchange","order.created",message,pubResult -> {
                    if (pubResult.succeeded()) {
                        System.out.println("Message published !");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } else {
                        pubResult.cause().printStackTrace();
                    }
                });
            }
        }
    });

}

尽管其工作正常,但如果兔子mq掉下来,电路就会断开。但是,它看起来不是用vert.x进行格式化的标准方法。

请您提出建议,我可以如何实现它。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...