Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset

What if you want to create a machine learning model but realized that your input dataset doesn’t fit your computer memory? Usual you would use distributed computing tools like Hadoop and Apache Spark for that computation in a cluster with many machines. However, Apache Spark is able to process your data in local machine standalone mode and even build models when the input data set is larger than the amount of memory your computer has. In this blog post, I’ll show you an end-to-end scenario with Apache Spark where we will be creating a binary classification model using a 34.6 gigabytes of input dataset. Run this scenario in your laptop (yes, yours with its 4-8 gigabytes of memory and 50+ gigabytes of disk space) to test this.

Choose dataset
Choose dataset

1. Input data and expected results

In the previous post we discussed “How To Find Simple And Interesting Multi-Gigabytes Data Set”. The Posts.xml file from this dataset will be used in the current post. The file size is 34.6 gigabytes. This xml file contains the stackoverflow.com posts data as xml attributes:

  1. Title – post title
  2. Body – post text
  3. Tags – list of tags for post
  4. 10+ more xml-attributes that we won’t use.

The full dataset with stackoverflow.com Posts.xml file is available here at https://archive.org/details/stackexchange. Additionally I created a smaller version of this file with only 10 items\posts in it. This file contains a small size of original dataset. This data is licensed under the Creative Commons license (cc-by-sa).

As you might expect, this small file is not the best choice for model training. This file is only good for experimenting with your data preparation code. However, the end-to-end Spark scenario from this article works with this small file as well. Please download the file from here.

Our goal is to create a predictive model which predicts post Tags based on Body and Title. To simplify the task and reduce the amount of code, we are going to concatenate Title and Body and use that as a single text column.

It might be easy to imagine how this model should work in the stackoverflow.com web site – the user types a question and the web size automatically gives tags suggestion.

Assume that we need as many correct tags as possible and that the user would remove the unnecessary tags. Because of this assumption we are choosing recall as a high priority target for our model.

2. Binary and multi-label classification

The problem of stackoverflow tag prediction is a multi-label classification one because the model should predict many classes, which are not exclusive. The same text might be classified as “Java” and “Multithreading”. Note that multi-label classification is a generalization of different problems – multi-class classification problem which predict only one class from a set of classes.

To simplify our the first Apache Spark problem and reduce the amount of code, let’s simplify our problem. Instead of training a multi-label classifier, let’s train a simple binary classifier for a given tag. For instance, for the tag “Java” one classifier will be created which can predict a post that is about the Java language.

By using this simple approach, many classifiers might be created for almost all frequent labels (Java, C++, Python, multi-threading etc…). This approach is simple and good for studying. However, it is not perfect in practice because by splitting predictive models by separate classifiers, you are ignoring the correlations between classes. Another reason – training many classifiers might be computationally expensive.

3. Setup and Run Apache Spark in a standalone mode

If you don’t have Apache Spark in your machine you can simply download it from the Spark web page http://spark.apache.org/. Please use version 1.5.1. Direct link to a pre-built version – http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz

You are ready to run Spark in Standalone mode if Java is installed in your computer. If not – install Java.

For Unix systems and Macs, uncompress the file and copy to any directory. This is a Spark directory now.

Run spark master:

sbin/start-master.sh

Run spark slave:

sbin/start-slaves.sh

Run Spark shell:

bin/spark-shell

Spark shell can run your Scala command in interactive mode.

Windows users can find the instruction here: http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html

If you are working in cluster mode in a Hadoop environment, I’m assuming you already know how to run the Spark shell.

4. Importing libraries

For this end-to-end scenario we are going to use Scala, the primary language for Apache Spark.

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

5. Parsing XML

We need to extract Body, Text and Tags from the input xml file and create a single data-frame with these columns. First, let’s remove the xml header and footer. I assume that the input file is located in the same directory where you run the spark shell command.

val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
                    filter(!_.startsWith("<?xml version=")).
                    filter(_ != "<posts>").
                    filter(_ != "</posts>")

Spark has good functions for parsing json and csv formats. For Xml we need to write several additional lines of code to create a data frame by specifying the schema programmatically.

Note, Scala language automatically converts all xml codes like “<a>” to actual tags “<a>”. Also we are going to concatenate title and body and remove all unnecessary tags and new line characters from the body and all space duplications.

val postsRDD = postsXml.map { s =>
            val xml = XML.loadString(s)

            val id = (xml \ "@Id").text
            val tags = (xml \ "@Tags").text

            val title = (xml \ "@Title").text
            val body = (xml \ "@Body").text
            val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
            val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " ");

            Row(id, tags, text)
        }

To create a data-frame, schema should be applied to RDD.

val schemaString = "Id Tags Text"
val schema = StructType(
      schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)

Now you can take a look at your data frame.

postsDf.show()

6. Preparing training and testing datasets

The next step – creating binary labels for a binary classifier. For this code examples, we are using “java” as a label that we would like to predict by a binary classifier. All rows with the “java” label should be marked as a “1” and rows with no “java” as a “0”. Let’s identify our target tag “java” and create binary labels based on this tag.

val targetTag = "java"
val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) )

Dataset can be split into negative and positive subsets by using the new label.

val positive = postsLabeled.filter('Label > 0.0)
val negative = postsLabeled.filter('Label < 1.0)

We are going to use 90% of our data for the model training and 10% as a testing dataset. Let’s create a training dataset by sampling the positive and negative datasets separately.

val positiveTrain = positive.sample(false, 0.9)
val negativeTrain = negative.sample(false, 0.9)
val training = positiveTrain.unionAll(negativeTrain)

The testing dataset should include all rows which are not included in the training datasets. And again – positive and negative examples separately.

val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label)
val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label)
val testing = negativeTest.unionAll(positiveTest)

7. Training a model

Let’s identify training parameters:

  1. Number of features
  2. Regression parameters
  3. Number of epoch for gradient decent

Spark API creates a model based on columns from the data-frame and the training parameters:

val numFeatures = 64000
val numEpochs = 30
val regParam = 0.02

val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words")
val hashingTF = new  org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures).
          setInputCol(tokenizer.getOutputCol).setOutputCol("Features")
val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam).
                                    setFeaturesCol("Features").setLabelCol("Label").
                                    setRawPredictionCol("Score").setPredictionCol("Prediction")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(training)

8. Testing a model

This is our final code for the binary “Java” classifier which returns a prediction (0.0 or 1.0):

val testTitle = "Easiest way to merge a release into one JAR file"
val testBoby = """Is there a tool or script which easily merges a bunch of 
                   href="http://en.wikipedia.org/wiki/JAR_%28file_format%29"
                   JAR files into one JAR file? A bonus would be to easily set the main-file manifest 
                   and make it executable. I would like to run it with something like:
                  As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy 
                  single-file tool, but the downloaded ZIP file contains a lot of libraries."""
val testText = testTitle + testBody
val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text")
val result = model.transform(testDF)
val prediction = result.collect()(0)(6).asInstanceOf[Double]
print("Prediction: "+ prediction)

Let’s evaluate the quality of the model based on training dataset.

val testingResult = model.transform(testing)
val testingResultScores = testingResult.select("Prediction", "Label").rdd.
                                    map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double]))
val bc = new BinaryClassificationMetrics(testingResultScores)
val roc = bc.areaUnderROC
print("Area under the ROC:" + roc)

If you use the small dataset then the quality of your model is probably not the best. Area under the ROC value will be very low (close to 50%) which indicates a poor quality of the model. With an entire Posts.xml dataset, the quality is no so bad. Area under the ROC is 0.64. Probably you can improve this result by playing with different transformations such as TF-IDF and normalization. Not in this blog post.

Conclusion

Apache Spark could be a great option for data processing and for machine learning scenarios if your dataset is larger than your computer memory can hold. It might not be easy to use Spark in a cluster mode within the Hadoop Yarn environment. However, in a local (or standalone) mode, Spark is as simple as any other analytical tool.

Please let me know if you encountered any problem or had future questions. I would really like to head your feedback.

The complete source code of this program could be found here.

22 thoughts on “Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset

  1. Very interesting post! May I ask two questions:

    1. When you create DataFrame using sqlContext.createDataFrame, will spark crash as the memory could not hold 34.6 gb of data? Or does spark automatically partition the data such that the partitions can fit into the memory, when performing an action?

    2. I am confused about how spark handles data that is larger than memory. Does it partition the data by itself during the transform-action steps, or do I have to manually partition the data into chunks that can hold in memory, and then load them into sparks for the transform-action steps?

    Like

    • Spark won’t crash. Right. It uses a concept like partitions in MapReduce. Less memory – more partitions and more slow I\O operations. This partitioning works for both: transform-actions and a model creation as well.

      This partitioning works without any manual steps. However, you can control the partitioning (and “data locality” in a cluster mode) in code level or config level. See http://eugenezhulenev.com/blog/2015/09/16/spark-ml-for-big-and-small-data/

      Spark was able to create a model on top of 34 gigabytes in my old laptop with only 4GB RAM. It uses less that 1.5 GB by default. It took couple hours. I’ll measure more precisely and will publish actual time.

      Like

      • The result is ready.

        I did the measurements with 34 Gb of input data for two sets of parameters:
        1) 64K features in a hash (numFeatures variable) and maximum number of iterations (numEpochs) is equal to 30.
        2) numFeatures=20K and numEpochs=20

        My old 4GB laptop prepared the input data and created the model in 8 hours and 15 minutes for the first set of parameters. And 6:50 hours for the second set of parameters. See the Scala code from line 1 to line 85.

        Model validation takes 2 hours and 40 minutes for both of the parameters sets. Scala code from a line 87 to 109.

        I did not change any Apache Spark settings. And as I said before Apache Spark uses only 1.5 GB of memory by default in my 4GB machine.

        Like

  2. […] Beginners Guide: Apache Spark Machine Learning Scenario With A Large Input Dataset What if you want to create a machine learning model but realized that your input dataset doesn’t fit your computer memory? Usual you would use distributed computing tools like Hadoop and Apache Spark for that computation in a cluster with many machines. However, Apache Spark is able to process your data in local machine standalone mode and even build models when the input data set is larger than the amount of memory your computer has. In this blog post, I’ll show you an end-to-end scenario with Apache Spark where we will be creating a binary classification model using a 34.6 gigabytes of input dataset. Run this scenario in your laptop (yes, yours with its 4-8 gigabytes of memory and 50+ gigabytes of disk space) to test this. […]

    Like

  3. Its great usage, as a java developer it would be easy for me to understand it better if its Java. Are you considering it to convert it into java?

    Like

  4. Thank you for this post. Very helpful from a beginner perspective. Will surely checkout the rest of the series soon.
    Also, some notes I would like to add here: since I’ve installed Spark 2.0 version, “sqlContext” has been replaced with the “SparkSession” class in that version (http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-16-to-20).

    I replaced this line of code in Point 4:
    >val postsDf = sqlContext.createDataFrame(postsRDD, schema)

    with:
    >val sparkSession = SparkSession.builder.
    master(“local”).
    appName(“spark session example”).
    getOrCreate()
    >val postsDf = sparkSession.createDataFrame(postsRDD, schema)

    and similarly in Point 8 for testDF.
    The code runs fine until I come across a NullPointerException in the end (for “val bc = new BinaryClassificationMetrics(testingResultScores)”), which I’ve yet to figure out.

    PS: I noticed a typo in Point 8 (I guess you meant testBody instead of testBoby, or the next line of code wouldn’t run) 🙂

    Thanks!

    Like

      • Hi Dmitry,

        Here’s the code that I ran:

        // General purpose library
        import scala.xml._

        // Spark data manipulation libraries
        import org.apache.spark.sql.catalyst.plans._
        import org.apache.spark.sql._
        import org.apache.spark.sql.types._
        import org.apache.spark.sql.functions._

        // Spark machine learning libraries
        import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
        import org.apache.spark.ml.classification.LogisticRegression
        import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
        import org.apache.spark.ml.Pipeline

        val fileName = “Posts.small.xml”
        val textFile = sc.textFile(fileName)
        val postsXml = textFile.map(_.trim).
        filter(!_.startsWith(“<?xml version=")).
        filter(_ != "”).
        filter(_ != “”)

        val postsRDD = postsXml.map { s =>
        val xml = XML.loadString(s)

        val id = (xml \ “@Id”).text
        val tags = (xml \ “@Tags”).text

        val title = (xml \ “@Title”).text
        val body = (xml \ “@Body”).text
        val bodyPlain = (“”.r).replaceAllIn(body, ” “)
        val text = (title + ” ” + bodyPlain).replaceAll(“\n”, ” “).replaceAll(“( )+”, ” “);

        Row(id, tags, text)
        }

        val schemaString = “Id Tags Text”
        val schema = StructType(
        schemaString.split(” “).map(fieldName => StructField(fieldName, StringType, true)))

        // Spark 2.0
        val sparkSession = SparkSession.builder.
        master(“local”).
        appName(“spark session example”).
        getOrCreate()

        val postsDf = sparkSession.createDataFrame(postsRDD, schema)

        // Spark 1.0
        // val postsDf = sqlContext.createDataFrame(postsRDD, schema)

        val targetTag = “java”
        val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
        val sqlfunc = udf(myudf)
        val postsLabeled = postsDf.withColumn(“Label”, sqlfunc(col(“Tags”)) )

        val positive = postsLabeled.filter(‘Label > 0.0)
        val negative = postsLabeled.filter(‘Label (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double]))
        val bc = new BinaryClassificationMetrics(testingResultScores)
        val roc = bc.areaUnderROC
        print(“Area under the ROC:” + roc)

        The Null Pointer exception is received for variable ‘testingResultScores’. There were a few warnings received for ‘val model = pipeline.fit(training)’ as well:

        16/11/12 20:14:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
        16/11/12 20:14:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
        16/11/12 20:14:19 WARN LogisticRegression: LogisticRegression training finished but the result is not converged because: max iterations reached
        model: org.apache.spark.ml.PipelineModel = pipeline_fc1efbb88c28

        Thanks,
        Sejal

        Like

Leave a reply to viswanath gangavaram (@visuthemoon) Cancel reply