问题描述
我正在尝试使用反应性mongodb和EventSource进行有效的Spring Boot。 但是,由于服务器已关闭该连接,我面临着重复重新打开该连接的问题。我什至怀疑这是否真的可行,因为我没有找到任何带有响应式数据库和事件源的有效示例。
这是代码的主要部分:
pom.xml
<properties>
<java.version>1.8</java.version>
<junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
</parent>
<dependencies>
<!-- webflux reactive -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<!-- exclude junit 4,prefer junit 5 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- junit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
正如您在pom中看到的那样,我正在使用嵌入式tomcat(我已经尝试使用Netty(默认的Spring Boot服务器...)。 另外,我正在将应用程序部署到任何远程服务器上,但只是尝试在本地(Windows 10)上进行。
网络:
let source = new EventSource("/comment/stream");
source.addEventListener("message",function (event) {
// These events are JSON,so parsing and DOM fiddling are needed
var comment = JSON.parse(event.data);
console.log(comment );
});
source.addEventListener("error",function (event) {
console.log("error",event);
this.close();
});
RestController:
@RestController
public class CommentController {
@Autowired
private CommentRepository commentRepository;
@PostMapping(path = "/comment")
public Mono<Comment> comment(@RequestBody Comment comment) {
return this.commentRepository.save(comment);
}
@GetMapping(path = "/comment/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> Feed() {
return this.commentRepository.findAll();
}
}
数据库存储库:
@Repository
public interface CommentRepository extends ReactiveSortingRepository<Comment,String> {
Flux<Comment> findAll();
}
同样,使用EventSource的Web客户端由于连接已被服务器关闭而保持每秒重新连接。
谢谢!
解决方法
我不确定,您是否向我们提供了有关您的连接为何关闭的信息太少。没有日志,也没有透露有关部署位置的任何信息。
我只会根据个人经验回答这个问题。我在Heroku上部署了一个使用事件流的应用程序,并且每个应用程序的前端都有一个代理/负载平衡器,它将终止长达60秒的任何不发送任何内容的连接。
如此处Why are event sources closed after 30-60 sec所述,它证实了我一直在注意的事情。
要解决此问题,可以使用websockets来执行ping / pong消息,或者像我一样使用ServerSentEvents
来实现保持活动的消息。
.GET("",accept(TEXT_EVENT_STREAM),request -> ok()
.contentType(TEXT_EVENT_STREAM)
.header("Cache-Control","no-transform")
.body(Flux.merge(myHandler.getEvents()),Flux.interval(Duration.ofSeconds(15))
.map(aLong -> ServerSentEvent.builder()
.comment("keep alive")
.build())),new ParameterizedTypeReference<List<MyClass>>() {}))
我从我的一个项目中获得了此代码段。在这里,您可以看到我与当前流合并后的磁通量,该磁通量在给定的时间间隔(15秒)内将发出ServerSentEvent
,且仅带有活动注释。由于它是注释,因此它将被客户端忽略。
仅需提及,常规流myHandler.getEvents
返回包裹在ServerSentEvent
中的数据。