diff --git a/src/main/scala/Server.scala b/src/main/scala/Server.scala index 56938c7..354c4b7 100644 --- a/src/main/scala/Server.scala +++ b/src/main/scala/Server.scala @@ -2,6 +2,9 @@ import cats.effect._ import cats.implicits.catsSyntaxTuple4Parallel import configuration.{Configuration, ConfigurationUtils, FileAndSystemPropertyReader} import http.HttpClient +import org.http4s.client.Client +import org.http4s.client.middleware.FollowRedirect +import org.http4s.ember.client.EmberClientBuilder import org.slf4j.LoggerFactory import java.nio.channels.ClosedChannelException @@ -18,58 +21,62 @@ object Server extends IOApp { def run(args: List[String]): IO[ExitCode] = { val configReader = FileAndSystemPropertyReader - val httpClient = new HttpClient + val httpClientResource = EmberClientBuilder.default[IO].build.map(FollowRedirect(5)) - for { - initialConfig <- ConfigurationUtils.create(configReader, httpClient) - configRef <- Ref.of[IO, Configuration](initialConfig) - result <- ( - pingTokenSync(configRef, httpClient), - plexRssSync(configRef, httpClient), - plexTokenDeleteSync(configRef, httpClient), - plexFullSync(configRef, httpClient) - ).parTupled.as(ExitCode.Success) - } yield result + httpClientResource.use { implicit resolvedHttpClient: Client[IO] => + + val appHttpClient = new http.HttpClient(resolvedHttpClient) + for { + initialConfig <- ConfigurationUtils.create(configReader, appHttpClient) + configRef <- Ref.of[IO, Configuration](initialConfig) + result <- ( + pingTokenSync(configRef, appHttpClient), + plexRssSync(configRef, appHttpClient), + plexTokenDeleteSync(configRef, appHttpClient), + plexFullSync(configRef, appHttpClient) + ).parTupled.as(ExitCode.Success) + } yield result + } } private def fetchLatestConfig(configRef: Ref[IO, Configuration]): IO[Configuration] = configRef.get - private def pingTokenSync(configRef: Ref[IO, Configuration], httpClient: HttpClient): IO[Unit] = + private def pingTokenSync(configRef: Ref[IO, Configuration], appHttpClient: HttpClient): IO[Unit] = for { config <- fetchLatestConfig(configRef) - _ <- PingTokenSync.run(config, httpClient) + _ <- PingTokenSync.run(config, appHttpClient) _ <- IO.sleep(24.hours) - _ <- pingTokenSync(configRef, httpClient) + _ <- pingTokenSync(configRef, appHttpClient) } yield () private def plexRssSync( configRef: Ref[IO, Configuration], - httpClient: HttpClient + appHttpClient: HttpClient ): IO[Unit] = for { config <- fetchLatestConfig(configRef) - _ <- PlexTokenSync.run(config, httpClient, runFullSync = false) + _ <- PlexTokenSync.run(config, appHttpClient, runFullSync = false) _ <- IO.sleep(config.refreshInterval) - _ <- plexRssSync(configRef, httpClient) + _ <- plexRssSync(configRef, appHttpClient) } yield () private def plexFullSync( configRef: Ref[IO, Configuration], - httpClient: HttpClient + appHttpClient: HttpClient ): IO[Unit] = for { config <- fetchLatestConfig(configRef) - _ <- PlexTokenSync.run(config, httpClient, runFullSync = true) + _ <- PlexTokenSync.run(config, appHttpClient, runFullSync = true) _ <- IO.sleep(19.minutes) - _ <- plexFullSync(configRef, httpClient) + _ <- plexFullSync(configRef, appHttpClient) } yield () - private def plexTokenDeleteSync(configRef: Ref[IO, Configuration], httpClient: HttpClient): IO[Unit] = + private def plexTokenDeleteSync(configRef: Ref[IO, Configuration], appHttpClient: HttpClient): IO[Unit] = for { config <- fetchLatestConfig(configRef) - _ <- PlexTokenDeleteSync.run(config, httpClient) + _ <- PlexTokenDeleteSync.run(config, appHttpClient) _ <- IO.sleep(config.deleteConfiguration.deleteInterval) - _ <- plexTokenDeleteSync(configRef, httpClient) + _ <- plexTokenDeleteSync(configRef, appHttpClient) } yield () } diff --git a/src/main/scala/http/HttpClient.scala b/src/main/scala/http/HttpClient.scala index 75078e6..85c8403 100644 --- a/src/main/scala/http/HttpClient.scala +++ b/src/main/scala/http/HttpClient.scala @@ -10,18 +10,13 @@ import org.http4s.{Header, Method, Request, Uri} import org.typelevel.ci.CIString import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine} import org.slf4j.LoggerFactory - import scala.concurrent.duration._ +import org.http4s.client.Client // This import is fine, but the class signature will use FQN -class HttpClient { +class HttpClient(httpClient: org.http4s.client.Client[IO]) { private val logger = LoggerFactory.getLogger(getClass) - private val client = EmberClientBuilder - .default[IO] - .build - .map(FollowRedirect(5)) - private val cacheTtl = 5.seconds private val cache: AsyncLoadingCache[(Method, Uri, Option[String], Option[Json]), Either[Throwable, Json]] = @@ -66,7 +61,7 @@ class HttpClient { logger.debug(s"HTTP Request: ${requestWithPayload.toString()}") - val responseIO = client.use(_.expect[Json](requestWithPayload).attempt) + val responseIO = httpClient.expect[Json](requestWithPayload).attempt responseIO.map { response => logger.debug(s"HTTP Response: $response") diff --git a/src/test/scala/http/HttpClientSpec.scala b/src/test/scala/http/HttpClientSpec.scala new file mode 100644 index 0000000..88f7e1c --- /dev/null +++ b/src/test/scala/http/HttpClientSpec.scala @@ -0,0 +1,71 @@ +package http + +import cats.effect.{IO, Resource} +import cats.syntax.all._ +import com.comcast.ip4s._ +import io.circe.Json +import org.http4s._ +import org.http4s.circe._ +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.ember.server.EmberServerBuilder +import org.http4s.server.Server +// import org.scalatest.wordspec.AsyncWordSpec // No longer needed +import cats.effect.testing.scalatest.AsyncIOSpec +import org.scalatest.matchers.should.Matchers // Ensure Matchers is imported and used + +class HttpClientSpec extends AsyncIOSpec with Matchers { + + // Define the service for the test server + val testService: HttpRoutes[IO] = HttpRoutes.of[IO] { + case GET -> Root / "test" / id => + Ok(Json.obj("id" -> Json.fromString(id), "message" -> Json.fromString("Ok"))) + } + + // Resource for the test server + val serverResource: Resource[IO, Server] = EmberServerBuilder + .default[IO] + .withHost(ipv4"0.0.0.0") + .withPort(port"0") // Use port 0 to let the system pick an available port + .withHttpApp(testService.orNotFound) + .build + + // Resource for the http4s client + val clientResource: Resource[IO, Client[IO]] = EmberClientBuilder + .default[IO] + .build + + "HttpClient" should { + "stress test HttpClient with multiple requests" in { + val numRequests = 500 + val uniqueRequests = numRequests / 2 + + (serverResource, clientResource).tupled.use { case (server, liveHttpClient) => + val httpClient = new HttpClient(liveHttpClient) // Use the live client from the resource + + val baseUri = s"http://${server.address.getHostString}:${server.address.getPort}/test" + + // Generate 250 unique URIs and 250 duplicates + val uris = (0 until uniqueRequests).map(i => Uri.unsafeFromString(s"$baseUri/$i")).toList ++ + (0 until uniqueRequests).map(i => Uri.unsafeFromString(s"$baseUri/$i")).toList + + uris.length shouldBe numRequests // Quick check + + val resultsIO: IO[List[Either[Throwable, Json]]] = + Traverse[List].traverse(uris)(uri => httpClient.httpRequest(Method.GET, uri)) + + resultsIO.asserting { results => + results.foreach { result => + result shouldBe a[Right[_, _]] // Uses Matchers + result.foreach { json => + json.hcursor.downField("message").as[String] shouldBe Right("Ok") // Uses Matchers + } + } + results.length shouldBe numRequests // Uses Matchers + results.count(_.isRight) shouldBe numRequests // Uses Matchers + } + } + } + } +}