Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,61 @@ The currently available algorithms are:

Clone this repository. And run `./sbt assembly` at the project directory. To create an assembly.

## Quick Start (local)
1. Get `Spark` version 1.3.0. A pre-built version can be downloaded from [here](http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz "SparkDownload")
1a. Untar and set it the location to $SPARK_HOME
2. Clone this repo:
`Git Clone https://github.com/AlpineNow/SparkML2.git`
3. Assemble the jar:
`sbt assembly`
4. Submit Training Job
```
rm -rf /tmp/ModelOutputs/mnist && \
$SPARK_HOME/bin/spark-submit \
--class spark_ml.sequoia_forest.SequoiaForestRunner \
--name SequoiaForestRunner \
--driver-memory 4G \
--executor-memory 4G \
--num-executors 10 \
target/scala-2.10/spark_ml-assembly-0.1.jar \
--inputPath data/mnist.tsv.gz \
--outputPath /tmp/ModelOutputs/mnist \
--numTrees 100 \
--numPartitions 10 \
--labelIndex 780 \
--checkpointDir /tmp/tree
```
5. Submit Prediction Job
```
rm -rf /tmp/ModelOutputs/mnistpredictions && \
$SPARK_HOME/bin/spark-submit \
--class spark_ml.sequoia_forest.SequoiaForestPredictor \
--name SequoiaForestPredictor \
--driver-memory 4G \
--executor-memory 4G \
--num-executors 4 \
target/scala-2.10/spark_ml-assembly-0.1.jar \
--inputPath data/mnist.t.tsv.gz \
--forestPath /tmp/ModelOutputs/mnist \
--outputPath /tmp/ModelOutputs/mnistpredictions \
--labelIndex 780 \
--outputFieldIndices 780 \
--pauseDuration 100
```

## Quick Start (for YARN and Linux variants)

1. Get `Spark` version 1.0.1. A pre-built version can be downloaded from [here](https://spark.apache.org/downloads.html "SparkDownload") for some of Hadoop variants. For different Hadoop versions, you'll have to build it after cloning it from github. E.g., to build `Spark` for Apache Hadoop 2.0.5-alpha with `YARN` support, you could do the following.
1. Get `Spark` version 1.3.0. A pre-built version can be downloaded from [here](https://spark.apache.org/downloads.html "SparkDownload") for some of Hadoop variants. For different Hadoop versions, you'll have to build it after cloning it from github. E.g., to build `Spark` for Apache Hadoop 2.0.5-alpha with `YARN` support, you could do the following.
1. `git clone https://github.com/apache/spark.git`
2. `git checkout tags/v1.0.1`
2. `git checkout tags/v1.3.0`
3. `SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly` or `SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt -Dsbt.override.build.repos=true assembly`
4. If you want to run this against a different `Spark` version, you should modify `project/build.scala` and change versions of `spark-core` and `spark-mllib` to appropriate versions. Of course, you'll also need to build a matching version of `Spark`.
5. Additionally, by default, this package builds against `hadoop-client` version `1.0.4`. This will have to change, for instance if you want to build this against different Hadoop versions that are not protocol-compatible with this version. Refer to this `Spark` [page](http://spark.apache.org/docs/latest/building-with-maven.html "SparkMaven") to find out about different Hadoop versions.
2. Clone this repository, and run `./sbt assembly`.
3. In order to connect to Hadoop clusters, you should have Hadoop configurations stored somewhere. E.g., if your Hadoop configurations are stored in `/home/me/hd-config`, then make sure to have the following environment variables.
* `export HADOOP_CONF_DIR=/home/me/hd-config`
* `export YARN_CONF_DIR=/home/me/hd-config`
4. Find the location of the `Spark` assembly jar. E.g., it might be `assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop2.0.5-alpha.jar` under the `Spark` directory. Run `export SPARK_JAR=jar_location`.
4. Find the location of the `Spark` assembly jar. E.g., it might be `assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.0.5-alpha.jar` under the `Spark` directory. Run `export SPARK_JAR=jar_location`.
5. Have some data you want to train on in HDFS. A couple of data sets are provided in this package under the `data` directory for quick testing. E.g., copy `mnist.tsv.gz` and `mnist.t.tsv.gz` to a HDFS directory (E.g. `/Datasets/`).
6. To train a classifier using `YARN`, run the following. `SPARK_DIR` should be replaced with the directory of `Spark` and `SPARK_ML_DIR` should be replaced with the directory where this package resides.
* `SPARK_DIR/bin/spark-submit --master yarn --deploy-mode cluster --class spark_ml.sequoia_forest.SequoiaForestRunner --name SequoiaForestRunner --driver-memory 4G --executor-memory 4G --num-executors 10 SPARK_ML_DIR/target/scala-2.10/spark_ml-assembly-0.1.jar --inputPath /Datasets/mnist.tsv.gz --outputPath /ModelOutputs/mnist --numTrees 100 --numPartitions 10 --labelIndex 780`
Expand All @@ -42,4 +84,3 @@ Clone this repository. And run `./sbt assembly` at the project directory. To cre
* `SPARK_DIR/bin/spark-submit --master yarn --deploy-mode cluster --class spark_ml.sequoia_forest.SequoiaForestPredictor --name SequoiaForestPredictor --driver-memory 4G --executor-memory 4G --num-executors 4 SPARK_ML_DIR/target/scala-2.10/spark_ml-assembly-0.1.jar --inputPath /Datasets/mnist.t.tsv.gz --forestPath /ModelOutputs/mnist --outputPath /ModelOutputs/mnistpredictions --labelIndex 780 --outputFieldIndices 780 --pauseDuration 100`
* The above command would predict on `mnist.t.tsv.gz` using the previously trained model in `/ModelOutputs/mnist` and write predictions under `/ModelOutputs/mnistpredictions`. It'll also write the value of the column 780 (which happens to be the label in this case) along with the predicted value. In the standard output log of the driver, you should also be able to see computed accuracy since the label is given in this case.
9. Training regression requires adding an argument `--forestType Variance`. Likewise, using categorical features requires adding an argument like `--categoricalFeatureIndices 5,6`. This would mean that columns 5 and 6 are to be treated as categorical features. For other options, refer to the command line arguments described below.

2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.12.4
sbt.version=0.13.7
16 changes: 12 additions & 4 deletions project/build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import sbtassembly.Plugin._
import AssemblyKeys._

object spark_ml extends Build {

object V {
val Spark = "1.3.0"
val Hadoop = "1.0.4"
}

lazy val sharedLibraryDependencies = Seq(
"org.scalatest" %% "scalatest" % "2.1.5" % "test",
"org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
"org.apache.spark" %% "spark-mllib" % "1.0.1" % "provided",
"org.apache.hadoop" % "hadoop-client" % "1.0.4" % "provided",
"org.apache.spark" %% "spark-core" % V.Spark % "provided",
"org.apache.spark" %% "spark-mllib" % V.Spark % "provided",
"org.apache.hadoop" % "hadoop-client" % V.Hadoop % "provided",
"org.spire-math" %% "spire" % "0.8.2",
"org.scalanlp" %% "breeze" % "0.8" % "provided",
"com.github.scopt" %% "scopt" % "3.2.0"
Expand All @@ -39,7 +45,9 @@ object spark_ml extends Build {
"-deprecation"
),

libraryDependencies ++= sharedLibraryDependencies
libraryDependencies ++= sharedLibraryDependencies,
fork := true,
fork in Test := true
) ++ scalariformSettings ++ ScalastylePlugin.Settings ++ assemblySettings ++ extraAssemblySettings

def buildSettings =
Expand Down
8 changes: 4 additions & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")

resolvers += Classpaths.typesafeResolver

addSbtPlugin("com.github.retronym" % "sbt-onejar" % "0.8")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.3.2")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

// adding support for source code formatting using Scalariform
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")

3 changes: 2 additions & 1 deletion src/main/scala/spark_ml/sequoia_forest/ForestStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package spark_ml.sequoia_forest
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStream
import java.net.URI

import spire.implicits._

Expand Down Expand Up @@ -130,7 +131,7 @@ abstract class FSForestStorage extends ForestStorage {
* This stores the trees in an HDFS directory.
*/
class HDFSForestStorage(hadoopConf: Configuration, path: String) extends FSForestStorage {
private val hdfs = FileSystem.get(hadoopConf)
private val hdfs = FileSystem.get(URI.create(path), hadoopConf)
forestPath = path

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,32 @@ object SequoiaForestTrainer {

val rng = new Random() // For generating seeds for the bagger.

val labelIsCategorical: Boolean = treeType == TreeType.Classification_InfoGain

val (maxLabelValue: Double, featureBins: Array[Bins]) = discretizationType match {
case DiscretizationType.EqualWidth => EqualWidthDiscretizer.discretizeFeatures(
input,
categoricalFeatureIndices,
labelIsCategorical = true,
labelIsCategorical = labelIsCategorical,
Map[String, String](
StringConstants.NumBins_Numeric -> maxNumNumericBins.toString,
StringConstants.MaxCardinality_Categoric -> maxNumCategoricalBins.toString))

case DiscretizationType.EqualFrequency => EqualFrequencyDiscretizer.discretizeFeatures(
input,
categoricalFeatureIndices,
labelIsCategorical = true,
labelIsCategorical = labelIsCategorical,
Map[String, String](
StringConstants.NumBins_Numeric -> maxNumNumericBins.toString,
StringConstants.SubSampleCount_Numeric -> "50000", // TODO: Using 50000 samples to find numeric bins but should make this configurable.
StringConstants.SubSampleCount_Numeric -> "10000", // TODO: Using 10000 samples to find numeric bins but should make this configurable.
StringConstants.MaxCardinality_Categoric -> maxNumCategoricalBins.toString,
StringConstants.RandomSeed -> rng.nextInt().toString))

case _ => throw new UnsupportedOperationException("Currently, only equal-width or equal-frequency discretizations are supported.")
}

// If this is classification, the label has to be a non-negative integer.
if (treeType == TreeType.Classification_InfoGain && (maxLabelValue < 0.0 || maxLabelValue.toInt.toDouble != maxLabelValue)) {
if (labelIsCategorical && (maxLabelValue < 0.0 || maxLabelValue.toInt.toDouble != maxLabelValue)) {
throw new InvalidCategoricalValueException(maxLabelValue + " is not a valid target class value.")
}

Expand Down Expand Up @@ -163,7 +165,7 @@ object SequoiaForestTrainer {
discretizedBaggedInput.setCheckpointDir(checkpointDir)
discretizedBaggedInput.setCheckpointInterval(checkpointInterval)

notifiee.newStatusMessage("Finished transforming the input data into propert training data...")
notifiee.newStatusMessage("Finished transforming the input data into proper training data...")

// Determine certain parameters automatically.
val numFeatures = featureBins.length
Expand Down