Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
object V {
// Snowplow
val snowplowStreamCollector = "2.8.1"
val snowplowCommonEnrich = "3.6.1"
val snowplowCommonEnrich = "3.8.0"

// circe
val circe = "0.14.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package com.snowplowanalytics.snowplow.micro

import cats.Id
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.Resolver
Expand All @@ -24,6 +23,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorCon
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils
import com.snowplowanalytics.snowplow.micro.IdImplicits._
import com.typesafe.config.{Config, ConfigFactory}
import io.circe.Json
import io.circe.parser.parse
Expand All @@ -36,7 +36,6 @@ import java.io.File
import java.net.URI
import java.nio.file.{Path, Paths}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.TimeUnit
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.io.Source

Expand All @@ -54,17 +53,8 @@ private[micro] object ConfigHelper {
implicit def hint[T] =
ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))

implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled")

// Copied from Enrich - necessary for parsing enrichment configs
implicit val clockProvider: Clock[Id] = new Clock[Id] {
final def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)

final def monotonic(unit: TimeUnit): Id[Long] =
unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)
}

implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled")
type EitherS[A] = Either[String, A]

case class MicroConfig(
Expand Down
21 changes: 14 additions & 7 deletions src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -12,18 +12,25 @@
*/
package com.snowplowanalytics.snowplow.micro

import java.io.File

import org.slf4j.LoggerFactory

import scala.sys.process._

import akka.actor.ActorSystem
import akka.http.scaladsl.{ConnectionContext, Http}

import cats.Id

import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorSinks

import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig
import org.slf4j.LoggerFactory
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution}

import java.io.File
import scala.sys.process._
import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig
import com.snowplowanalytics.snowplow.micro.IdImplicits._

/** Read the configuration and instantiate Snowplow Micro,
* which acts as a `Collector` and has an in-memory sink
Expand All @@ -44,7 +51,7 @@ object Main {
uri.toURL #> new File(location) !!
}

val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop).value match {
val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop, ShiftExecution.noop).value match {
case Right(ok) => ok
case Left(e) =>
throw new IllegalArgumentException(s"Error while enabling enrichments: $e.")
Expand Down