Mind blown: RDD.zip() method

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning … Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So … Read more

Apache Spark does not delete temporary directories

Three SPARK_WORKER_OPTS exists to support the worker application folder cleanup, copied here for further reference: from Spark Doc spark.worker.cleanup.enabled, default value is false, Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. spark.worker.cleanup.interval, default is 1800, … Read more

How to bin in PySpark?

You can use Bucketizer feature transfrom from ml library in spark. values = [(“a”, 23), (“b”, 45), (“c”, 10), (“d”, 60), (“e”, 56), (“f”, 2), (“g”, 25), (“h”, 40), (“j”, 33)] df = spark.createDataFrame(values, [“name”, “ages”]) from pyspark.ml.feature import Bucketizer bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float(‘Inf’) ],inputCol=”ages”, outputCol=”buckets”) df_buck = bucketizer.setHandleInvalid(“keep”).transform(df) df_buck.show() output … Read more

Spark SQL – difference between gzip vs snappy vs lzo compression formats

Compression Ratio : GZIP compression uses more CPU resources than Snappy or LZO, but provides a higher compression ratio. General Usage : GZip is often a good choice for cold data, which is accessed infrequently. Snappy or LZO are a better choice for hot data, which is accessed frequently. Snappy often performs better than LZO. … Read more

Create spark dataframe schema from json schema representation

There are two steps for this: Creating the json from an existing dataframe and creating the schema from the previously saved json string. Creating the string from an existing dataframe val schema = df.schema val jsonString = schema.json create a schema from json import org.apache.spark.sql.types.{DataType, StructType} val newSchema = DataType.fromJson(jsonString).asInstanceOf[StructType]

pyspark: Efficiently have partitionBy write to same number of total partitions as original table

You’ve got several options. In my code below I’ll assume you want to write in parquet, but of course you can change that. (1) df.repartition(numPartitions, *cols).write.partitionBy(*cols).parquet(writePath) This will first use hash-based partitioning to ensure that a limited number of values from COL make their way into each partition. Depending on the value you choose for … Read more

What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

If you’re running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001. Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000: private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new … Read more

Difference between na().drop() and filter(col.isNotNull) (Apache Spark)

With df.na.drop() you drop the rows containing any null or NaN values. With df.filter(df.col(“onlyColumnInOneColumnDataFrame”).isNotNull()) you drop those rows which have null only in the column onlyColumnInOneColumnDataFrame. If you would want to achieve the same thing, that would be df.na.drop([“onlyColumnInOneColumnDataFrame”]).

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)