How do I stop a spark streaming job?

You can stop your streaming context in cluster mode by running the following command without needing to sending a SIGTERM. This will stop the streaming context without you needing to explicitly stop it using a thread hook. $SPARK_HOME_DIR/bin/spark-submit –master $MASTER_REST_URL –kill $DRIVER_ID -$MASTER_REST_URL is the rest url of the spark driver, ie something like spark://localhost:6066 … Read more

Use Spring together with Spark

FROM THE QUESTION ASKER: Added: To interfere the deserialization part directly without modifying your own classes use the following spring-spark project by parapluplu. This projects autowires your bean when it gets deserialized by spring. EDIT: In order to use Spark, you need the following setup (also seen in this repository): Spring Boot + Spark: . … Read more

Queries with streaming sources must be executed with writeStream.start();

You are branching the query plan: from the same ds1 you are trying to: ds1.collect.foreach(…) ds1.writeStream.format(…){…} But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back. The solution is to start both branches and await termination. val ds1 … Read more

The value of “spark.yarn.executor.memoryOverhead” setting?

spark.yarn.executor.memoryOverhead Is just the max value .The goal is to calculate OVERHEAD as a percentage of real executor memory, as used by RDDs and DataFrames –executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead property … Read more

Spark DataFrame: does groupBy after orderBy maintain that order?

groupBy after orderBy doesn’t maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself … Read more

How to know what is the reason for ClosedChannelExceptions with spark-shell in YARN client mode?

Reason is association with yarn cluster may be lost due to the Java 8 excessive memory allocation issue: https://issues.apache.org/jira/browse/YARN-4714 You can force YARN to ignore this by setting up the following properties in yarn-site.xml <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> Thanks to simplejack, Reference from Spark Pi Example in Cluster mode with Yarn: … Read more

How can I update a broadcast variable in spark streaming?

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) … Read more

build.sbt: how to add spark dependencies

The problem is that you are mixing Scala 2.11 and 2.10 artifacts. You have: scalaVersion := “2.11.8” And then: libraryDependencies += “org.apache.spark” % “spark-streaming_2.10” % “1.4.1” Where the 2.10 artifact is being required. You are also mixing Spark versions instead of using a consistent version: // spark 1.6.1 libraryDependencies += “org.apache.spark” %% “spark-core” % “1.6.1” … Read more

Spark using python: How to resolve Stage x contains a task of very large size (xxx KB). The maximum recommended task size is 100 KB

The general idea is that PySpark creates as many java processes than there are executors, and then ships data to each process. If there are too few processes, a memory bottleneck happens on the java heap space. In your case, the specific error is that the RDD that you created with sc.parallelize([…]) did not specify … Read more