What if you want to create a machine learning model but realized that your input dataset doesn’t fit your computer memory? Usual you would use distributed computing tools like Hadoop and Apache Spark for that computation in a cluster with many machines. However, Apache Spark is able to process your data in local machine standalone mode and even build models when the input data set is larger than the amount of memory your computer has. In this blog post, I’ll show you an end-to-end scenario with Apache Spark where we will be creating a binary classification model using a 34.6 gigabytes of input dataset. Run this scenario in your laptop (yes, yours with its 4-8 gigabytes of memory and 50+ gigabytes of disk space) to test this.
1. Input data and expected results
In the previous post we discussed “How To Find Simple And Interesting Multi-Gigabytes Data Set”. The Posts.xml file from this dataset will be used in the current post. The file size is 34.6 gigabytes. This xml file contains the stackoverflow.com posts data as xml attributes:
- Title – post title
- Body – post text
- Tags – list of tags for post
- 10+ more xml-attributes that we won’t use.
The full dataset with stackoverflow.com Posts.xml file is available here at https://archive.org/details/stackexchange. Additionally I created a smaller version of this file with only 10 items\posts in it. This file contains a small size of original dataset. This data is licensed under the Creative Commons license (cc-by-sa).
As you might expect, this small file is not the best choice for model training. This file is only good for experimenting with your data preparation code. However, the end-to-end Spark scenario from this article works with this small file as well. Please download the file from here.
Our goal is to create a predictive model which predicts post Tags based on Body and Title. To simplify the task and reduce the amount of code, we are going to concatenate Title and Body and use that as a single text column.
It might be easy to imagine how this model should work in the stackoverflow.com web site – the user types a question and the web size automatically gives tags suggestion.
Assume that we need as many correct tags as possible and that the user would remove the unnecessary tags. Because of this assumption we are choosing recall as a high priority target for our model.
2. Binary and multi-label classification
The problem of stackoverflow tag prediction is a multi-label classification one because the model should predict many classes, which are not exclusive. The same text might be classified as “Java” and “Multithreading”. Note that multi-label classification is a generalization of different problems – multi-class classification problem which predict only one class from a set of classes.
To simplify our the first Apache Spark problem and reduce the amount of code, let’s simplify our problem. Instead of training a multi-label classifier, let’s train a simple binary classifier for a given tag. For instance, for the tag “Java” one classifier will be created which can predict a post that is about the Java language.
By using this simple approach, many classifiers might be created for almost all frequent labels (Java, C++, Python, multi-threading etc…). This approach is simple and good for studying. However, it is not perfect in practice because by splitting predictive models by separate classifiers, you are ignoring the correlations between classes. Another reason – training many classifiers might be computationally expensive.
3. Setup and Run Apache Spark in a standalone mode
If you don’t have Apache Spark in your machine you can simply download it from the Spark web page http://spark.apache.org/. Please use version 1.5.1. Direct link to a pre-built version – http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz
You are ready to run Spark in Standalone mode if Java is installed in your computer. If not – install Java.
For Unix systems and Macs, uncompress the file and copy to any directory. This is a Spark directory now.
Run spark master:
sbin/start-master.sh
Run spark slave:
sbin/start-slaves.sh
Run Spark shell:
bin/spark-shell
Spark shell can run your Scala command in interactive mode.
Windows users can find the instruction here: http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html
If you are working in cluster mode in a Hadoop environment, I’m assuming you already know how to run the Spark shell.
4. Importing libraries
For this end-to-end scenario we are going to use Scala, the primary language for Apache Spark.
// General purpose library import scala.xml._ // Spark data manipulation libraries import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ // Spark machine learning libraries import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.ml.Pipeline
5. Parsing XML
We need to extract Body, Text and Tags from the input xml file and create a single data-frame with these columns. First, let’s remove the xml header and footer. I assume that the input file is located in the same directory where you run the spark shell command.
val fileName = "Posts.small.xml" val textFile = sc.textFile(fileName) val postsXml = textFile.map(_.trim). filter(!_.startsWith("<?xml version=")). filter(_ != "<posts>"). filter(_ != "</posts>")
Spark has good functions for parsing json and csv formats. For Xml we need to write several additional lines of code to create a data frame by specifying the schema programmatically.
Note, Scala language automatically converts all xml codes like “<a>” to actual tags “<a>”. Also we are going to concatenate title and body and remove all unnecessary tags and new line characters from the body and all space duplications.
val postsRDD = postsXml.map { s => val xml = XML.loadString(s) val id = (xml \ "@Id").text val tags = (xml \ "@Tags").text val title = (xml \ "@Title").text val body = (xml \ "@Body").text val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ") val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " "); Row(id, tags, text) }
To create a data-frame, schema should be applied to RDD.
val schemaString = "Id Tags Text" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val postsDf = sqlContext.createDataFrame(postsRDD, schema)
Now you can take a look at your data frame.
postsDf.show()
6. Preparing training and testing datasets
The next step – creating binary labels for a binary classifier. For this code examples, we are using “java” as a label that we would like to predict by a binary classifier. All rows with the “java” label should be marked as a “1” and rows with no “java” as a “0”. Let’s identify our target tag “java” and create binary labels based on this tag.
val targetTag = "java" val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0} val sqlfunc = udf(myudf) val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) )
Dataset can be split into negative and positive subsets by using the new label.
val positive = postsLabeled.filter('Label > 0.0) val negative = postsLabeled.filter('Label < 1.0)
We are going to use 90% of our data for the model training and 10% as a testing dataset. Let’s create a training dataset by sampling the positive and negative datasets separately.
val positiveTrain = positive.sample(false, 0.9) val negativeTrain = negative.sample(false, 0.9) val training = positiveTrain.unionAll(negativeTrain)
The testing dataset should include all rows which are not included in the training datasets. And again – positive and negative examples separately.
val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter"). filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label) val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter"). filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label) val testing = negativeTest.unionAll(positiveTest)
7. Training a model
Let’s identify training parameters:
- Number of features
- Regression parameters
- Number of epoch for gradient decent
Spark API creates a model based on columns from the data-frame and the training parameters:
val numFeatures = 64000 val numEpochs = 30 val regParam = 0.02 val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words") val hashingTF = new org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures). setInputCol(tokenizer.getOutputCol).setOutputCol("Features") val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam). setFeaturesCol("Features").setLabelCol("Label"). setRawPredictionCol("Score").setPredictionCol("Prediction") val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) val model = pipeline.fit(training)
8. Testing a model
This is our final code for the binary “Java” classifier which returns a prediction (0.0 or 1.0):
val testTitle = "Easiest way to merge a release into one JAR file" val testBoby = """Is there a tool or script which easily merges a bunch of href="http://en.wikipedia.org/wiki/JAR_%28file_format%29" JAR files into one JAR file? A bonus would be to easily set the main-file manifest and make it executable. I would like to run it with something like: As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy single-file tool, but the downloaded ZIP file contains a lot of libraries.""" val testText = testTitle + testBody val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text") val result = model.transform(testDF) val prediction = result.collect()(0)(6).asInstanceOf[Double] print("Prediction: "+ prediction)
Let’s evaluate the quality of the model based on training dataset.
val testingResult = model.transform(testing) val testingResultScores = testingResult.select("Prediction", "Label").rdd. map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])) val bc = new BinaryClassificationMetrics(testingResultScores) val roc = bc.areaUnderROC print("Area under the ROC:" + roc)
If you use the small dataset then the quality of your model is probably not the best. Area under the ROC value will be very low (close to 50%) which indicates a poor quality of the model. With an entire Posts.xml dataset, the quality is no so bad. Area under the ROC is 0.64. Probably you can improve this result by playing with different transformations such as TF-IDF and normalization. Not in this blog post.
Conclusion
Apache Spark could be a great option for data processing and for machine learning scenarios if your dataset is larger than your computer memory can hold. It might not be easy to use Spark in a cluster mode within the Hadoop Yarn environment. However, in a local (or standalone) mode, Spark is as simple as any other analytical tool.
Please let me know if you encountered any problem or had future questions. I would really like to head your feedback.
The complete source code of this program could be found here.
Very interesting post! May I ask two questions:
1. When you create DataFrame using sqlContext.createDataFrame, will spark crash as the memory could not hold 34.6 gb of data? Or does spark automatically partition the data such that the partitions can fit into the memory, when performing an action?
2. I am confused about how spark handles data that is larger than memory. Does it partition the data by itself during the transform-action steps, or do I have to manually partition the data into chunks that can hold in memory, and then load them into sparks for the transform-action steps?
LikeLike
Spark won’t crash. Right. It uses a concept like partitions in MapReduce. Less memory – more partitions and more slow I\O operations. This partitioning works for both: transform-actions and a model creation as well.
This partitioning works without any manual steps. However, you can control the partitioning (and “data locality” in a cluster mode) in code level or config level. See http://eugenezhulenev.com/blog/2015/09/16/spark-ml-for-big-and-small-data/
Spark was able to create a model on top of 34 gigabytes in my old laptop with only 4GB RAM. It uses less that 1.5 GB by default. It took couple hours. I’ll measure more precisely and will publish actual time.
LikeLike
The result is ready.
I did the measurements with 34 Gb of input data for two sets of parameters:
1) 64K features in a hash (numFeatures variable) and maximum number of iterations (numEpochs) is equal to 30.
2) numFeatures=20K and numEpochs=20
My old 4GB laptop prepared the input data and created the model in 8 hours and 15 minutes for the first set of parameters. And 6:50 hours for the second set of parameters. See the Scala code from line 1 to line 85.
Model validation takes 2 hours and 40 minutes for both of the parameters sets. Scala code from a line 87 to 109.
I did not change any Apache Spark settings. And as I said before Apache Spark uses only 1.5 GB of memory by default in my 4GB machine.
LikeLike
[…] Beginners Guide: Apache Spark Machine Learning // Lobsters […]
LikeLike
Very interesting and useful post to head start with Apache Spark.
LikeLike
It would be great, if you can post python code too, if available.
LikeLike
I’m working on a version of the code in python right now, I’ll post a pastebin link in the comments!
LikeLike
I’ll publish Python code example in my next blog post later this week.
Another request that I got from readers – it is not easy to download the full dataset and my data slice (https://www.dropbox.com/s/n2skgloqoadpa30/Posts.small.xml?dl=0) is relatively small (120Mb). In the next blog post I’ll share a bigger slice of data (~3Gb uncompressed).
@adam106, I can publish your version of code as well.
LikeLike
Python guide is published: http://fullstackml.com/2015/11/10/beginners-guide-apache-spark-python-machine-learning-scenario-with-a-large-input-dataset/
LikeLike
@adam1016 Thanks
LikeLike
[…] Read more here: http://fullstackml.com/2015/10/29/beginners-guide-apache-spark-machine-learning-scenario-with-a-larg… […]
LikeLike
I have a 3 GB data set that I want to apply machine learning algorithms on but I have only a laptop. How can I analyze the data efficiently?
You can try spark at standalone mode. http://fullstackml.com/2015/10/29/beginners-guide-apache-spark-machine-learning-scenario-with-a-large-input-dataset/ But with 4gb ram might be a bit slow as not much caching can be done.
LikeLiked by 1 person
[…] Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset What if you want to create a machine learning model but realized that your input dataset doesn’t fit your computer memory? Usual you would use distributed computing tools like Hadoop and Apache Spark for that computation in a cluster with many machines. However, Apache Spark is able to process your data in local machine standalone mode and even build models when the input data set is larger than the amount of memory your computer has. In this blog post, I’ll show you an end-to-end scenario with Apache Spark where we will be creating a binary classification model using a 34.6 gigabytes of input dataset. Run this scenario in your laptop (yes, yours with its 4-8 gigabytes of memory and 50+ gigabytes of disk space) to test this. […]
LikeLike
[…] the previous post “Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset” we discussed the process of creating predictive model with 34 gigabytes of input data using Apache […]
LikeLike
Reblogged this on Khode Prasad.
LikeLike
[…] Nguồn: http://fullstackml.com/2015/10/29/beginners-guide-apache-spark-machine-learning-scenario-with-a-larg… […]
LikeLike
[…] 原文:Beginners Guide: Apache Spark Machine Learning with Large Data […]
LikeLike
Its great usage, as a java developer it would be easy for me to understand it better if its Java. Are you considering it to convert it into java?
LikeLike
Hi Karam, Java implementation is a good idea. I’ll do that later.
LikeLike
Thank you for this post. Very helpful from a beginner perspective. Will surely checkout the rest of the series soon.
Also, some notes I would like to add here: since I’ve installed Spark 2.0 version, “sqlContext” has been replaced with the “SparkSession” class in that version (http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-16-to-20).
I replaced this line of code in Point 4:
>val postsDf = sqlContext.createDataFrame(postsRDD, schema)
with:
>val sparkSession = SparkSession.builder.
master(“local”).
appName(“spark session example”).
getOrCreate()
>val postsDf = sparkSession.createDataFrame(postsRDD, schema)
and similarly in Point 8 for testDF.
The code runs fine until I come across a NullPointerException in the end (for “val bc = new BinaryClassificationMetrics(testingResultScores)”), which I’ve yet to figure out.
PS: I noticed a typo in Point 8 (I guess you meant testBody instead of testBoby, or the next line of code wouldn’t run) 🙂
Thanks!
LikeLike
Hi Sejal, you might have a problem in a different place. Could you please share full listing of your code?
LikeLike
Hi Dmitry,
Here’s the code that I ran:
// General purpose library
import scala.xml._
// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline
val fileName = “Posts.small.xml”
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
filter(!_.startsWith(“<?xml version=")).
filter(_ != "”).
filter(_ != “”)
val postsRDD = postsXml.map { s =>
val xml = XML.loadString(s)
val id = (xml \ “@Id”).text
val tags = (xml \ “@Tags”).text
val title = (xml \ “@Title”).text
val body = (xml \ “@Body”).text
val bodyPlain = (“”.r).replaceAllIn(body, ” “)
val text = (title + ” ” + bodyPlain).replaceAll(“\n”, ” “).replaceAll(“( )+”, ” “);
Row(id, tags, text)
}
val schemaString = “Id Tags Text”
val schema = StructType(
schemaString.split(” “).map(fieldName => StructField(fieldName, StringType, true)))
// Spark 2.0
val sparkSession = SparkSession.builder.
master(“local”).
appName(“spark session example”).
getOrCreate()
val postsDf = sparkSession.createDataFrame(postsRDD, schema)
// Spark 1.0
// val postsDf = sqlContext.createDataFrame(postsRDD, schema)
val targetTag = “java”
val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn(“Label”, sqlfunc(col(“Tags”)) )
val positive = postsLabeled.filter(‘Label > 0.0)
val negative = postsLabeled.filter(‘Label (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double]))
val bc = new BinaryClassificationMetrics(testingResultScores)
val roc = bc.areaUnderROC
print(“Area under the ROC:” + roc)
The Null Pointer exception is received for variable ‘testingResultScores’. There were a few warnings received for ‘val model = pipeline.fit(training)’ as well:
16/11/12 20:14:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/11/12 20:14:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
16/11/12 20:14:19 WARN LogisticRegression: LogisticRegression training finished but the result is not converged because: max iterations reached
model: org.apache.spark.ml.PipelineModel = pipeline_fc1efbb88c28
Thanks,
Sejal
LikeLike