diff --git a/build.sbt b/build.sbt index c01d73a..ba19e74 100644 --- a/build.sbt +++ b/build.sbt @@ -2,11 +2,11 @@ name := "spark-google-spreadsheets" organization := "com.github.potix2" -scalaVersion := "2.11.12" +scalaVersion := "2.12.13" -crossScalaVersions := Seq("2.11.12") +crossScalaVersions := Seq("2.12.13") -version := "0.6.4-SNAPSHOT" +version := "0.7.1-SNAPSHOT" spName := "potix2/spark-google-spreadsheets" @@ -16,7 +16,7 @@ spIncludeMaven := true spIgnoreProvided := true -sparkVersion := "2.3.3" +sparkVersion := "3.2.0" val testSparkVersion = settingKey[String]("The version of Spark to test against.") @@ -26,7 +26,7 @@ sparkComponents := Seq("sql") libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5" % "provided", - "org.scalatest" %% "scalatest" % "2.2.1" % "test", + "org.scalatest" %% "scalatest" % "3.0.4" % "test", ("com.google.api-client" % "google-api-client" % "1.22.0"). exclude("com.google.guava", "guava-jdk5"), "com.google.oauth-client" % "google-oauth-client-jetty" % "1.22.0", diff --git a/project/plugins.sbt b/project/plugins.sbt index a2f0172..4c0600d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,10 @@ -resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) +resolvers += Resolver.url("artifactory", url("https://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) -resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +resolvers += "Typesafe Repository" at "https://repo.typesafe.com/typesafe/releases/" resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" -resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" +resolvers += "Spark Package Main Repo" at "https://repos.spark-packages.org/" addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4") diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala index aac0463..e8206bd 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala @@ -46,7 +46,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val (spreadsheetName, worksheetName) = pathToSheetNames(parameters) implicit val context = createSpreadsheetContext(parameters) val spreadsheet = SparkSpreadsheetService.findSpreadsheet(spreadsheetName) - if(!spreadsheet.isDefined) + if (!spreadsheet.isDefined) throw new RuntimeException(s"no such a spreadsheet: $spreadsheetName") spreadsheet.get.addWorksheet(worksheetName, data.schema, data.collect().toList, Util.toRowData) @@ -56,7 +56,12 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr private[spreadsheets] def createSpreadsheetContext(parameters: Map[String, String]) = { val serviceAccountIdOption = parameters.get("serviceAccountId") val credentialPath = parameters.getOrElse("credentialPath", DEFAULT_CREDENTIAL_PATH) - SparkSpreadsheetService(serviceAccountIdOption, new File(credentialPath)) + val credentialData = parameters.get("credentialData") + if (credentialData != null && !credentialData.isEmpty) { + SparkSpreadsheetService(serviceAccountIdOption, credentialData) + } else { + SparkSpreadsheetService(serviceAccountIdOption, new File(credentialPath)) + } } private[spreadsheets] def createRelation(sqlContext: SQLContext, diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala index 26bdd4c..31cc826 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala @@ -27,7 +27,14 @@ import org.apache.spark.sql.types.StructType import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential +import com.google.api.services.sheets.v4.SheetsScopes +import java.io.FileInputStream +import java.io.IOException +import java.io.InputStream +import java.util +import org.apache.commons.io.IOUtils object SparkSpreadsheetService { private val SPREADSHEET_URL = new URL("https://spreadsheets.google.com/feeds/spreadsheets/private/full") @@ -36,15 +43,21 @@ object SparkSpreadsheetService { private val HTTP_TRANSPORT: NetHttpTransport = GoogleNetHttpTransport.newTrustedTransport() private val JSON_FACTORY: JacksonFactory = JacksonFactory.getDefaultInstance() - case class SparkSpreadsheetContext(serviceAccountIdOption: Option[String], p12File: File) { - private val credential = authorize(serviceAccountIdOption, p12File) + case class SparkSpreadsheetContext(serviceAccountIdOption: Option[String], credentialData: Option[String], p12File: File) { + var credential: GoogleCredential = _; + if (p12File != null) { + credential = authorize(serviceAccountIdOption, p12File) + } else { + credential = getCredentialFromStream(IOUtils.toInputStream(credentialData.get, "UTF-8")) + } lazy val service = new Sheets.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential) .setApplicationName(APP_NAME) .build() private def authorize(serviceAccountIdOption: Option[String], p12File: File): GoogleCredential = { - val credential = serviceAccountIdOption + if (p12File.getName.endsWith("p12")) { + val credential = serviceAccountIdOption .map { new Builder() .setTransport(HTTP_TRANSPORT) @@ -55,19 +68,95 @@ object SparkSpreadsheetService { .build() }.getOrElse(GoogleCredential.getApplicationDefault.createScoped(scopes)) - credential.refreshToken() + credential.refreshToken() + credential + } else { + getCredentials(p12File) + } + } + + + private def getCredentials(jsonFile: File) = { + val in = new FileInputStream(jsonFile) + try { + getCredentialFromStream(in) + } catch { + case e: IOException => + throw new RuntimeException(e) + } finally if (in != null) in.close() + + } + + private def getCredentialFromStream(in: InputStream) = { + val credential = GoogleCredential.fromStream(in).createScoped(scopes) + credential.refreshToken(); credential } - def findSpreadsheet(spreadSheetId: String): SparkSpreadsheet = + + def findSpreadsheet(spreadSheetId: String) = SparkSpreadsheet(this, service.spreadsheets().get(spreadSheetId).execute()) - def query(spreadsheetId: String, range: String): ValueRange = - service.spreadsheets().values().get(spreadsheetId, range).execute() + def query(spreadsheetId: String, range: String): ValueRange = { + var temp = addSingleQuotesIfEndsWithNumber(range); + var x = service.spreadsheets().values().get(spreadsheetId, temp).execute() + if (x.size() > 0) { + var head = x.getValues.get(0); + var max = 0; + + + if (x.getValues != null) { + var listOfLists = x.getValues + if (listOfLists != null) { + for (i <- 0 until listOfLists.size()) { + var temp = listOfLists.get(i); + if (temp.isInstanceOf[java.util.List[_]]) { + max = temp.asInstanceOf[java.util.List[_]].size(); + } + } + } + } + + var index = 0; + var listIndex = 0; + head.map({ + fieldName => { + if (fieldName == null || fieldName.toString.isEmpty) { + index = index + 1; + head.set(listIndex, "_col" + index); + } + listIndex = listIndex + 1; + } + }) + + if (head != null && head.size() >=0) { + for (i <- head.size() until max) { + index = index + 1; + head.add("_col" + index); + } + } + } + x + } + } + + def endsWithNumber(s: String): Boolean = { + s match { + case _ if s.matches(".*\\d$") => true + case _ => false + } } - case class SparkSpreadsheet(context:SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) { + def addSingleQuotesIfEndsWithNumber(s: String): String = { + s match { + case _ if s.matches(".*\\d$") => s"'$s'" + case _ => s + } + } + + case class SparkSpreadsheet(context: SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) { def name: String = spreadsheet.getProperties.getTitle + def getWorksheets: Seq[SparkWorksheet] = spreadsheet.getSheets.map(new SparkWorksheet(context, spreadsheet, _)) @@ -153,7 +242,7 @@ object SparkSpreadsheetService { def deleteWorksheet(worksheetName: String): Unit = { val worksheet: Option[SparkWorksheet] = findWorksheet(worksheetName) - if(worksheet.isDefined) { + if (worksheet.isDefined) { val request = new Request() val sheetId = worksheet.get.sheet.getProperties.getSheetId request.setDeleteSheet(new DeleteSheetRequest() @@ -169,9 +258,10 @@ object SparkSpreadsheetService { case class SparkWorksheet(context: SparkSpreadsheetContext, spreadsheet: Spreadsheet, sheet: Sheet) { def name: String = sheet.getProperties.getTitle + lazy val values = { val valueRange = context.query(spreadsheet.getSpreadsheetId, name) - if ( valueRange.getValues != null ) + if (valueRange.getValues != null) valueRange.getValues else List[java.util.List[Object]]().asJava @@ -229,7 +319,7 @@ object SparkSpreadsheetService { } def rows: Seq[Map[String, String]] = - if(values.isEmpty) { + if (values.isEmpty) { Seq() } else { @@ -239,17 +329,20 @@ object SparkSpreadsheetService { /** * create new context of spareadsheets for spark - * - * @param serviceAccountId + * + * @param serviceAccountId * @param p12File * @return */ - def apply(serviceAccountIdOption: Option[String], p12File: File) = SparkSpreadsheetContext(serviceAccountIdOption, p12File) + def apply(serviceAccountIdOption: Option[String], p12File: File) = SparkSpreadsheetContext(serviceAccountIdOption, null, p12File) + + def apply(serviceAccountIdOption: Option[String], credentialData: Option[String]) = SparkSpreadsheetContext(serviceAccountIdOption, credentialData, null) + /** * find a spreadsheet by name - * - * @param spreadsheetName + * + * @param spreadsheetName * @param context * @return */