diff --git a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt index b17993601bef78..63a64c495f77e9 100644 --- a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt +++ b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt @@ -13,6 +13,7 @@ import com.intellij.util.application import com.jetbrains.rd.platform.codeWithMe.portForwarding.* import com.jetbrains.rd.util.URI import com.jetbrains.rd.util.lifetime.Lifetime +import com.jetbrains.rd.util.lifetime.LifetimeDefinition import io.gitpod.supervisor.api.Status import io.gitpod.supervisor.api.Status.PortsStatus import io.gitpod.supervisor.api.StatusServiceGrpc @@ -23,18 +24,35 @@ import kotlinx.coroutines.future.asDeferred import org.apache.http.client.utils.URIBuilder import java.util.* import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit @Suppress("UnstableApiUsage") abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService { companion object { const val FORWARDED_PORT_LABEL = "ForwardedByGitpod" const val EXPOSED_PORT_LABEL = "ExposedByGitpod" + private const val MAX_CONCURRENT_OPERATIONS = 10 + private const val BATCH_SIZE = 10 + private const val BATCH_DELAY = 100L + private const val DEBOUNCE_DELAY = 500L + private const val UI_UPDATE_BATCH_SIZE = 50 // Batch size for UI updates } private val perClientPortForwardingManager = service() private val ignoredPortsForNotificationService = service() private val lifetime = Lifetime.Eternal.createNested() + // Store current observed ports and their lifetime references + private val portLifetimes = ConcurrentHashMap() + + // Debounce job for port updates + private var debounceJob: Job? = null + + // Semaphore to limit concurrent operations + private val operationSemaphore = Semaphore(MAX_CONCURRENT_OPERATIONS) + init { start() } private fun start() { @@ -58,7 +76,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService is InterruptedException, is CancellationException -> { cancel("gitpod: Stopped observing ports list due to an expected interruption.") } - else -> { thisLogger().warn( "gitpod: Got an error while trying to get ports list from Supervisor. " + @@ -76,7 +93,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService val completableFuture = CompletableFuture() val statusServiceStub = StatusServiceGrpc.newStub(GitpodManager.supervisorChannel) - val portsStatusRequest = Status.PortsStatusRequest.newBuilder().setObserve(true).build() val portsStatusResponseObserver = object : @@ -86,7 +102,17 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } override fun onNext(response: Status.PortsStatusResponse) { - application.invokeLater { syncPortsListWithClient(response) } + debounceJob?.cancel() + debounceJob = runJob(lifetime) { + try { + delay(DEBOUNCE_DELAY) + syncPortsListWithClient(response) + } catch (e: Exception) { + thisLogger().error("gitpod: Error during port observation", e) + } finally { + debounceJob = null + } + } } override fun onCompleted() { @@ -99,7 +125,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } statusServiceStub.portsStatus(portsStatusRequest, portsStatusResponseObserver) - return completableFuture } @@ -107,13 +132,16 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService return System.getenv("GITPOD_DISABLE_JETBRAINS_LOCAL_PORT_FORWARDING")?.toBoolean() ?: false } - private fun syncPortsListWithClient(response: Status.PortsStatusResponse) { + private suspend fun syncPortsListWithClient(response: Status.PortsStatusResponse) { val ignoredPorts = ignoredPortsForNotificationService.getIgnoredPorts() val portsList = response.portsList.filter { !ignoredPorts.contains(it.localPort) } val portsNumbersFromPortsList = portsList.map { it.localPort } val servedPorts = portsList.filter { it.served } val exposedPorts = servedPorts.filter { it.exposed?.url?.isNotBlank() ?: false } val portsNumbersFromNonServedPorts = portsList.filter { !it.served }.map { it.localPort } + + val allPortsToKeep = mutableSetOf() + val servedPortsToStartForwarding = servedPorts.filter { perClientPortForwardingManager.getPorts(it.localPort).none { p -> p.labels.contains(FORWARDED_PORT_LABEL) } } @@ -127,27 +155,117 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService .map { it.hostPortNumber } .filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) } - servedPortsToStartForwarding.forEach { startForwarding(it) } + coroutineScope { + // Stop operations first to free up resources + val stopForwardingJob = launch { + processPortsInBatches(forwardedPortsToStopForwarding) { port -> + operationSemaphore.withPermit { stopForwarding(port) } + } + } + val stopExposingJob = launch { + processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> + operationSemaphore.withPermit { stopExposingOnClient(port) } + } + } - exposedPortsToStartExposingOnClient.forEach { startExposingOnClient(it) } + // Wait for stop operations to complete + stopForwardingJob.join() + stopExposingJob.join() - forwardedPortsToStopForwarding.forEach { stopForwarding(it) } + // Start new operations + val startForwardingJob = launch { + processPortsInBatches(servedPortsToStartForwarding) { port -> + operationSemaphore.withPermit { + startForwarding(port) + allPortsToKeep.add(port.localPort) + } + } + } + val startExposingJob = launch { + processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> + operationSemaphore.withPermit { + startExposingOnClient(port) + allPortsToKeep.add(port.localPort) + } + } + } + + // Wait for start operations to complete + startForwardingJob.join() + startExposingJob.join() + + // Update presentation in batches to avoid UI thread blocking + val updatePresentationJob = launch { + portsList.chunked(UI_UPDATE_BATCH_SIZE).forEach { batch -> + application.invokeLater { + batch.forEach { port -> + updatePortsPresentation(port) + allPortsToKeep.add(port.localPort) + } + } + delay(50) // Small delay between UI updates to prevent overwhelming the EDT + } + } - exposedPortsToStopExposingOnClient.forEach { stopExposingOnClient(it) } + // Wait for UI updates to complete + updatePresentationJob.join() - portsList.forEach { updatePortsPresentation(it) } + // Clean up after all operations are done + cleanupUnusedLifetimes(allPortsToKeep) + } } - private fun startForwarding(portStatus: PortsStatus) { - if (isLocalPortForwardingDisabled()) { - return + private suspend fun processPortsInBatches(ports: List, action: suspend (T) -> Unit) { + ports.chunked(BATCH_SIZE).forEach { batch -> + try { + batch.forEach { port -> + try { + withTimeout(5000) { + action(port) + } + } catch (e: Exception) { + thisLogger().warn("gitpod: Error processing port in batch", e) + } + } + delay(BATCH_DELAY) + } catch (e: Exception) { + thisLogger().error("gitpod: Error processing batch", e) + delay(BATCH_DELAY * 2) + } } + } + + private fun cleanupUnusedLifetimes(portsToKeep: Set) { + portLifetimes.keys.filter { !portsToKeep.contains(it) }.forEach { port -> + portLifetimes[port]?.let { lifetime -> + thisLogger().debug("gitpod: Terminating lifetime for port $port") + lifetime.terminate() + portLifetimes.remove(port) + } + } + } + + private fun startForwarding(portStatus: PortsStatus) { + if (isLocalPortForwardingDisabled()) return + + val portLifetime = getOrCreatePortLifetime(portStatus.localPort) + try { - perClientPortForwardingManager.forwardPort( + thisLogger().debug("gitpod: Starting forwarding for port ${portStatus.localPort}") + val port = perClientPortForwardingManager.forwardPort( portStatus.localPort, PortType.TCP, setOf(FORWARDED_PORT_LABEL), ) + + portLifetime.onTerminationOrNow { + thisLogger().debug("gitpod: Cleaning up port ${portStatus.localPort} due to lifetime termination") + try { + perClientPortForwardingManager.removePort(port) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove port on lifetime termination", e) + } + } } catch (throwable: Throwable) { if (throwable !is PortAlreadyForwardedException) { thisLogger().warn("gitpod: Caught an exception while forwarding port: ${throwable.message}") @@ -156,62 +274,113 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } private fun stopForwarding(hostPort: Int) { - perClientPortForwardingManager.getPorts(hostPort) + thisLogger().debug("gitpod: Stopping forwarding for port $hostPort") + val portsToRemove = perClientPortForwardingManager.getPorts(hostPort) .filter { it.labels.contains(FORWARDED_PORT_LABEL) } - .forEach { perClientPortForwardingManager.removePort(it) } + + terminatePortLifetime(hostPort) + + portsToRemove.forEach { + try { + perClientPortForwardingManager.removePort(it) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove forwarded port $hostPort", e) + } + } } private fun startExposingOnClient(portStatus: PortsStatus) { - perClientPortForwardingManager.exposePort( + val portLifetime = getOrCreatePortLifetime(portStatus.localPort) + + thisLogger().debug("gitpod: Starting exposing for port ${portStatus.localPort}") + val port = perClientPortForwardingManager.exposePort( portStatus.localPort, portStatus.exposed.url, setOf(EXPOSED_PORT_LABEL), ) + + portLifetime.onTerminationOrNow { + thisLogger().debug("gitpod: Cleaning up exposed port ${portStatus.localPort} due to lifetime termination") + try { + perClientPortForwardingManager.removePort(port) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove exposed port on lifetime termination", e) + } + } } private fun stopExposingOnClient(hostPort: Int) { - perClientPortForwardingManager.getPorts(hostPort) + thisLogger().debug("gitpod: Stopping exposing for port $hostPort") + val portsToRemove = perClientPortForwardingManager.getPorts(hostPort) .filter { it.labels.contains(EXPOSED_PORT_LABEL) } - .forEach { perClientPortForwardingManager.removePort(it) } - } - private fun updatePortsPresentation(portStatus: PortsStatus) { - perClientPortForwardingManager.getPorts(portStatus.localPort).forEach { - if (it.configuration.isForwardedPort()) { - it.presentation.name = portStatus.name - it.presentation.description = portStatus.description - it.presentation.tooltip = "Forwarded" - it.presentation.icon = RowIcon(AllIcons.Actions.Commit) - } else if (it.configuration.isExposedPort()) { - val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility) - - it.presentation.name = portStatus.name - it.presentation.description = portStatus.description - it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})" - it.presentation.icon = if (isPubliclyExposed) { - RowIcon(AllIcons.Actions.Commit) - } else { - RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock) - } + terminatePortLifetime(hostPort) + + portsToRemove.forEach { + try { + perClientPortForwardingManager.removePort(it) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove exposed port $hostPort", e) } } } - override fun getLocalHostUriFromHostPort(hostPort: Int): Optional { - val forwardedPort = perClientPortForwardingManager.getPorts(hostPort).firstOrNull { - it.configuration.isForwardedPort() - } ?: return Optional.empty() + private fun getOrCreatePortLifetime(port: Int): Lifetime = + portLifetimes.computeIfAbsent(port) { + thisLogger().debug("gitpod: Creating new lifetime for port $port") + lifetime.createNested() + } + + private fun terminatePortLifetime(port: Int) { + portLifetimes[port]?.let { portLifetime -> + thisLogger().debug("gitpod: Terminating lifetime for port $port") + portLifetime.terminate() + portLifetimes.remove(port) + } + } - (forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding).clientPortState.let { - return if (it is ClientPortState.Assigned) { - Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build()) - } else { - Optional.empty() + private fun updatePortsPresentation(portStatus: PortsStatus) { + perClientPortForwardingManager.getPorts(portStatus.localPort).forEach { + when { + it.configuration.isForwardedPort() -> { + it.presentation.name = portStatus.name + it.presentation.description = portStatus.description + it.presentation.tooltip = "Forwarded" + it.presentation.icon = RowIcon(AllIcons.Actions.Commit) + } + it.configuration.isExposedPort() -> { + val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility) + it.presentation.name = portStatus.name + it.presentation.description = portStatus.description + it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})" + it.presentation.icon = if (isPubliclyExposed) { + RowIcon(AllIcons.Actions.Commit) + } else { + RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock) + } + } } } } + override fun getLocalHostUriFromHostPort(hostPort: Int): Optional = + perClientPortForwardingManager.getPorts(hostPort) + .firstOrNull { it.configuration.isForwardedPort() } + ?.let { forwardedPort -> + (forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding) + .clientPortState + .let { + if (it is ClientPortState.Assigned) { + Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build()) + } else { + Optional.empty() + } + } + } ?: Optional.empty() + override fun dispose() { + portLifetimes.values.forEach { it.terminate() } + portLifetimes.clear() lifetime.terminate() } }