Integrating Spark Structured Streaming with the Confluent Schema Registry

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema … Read more

Kafka producer TimeoutException: Expiring 1 record(s)

There are 3 possibilities: Increase request.timeout.ms – this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://stackoverflow.com/a/34794261/2707179 Decrease batch-size – related to previous point, it will send batches … Read more

What is the best practice for naming kafka topics?

https://cnr.sh/essays/how-paint-bike-shed-kafka-topic-naming-conventions helped us answering that same question. As a summary this article suggest to follow similar best practices to naming databases and tables, and it provides these additional points of advice: Avoid topic names based on things that change Avoid topic names based on information that would be stored in other places Avoid topic names … Read more

Get last message from kafka consumer console script

I’m not aware of any automatism, but using this simple two step approach, it should work. Note that in my case it was a partitioned topic, you can leave the params for it out in case you have a unpartitioned one: 1) Get max offset for your topic (+ their partitions): bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 … Read more

kafka 8 and memory – There is insufficient memory for the Java Runtime Environment to continue

You can adjust the JVM heap size by editing kafka-server-start.sh, zookeeper-server-start.shand so on: export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G” The -Xms parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only have 512M, you should change the maximum heap size (-Xmx) too: … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)