Skip to content

Commit

Permalink
Merge pull request #16 from jpbnetley/feature/library_updates
Browse files Browse the repository at this point in the history
database cleanup
  • Loading branch information
jpbnetley authored Apr 19, 2020
2 parents 716302a + b6adcc4 commit c375d4f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 25 deletions.
38 changes: 20 additions & 18 deletions src/main/scala/Util/DataBuilder/Processing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import org.bson.Document

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.reflect.io.File
import Util.Logging.log
import Util.ErrorHandler._
import Util.Models.OrderedFile

Expand All @@ -25,22 +23,26 @@ object Processing {
*/
def csvFiles(csvFiles: List[OrderedFile])
(implicit database: Database.type): Task[Either[Exception, Unit]] = {
Task.wander(csvFiles) { orderedFile =>
println(s"Processing file ${orderedFile.index + 1} of ${csvFiles.length} file name: ${orderedFile.file.name}")
(for {
fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file))
headers = fileLines.headOption.map(_.split(',').toList)
lineItems = fileLines.drop(1)
collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase
documentResult <- EitherT(buildMongoDocuments(headers, lineItems))
db <- EitherT.right[Exception](database.getDatabase)
dbInsert <- EitherT.rightT[Task, Exception](db.getCollection[Document](collectionName).insertMany(documentResult))
} yield {
println(s"Inserting into db: $dbInsert")
Await.result(dbInsert.toFuture(), Duration.Inf)
println(s"Done processing file ${orderedFile.index + 1}")
}).value
}.map { result =>
val insertion = {
Task.wander(csvFiles) { orderedFile =>
println(s"Processing file ${orderedFile.index + 1} of ${csvFiles.length} file name: ${orderedFile.file.name}")
(for {
fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file))
headers = fileLines.headOption.map(_.split(',').toList)
lineItems = fileLines.drop(1)
collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase
documentResult <- EitherT(buildMongoDocuments(headers, lineItems))
db <- EitherT(database.getDatabase)
dbInsert <- EitherT.rightT[Task, Exception](db.getCollection[Document](collectionName).insertMany(documentResult))
} yield {
println(s"Inserting into db: $dbInsert")
Await.result(dbInsert.toFuture(), Duration.Inf)
println(s"Done processing file ${orderedFile.index + 1}")
}).value
}
}

insertion.map { result =>
val (errors, _) = result.separate
errors.headOption.map(error).toLeft(())
}
Expand Down
22 changes: 15 additions & 7 deletions src/main/scala/Util/Database/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import com.mongodb.{MongoClientSettings, MongoCredential, ServerAddress}
import monix.eval.Task
import org.mongodb.scala.{MongoClient, MongoDatabase}
import Util.ErrorHandler._
import cats.implicits._

import scala.jdk.CollectionConverters._

case object Database {

private var mongoClient: Option[MongoClient] = None

/** builds mongo settings with auth
*
* @param authUser on the db
Expand Down Expand Up @@ -54,20 +57,25 @@ case object Database {
}
}

private val databaseName: String = sys.env.get("mongo_db_name").fold(throw error("environment variables not set for db name"))(identity)
private val settings: MongoClientSettings.Builder = settingsBuilder().fold(throw error("environment variables not set for db"))(identity)
private val mongoClient: MongoClient = MongoClient(settings.build())
private val database: MongoDatabase = mongoClient.getDatabase(databaseName)

/** Returns thee database instance
*
* @return
*/
def getDatabase: Task[MongoDatabase] = Task(database)
def getDatabase: Task[Either[Exception, MongoDatabase]] = Task.eval {
for {
databaseName <- Either.fromOption(sys.env.get("mongo_db_name"), error("environment variables not set for db name"))
settings <- Either.fromOption(settingsBuilder(), error("environment variables not set for db"))
mongoClient = MongoClient(settings.build()).some
mClient <- Either.fromOption(mongoClient, error("mongo client could not be defined"))
} yield mClient.getDatabase(databaseName)
}

/** Closes the mongo connection
*
*/
def close(): Unit = mongoClient.close()
def close(): Unit = mongoClient match {
case Some(db) => db.close()
case None => ()
}

}

0 comments on commit c375d4f

Please sign in to comment.