From cb3105931a07053314be062df2b97690cffd2f7e Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 18 Dec 2013 18:47:33 -0800 Subject: [PATCH 1/5] A basic version of the collapsed Gibbs sampler for LDA which also compiles. --- .../graph/algorithms/TopicModeling.scala | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala new file mode 100644 index 000000000..f83778df9 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala @@ -0,0 +1,196 @@ +package org.apache.spark.graph.algorithms + +import util.Random +import org.apache.spark.graph._ +import org.apache.spark.rdd.RDD +import org.apache.spark._ +import org.apache.spark.broadcast._ + +object LDA { + type DocId = Vid + type WordId = Vid + type TopicId = Int + type Count = Int + + class Posterior (docs: VertexRDD[Array[Count]], words: VertexRDD[Array[Count]]) +} // end of LDA singleton + + +class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], + val nTopics: Int = 100, + val alpha: Double = 0.1, + val beta: Double = 0.1) { + import LDA._ + + /** + * The Factor class represents a vector of counts that is nTopics long. + * + * The factor class internally stores a single topic assignment (e.g., [0 0 0 1 0 0]) as + * a scalar (i.e., topicId = 3) rather than allocating an array. This optimization is + * needed to efficienlty support adding token assignments to count vectors. + * + * The Factor class is a member of the LDA class because it relies on the nTopics member + * variable. + * + * @param topic + * @param counts + */ + class Factor(private var topic: TopicId = -1, private var counts: Array[Count] = null) { + def this(topic: TopicId) = this(topic, null) + def this(counts: Array[Count]) = this(-1, counts) + def makeDense() { + if (counts == null) { + counts = new Array[Count](nTopics) + if (topic >= 0) counts(topic) = 1 + topic = -1 + } + } + def +=(other: Factor) { + makeDense() + if (other.counts == null) { + counts(topic) += 1 + } else { + var i = 0 + while (i < other.counts.size) { + counts(i) += other.counts(i) + i += 1 + } + } + } + def asCounts(): Array[Count] = { + makeDense() + counts + } + } // end of Factor + + + private val sc = tokens.sparkContext + + /** + * The bipartite terms by document graph. + */ + @transient var graph: Graph[Factor, TopicId] = { + // To setup a bipartite graph it is necessary to ensure that the document and + // word ids are in a different namespace + val renumbered = tokens.map { case (wordId, docId) => + assert(wordId >= 0) + assert(docId >= 0) + val newDocId: DocId = -(docId + 1) + (wordId, newDocId) + } + Graph.fromEdgeTuples(renumbered, new Factor).mapEdges(e => Random.nextInt(nTopics)).cache + } + + + /** + * The number of unique words in the corpus + */ + val nwords = graph.vertices.filter{ case (vid, c) => vid >= 0 }.count() + + /** + * The number of documents in the corpus + */ + val ndocs = graph.vertices.filter{ case (vid, c) => vid < 0 }.count() + + /** + * The total counts for each topic + */ + var topicC: Broadcast[Array[Count]] = null + + // Execute update counts after initializing all the member + updateCounts() + + + /** + * The update counts function updates the term and document counts in the + * graph as well as the overall topic count based on the current topic + * assignments of each token (the edge attributes). + * + */ + def updateCounts() { + implicit object FactorAccumParam extends AccumulatorParam[Factor] { + def addInPlace(a: Factor, b: Factor): Factor = { a += b; a } + def zero(initialValue: Factor): Factor = new Factor() + } + val accum = sc.accumulator(new Factor()) + + def mapFun(e: EdgeTriplet[Factor, TopicId]): Iterator[(Vid, Factor)] = { + val f = new Factor(e.attr) + accum += f + Iterator((e.srcId, f), (e.dstId, f)) + } + val newCounts = graph.mapReduceTriplets[Factor](mapFun, (a, b) => { a += b; a } ) + graph = graph.outerJoinVertices(newCounts) { (vid, oldFactor, newFactorOpt) => newFactorOpt.get } + // Trigger computation of the topic counts + // TODO: We should uncache the graph at some point. + graph.cache + graph.vertices.foreach(x => ()) + val globalCounts: Factor = accum.value + topicC = sc.broadcast(globalCounts.asCounts()) + } // end of update counts + + /** + * Run the gibbs sampler + * @param nIter + * @return + */ + def iterate(nIter: Int = 1) { + // Run the sampling + for (i <- 0 until nIter) { + // Resample all the tokens + graph = graph.mapTriplets { + triplet => { + val w: Array[Count] = triplet.srcAttr.asCounts() + val d: Array[Count] = triplet.dstAttr.asCounts() + val total: Array[Count] = topicC.value + val oldTopic = triplet.attr + // Subtract out the old assignment from the counts + w(oldTopic) -= 1 + d(oldTopic) -= 1 + total(oldTopic) -= 1 + assert(w(oldTopic) >= 0) + assert(d(oldTopic) >= 0) + assert(total(oldTopic) >= 0) + // Construct the conditional + val conditional = new Array[Double](nTopics) + var t = 0 + var conditionalSum = 0.0 + while (t < conditional.size) { + conditional(t) = (alpha + d(t)) * (beta * w(t)) / (beta * nwords + total(t)) + conditionalSum += conditional(t) + t += 1 + } + assert(conditionalSum > 0.0) + t = 0 + // Generate a random number between [0, conditionalSum) + val u = Random.nextDouble() * conditionalSum + assert(u < conditionalSum) + // Draw the new topic from the multinomial + t = 0 + var cumsum = conditional(t) + while(cumsum < u) { + t += 1 + cumsum += conditional(t) + } + val newTopic = t + // Cheat !!!! and modify the vertex and edge attributes in place + w(newTopic) += 1 + d(newTopic) += 1 + total(newTopic) += 1 // <-- This might be dangerous + // Return the new topic + newTopic + } // End of resample edge udf + } // end of map triplets + updateCounts() // <-- This is an action + println("Sampled iteration: " + i.toString) + } + } + + def posterior: Posterior = { + graph.cache() + val words = graph.vertices.filter { case (vid, _) => vid >= 0 }.mapValues(_.asCounts()) + val docs = graph.vertices.filter { case (vid,_) => vid < 0 }.mapValues(_.asCounts()) + new LDA.Posterior(words, docs) + } + +} // end of TopicModeling From b52474963bf05f5e69f49759db6175ccb1ccd87e Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 19 Dec 2013 13:41:13 -1000 Subject: [PATCH 2/5] adding code to compute top k words in each topic --- .../graph/algorithms/TopicModeling.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala index f83778df9..cf50f8d6c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala @@ -5,6 +5,7 @@ import org.apache.spark.graph._ import org.apache.spark.rdd.RDD import org.apache.spark._ import org.apache.spark.broadcast._ +import org.apache.spark.util.BoundedPriorityQueue object LDA { type DocId = Vid @@ -129,6 +130,27 @@ class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], topicC = sc.broadcast(globalCounts.asCounts()) } // end of update counts + def topWords(k: Int): Array[Array[(Count, WordId)]] = { + graph.vertices.filter { + case (vid, c) => vid >= 0 + }.mapPartitions { items => + val queues = Array.fill(nTopics)(new BoundedPriorityQueue[(Count, WordId)](k)) + for ((wordId, factor) <- items) { + var t = 0 + val counts: Array[Count] = factor.asCounts() + while (t < nTopics) { + val tpl: (Count, WordId) = (counts(t), wordId) + queues(t) += tpl + t += 1 + } + } + Iterator(queues) + }.reduce { (q1, q2) => + q1.zip(q2).foreach { case (a,b) => a ++= b } + q1 + }.map ( q => q.toArray ) + } // end of TopWords + /** * Run the gibbs sampler * @param nIter From b2b21f377f07238e3861bb51682cde8bc9f18021 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 31 Dec 2013 13:46:35 -0800 Subject: [PATCH 3/5] Addressed several bugs in the implementation and tested successfully on the NIPS corpus. --- .../graph/algorithms/TopicModeling.scala | 379 +++++++++++------- 1 file changed, 240 insertions(+), 139 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala index cf50f8d6c..381bacd95 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala @@ -1,6 +1,5 @@ package org.apache.spark.graph.algorithms -import util.Random import org.apache.spark.graph._ import org.apache.spark.rdd.RDD import org.apache.spark._ @@ -13,133 +12,187 @@ object LDA { type TopicId = Int type Count = Int - class Posterior (docs: VertexRDD[Array[Count]], words: VertexRDD[Array[Count]]) + type Factor = Array[Count] + + class Posterior (docs: VertexRDD[Factor], words: VertexRDD[Factor]) + + def addEq(a: Factor, b: Factor): Factor = { + assert(a.size == b.size) + var i = 0 + while (i < a.size) { + a(i) += b(i) + i += 1 + } + a + } + + def addEq(a: Factor, t: TopicId): Factor = { a(t) += 1; a } + + def makeFactor(nTopics: Int, topic: TopicId): Factor = { + val f = new Factor(nTopics) + f(topic) += 1 + f + } } // end of LDA singleton + + + class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], val nTopics: Int = 100, val alpha: Double = 0.1, val beta: Double = 0.1) { import LDA._ - /** - * The Factor class represents a vector of counts that is nTopics long. - * - * The factor class internally stores a single topic assignment (e.g., [0 0 0 1 0 0]) as - * a scalar (i.e., topicId = 3) rather than allocating an array. This optimization is - * needed to efficienlty support adding token assignments to count vectors. - * - * The Factor class is a member of the LDA class because it relies on the nTopics member - * variable. - * - * @param topic - * @param counts - */ - class Factor(private var topic: TopicId = -1, private var counts: Array[Count] = null) { - def this(topic: TopicId) = this(topic, null) - def this(counts: Array[Count]) = this(-1, counts) - def makeDense() { - if (counts == null) { - counts = new Array[Count](nTopics) - if (topic >= 0) counts(topic) = 1 - topic = -1 - } - } - def +=(other: Factor) { - makeDense() - if (other.counts == null) { - counts(topic) += 1 - } else { - var i = 0 - while (i < other.counts.size) { - counts(i) += other.counts(i) - i += 1 - } - } - } - def asCounts(): Array[Count] = { - makeDense() - counts - } - } // end of Factor - - private val sc = tokens.sparkContext /** * The bipartite terms by document graph. */ - @transient var graph: Graph[Factor, TopicId] = { + var graph: Graph[Factor, TopicId] = { // To setup a bipartite graph it is necessary to ensure that the document and // word ids are in a different namespace val renumbered = tokens.map { case (wordId, docId) => assert(wordId >= 0) assert(docId >= 0) - val newDocId: DocId = -(docId + 1) + val newDocId: DocId = -(docId + 1L) (wordId, newDocId) } - Graph.fromEdgeTuples(renumbered, new Factor).mapEdges(e => Random.nextInt(nTopics)).cache + val nT = nTopics + // Sample the tokens + val gTmp = Graph.fromEdgeTuples(renumbered, false).mapEdges { (pid, iter) => + val gen = new java.util.Random(pid) + iter.map(e => gen.nextInt(nT)) + } + // Compute the topic histograms (factors) for each word and document + val newCounts = gTmp.mapReduceTriplets[Factor]( + e => Iterator((e.srcId, makeFactor(nT, e.attr)), (e.dstId, makeFactor(nT, e.attr))), + (a, b) => addEq(a,b) ) + // Update the graph with the factors + gTmp.outerJoinVertices(newCounts) { (_, _, newFactorOpt) => newFactorOpt.get }.cache + // Trigger computation of the topic counts } - /** * The number of unique words in the corpus */ - val nwords = graph.vertices.filter{ case (vid, c) => vid >= 0 }.count() + val nWords = graph.vertices.filter{ case (vid, c) => vid >= 0 }.count() /** * The number of documents in the corpus */ - val ndocs = graph.vertices.filter{ case (vid, c) => vid < 0 }.count() + val nDocs = graph.vertices.filter{ case (vid, c) => vid < 0 }.count() + + /** + * The number of tokens + */ + val nTokens = graph.edges.count() /** * The total counts for each topic */ - var topicC: Broadcast[Array[Count]] = null + var totalHist = graph.edges.map(e => e.attr) + .aggregate(new Factor(nTopics))(LDA.addEq(_, _), LDA.addEq(_, _)) + assert(totalHist.sum == nTokens) - // Execute update counts after initializing all the member - updateCounts() + /** + * The internal iteration tracks the number of times the random number + * generator was created. In constructing the graph the generated is created + * once and then once for each iteration + */ + private var internalIteration = 1 /** - * The update counts function updates the term and document counts in the - * graph as well as the overall topic count based on the current topic - * assignments of each token (the edge attributes). - * + * Run the gibbs sampler + * @param nIter + * @return */ - def updateCounts() { - implicit object FactorAccumParam extends AccumulatorParam[Factor] { - def addInPlace(a: Factor, b: Factor): Factor = { a += b; a } - def zero(initialValue: Factor): Factor = new Factor() - } - val accum = sc.accumulator(new Factor()) + def iterate(nIter: Int = 1) { + // Run the sampling + for (i <- 0 until nIter) { + println("Starting iteration: " + i.toString) + // Broadcast the topic histogram + val totalHistbcast = sc.broadcast(totalHist) + // Shadowing because scala's closure capture is an abomination + val a = alpha + val b = beta + val nt = nTopics + val nw = nWords + // Define the function to sample a single token + // I had originally used def sampleToken but this leads to closure capture of the LDA class (an abomination). + val sampleToken = (gen: java.util.Random, triplet: EdgeTriplet[Factor, TopicId]) => { + val wHist: Array[Count] = triplet.srcAttr + val dHist: Array[Count] = triplet.dstAttr + val totalHist: Array[Count] = totalHistbcast.value + val oldTopic = triplet.attr + assert(wHist(oldTopic) > 0) + assert(dHist(oldTopic) > 0) + assert(totalHist(oldTopic) > 0) + // Construct the conditional + val conditional = new Array[Double](nt) + var t = 0 + var conditionalSum = 0.0 + while (t < conditional.size) { + val cavityOffset = if (t == oldTopic) 1 else 0 + val w = wHist(t) - cavityOffset + val d = dHist(t) - cavityOffset + val total = totalHist(t) - cavityOffset + conditional(t) = (a + d) * (b + w) / (b * nw + total) + conditionalSum += conditional(t) + t += 1 + } + assert(conditionalSum > 0.0) + // Generate a random number between [0, conditionalSum) + val u = gen.nextDouble() * conditionalSum + assert(u < conditionalSum) + // Draw the new topic from the multinomial + t = 0 + var cumsum = conditional(t) + while(cumsum < u) { + t += 1 + cumsum += conditional(t) + } + val newTopic = t + // Return the new topic + newTopic + } + + // Resample all the tokens + val parts = graph.edges.partitions.size + val interIter = internalIteration + graph = graph.mapTriplets { (pid, iter) => + val gen = new java.util.Random(parts * interIter + pid) + iter.map(token => sampleToken(gen, token)) + } + + // Update the counts + val newCounts = graph.mapReduceTriplets[Factor]( + e => Iterator((e.srcId, makeFactor(nt, e.attr)), (e.dstId, makeFactor(nt, e.attr))), + (a, b) => { addEq(a,b); a } ) + graph = graph.outerJoinVertices(newCounts) { (_, _, newFactorOpt) => newFactorOpt.get }.cache - def mapFun(e: EdgeTriplet[Factor, TopicId]): Iterator[(Vid, Factor)] = { - val f = new Factor(e.attr) - accum += f - Iterator((e.srcId, f), (e.dstId, f)) + // Recompute the global counts (the actual action) + totalHist = graph.edges.map(e => e.attr) + .aggregate(new Factor(nt))(LDA.addEq(_, _), LDA.addEq(_, _)) + assert(totalHist.sum == nTokens) + + internalIteration += 1 + println("Sampled iteration: " + i.toString) } - val newCounts = graph.mapReduceTriplets[Factor](mapFun, (a, b) => { a += b; a } ) - graph = graph.outerJoinVertices(newCounts) { (vid, oldFactor, newFactorOpt) => newFactorOpt.get } - // Trigger computation of the topic counts - // TODO: We should uncache the graph at some point. - graph.cache - graph.vertices.foreach(x => ()) - val globalCounts: Factor = accum.value - topicC = sc.broadcast(globalCounts.asCounts()) - } // end of update counts + } // end of iterate def topWords(k: Int): Array[Array[(Count, WordId)]] = { + val nt = nTopics graph.vertices.filter { case (vid, c) => vid >= 0 }.mapPartitions { items => - val queues = Array.fill(nTopics)(new BoundedPriorityQueue[(Count, WordId)](k)) + val queues = Array.fill(nt)(new BoundedPriorityQueue[(Count, WordId)](k)) for ((wordId, factor) <- items) { var t = 0 - val counts: Array[Count] = factor.asCounts() - while (t < nTopics) { - val tpl: (Count, WordId) = (counts(t), wordId) + while (t < nt) { + val tpl: (Count, WordId) = (factor(t), wordId) queues(t) += tpl t += 1 } @@ -151,68 +204,116 @@ class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], }.map ( q => q.toArray ) } // end of TopWords - /** - * Run the gibbs sampler - * @param nIter - * @return - */ - def iterate(nIter: Int = 1) { - // Run the sampling - for (i <- 0 until nIter) { - // Resample all the tokens - graph = graph.mapTriplets { - triplet => { - val w: Array[Count] = triplet.srcAttr.asCounts() - val d: Array[Count] = triplet.dstAttr.asCounts() - val total: Array[Count] = topicC.value - val oldTopic = triplet.attr - // Subtract out the old assignment from the counts - w(oldTopic) -= 1 - d(oldTopic) -= 1 - total(oldTopic) -= 1 - assert(w(oldTopic) >= 0) - assert(d(oldTopic) >= 0) - assert(total(oldTopic) >= 0) - // Construct the conditional - val conditional = new Array[Double](nTopics) - var t = 0 - var conditionalSum = 0.0 - while (t < conditional.size) { - conditional(t) = (alpha + d(t)) * (beta * w(t)) / (beta * nwords + total(t)) - conditionalSum += conditional(t) - t += 1 - } - assert(conditionalSum > 0.0) - t = 0 - // Generate a random number between [0, conditionalSum) - val u = Random.nextDouble() * conditionalSum - assert(u < conditionalSum) - // Draw the new topic from the multinomial - t = 0 - var cumsum = conditional(t) - while(cumsum < u) { - t += 1 - cumsum += conditional(t) - } - val newTopic = t - // Cheat !!!! and modify the vertex and edge attributes in place - w(newTopic) += 1 - d(newTopic) += 1 - total(newTopic) += 1 // <-- This might be dangerous - // Return the new topic - newTopic - } // End of resample edge udf - } // end of map triplets - updateCounts() // <-- This is an action - println("Sampled iteration: " + i.toString) - } - } - def posterior: Posterior = { graph.cache() - val words = graph.vertices.filter { case (vid, _) => vid >= 0 }.mapValues(_.asCounts()) - val docs = graph.vertices.filter { case (vid,_) => vid < 0 }.mapValues(_.asCounts()) + val words = graph.vertices.filter { case (vid, _) => vid >= 0 } + val docs = graph.vertices.filter { case (vid,_) => vid < 0 } new LDA.Posterior(words, docs) } } // end of TopicModeling + + + +object TopicModeling { + def main(args: Array[String]) { + val host = "local" // args(0) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + var tokensFile = "" + var dictionaryFile = "" + var numVPart = 4 + var numEPart = 4 + var partitionStrategy: Option[PartitionStrategy] = None + var nIter = 50 + var nTopics = 10 + var alpha = 0.1 + var beta = 0.1 + + def pickPartitioner(v: String): PartitionStrategy = v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) + } + + options.foreach{ + case ("tokens", v) => tokensFile = v + case ("dictionary", v) => dictionaryFile = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case ("niter", v) => nIter = v.toInt + case ("ntopics", v) => nTopics = v.toInt + case ("alpha", v) => alpha = v.toDouble + case ("beta", v) => beta = v.toDouble + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + + // def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { + // loggers.map{ + // loggerName => + // val logger = org.apache.log4j.Logger.getLogger(loggerName) + // val prevLevel = logger.getLevel() + // logger.setLevel(level) + // loggerName -> prevLevel + // }.toMap + // } + // setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark")) + + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + val sc = new SparkContext(host, "LDA(" + tokensFile + ")") + + val rawTokens: RDD[(LDA.WordId, LDA.DocId)] = + sc.textFile(tokensFile, numEPart).flatMap { line => + val lineArray = line.split("\\s+") + if(lineArray.length != 3) { + println("Invalid line: " + line) + assert(false) + } + val termId = lineArray(0).trim.toLong + val docId = lineArray(1).trim.toLong + assert(termId >= 0) + assert(docId >= 0) + val count = lineArray(2).trim.toInt + assert(count > 0) + //Iterator((termId, docId)) + Iterator.fill(count)((termId, docId)) + } + + val dictionary = + if (!dictionaryFile.isEmpty) { + scala.io.Source.fromFile(dictionaryFile).getLines.toArray + } else { + Array.empty + } + + val model = new LDA(rawTokens, nTopics, alpha, beta) + model.iterate(nIter) + + val topWords = model.topWords(5) + for (queue <- topWords) { + println("word list: ") + if (!dictionary.isEmpty) { + queue.foreach(t => println("\t(" + t._1 + ", " + dictionary(t._2.toInt - 1) + ")")) + } else { + queue.foreach(t => println("\t" + t.toString)) + } + } + + sc.stop() + + } +} // end of TopicModeling object + From 257cc3fa5ca641bdbe96bab80c987a59249988e8 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sat, 4 Jan 2014 15:40:22 -0800 Subject: [PATCH 4/5] Moving the topK printing to inside the LDA loop to show incremental improvements in topK words. --- .../spark/graph/algorithms/TopicModeling.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala index 381bacd95..f56374766 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala @@ -300,15 +300,17 @@ object TopicModeling { } val model = new LDA(rawTokens, nTopics, alpha, beta) - model.iterate(nIter) - val topWords = model.topWords(5) - for (queue <- topWords) { - println("word list: ") - if (!dictionary.isEmpty) { - queue.foreach(t => println("\t(" + t._1 + ", " + dictionary(t._2.toInt - 1) + ")")) - } else { - queue.foreach(t => println("\t" + t.toString)) + for(iter <- 0 until nIter) { + model.iterate(1) + val topWords = model.topWords(5) + for (queue <- topWords) { + println("word list: ") + if (!dictionary.isEmpty) { + queue.foreach(t => println("\t(" + t._1 + ", " + dictionary(t._2.toInt - 1) + ")")) + } else { + queue.foreach(t => println("\t" + t.toString)) + } } } From 0143f543a7ffe347ced47bce4034affb7f757228 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 5 Jan 2014 12:58:36 -0800 Subject: [PATCH 5/5] Addressing minor bugs in standalone main. --- .../org/apache/spark/graph/algorithms/TopicModeling.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala index f56374766..4b9841d9d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TopicModeling.scala @@ -179,7 +179,6 @@ class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], assert(totalHist.sum == nTokens) internalIteration += 1 - println("Sampled iteration: " + i.toString) } } // end of iterate @@ -217,8 +216,8 @@ class LDA(@transient val tokens: RDD[(LDA.WordId, LDA.DocId)], object TopicModeling { def main(args: Array[String]) { - val host = "local" // args(0) - val options = args.drop(3).map { arg => + val host = args(0) + val options = args.drop(1).map { arg => arg.dropWhile(_ == '-').split('=') match { case Array(opt, v) => (opt -> v) case _ => throw new IllegalArgumentException("Invalid argument: " + arg) @@ -256,6 +255,8 @@ object TopicModeling { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } + println("Tokens: " + tokensFile) + println("Dictionary: " + dictionaryFile) // def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { // loggers.map{ @@ -312,6 +313,7 @@ object TopicModeling { queue.foreach(t => println("\t" + t.toString)) } } + println("Sampled iteration: " + iter.toString) } sc.stop()