文章目录
- Producer factory does not support transactions
- Must set retries to non-zero when using the idempotent producer.
- Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
- No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
- a KafkaTemplate is required to support replies
Producer factory does not support transactions
java.lang.IllegalStateException: Producer factory does not support transactions
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:483) ~[spring-kafka-2.8.2.jar:2.8.2]
at site.sunlong.kafkapractice.controller.KafkaController.sendTransactionMsg(KafkaController.java:53) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_152]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_152]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_152]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_152]
at org.springframework.web.method.support.invocableHandlerMethod.doInvoke(invocableHandlerMethod.java:205) ~[spring-web-5.3.15.jar:5.3.15]
at org.springframework.web.method.support.invocableHandlerMethod.invokeForRequest(invocableHandlerMethod.java:150) ~[spring-web-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.mvc.method.annotation.ServletinvocableHandlerMethod.invokeAndHandle(ServletinvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.dispatcherServlet.dodispatch(dispatcherServlet.java:1067) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.dispatcherServlet.doService(dispatcherServlet.java:963) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.15.jar:5.3.15]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.15.jar:5.3.15]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.15.jar:5.3.15]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.15.jar:5.3.15]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.15.jar:5.3.15]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.15.jar:5.3.15]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardContextValve.__invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:41002) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.net.socketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.56.jar:9.0.56]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
异常原因
使用kafka事务发送消息的时候没有开启事务
解决方案
加事务前缀,自动给producer开启事务,所有加
# 事务前缀
spring.kafka.producer.transaction-id-prefix=tx_
Must set retries to non-zero when using the idempotent producer.
org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
解决方案
# 重试次数
spring.kafka.producer.retries=3
Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
at org.apache.kafka.clients.producer.ProducerConfig.maybeOverrideAcksAndRetries(ProducerConfig.java:459) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.producer.ProducerConfig.postProcessparsedConfig(ProducerConfig.java:420) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:114) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:490) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290) ~[kafka-clients-3.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createrawProducer(DefaultKafkaProducerFactory.java:863) ~[spring-kafka-2.8.2.jar:2.8.2]
解决方案
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=-1
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:726) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:638) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) ~[spring-kafka-2.8.2.jar:2.8.2]
解决方案
在方法上加@Transactional注解
@Transactional
@GetMapping("/transaction")
public void sendTransactionMsg(@RequestParam String userId){
String msg = " transaction "+userId;
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
@Override
public Object doInoperations(KafkaOperations<String, Object> kafkaOperations) {
kafkaOperations.send(KafkaTopicConstants.TEST_TRANSACTION , msg);
// throw new RuntimeException("test transaction");
return "";
}
});
}
a KafkaTemplate is required to support replies
启动报错
java.lang.IllegalStateException: a KafkaTemplate is required to support replies
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.lambda$createMessageListener$1(MethodKafkaListenerEndpoint.java:180) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.support.JavaUtils.acceptIfNotNull(JavaUtils.java:71) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:179) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:517) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:500) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:384) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:69) ~[spring-kafka-2.8.2.jar:2.8.2]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:228) ~[spring-kafka-2.8.2.jar:2.8
解决方案
ConcurrentKafkaListenerContainerFactory 没有设置replyTemplate
ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory();
containerFactory.setReplyTemplate(kafkaTemplate);