Kafka producer TimeoutException: Expiring 1 record(s)

There are 3 possibilities: Increase request.timeout.ms – this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://stackoverflow.com/a/34794261/2707179 Decrease batch-size – related to previous point, it will send batches … Read more

How to acknowledge current offset in spring kafka for manual commit

Set the enable-auto-commit property to false: propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); Set the ack-mode to MANUAL_IMMEDIATE: factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); Then, in your consumer/listener code, you can commit the offset manually, like this: @KafkaListener(topics = “testKafka”) public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) { System.out.println(“Received message: “); System.out.println(consumerRecord.value().toString()); acknowledgment.acknowledge(); } Update: I created a small POC for this. Check it out … Read more

Spring Kafka The class is not in the trusted packages

See the documentation. Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties. JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property). JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys … Read more

Simple embedded Kafka test example with spring boot

Embedded Kafka tests work for me with below configs, Annotation on test class @EnableKafka @SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config @EmbeddedKafka( partitions = 1, controlledShutdown = false, brokerProperties = { “listeners=PLAINTEXT://localhost:3333”, “port=3333” }) public class KafkaConsumerTest { @Autowired KafkaEmbedded kafkaEmbeded; @Autowired KafkaListenerEndpointRegistry … Read more

Spring-Kafka vs. Spring-Cloud-Stream (Kafka)

Spring Cloud Stream with kafka binder rely on Spring-kafka. So the former has all functionalities supported by later, but the former will be more heavyweight. Below are some points help you make the choice: If you might change kafka into another message middleware in the future, then Spring Cloud stream should be your choice since … Read more

Synchronising transactions between database and Kafka producer

I’d suggest to use a slightly altered variant of approach 2. Write into your database only, but in addition to the actual table writes, also write “events” into a special table within that same database; these event records would contain the aggregations you need. In the easiest way, you’d simply insert another entity e.g. mapped … Read more