在Spring Boot App中使用Redis Stream通过HTTP长轮询来阻止HTTP响应

问题描述

我有一个Spring Boot Web应用程序,该应用程序具有更新名为StudioLinking的实体的功能。此实体描述了两个IoT设备之间的临时,可变,描述性逻辑链接,我的Web应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但是StudioLinking实体保留在数据库中以用于报告。 StudioLinking使用Spring Data / Hibernate以常规方式存储到基于SQL的数据存储中。该StudioLinking实体会不时地使用来自Rest API的新信息进行更新。链接更新后,设备需要响应(更改颜色,音量等)。现在,通过每5秒轮询一次来进行处理,但这会造成人类用户向系统中输入更新以及IoT设备实际更新时的延迟。可能只有一毫秒或最多5秒!显然,增加轮询的频率是不可持续的,并且在绝大多数情况下根本没有更新!

因此,我正在尝试使用HTTP Long Polling在同一应用程序上开发另一个Rest API,该API将在给定StudioLinking实体更新或超时后返回。侦听器不支持WebSocket或类似功能,使我无法进行Long Polling。长轮询可能会导致竞争状态,在这种情况下,您必须考虑以下可能性:对于连续消息,一条消息可能会在HTTP请求之间进入时“丢失”(在关闭和打开连接时,可能会出现新的“更新”如果我使用发布/订阅,则不会被“注意到”。

请注意,此“订阅更新” API仅应返回StudioLinking的最新和最新版本,但仅当存在实际更新或自最后签到。 “订阅更新”客户端将首先发布API请求,以设置新的侦听会话并将其传递,以便服务器知道它们是谁。因为可能有多个设备需要监视对同一StudioLinking实体的更新。我相信我可以通过在Redis XREAD中使用单独命名的使用者来完成此操作。 (请在以后的问题中记住这一点)

经过数小时的研究,我相信完成此操作的方法是使用redis流。

我在Spring Data Redis中找到了有关Redis流的以下两个链接:

https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/ https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281

我也已经阅读了有关长轮询的链接,这两个链接在长轮询期间都只有一个睡眠计时器,用于演示,但显然我想做些有用的事情。

https://www.baeldung.com/spring-deferred-result

这两个链接都非常有用。现在,我可以毫无疑问地确定如何将更新发布到Redis流-(这是未经测试的“伪代码”,但我预计实现此目标不会有任何问题)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String,StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

但是,在订阅上述流时,我会很沮丧:所以基本上我想在这里做-请原谅伪造的伪代码,这仅出于想法目的。我很清楚,该代码绝不表示该语言和框架的实际行为:)

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() -> 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));
    
    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // Subscribe to Redis Stream,get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID,updateList),StreamOffset.create(studioLinkingID,ReadOffset.lastConsumed()),streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });
    
    LOG.info("servlet thread freed");
    return output;
}

那么我将如何解决这个问题呢?我认为答案就在https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html之内,但是我对Spring的熟练程度还不足以真正理解Java Docs中的术语(Spring文档确实很好,但是JavaDocs是用密集的技术语言编写的,对此我深表感谢。但还不太了解)。

我的实施还有两个障碍:

  1. 我对弹簧的确切了解还不是100%。我还没有到达那一刻,我真的完全理解为什么所有这些bean都在漂浮。我认为这是为什么我不能在这里得到东西的关键... Redis的配置在Spring以太坊中浮动,我不了解如何称呼它。我真的需要继续对此进行调查(这对我来说是一个巨大的障碍)。
  2. 这些StudioLinking寿命很短,因此我也需要进行一些清理。一旦我将整个事情付诸实践,我将稍后实施,我确实知道将需要这样做。

解决方法

为什么不使用阻塞式轮询机制?无需使用spring-data-redis的高级工具。只需使用5秒钟的简单阻塞读取,那么此调用可能需要6秒钟左右的时间。您可以减少或增加阻止超时。

class LinkStatus {
    private final boolean updated;

    LinkStatus(boolean updated) {
      this.updated = updated;
    }
  }



// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
  // updateList is a unique token to track individual consumers in Redis
  @GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
  public LinkStatus subscribeToUpdates(
      @PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
    StreamOperations<String,String,String> op = redisTemplate.opsForStream();
    
    Consumer consumer = Consumer.from("test-group","test-consumer");
    // auto ack block stream read with size 1 with timeout of 5 seconds
    StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
    List<MapRecord<String,String>> records =
        op.read(consumer,readOptions,StreamOffset.latest("test-stream"));
    return new LinkStatus(!CollectionUtils.isEmpty(records));
  }

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...