问题描述
我想使用vert.x实现断路器mq的断路器。如果Rabbit mq断开,则根据配置断开电路。
我为vert.x断路器单独设置了POC,并且还可以与Rabbit MQ客户端连接。使用vert.x。
现在,我想构建一个断路器,如果电路断开,那么客户端将不将消息推送到队列中,而是保留在数据库中,一旦电路关闭,它将开始在mq中推送数据库消息。请帮忙。
我已经使用下面的链接来创建有效的poc。
用于断路器的代码段
CircuitBreakerOptions options = new CircuitBreakerOptions()
.setMaxFailures(0)
.setTimeout(5000)
.setMaxRetries(3)
.setFallbackOnFailure(true);
CircuitBreaker breaker =
CircuitBreaker.create("my-circuit-breaker",vertx,options)
.openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(retryCount -> retryCount * 100L);
breaker.executeWithFallback(promise -> {
vertx.createHttpClient().getNow(8080,"localhost","/",response -> {
if (response.statusCode() != 200) {
promise.fail("HTTP error");
} else {
response.exceptionHandler(promise::fail).bodyHandler(buffer -> {
promise.complete(buffer.toString());
});
}
});
},v -> {
// Executed when the circuit is opened
return "Hello (fallback)";
},ar -> {
// Do something with the result
System.out.println("Result: " + ar.result());
});
}
用于RAbbit MQ客户端的代码段
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("guest");
config.setPassword("guest");
config.setHost("localhost");
config.setPort(5672);
// config.setVirtualHost("vhost1");
// config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
// config.setHandshakeTimeout(6000); // in milliseconds
// config.setRequestedChannelMax(5);
// config.setNetworkRecoveryInterval(500); // in milliseconds
// config.setAutomaticRecoveryEnabled(true);
// vertx.
RabbitMQClient client = RabbitMQClient.create(vertx,config);
CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(0).setTimeout(5000).setMaxRetries(0)
.setFallbackOnFailure(true);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker",options).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(retryCount -> retryCount * 100L);
breaker.executeWithFallback(promise -> {
vertx.createHttpClient().getNow(8080,response -> {
if (response.statusCode() != 200) {
promise.fail("HTTP error");
} else {
response.exceptionHandler(promise::fail).bodyHandler(buffer -> {
promise.complete(buffer.toString());
});
}
});
},v -> {
// Executed when the circuit is opened
return "Hello (fallback)";
},ar -> {
// Do something with the result
System.out.println("Result: " + ar.result());
});
client.start(rh -> {
if (rh.failed()) {
System.out.println("failed");
} else {
for (int i = 0; i > 5; i++) {
System.out.println(rh.succeeded());
System.out.println("client : " + client.isConnected());
breaker.executeWithFallback(promise -> {
if(!client.isConnected()) {
promise.fail("MQ client is not connected");
}
},v -> {
// Executed when the circuit is opened
return "Hello (fallback)";
},ar -> {
// Do something with the result
System.out.println("Result: " + ar.result());
});
/* RabbitMQClient client2 = RabbitMQClient.create(vertx); */
JsonObject message = new JsonObject().put("body","Hello RabbitMQ,from Vert.x !");
client.basicPublish("eventExchange","order.created",message,pubResult -> {
if (pubResult.succeeded()) {
System.out.println("Message published !");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
pubResult.cause().printStackTrace();
}
});
}
}
});
}
尽管其工作正常,但如果兔子mq掉下来,电路就会断开。但是,它看起来不是用vert.x进行格式化的标准方法。
请您提出建议,我可以如何实现它。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)