Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.So there is no priority on topic or message.
I also faced same problem that you have.Solution is very simple.Create topics in kafka queue,Let say:
-
high_priority_queue
-
medium_priority_queue
-
low_priority_queue
Publish high priority message in high_priority_queue and medium priority message in medium_priority_queue.
Now you can create kafka consumer and open stream for all topic.
// this is scala code
val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
val config = new ConsumerConfig(props)
val connector = Consumer.create(config)
val topicWithStreamCount = Map(
"high_priority_queue" -> 1,
"medium_priority_queue" -> 1,
"low_priority_queue" -> 1
)
val streamsMap = connector.createMessageStreams(topicWithStreamCount)
You get stream of each topic.Now you can first read high_priority topic if topic does not have any message then fallback on medium_priority_queue topic. if medium_priority_queue is empty then read low_priority queue.
This trick is working fine for me.May be helpful for you!!.