December 6, 2016

Weird behavior in Class methods vs StaticMethods in Pyspark

Note, using Spark 2.0.0 with python 2.7

I just found a very weird behavior in PySpark. I will show it with an example. Who knows, maybe this can help someone else.

I am processing a list of text files containing data in jsonlines format. After some fiddling I set up a basic class to process the files:

class TestClassProcessor(object):
	def __init__(self):
        self.spark = SparkSession...GetOrCreate()
        
    @staticmethod
    def parse_record(self, record):
        ... do something with record...
        return record_updated
      
    def process_file(self, fname):
        data = self.spark.read.text(fname)
        data_processed = data.rdd.map(
                lambda r: self.parse_record(r.value)
                )
        df = data_processed.toDF()

The reason why I set up parse_record as a staticmethod was simple. Initially I set up this code as a set of functions, and when I switched to classes I wanted the existing code to work. So in the existing code, I just changed

data_processed = data.rdd.map(
                lambda r: parse_record(r.value)
                )

to

data_processed = data.rdd.map(
                lambda r: TestClassProcessor.parse_record(r.value)
                )

I mean, that is the purpose of static methods, ain’t it?

Big was my surprise, when I spark-submitted (is that a verb?) the script and got one of Spark’s wonderful spaguetti traces, ending in the infamous:

py4j.Py4JException: Method __getnewargs__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:272)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)

Trace which, based on my experience, it can mean fkin anything.

Interesingly though, the old function based code still worked.

At the end, the way I fixed this baffling error was easy. Just had to change one line of the class TestClassProcessor to

class TestClassProcessor(object):
	def __init__(self):
        self.spark = SparkSession...GetOrCreate()
        
    @staticmethod
    def parse_record(self, record):
        ... do something with record...
        return record_updated
      
    def process_file(self, fname):
        data = TestClassProcessor.spark.read.text(fname)  #<==============TADAAA!
        data_processed = data.rdd.map(
                lambda r: self.parse_record(r.value)
                )
        df = data_processed.toDF()

Which Im pretty sure is not required in regular python.

Cheers!

Powered by Hugo & Kiss.