Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions src/main/scala/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import cats.effect._
import cats.implicits.catsSyntaxTuple4Parallel
import configuration.{Configuration, ConfigurationUtils, FileAndSystemPropertyReader}
import http.HttpClient
import org.http4s.client.Client
import org.http4s.client.middleware.FollowRedirect
import org.http4s.ember.client.EmberClientBuilder
import org.slf4j.LoggerFactory

import java.nio.channels.ClosedChannelException
Expand All @@ -18,58 +21,62 @@ object Server extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val configReader = FileAndSystemPropertyReader
val httpClient = new HttpClient
val httpClientResource = EmberClientBuilder.default[IO].build.map(FollowRedirect(5))

for {
initialConfig <- ConfigurationUtils.create(configReader, httpClient)
configRef <- Ref.of[IO, Configuration](initialConfig)
result <- (
pingTokenSync(configRef, httpClient),
plexRssSync(configRef, httpClient),
plexTokenDeleteSync(configRef, httpClient),
plexFullSync(configRef, httpClient)
).parTupled.as(ExitCode.Success)
} yield result
httpClientResource.use { implicit resolvedHttpClient: Client[IO] =>

val appHttpClient = new http.HttpClient(resolvedHttpClient)
for {
initialConfig <- ConfigurationUtils.create(configReader, appHttpClient)
configRef <- Ref.of[IO, Configuration](initialConfig)
result <- (
pingTokenSync(configRef, appHttpClient),
plexRssSync(configRef, appHttpClient),
plexTokenDeleteSync(configRef, appHttpClient),
plexFullSync(configRef, appHttpClient)
).parTupled.as(ExitCode.Success)
} yield result
}
}

private def fetchLatestConfig(configRef: Ref[IO, Configuration]): IO[Configuration] =
configRef.get

private def pingTokenSync(configRef: Ref[IO, Configuration], httpClient: HttpClient): IO[Unit] =
private def pingTokenSync(configRef: Ref[IO, Configuration], appHttpClient: HttpClient): IO[Unit] =
for {
config <- fetchLatestConfig(configRef)
_ <- PingTokenSync.run(config, httpClient)
_ <- PingTokenSync.run(config, appHttpClient)
_ <- IO.sleep(24.hours)
_ <- pingTokenSync(configRef, httpClient)
_ <- pingTokenSync(configRef, appHttpClient)
} yield ()

private def plexRssSync(
configRef: Ref[IO, Configuration],
httpClient: HttpClient
appHttpClient: HttpClient
): IO[Unit] =
for {
config <- fetchLatestConfig(configRef)
_ <- PlexTokenSync.run(config, httpClient, runFullSync = false)
_ <- PlexTokenSync.run(config, appHttpClient, runFullSync = false)
_ <- IO.sleep(config.refreshInterval)
_ <- plexRssSync(configRef, httpClient)
_ <- plexRssSync(configRef, appHttpClient)
} yield ()

private def plexFullSync(
configRef: Ref[IO, Configuration],
httpClient: HttpClient
appHttpClient: HttpClient
): IO[Unit] =
for {
config <- fetchLatestConfig(configRef)
_ <- PlexTokenSync.run(config, httpClient, runFullSync = true)
_ <- PlexTokenSync.run(config, appHttpClient, runFullSync = true)
_ <- IO.sleep(19.minutes)
_ <- plexFullSync(configRef, httpClient)
_ <- plexFullSync(configRef, appHttpClient)
} yield ()

private def plexTokenDeleteSync(configRef: Ref[IO, Configuration], httpClient: HttpClient): IO[Unit] =
private def plexTokenDeleteSync(configRef: Ref[IO, Configuration], appHttpClient: HttpClient): IO[Unit] =
for {
config <- fetchLatestConfig(configRef)
_ <- PlexTokenDeleteSync.run(config, httpClient)
_ <- PlexTokenDeleteSync.run(config, appHttpClient)
_ <- IO.sleep(config.deleteConfiguration.deleteInterval)
_ <- plexTokenDeleteSync(configRef, httpClient)
_ <- plexTokenDeleteSync(configRef, appHttpClient)
} yield ()
}
11 changes: 3 additions & 8 deletions src/main/scala/http/HttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,13 @@ import org.http4s.{Header, Method, Request, Uri}
import org.typelevel.ci.CIString
import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import org.http4s.client.Client // This import is fine, but the class signature will use FQN

class HttpClient {
class HttpClient(httpClient: org.http4s.client.Client[IO]) {

private val logger = LoggerFactory.getLogger(getClass)

private val client = EmberClientBuilder
.default[IO]
.build
.map(FollowRedirect(5))

private val cacheTtl = 5.seconds

private val cache: AsyncLoadingCache[(Method, Uri, Option[String], Option[Json]), Either[Throwable, Json]] =
Expand Down Expand Up @@ -66,7 +61,7 @@ class HttpClient {

logger.debug(s"HTTP Request: ${requestWithPayload.toString()}")

val responseIO = client.use(_.expect[Json](requestWithPayload).attempt)
val responseIO = httpClient.expect[Json](requestWithPayload).attempt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify direct client usage aligns with resource lifecycle.

The direct usage of the injected httpClient is correct and eliminates the internal resource management that was causing memory leaks. However, ensure that the external resource management (in Server.scala) properly handles the client lifecycle.


🏁 Script executed:

#!/bin/bash
# Description: Verify that the httpClient parameter is being used consistently and no old EmberClientBuilder usage remains
# Expected: Only the new constructor parameter usage, no internal client creation

echo "=== Checking for any remaining EmberClientBuilder usage in HttpClient ==="
rg -A 3 -B 3 "EmberClientBuilder" src/main/scala/http/HttpClient.scala

echo "=== Verifying httpClient parameter usage ==="
rg -A 2 -B 2 "httpClient\." src/main/scala/http/HttpClient.scala

echo "=== Checking for any FollowRedirect usage that should be externalized ==="
rg -A 2 -B 2 "FollowRedirect" src/main/scala/http/HttpClient.scala

Length of output: 1287


Let’s verify that the HTTP client is being managed as a Resource in your server bootstrap. We’ll locate the Server.scala file and check for any EmberClientBuilder usage as well as Resource and .use calls:


🏁 Script executed:

#!/bin/bash
# 1. Find Server.scala
echo "=== Locating Server.scala ==="
fd -e scala Server.scala

# 2. Check for EmberClientBuilder usage in Server.scala
echo "=== Checking EmberClientBuilder usage in Server.scala ==="
fd -e scala Server.scala --exec rg -A 3 -B 3 "EmberClientBuilder" {}

# 3. Verify Resource allocations
echo "=== Checking for Resource in Server.scala ==="
fd -e scala Server.scala --exec rg -n "Resource" {}

# 4. Ensure .use is invoked on the client Resource
echo "=== Checking for .use calls in Server.scala ==="
fd -e scala Server.scala --exec rg -n "\.use" {}

Length of output: 1436


Remove unused EmberClientBuilder and FollowRedirect imports in HttpClient.scala

The direct use of the injected httpClient in HttpClient.scala is correct, but there are leftover imports that should be cleaned up:

• In src/main/scala/http/HttpClient.scala, remove:
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.client.middleware.FollowRedirect

No other references to EmberClientBuilder or FollowRedirect remain in that file, and your calls to httpClient.expect[Json](…) are already using the injected client.

External resource management in Server.scala is correctly handled:

• Builds the client as a Resource:
val httpClientResource = EmberClientBuilder.default[IO].build.map(FollowRedirect(5))
• Invokes .use { implicit resolvedHttpClient => … }

🤖 Prompt for AI Agents
In src/main/scala/http/HttpClient.scala at line 65, the injected httpClient is
used correctly, but the imports for EmberClientBuilder and FollowRedirect are no
longer needed. Remove the imports "org.http4s.ember.client.EmberClientBuilder"
and "org.http4s.client.middleware.FollowRedirect" from the top of the file to
clean up unused dependencies, since the client lifecycle is managed externally
in Server.scala.


responseIO.map { response =>
logger.debug(s"HTTP Response: $response")
Expand Down
71 changes: 71 additions & 0 deletions src/test/scala/http/HttpClientSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package http

import cats.effect.{IO, Resource}
import cats.syntax.all._
import com.comcast.ip4s._
import io.circe.Json
import org.http4s._
import org.http4s.circe._
import org.http4s.client.Client
import org.http4s.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.Server
// import org.scalatest.wordspec.AsyncWordSpec // No longer needed
import cats.effect.testing.scalatest.AsyncIOSpec
import org.scalatest.matchers.should.Matchers // Ensure Matchers is imported and used

class HttpClientSpec extends AsyncIOSpec with Matchers {

// Define the service for the test server
val testService: HttpRoutes[IO] = HttpRoutes.of[IO] {
case GET -> Root / "test" / id =>
Ok(Json.obj("id" -> Json.fromString(id), "message" -> Json.fromString("Ok")))
}

// Resource for the test server
val serverResource: Resource[IO, Server] = EmberServerBuilder
.default[IO]
.withHost(ipv4"0.0.0.0")
.withPort(port"0") // Use port 0 to let the system pick an available port
.withHttpApp(testService.orNotFound)
.build

// Resource for the http4s client
val clientResource: Resource[IO, Client[IO]] = EmberClientBuilder
.default[IO]
.build

"HttpClient" should {
"stress test HttpClient with multiple requests" in {
val numRequests = 500
val uniqueRequests = numRequests / 2

(serverResource, clientResource).tupled.use { case (server, liveHttpClient) =>
val httpClient = new HttpClient(liveHttpClient) // Use the live client from the resource

val baseUri = s"http://${server.address.getHostString}:${server.address.getPort}/test"

// Generate 250 unique URIs and 250 duplicates
val uris = (0 until uniqueRequests).map(i => Uri.unsafeFromString(s"$baseUri/$i")).toList ++
(0 until uniqueRequests).map(i => Uri.unsafeFromString(s"$baseUri/$i")).toList

uris.length shouldBe numRequests // Quick check

val resultsIO: IO[List[Either[Throwable, Json]]] =
Traverse[List].traverse(uris)(uri => httpClient.httpRequest(Method.GET, uri))

resultsIO.asserting { results =>
results.foreach { result =>
result shouldBe a[Right[_, _]] // Uses Matchers
result.foreach { json =>
json.hcursor.downField("message").as[String] shouldBe Right("Ok") // Uses Matchers
}
}
results.length shouldBe numRequests // Uses Matchers
results.count(_.isRight) shouldBe numRequests // Uses Matchers
}
}
}
}
}
Loading