问题描述
我正在尝试使用反应式Spring Boot创建一个非阻塞api,该API可以存储和检索来自Redis集群的简单消息。一个非常简单的应用程序。我正在将 spring-boot-starter-data-redis-reactive 依赖性用于反应式Redis客户端,并将 blockhound 用于检测阻塞调用。最初,我的应用程序运行正常,但是当我尝试安装Blockhound来检测阻塞调用时,它会引发异常。
这是我的 pom.xml :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tigerit.example</groupId>
<artifactId>reactivespring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactivespring</name>
<description>Demo project for Reactive Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<version>1.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这是我的bean配置类 RedisConfiguration :
package com.tigerit.example.reactivespring.configuration;
import com.tigerit.example.reactivespring.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedistemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.List;
@Configuration
public class RedisConfiguration {
@Value("${spring.redis.cluster.nodes}")
private List<String> redisClusterNodes;
@Bean
public RedisClusterConfiguration getRedisClusterConfiguration() {
return new RedisClusterConfiguration(this.redisClusterNodes);
}
@Bean
@Primary
public ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory(@Autowired RedisClusterConfiguration configuration) {
return new LettuceConnectionFactory(configuration);
}
@Bean
@Primary
public ReactiveRedistemplate<String,User> getReactiveRedistemplate(@Autowired ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<User> valueSerializer = new Jackson2JsonRedisSerializer<>(User.class);
RedisSerializationContext.RedisSerializationContextBuilder<String,User> contextBuilder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String,User> context = contextBuilder.value(valueSerializer).build();
return new ReactiveRedistemplate<>(factory,context);
}
}
这是我的休息控制器类 UserController :
package com.tigerit.example.reactivespring.restcontroller;
import com.tigerit.example.reactivespring.model.User;
import lombok.requiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.redis.core.ReactiveRedistemplate;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/rest/user")
@requiredArgsConstructor
@Log4j2
public class UserController {
private final ReactiveRedistemplate<String,User> redistemplate;
@PostMapping("/add")
public Mono<Boolean> saveUser(@RequestParam(value = "message",defaultValue = "EMPTY_MESSAGE") String message) {
User user = new User(System.currentTimeMillis(),message);
log.info(user);
ReactiveValueOperations<String,User> operations = this.redistemplate.opsForValue();
return operations.set(user.getId().toString(),user);
}
@GetMapping("/{id}")
public Mono<User> getById(@PathVariable(value = "id",required = true) String id) {
ReactiveValueOperations<String,User> operations = this.redistemplate.opsForValue();
return operations.get(id);
}
}
这是我的主要课程主要:
package com.tigerit.example.reactivespring;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.blockhound.BlockHound;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
BlockHound.install();
SpringApplication.run(Main.class,args);
}
}
注意:当我在不安装Blockhound的情况下运行此应用程序时,它可以工作,但是当我尝试在Main中安装Blockhound时,它将引发以下异常。
reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java) ~[na:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxUsingWhen] :
reactor.core.publisher.Flux.usingWhen
org.springframework.data.redis.core.ReactiveRedistemplate.doInConnection(ReactiveRedistemplate.java:195)
Error has been observed at the following site(s):
|_ Flux.usingWhen ⇢ at org.springframework.data.redis.core.ReactiveRedistemplate.doInConnection(ReactiveRedistemplate.java:195)
|_ Mono.from ⇢ at org.springframework.data.redis.core.ReactiveRedistemplate.createMono(ReactiveRedistemplate.java:179)
|_ ⇢ at org.springframework.data.redis.core.DefaultReactiveValueOperations.createMono(DefaultReactiveValueOperations.java:349)
|_ ⇢ at org.springframework.data.redis.core.DefaultReactiveValueOperations.get(DefaultReactiveValueOperations.java:171)
|_ ⇢ at com.tigerit.example.reactivespring.restcontroller.UserController.getById(UserController.java:31)
|_ Mono.from ⇢ at org.springframework.http.codec.json.AbstractJackson2Encoder.encode(AbstractJackson2Encoder.java:136)
|_ Mono.map ⇢ at org.springframework.http.codec.json.AbstractJackson2Encoder.encode(AbstractJackson2Encoder.java:137)
|_ Mono.flux ⇢ at org.springframework.http.codec.json.AbstractJackson2Encoder.encode(AbstractJackson2Encoder.java:138)
|_ Flux.singleOrEmpty ⇢ at org.springframework.http.codec.EncoderHttpMessageWriter.write(EncoderHttpMessageWriter.java:122)
|_ Mono.switchIfEmpty ⇢ at org.springframework.http.codec.EncoderHttpMessageWriter.write(EncoderHttpMessageWriter.java:123)
|_ Mono.flatMap ⇢ at org.springframework.http.codec.EncoderHttpMessageWriter.write(EncoderHttpMessageWriter.java:127)
|_ ⇢ at org.springframework.http.codec.EncoderHttpMessageWriter.write(EncoderHttpMessageWriter.java:203)
|_ ⇢ at org.springframework.web.reactive.result.method.annotation.AbstractMessageWriterResultHandler.writeBody(AbstractMessageWriterResultHandler.java:107)
|_ ⇢ at org.springframework.web.reactive.result.method.annotation.ResponseBodyResultHandler.handleResult(ResponseBodyResultHandler.java:86)
|_ checkpoint ⇢ Handler com.tigerit.example.reactivespring.restcontroller.UserController#getById(String) [dispatcherHandler]
|_ ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:120)
|_ Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
|_ ⇢ at org.springframework.web.server.handler.FilteringWebHandler.handle(FilteringWebHandler.java:59)
|_ ⇢ at org.springframework.web.server.handler.WebHandlerDecorator.handle(WebHandlerDecorator.java:56)
|_ Mono.error ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler$CheckpointInsertingHandler.handle(ExceptionHandlingWebHandler.java:98)
|_ checkpoint ⇢ HTTP GET "/rest/user/add" [ExceptionHandlingWebHandler]
|_ ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.lambda$handle$0(ExceptionHandlingWebHandler.java:77)
|_ Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:77)
解决方法
请尝试这个。这是全面解决方案。 :D
@AutoService(BlockHoundIntegration.class)
public class BlockHoundConfig implements BlockHoundIntegration {
@Override
public void applyTo(BlockHound.Builder builder) {
builder.allowBlockingCallsInside("io.lettuce.core.protocol.RedisHandshakeHandler","channelRegistered");
}
}