diff --git a/README.md b/README.md index 6383354..0bb324f 100644 --- a/README.md +++ b/README.md @@ -20,11 +20,53 @@ The currently available algorithms are: Clone this repository. And run `./sbt assembly` at the project directory. To create an assembly. +## Quick Start (local) +1. Get `Spark` version 1.3.0. A pre-built version can be downloaded from [here](http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz "SparkDownload") +1a. Untar and set it the location to $SPARK_HOME +2. Clone this repo: +`Git Clone https://github.com/AlpineNow/SparkML2.git` +3. Assemble the jar: +`sbt assembly` +4. Submit Training Job + ``` +rm -rf /tmp/ModelOutputs/mnist && \ +$SPARK_HOME/bin/spark-submit \ + --class spark_ml.sequoia_forest.SequoiaForestRunner \ + --name SequoiaForestRunner \ + --driver-memory 4G \ + --executor-memory 4G \ + --num-executors 10 \ +target/scala-2.10/spark_ml-assembly-0.1.jar \ + --inputPath data/mnist.tsv.gz \ + --outputPath /tmp/ModelOutputs/mnist \ + --numTrees 100 \ + --numPartitions 10 \ + --labelIndex 780 \ + --checkpointDir /tmp/tree + ``` +5. Submit Prediction Job +``` +rm -rf /tmp/ModelOutputs/mnistpredictions && \ +$SPARK_HOME/bin/spark-submit \ + --class spark_ml.sequoia_forest.SequoiaForestPredictor \ + --name SequoiaForestPredictor \ + --driver-memory 4G \ + --executor-memory 4G \ + --num-executors 4 \ +target/scala-2.10/spark_ml-assembly-0.1.jar \ + --inputPath data/mnist.t.tsv.gz \ + --forestPath /tmp/ModelOutputs/mnist \ + --outputPath /tmp/ModelOutputs/mnistpredictions \ + --labelIndex 780 \ + --outputFieldIndices 780 \ + --pauseDuration 100 +``` + ## Quick Start (for YARN and Linux variants) -1. Get `Spark` version 1.0.1. A pre-built version can be downloaded from [here](https://spark.apache.org/downloads.html "SparkDownload") for some of Hadoop variants. For different Hadoop versions, you'll have to build it after cloning it from github. E.g., to build `Spark` for Apache Hadoop 2.0.5-alpha with `YARN` support, you could do the following. +1. Get `Spark` version 1.3.0. A pre-built version can be downloaded from [here](https://spark.apache.org/downloads.html "SparkDownload") for some of Hadoop variants. For different Hadoop versions, you'll have to build it after cloning it from github. E.g., to build `Spark` for Apache Hadoop 2.0.5-alpha with `YARN` support, you could do the following. 1. `git clone https://github.com/apache/spark.git` - 2. `git checkout tags/v1.0.1` + 2. `git checkout tags/v1.3.0` 3. `SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly` or `SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt -Dsbt.override.build.repos=true assembly` 4. If you want to run this against a different `Spark` version, you should modify `project/build.scala` and change versions of `spark-core` and `spark-mllib` to appropriate versions. Of course, you'll also need to build a matching version of `Spark`. 5. Additionally, by default, this package builds against `hadoop-client` version `1.0.4`. This will have to change, for instance if you want to build this against different Hadoop versions that are not protocol-compatible with this version. Refer to this `Spark` [page](http://spark.apache.org/docs/latest/building-with-maven.html "SparkMaven") to find out about different Hadoop versions. @@ -32,7 +74,7 @@ Clone this repository. And run `./sbt assembly` at the project directory. To cre 3. In order to connect to Hadoop clusters, you should have Hadoop configurations stored somewhere. E.g., if your Hadoop configurations are stored in `/home/me/hd-config`, then make sure to have the following environment variables. * `export HADOOP_CONF_DIR=/home/me/hd-config` * `export YARN_CONF_DIR=/home/me/hd-config` -4. Find the location of the `Spark` assembly jar. E.g., it might be `assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop2.0.5-alpha.jar` under the `Spark` directory. Run `export SPARK_JAR=jar_location`. +4. Find the location of the `Spark` assembly jar. E.g., it might be `assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.0.5-alpha.jar` under the `Spark` directory. Run `export SPARK_JAR=jar_location`. 5. Have some data you want to train on in HDFS. A couple of data sets are provided in this package under the `data` directory for quick testing. E.g., copy `mnist.tsv.gz` and `mnist.t.tsv.gz` to a HDFS directory (E.g. `/Datasets/`). 6. To train a classifier using `YARN`, run the following. `SPARK_DIR` should be replaced with the directory of `Spark` and `SPARK_ML_DIR` should be replaced with the directory where this package resides. * `SPARK_DIR/bin/spark-submit --master yarn --deploy-mode cluster --class spark_ml.sequoia_forest.SequoiaForestRunner --name SequoiaForestRunner --driver-memory 4G --executor-memory 4G --num-executors 10 SPARK_ML_DIR/target/scala-2.10/spark_ml-assembly-0.1.jar --inputPath /Datasets/mnist.tsv.gz --outputPath /ModelOutputs/mnist --numTrees 100 --numPartitions 10 --labelIndex 780` @@ -42,4 +84,3 @@ Clone this repository. And run `./sbt assembly` at the project directory. To cre * `SPARK_DIR/bin/spark-submit --master yarn --deploy-mode cluster --class spark_ml.sequoia_forest.SequoiaForestPredictor --name SequoiaForestPredictor --driver-memory 4G --executor-memory 4G --num-executors 4 SPARK_ML_DIR/target/scala-2.10/spark_ml-assembly-0.1.jar --inputPath /Datasets/mnist.t.tsv.gz --forestPath /ModelOutputs/mnist --outputPath /ModelOutputs/mnistpredictions --labelIndex 780 --outputFieldIndices 780 --pauseDuration 100` * The above command would predict on `mnist.t.tsv.gz` using the previously trained model in `/ModelOutputs/mnist` and write predictions under `/ModelOutputs/mnistpredictions`. It'll also write the value of the column 780 (which happens to be the label in this case) along with the predicted value. In the standard output log of the driver, you should also be able to see computed accuracy since the label is given in this case. 9. Training regression requires adding an argument `--forestType Variance`. Likewise, using categorical features requires adding an argument like `--categoricalFeatureIndices 5,6`. This would mean that columns 5 and 6 are to be treated as categorical features. For other options, refer to the command line arguments described below. - diff --git a/project/build.properties b/project/build.properties index 5e96e96..748703f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.12.4 +sbt.version=0.13.7 diff --git a/project/build.scala b/project/build.scala index d0c1d06..70d9fd3 100644 --- a/project/build.scala +++ b/project/build.scala @@ -8,11 +8,17 @@ import sbtassembly.Plugin._ import AssemblyKeys._ object spark_ml extends Build { + + object V { + val Spark = "1.3.0" + val Hadoop = "1.0.4" + } + lazy val sharedLibraryDependencies = Seq( "org.scalatest" %% "scalatest" % "2.1.5" % "test", - "org.apache.spark" %% "spark-core" % "1.0.1" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.0.1" % "provided", - "org.apache.hadoop" % "hadoop-client" % "1.0.4" % "provided", + "org.apache.spark" %% "spark-core" % V.Spark % "provided", + "org.apache.spark" %% "spark-mllib" % V.Spark % "provided", + "org.apache.hadoop" % "hadoop-client" % V.Hadoop % "provided", "org.spire-math" %% "spire" % "0.8.2", "org.scalanlp" %% "breeze" % "0.8" % "provided", "com.github.scopt" %% "scopt" % "3.2.0" @@ -39,7 +45,9 @@ object spark_ml extends Build { "-deprecation" ), - libraryDependencies ++= sharedLibraryDependencies + libraryDependencies ++= sharedLibraryDependencies, + fork := true, + fork in Test := true ) ++ scalariformSettings ++ ScalastylePlugin.Settings ++ assemblySettings ++ extraAssemblySettings def buildSettings = diff --git a/project/plugins.sbt b/project/plugins.sbt index ea9651f..1c696be 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ -addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0") +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0") resolvers += Classpaths.typesafeResolver @@ -8,8 +8,8 @@ addSbtPlugin("com.github.retronym" % "sbt-onejar" % "0.8") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.3.2") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") // adding support for source code formatting using Scalariform -addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1") +addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0") diff --git a/src/main/scala/spark_ml/sequoia_forest/ForestStorage.scala b/src/main/scala/spark_ml/sequoia_forest/ForestStorage.scala index d638aeb..81cd9e1 100644 --- a/src/main/scala/spark_ml/sequoia_forest/ForestStorage.scala +++ b/src/main/scala/spark_ml/sequoia_forest/ForestStorage.scala @@ -20,6 +20,7 @@ package spark_ml.sequoia_forest import java.io.File import java.io.FileOutputStream import java.io.OutputStream +import java.net.URI import spire.implicits._ @@ -130,7 +131,7 @@ abstract class FSForestStorage extends ForestStorage { * This stores the trees in an HDFS directory. */ class HDFSForestStorage(hadoopConf: Configuration, path: String) extends FSForestStorage { - private val hdfs = FileSystem.get(hadoopConf) + private val hdfs = FileSystem.get(URI.create(path), hadoopConf) forestPath = path /** diff --git a/src/main/scala/spark_ml/sequoia_forest/SequoiaForestTrainer.scala b/src/main/scala/spark_ml/sequoia_forest/SequoiaForestTrainer.scala index a71a305..4cf9804 100644 --- a/src/main/scala/spark_ml/sequoia_forest/SequoiaForestTrainer.scala +++ b/src/main/scala/spark_ml/sequoia_forest/SequoiaForestTrainer.scala @@ -106,11 +106,13 @@ object SequoiaForestTrainer { val rng = new Random() // For generating seeds for the bagger. + val labelIsCategorical: Boolean = treeType == TreeType.Classification_InfoGain + val (maxLabelValue: Double, featureBins: Array[Bins]) = discretizationType match { case DiscretizationType.EqualWidth => EqualWidthDiscretizer.discretizeFeatures( input, categoricalFeatureIndices, - labelIsCategorical = true, + labelIsCategorical = labelIsCategorical, Map[String, String]( StringConstants.NumBins_Numeric -> maxNumNumericBins.toString, StringConstants.MaxCardinality_Categoric -> maxNumCategoricalBins.toString)) @@ -118,10 +120,10 @@ object SequoiaForestTrainer { case DiscretizationType.EqualFrequency => EqualFrequencyDiscretizer.discretizeFeatures( input, categoricalFeatureIndices, - labelIsCategorical = true, + labelIsCategorical = labelIsCategorical, Map[String, String]( StringConstants.NumBins_Numeric -> maxNumNumericBins.toString, - StringConstants.SubSampleCount_Numeric -> "50000", // TODO: Using 50000 samples to find numeric bins but should make this configurable. + StringConstants.SubSampleCount_Numeric -> "10000", // TODO: Using 10000 samples to find numeric bins but should make this configurable. StringConstants.MaxCardinality_Categoric -> maxNumCategoricalBins.toString, StringConstants.RandomSeed -> rng.nextInt().toString)) @@ -129,7 +131,7 @@ object SequoiaForestTrainer { } // If this is classification, the label has to be a non-negative integer. - if (treeType == TreeType.Classification_InfoGain && (maxLabelValue < 0.0 || maxLabelValue.toInt.toDouble != maxLabelValue)) { + if (labelIsCategorical && (maxLabelValue < 0.0 || maxLabelValue.toInt.toDouble != maxLabelValue)) { throw new InvalidCategoricalValueException(maxLabelValue + " is not a valid target class value.") } @@ -163,7 +165,7 @@ object SequoiaForestTrainer { discretizedBaggedInput.setCheckpointDir(checkpointDir) discretizedBaggedInput.setCheckpointInterval(checkpointInterval) - notifiee.newStatusMessage("Finished transforming the input data into propert training data...") + notifiee.newStatusMessage("Finished transforming the input data into proper training data...") // Determine certain parameters automatically. val numFeatures = featureBins.length