问题描述
- 我正在使用 spring boot、嵌入式 kafka 和临时编写集成测试用例。我正在尝试发送关于 kafka 主题的消息。
@SpringBoottest(classes = Application.class)
@RunWith(SpringJUnit4ClassRunner.class)
@ActiveProfiles("test")
@DirtiesContext
@EmbeddedKafka(
partitions = 5,controlledShutdown = true,brokerProperties = {
"listeners=PLAINTEXT://localhost:9092","port=9092"
})
public class OutboundFlowIT {
private final Logger logger = LoggerFactory.getLogger(OutboundFlowIT.class);
private TestWorkflowEnvironment testEnv;
private Worker worker;
private WorkflowClient workflowClient;
@Autowired
private ActivityService activityService;
@Autowired
private EventSender sender;
@Before
public void setUp(){
// some setup code.
}
@Test
public void processOutboundFinancialMessage_shouldTriggerAllSteps_WhenOK() throws IOException,InterruptedException {
// logic for sending message to intended topic.
}
- 但我遇到了以下错误。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[kafka-clients-2.5.1.jar:na]
at kafka.network.Processor.poll(SocketServer.scala:861) ~[kafka_2.12-2.5.1.jar:na]
at kafka.network.Processor.run(SocketServer.scala:760) ~[kafka_2.12-2.5.1.jar:na]
- 我还在 kafka.properties 中添加了以下配置,但我遇到了与上述相同的问题。
spring.kafka.producer.properties.max.request.size=569296129
spring.kafka.consumer.properties.max.partition.fetch.bytes=369296129
我是 kafka 的新手,请帮助我。
解决方法
你如何将消息发送给 kafka 代理?您应该使用 kafka 协议而不是使用 HTTP 请求,类似问题说明这是错误来自