In the previous post “Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset” we discussed the process of creating predictive model with 34 gigabytes of input data using Apache Spark. I received a request for the Python code as a solution instead of Scala. This is exactly what I will do in this post.
1. Python and Scala difference
Python solution looks similar to the last Scala solution because when you look “under the hood” you have the same Spark library and engine. Because of this fact, I don’t anticipate any significant performance change. As there aren’t many difference between Python and Scala, I will highlight only the major ones and you can refer back to the last post for the code in it’s entirety.
Entire 34GB dataset is available here at https://archive.org/details/stackexchange, look at file Posts.xml in stackoverflow.com folder. Copy of 34GB Posts.xml file is here (8GB compressed). This data is licensed under the Creative Commons license (cc-by-sa).
3. Python code
In the Python version of code (source file) I create a correct Label column directly without intermediate sqlfunc\myudf function. Otherwise you should upload code of this function through intermediate python file to a Spark environment (sc.addPyFile() method). For the same reason I do not use xml libraries.
postsRDD = postsXml.map( lambda s: pyspark.sql.Row(\ Id = re.search('Id=".+?"', s).group(0)[4:-1].encode('utf-8'),\ Label = 1.0 if re.search('Tags=".+?"', s) != None\ and re.search('Tags=".+?"', s).group(0)[6:-1].encode('utf-8').find(targetTag) >= 0 else 0.0,\ Text = ((re.search('Title=".+?"', s).group(0)[7:-1] if re.search('Title=".+?"', s) != None else "") + " " + (re.search('Body=".+?"', s).group(0)[6:-1]) if re.search('Body=".+?"', s) != None else ""))) postsLabeled = sqlContext.createDataFrame(postsRDD)
One of the issues of Python version of code – we won’t decode xml meta symbols like <. Let’s keep these symbols for now.
Python code needs couple more temporary variables in the data preparation step (negTrainTmp1 and posTrainTmp1).
positiveTrain = positive.sample(False, 0.9) negativeTrain = negative.sample(False, 0.9) training = positiveTrain.unionAll(negativeTrain) negTrainTmp1 = negativeTrain.withColumnRenamed("Label", "Flag") negativeTrainTmp = negTrainTmp1.select(negTrainTmp1.Id, negTrainTmp1.Flag) negativeTest = negative.join( negativeTrainTmp, negative.Id == negativeTrainTmp.Id, "LeftOuter").\ filter("Flag is null").\ select(negative.Id, negative.Text, negative.Label) posTrainTmp1 = positiveTrain.withColumnRenamed("Label", "Flag") positiveTrainTmp = posTrainTmp1.select(posTrainTmp1.Id, posTrainTmp1.Flag) positiveTest = positive.join( positiveTrainTmp, positive.Id == positiveTrainTmp.Id, "LeftOuter").\ filter("Flag is null").\ select(positive.Id, positive.Text, positive.Label) testing = negativeTest.unionAll(positiveTest)
Small changes in the model validation step:
testText = testTitle + testBody testDF = sqlContext.createDataFrame([ ("0", testText, 1.0)], ["Id", "Text", "Label"]) result = model.transform(testDF) prediction = result.collect() print("Prediction: ", prediction)
That’s all the changes that we need.
Thank you for all the great feedback to the previous post “Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset”. The reception helped me to see where the needs and demands are in this field. I welcome all suggestions so keep the feedback coming and I’ll try to address as many as I humanly can.