How can I tear down a SparkSession and create a new one within one application?

Long story short, Spark (including PySpark) is not designed to handle multiple contexts in a single application. If you’re interested in JVM side of the story I would recommend reading SPARK-2243 (resolved as won’t fix).

There is a number of design decisions made in PySpark which reflects that including, but not limited to a singleton Py4J gateway. Effectively you cannot have multiple SparkContexts in a single application. SparkSession is not only bound to SparkContext but also introduces problems of its own, like handling local (standalone) Hive metastore if one is used. Moreover there functions which use SparkSession.builder.getOrCreate internally and depend on the behavior you see right now. A notable example is UDF registration. Other functions may exhibit unexpected behavior if multiple SQL contexts are present (for example RDD.toDF).

Multiple contexts are not only unsupported but also, in my personal opinion, violate single responsibility principle. Your business logic shouldn’t be concerned with all the setup, cleanup and configuration details.

My personal recommendations are as follows:

  • If application consist of multiple coherent modules which can be composed and benefit from a single execution environment with caching and common metastore initialize all required contexts in the application entry point and pass these down to individual pipelines when necessary:

    • main.py:

      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      if __name__ == "__main__":
          spark: SparkSession = ...
      
          # Pass data between modules
          collected = collect.execute(spark)
          processed = process.execute(spark, data=collected)
          ...
          spark.stop()
      
    • collect.py / process.py:

      from pyspark.sql import SparkSession
      
      def execute(spark: SparkSession, data=None):
          ...
      
  • Otherwise (it seems to be the case here based on your description) I would design entrypoint to execute a single pipeline and use external worfklow manager (like Apache Airflow or Toil) to handle the execution.

    It is not only cleaner but also allows for much more flexible fault recovery and scheduling.

    The same thing can be of course done with builders but like a smart person once said: Explicit is better than implicit.

    • main.py

      import argparse
      
      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      pipelines = {"collect": collect, "process": process}
      
      if __name__ == "__main__":
          parser = argparse.ArgumentParser()
          parser.add_argument('--pipeline')
          args = parser.parse_args()
      
          spark: SparkSession = ...
      
          # Execute a single pipeline only for side effects
          pipelines[args.pipeline].execute(spark)
          spark.stop()
      
    • collect.py / process.py as in the previous point.

One way or another I would keep one and only one place where context is set up and one and only one place were it is tear down.

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)