Vert.x 事件总线性能问题设计问题

问题描述

我仍然对 vert.x 比较熟悉。来自 Spring boot 和 Spring webflux 背景,我想尝试一些我以前在 Spring 生态系统中做过的基本东西。

所以我的想法是编写一个通过控制器公开的 api,它将实际工作委托给服务。我能想到在 vert.x 世界中实现这一点的唯一方法是利用事件总线。在这里,我的 keyvalueServiceVerticlegetkeyvalues 方法应该从发布者 (keyvalueRepository.findAllItems().items()) 获取键值列表,并通过事件总线将它们发送回原始事件发布者 API。我确实得到了预期的结果(键值列表),但不知何故我对性能不满意。我在 spring webflux 和 vert.x 的等效代码中放置了一些负载,我的 webflux 实现总是表现得更好(更高的 RPS)。相关存储库:https://github.com/tahniat-ashraf/spring-boot-webflux-vert.x-comparison

我是否在某处阻止了代码?有没有更好的 vert.x 方法来实现我想要实现的目标?

相关代码

公共类 keyvalueController 扩展 AbstractVerticle {

  @Override
  public void start() throws Exception {
    Router router = Router.router(vertx);
    router
      .route()
      .handler(BodyHandler.create());
    router.route()
      .handler(LoggerHandler.create(LoggerFormat.DEFAULT));
    router
      .route(HttpMethod.GET,"/keyvalues")
      .handler(this::getkeyvalues);

    vertx
      .createHttpServer()
      .requestHandler(router)
      .listen(6678);
  }

  private void getkeyvalues(RoutingContext routingContext) {
    vertx
      .eventBus()
      .request(keyvalueServiceVerticle.GET_LIST_ADDRESS,new JsonObject(),messageAsyncResult ->
        routingContext.response()
          .putHeader("content-type","application/json")
          .end((String) messageAsyncResult.result().body())
      );
  }
}

public class keyvalueServiceVerticle extends AbstractVerticle {

  public static final String GET_LIST_ADDRESS = "GET_LIST_KEY_VAL";
  private keyvalueRepository keyvalueRepository;
  private DynamoConfiguration dynamoConfiguration;

  @Override
  public void start() throws Exception {
    dynamoConfiguration = new DynamoConfiguration();
    keyvalueRepository = new keyvalueRepository("dev-paybill-key-value",dynamoConfiguration.getDynamoDBEnhancedClient());
    var eventBus = vertx.eventBus();
    eventBus
      .consumer(keyvalueServiceVerticle.GET_LIST_ADDRESS,this::getkeyvalues);

  }

  private <T> void getkeyvalues(Message<T> tMessage) {

    Observable.frompublisher(keyvalueRepository.findAllItems().items())
      .toList()
      .subscribe(tList -> {
        JsonArray jsonArray=new JsonArray(tList);
        tMessage.reply(jsonArray.encodePrettily());
      });
  }
}

解决方法

您确定 Observable.fromPublisher(keyValueRepository.findAllItems().items()) 的发布者是非阻塞发布者吗?

Rxjava2 的默认设置实际上是以阻塞方式处理事件。确保以非阻塞方式处理事件的一种方法是使用 .subscribeOn(RxHelper.scheduler(vertx)) 运算符或使用以下命令将 RxJava2 配置为默认使用 vertx 事件循环而不是标准的 RxJava2 线程:

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

为了使这更容易,请尝试以下操作

.end((String) messageAsyncResult.result().body())
// to
.end(messageAsyncResult.result().body().encode())