What is the best practice for naming kafka topics?

https://cnr.sh/essays/how-paint-bike-shed-kafka-topic-naming-conventions helped us answering that same question. As a summary this article suggest to follow similar best practices to naming databases and tables, and it provides these additional points of advice: Avoid topic names based on things that change Avoid topic names based on information that would be stored in other places Avoid topic names … Read more

Get last message from kafka consumer console script

I’m not aware of any automatism, but using this simple two step approach, it should work. Note that in my case it was a partitioned topic, you can leave the params for it out in case you have a unpartitioned one: 1) Get max offset for your topic (+ their partitions): bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 … Read more

Does Kafka support priority for topic or message?

Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.So there is no priority on topic or message. I also faced same problem that you have.Solution is very simple.Create topics in kafka queue,Let say: high_priority_queue medium_priority_queue low_priority_queue Publish high priority message in high_priority_queue and medium priority message in … Read more

how to view kafka headers

You can use the excellent kafkacat tool. Sample command: kafkacat -b kafka-broker:9092 -t my_topic_name -C \ -f ‘\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n’ Sample output: Key (-1 bytes): Value (13 bytes): {foo:”bar 5″} Timestamp: 1548350164096 Partition: 0 Offset: 34 Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co nverting byte[] to … Read more

Kafka schema registry not compatible in the same topic

Fields cannot be renamed in BACKWARD compatibility mode. As a workaround you can change the compatibility rules for the schema registry. According to the docs: The schema registry server can enforce certain compatibility rules when new schemas are registered in a subject. Currently, we support the following compatibility rules. Backward compatibility (default): A new schema … Read more

How to expose a headless Kafka service for a StatefulSet externally in Kubernetes

We have solved this in 1.7 by changing the headless service to Type=NodePort and setting the externalTrafficPolicy=Local. This bypasses the internal load balancing of a Service and traffic destined to a specific node on that node port will only work if a Kafka pod is on that node. apiVersion: v1 kind: Service metadata: name: broker … Read more

Adding Custom Headers in Kafka Message

Kafka v0.11.0.0 adds support for custom headers. You can add them when creating a ProducerRecord like this: new ProducerRecord(key, value, headers, …), where headers is of type Iterable<Header> For more details see: https://issues.apache.org/jira/browse/KAFKA-4208 https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers