From 498bab064603baf527015bb1516d2680eeb0d7fc Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Fri, 5 May 2023 09:32:55 +0200 Subject: [PATCH] [WIP] Use IO instead of Id --- .../ConfigHelper.scala | 60 +++++----- .../IdImplicits.scala | 29 ----- .../IgluService.scala | 8 +- .../Main.scala | 107 ++++++++++-------- .../MemorySink.scala | 43 +++++-- 5 files changed, 128 insertions(+), 119 deletions(-) delete mode 100644 src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala index 898ae8b..6dbe4de 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala @@ -12,33 +12,40 @@ */ package com.snowplowanalytics.snowplow.micro -import cats.Id +import java.io.File +import java.net.URI +import java.nio.file.{Path, Paths} +import java.security.{KeyStore, SecureRandom} +import java.util.concurrent.TimeUnit +import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} + +import pureconfig.generic.auto._ +import pureconfig.generic.{FieldCoproductHint, ProductHint} +import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource} + +import scala.io.Source + +import com.typesafe.config.{Config, ConfigFactory} + import cats.effect.Clock import cats.implicits._ + +import io.circe.Json +import io.circe.parser.parse +import io.circe.syntax._ + import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.Registry + import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} + import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, SinkConfig} + import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils -import com.typesafe.config.{Config, ConfigFactory} -import io.circe.Json -import io.circe.parser.parse -import io.circe.syntax._ -import pureconfig.generic.auto._ -import pureconfig.generic.{FieldCoproductHint, ProductHint} -import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource} - -import java.io.File -import java.net.URI -import java.nio.file.{Path, Paths} -import java.security.{KeyStore, SecureRandom} -import java.util.concurrent.TimeUnit -import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} -import scala.io.Source /** Contain functions to parse the command line arguments, * to parse the configuration for the collector, Akka HTTP and Iglu @@ -56,21 +63,12 @@ private[micro] object ConfigHelper { implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled") - // Copied from Enrich - necessary for parsing enrichment configs - implicit val clockProvider: Clock[Id] = new Clock[Id] { - final def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - - final def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) - } - type EitherS[A] = Either[String, A] case class MicroConfig( collectorConfig: CollectorConfig, - igluResolver: Resolver[Id], - igluClient: IgluCirceClient[Id], + igluResolver: Resolver[IO], + igluClient: IgluCirceClient[IO], enrichmentConfigs: List[EnrichmentConf], akkaConfig: Config, sslContext: Option[SSLContext], @@ -208,16 +206,16 @@ private[micro] object ConfigHelper { } /** Instantiate an Iglu client from its configuration file. */ - def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[Id], IgluCirceClient[Id])] = + def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[IO], IgluCirceClient[IO])] = for { text <- Either.catchNonFatal(igluConfigSource.mkString).leftMap(_.getMessage) json <- parse(text).leftMap(_.show) config <- Resolver.parseConfig(json).leftMap(_.show) - resolver <- Resolver.fromConfig[Id](config).leftMap(_.show).value + resolver <- Resolver.fromConfig[IO](config).leftMap(_.show).value completeResolver = resolver.copy(repos = resolver.repos ++ extraRegistry) - } yield (completeResolver, IgluCirceClient.fromResolver[Id](completeResolver, config.cacheSize)) + } yield (completeResolver, IgluCirceClient.fromResolver[IO](completeResolver, config.cacheSize)) - def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[Id]) = { + def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[IO]) = { val schemaKey = SchemaKey( "com.snowplowanalytics.snowplow", "enrichments", diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala deleted file mode 100644 index cdcc992..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.micro - -import cats.Id -import cats.effect.Clock -import java.util.concurrent.TimeUnit - -object IdImplicits { - - implicit val clockProvider: Clock[Id] = new Clock[Id] { - final def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - final def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) - } - -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala index c3fbff1..d2d3fb5 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala @@ -16,16 +16,18 @@ package com.snowplowanalytics.snowplow.micro import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.http.scaladsl.model.StatusCodes.NotFound -import cats.Id + +import cats.effect.IO + import io.circe.generic.auto._ import com.snowplowanalytics.iglu.client.resolver.Resolver + import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} -import IdImplicits._ import CirceSupport._ -class IgluService(resolver: Resolver[Id]) { +class IgluService(resolver: Resolver[IO]) { def get(vendor: String, name: String, versionStr: String): Route = SchemaVer.parseFull(versionStr) match { diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala index 174e426..aea4e42 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala @@ -12,65 +12,77 @@ */ package com.snowplowanalytics.snowplow.micro +import java.io.File + +import scala.sys.process._ + +import org.slf4j.LoggerFactory + import akka.actor.ActorSystem import akka.http.scaladsl.{ConnectionContext, Http} -import cats.Id + +import cats.implicits._ + +import cats.effect.{Blocker, ContextShift, ExitCode, IO, IOApp, Sync} + import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorSinks + +import com.snowplowanalytics.forex.ZonedClock + import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF -import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig -import org.slf4j.LoggerFactory -import java.io.File -import scala.sys.process._ +import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig /** Read the configuration and instantiate Snowplow Micro, * which acts as a `Collector` and has an in-memory sink * holding the valid and invalid events. * It offers an HTTP endpoint to query this sink. */ -object Main { +object Main extends IOApp { lazy val logger = LoggerFactory.getLogger(getClass()) - def main(args: Array[String]): Unit = { - val config = ConfigHelper.parseConfig(args) + def run(args: List[String]): IO[ExitCode] = { + val config = ConfigHelper.parseConfig(args.toArray) run(config) } - def setupEnrichments(configs: List[EnrichmentConf]): EnrichmentRegistry[Id] = { - configs.flatMap(_.filesToCache).foreach { case (uri, location) => - logger.info(s"Downloading ${uri}...") - uri.toURL #> new File(location) !! - } - - val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop).value match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while enabling enrichments: $e.") - } - - val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect { - case Some(e: Enrichment) => e.getClass.getSimpleName - } - if (loadedEnrichments.nonEmpty) { - logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") - } else { - logger.info(s"No enrichments enabled.") - } - - enrichmentRegistry + def setupEnrichments( + blocker: Blocker, + configs: List[EnrichmentConf] + ): IO[EnrichmentRegistry[IO]] = { + val maybeRegistry = for { + registry <- EnrichmentRegistry.build[IO](configs, BlockerF.ofBlocker[IO](blocker)) + _ <- configs.flatMap(_.filesToCache).traverse_ { case (uri, location) => + IO(logger.info(s"Downloading ${uri}...")) >> + blocker.blockOn(IO(uri.toURL #> new File(location) !!)) + } + enabledEnrichments = registry.productIterator.toList.collect { + case Some(e: Enrichment) => e.getClass.getSimpleName + } + ///_ <- log + } yield registry + + + //if (loadedEnrichments.nonEmpty) { + // logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") + //} else { + // logger.info(s"No enrichments enabled.") + //} + + maybeRegistry } /** Create the in-memory sink, * get the endpoints for both the collector and to query Snowplow Micro, * and start the HTTP server. */ - def run(config: MicroConfig): Unit = { + def run(config: MicroConfig): IO[ExitCode] = Blocker[IO].use { blocker => implicit val system = ActorSystem.create("snowplow-micro", config.akkaConfig) implicit val executionContext = system.dispatcher - val enrichmentRegistry = setupEnrichments(config.enrichmentConfigs) + val enrichmentRegistry = setupEnrichments[IO](blocker, config.enrichmentConfigs) val sinks = CollectorSinks( MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv), MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv) @@ -79,21 +91,26 @@ object Main { val routes = Routing.getMicroRoutes(config.collectorConfig, sinks, igluService) - Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.port) - .bind(routes) - .foreach { binding => - logger.info(s"REST interface bound to ${binding.localAddress}") - } - - config.sslContext.foreach { sslContext => + val http = IO( Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port) - .enableHttps(ConnectionContext.httpsServer(sslContext)) + .newServerAt(config.collectorConfig.interface, config.collectorConfig.port) .bind(routes) .foreach { binding => - logger.info(s"HTTPS REST interface bound to ${binding.localAddress}") + logger.info(s"REST interface bound to ${binding.localAddress}") } - } - } + ) + + val https = IO( + config.sslContext.foreach { sslContext => + Http() + .newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port) + .enableHttps(ConnectionContext.httpsServer(sslContext)) + .bind(routes) + .foreach { binding => + logger.info(s"HTTPS REST interface bound to ${binding.localAddress}") + } + } + ) + + }.as(ExitCode.Success) } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a78efd5..c6824bd 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -12,24 +12,33 @@ */ package com.snowplowanalytics.snowplow.micro +import org.joda.time.DateTime + +import org.slf4j.LoggerFactory + import cats.implicits._ -import cats.Id import cats.data.Validated + import io.circe.syntax._ -import org.joda.time.DateTime -import org.slf4j.LoggerFactory + +import cats.effect.IO + import com.snowplowanalytics.iglu.client.IgluCirceClient + import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, EventConverter} + import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor} + import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.Sink + import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent} import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils -import IdImplicits._ -import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} + /** Sink of the collector that Snowplow Micro is. * Contains the functions that are called for each tracking event sent * to the collector endpoint. @@ -37,7 +46,11 @@ import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline * For each event it tries to validate it using Common Enrich, * and then stores the results in-memory in [[ValidationCache]]. */ -private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enrichmentRegistry: EnrichmentRegistry[Id], outputEnrichedTsv: Boolean) extends Sink { +private[micro] final case class MemorySink( + igluClient: IgluCirceClient[IO], + enrichmentRegistry: EnrichmentRegistry[IO], + outputEnrichedTsv: Boolean +) extends Sink { val MaxBytes = Int.MaxValue private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) private lazy val logger = LoggerFactory.getLogger("EventLog") @@ -69,8 +82,8 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri */ private[micro] def processThriftBytes( thriftBytes: Array[Byte], - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], + igluClient: IgluCirceClient[IO], + enrichmentRegistry: EnrichmentRegistry[IO], processor: Processor ): Unit = ThriftLoader.toCollectorPayload(thriftBytes, processor) match { @@ -126,11 +139,19 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri */ private[micro] def validateEvent( rawEvent: RawEvent, - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], + igluClient: IgluCirceClient[IO], + enrichmentRegistry: EnrichmentRegistry[IO], processor: Processor ): Either[(List[String], BadRow), GoodEvent] = - EnrichmentManager.enrichEvent[Id](enrichmentRegistry, igluClient, processor, DateTime.now(), rawEvent, EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), ()) + EnrichmentManager.enrichEvent[IO]( + enrichmentRegistry, + igluClient, + processor, + DateTime.now(), + rawEvent, + EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), + IO.unit + ) .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure =>