How to shift back the offset of a topic within a stable Kafka consumer group?

The reset-offsets option for kafka-consumer-groups.sh first checks to see if a consumer is active in that group before attempting to shift back your offsets. ‘Stable’ means you have an active consumer running. Use the describe-groups option to check on your consumer groups: bin/kafka-consumer-groups.sh –bootstrap-server $SERVERS –group $GROUP –describe If you see an entry under ‘CONSUMER-ID/HOST/CLIENT-ID’ … Read more

Kafka producer TimeoutException: Expiring 1 record(s)

There are 3 possibilities: Increase request.timeout.ms – this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://stackoverflow.com/a/34794261/2707179 Decrease batch-size – related to previous point, it will send batches … Read more

Get last message from kafka consumer console script

I’m not aware of any automatism, but using this simple two step approach, it should work. Note that in my case it was a partitioned topic, you can leave the params for it out in case you have a unpartitioned one: 1) Get max offset for your topic (+ their partitions): bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 … Read more

Kafka console consumer ERROR “Offset commit failed on partition”

If you increase max.poll.interval.ms that says “it’s ok to spend time processing a large batch of records” and you’ll gain throughput if you can process larger batches more efficiently than smaller ones. To decrease max.poll.records says ”take fewer records so there’s enough time to process them” and would favor latency over throughput. Also consider that … Read more

How to get latest offset for a partition for a kafka topic?

Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it. from kafka import SimpleClient from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.common import OffsetRequestPayload client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, … Read more

Docker Kafka w/ Python consumer

Your problem is the networking. In your Kafka config you’re setting KAFKA_ADVERTISED_HOST_NAME: localhost but this means that any client (including your python app) will connect to the broker, and then be told by the broker to use localhost for any connections. Since localhost from your client machine (e.g. your python container) is not where the … Read more

Kafka and firewall rules

Kafka and zookeeper are different things. If you are running both on the same machine, you need to open both ports, of corse. kafka default ports: 9092, can be changed on server.properties; zookeeper default ports: 2181 for client connections; 2888 for follower(other zookeeper nodes) connections; 3888 for inter nodes connections; That’s it. Kafka, also has … Read more

Kafka10.1 heartbeat.interval.ms, session.timeout.ms and max.poll.interval.ms

Assuming we are talking about Kafka 0.10.1.0 or upwards where each consumer instance employs two threads to function. One is user thread from which poll is called; the other is heartbeat thread that specially takes care of heartbeat things. session.timeout.ms is for heartbeat thread. If coordinator fails to get any heartbeat from a consumer before … Read more