问题描述
我仍然对 vert.x 比较熟悉。来自 Spring boot 和 Spring webflux 背景,我想尝试一些我以前在 Spring 生态系统中做过的基本东西。
所以我的想法是编写一个通过控制器公开的 api,它将实际工作委托给服务。我能想到在 vert.x 世界中实现这一点的唯一方法是利用事件总线。在这里,我的 keyvalueServiceVerticle
的 getkeyvalues
方法应该从发布者 (keyvalueRepository.findAllItems().items()
) 获取键值列表,并通过事件总线将它们发送回原始事件发布者 API。我确实得到了预期的结果(键值列表),但不知何故我对性能不满意。我在 spring webflux 和 vert.x 的等效代码中放置了一些负载,我的 webflux 实现总是表现得更好(更高的 RPS)。相关存储库:https://github.com/tahniat-ashraf/spring-boot-webflux-vert.x-comparison
我是否在某处阻止了代码?有没有更好的 vert.x 方法来实现我想要实现的目标?
相关代码:
公共类 keyvalueController 扩展 AbstractVerticle {
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
router
.route()
.handler(BodyHandler.create());
router.route()
.handler(LoggerHandler.create(LoggerFormat.DEFAULT));
router
.route(HttpMethod.GET,"/keyvalues")
.handler(this::getkeyvalues);
vertx
.createHttpServer()
.requestHandler(router)
.listen(6678);
}
private void getkeyvalues(RoutingContext routingContext) {
vertx
.eventBus()
.request(keyvalueServiceVerticle.GET_LIST_ADDRESS,new JsonObject(),messageAsyncResult ->
routingContext.response()
.putHeader("content-type","application/json")
.end((String) messageAsyncResult.result().body())
);
}
}
和
public class keyvalueServiceVerticle extends AbstractVerticle {
public static final String GET_LIST_ADDRESS = "GET_LIST_KEY_VAL";
private keyvalueRepository keyvalueRepository;
private DynamoConfiguration dynamoConfiguration;
@Override
public void start() throws Exception {
dynamoConfiguration = new DynamoConfiguration();
keyvalueRepository = new keyvalueRepository("dev-paybill-key-value",dynamoConfiguration.getDynamoDBEnhancedClient());
var eventBus = vertx.eventBus();
eventBus
.consumer(keyvalueServiceVerticle.GET_LIST_ADDRESS,this::getkeyvalues);
}
private <T> void getkeyvalues(Message<T> tMessage) {
Observable.frompublisher(keyvalueRepository.findAllItems().items())
.toList()
.subscribe(tList -> {
JsonArray jsonArray=new JsonArray(tList);
tMessage.reply(jsonArray.encodePrettily());
});
}
}
解决方法
您确定 Observable.fromPublisher(keyValueRepository.findAllItems().items())
的发布者是非阻塞发布者吗?
Rxjava2 的默认设置实际上是以阻塞方式处理事件。确保以非阻塞方式处理事件的一种方法是使用 .subscribeOn(RxHelper.scheduler(vertx))
运算符或使用以下命令将 RxJava2 配置为默认使用 vertx 事件循环而不是标准的 RxJava2 线程:
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
为了使这更容易,请尝试以下操作
.end((String) messageAsyncResult.result().body())
// to
.end(messageAsyncResult.result().body().encode())