Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JetBrains] Improve port forwarding logic #20708

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.intellij.util.application
import com.jetbrains.rd.platform.codeWithMe.portForwarding.*
import com.jetbrains.rd.util.URI
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.lifetime.LifetimeDefinition
import io.gitpod.supervisor.api.Status
import io.gitpod.supervisor.api.Status.PortsStatus
import io.gitpod.supervisor.api.StatusServiceGrpc
Expand All @@ -23,18 +24,35 @@ import kotlinx.coroutines.future.asDeferred
import org.apache.http.client.utils.URIBuilder
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

@Suppress("UnstableApiUsage")
abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService {
companion object {
const val FORWARDED_PORT_LABEL = "ForwardedByGitpod"
const val EXPOSED_PORT_LABEL = "ExposedByGitpod"
private const val MAX_CONCURRENT_OPERATIONS = 10
private const val BATCH_SIZE = 10
private const val BATCH_DELAY = 100L
private const val DEBOUNCE_DELAY = 500L
private const val UI_UPDATE_BATCH_SIZE = 50 // Batch size for UI updates
}

private val perClientPortForwardingManager = service<PerClientPortForwardingManager>()
private val ignoredPortsForNotificationService = service<GitpodIgnoredPortsForNotificationService>()
private val lifetime = Lifetime.Eternal.createNested()

// Store current observed ports and their lifetime references
private val portLifetimes = ConcurrentHashMap<Int, LifetimeDefinition>()

// Debounce job for port updates
private var debounceJob: Job? = null

// Semaphore to limit concurrent operations
private val operationSemaphore = Semaphore(MAX_CONCURRENT_OPERATIONS)

init { start() }

private fun start() {
Expand All @@ -58,7 +76,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
is InterruptedException, is CancellationException -> {
cancel("gitpod: Stopped observing ports list due to an expected interruption.")
}

else -> {
thisLogger().warn(
"gitpod: Got an error while trying to get ports list from Supervisor. " +
Expand All @@ -76,7 +93,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
val completableFuture = CompletableFuture<Void>()

val statusServiceStub = StatusServiceGrpc.newStub(GitpodManager.supervisorChannel)

val portsStatusRequest = Status.PortsStatusRequest.newBuilder().setObserve(true).build()

val portsStatusResponseObserver = object :
Expand All @@ -86,7 +102,17 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
}

override fun onNext(response: Status.PortsStatusResponse) {
application.invokeLater { syncPortsListWithClient(response) }
debounceJob?.cancel()
debounceJob = runJob(lifetime) {
try {
delay(DEBOUNCE_DELAY)
syncPortsListWithClient(response)
} catch (e: Exception) {
thisLogger().error("gitpod: Error during port observation", e)
} finally {
debounceJob = null
}
}
}

override fun onCompleted() {
Expand All @@ -99,21 +125,23 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
}

statusServiceStub.portsStatus(portsStatusRequest, portsStatusResponseObserver)

return completableFuture
}

private fun isLocalPortForwardingDisabled(): Boolean {
return System.getenv("GITPOD_DISABLE_JETBRAINS_LOCAL_PORT_FORWARDING")?.toBoolean() ?: false
}

private fun syncPortsListWithClient(response: Status.PortsStatusResponse) {
private suspend fun syncPortsListWithClient(response: Status.PortsStatusResponse) {
val ignoredPorts = ignoredPortsForNotificationService.getIgnoredPorts()
val portsList = response.portsList.filter { !ignoredPorts.contains(it.localPort) }
val portsNumbersFromPortsList = portsList.map { it.localPort }
val servedPorts = portsList.filter { it.served }
val exposedPorts = servedPorts.filter { it.exposed?.url?.isNotBlank() ?: false }
val portsNumbersFromNonServedPorts = portsList.filter { !it.served }.map { it.localPort }

val allPortsToKeep = mutableSetOf<Int>()

val servedPortsToStartForwarding = servedPorts.filter {
perClientPortForwardingManager.getPorts(it.localPort).none { p -> p.labels.contains(FORWARDED_PORT_LABEL) }
}
Expand All @@ -127,27 +155,117 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
.map { it.hostPortNumber }
.filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) }

servedPortsToStartForwarding.forEach { startForwarding(it) }
coroutineScope {
// Stop operations first to free up resources
val stopForwardingJob = launch {
processPortsInBatches(forwardedPortsToStopForwarding) { port ->
operationSemaphore.withPermit { stopForwarding(port) }
}
}
val stopExposingJob = launch {
processPortsInBatches(exposedPortsToStopExposingOnClient) { port ->
operationSemaphore.withPermit { stopExposingOnClient(port) }
}
}

exposedPortsToStartExposingOnClient.forEach { startExposingOnClient(it) }
// Wait for stop operations to complete
stopForwardingJob.join()
stopExposingJob.join()

forwardedPortsToStopForwarding.forEach { stopForwarding(it) }
// Start new operations
val startForwardingJob = launch {
processPortsInBatches(servedPortsToStartForwarding) { port ->
operationSemaphore.withPermit {
startForwarding(port)
allPortsToKeep.add(port.localPort)
}
}
}
val startExposingJob = launch {
processPortsInBatches(exposedPortsToStartExposingOnClient) { port ->
operationSemaphore.withPermit {
startExposingOnClient(port)
allPortsToKeep.add(port.localPort)
}
}
}

// Wait for start operations to complete
startForwardingJob.join()
startExposingJob.join()

// Update presentation in batches to avoid UI thread blocking
val updatePresentationJob = launch {
portsList.chunked(UI_UPDATE_BATCH_SIZE).forEach { batch ->
application.invokeLater {
batch.forEach { port ->
updatePortsPresentation(port)
allPortsToKeep.add(port.localPort)
}
}
delay(50) // Small delay between UI updates to prevent overwhelming the EDT
}
}

exposedPortsToStopExposingOnClient.forEach { stopExposingOnClient(it) }
// Wait for UI updates to complete
updatePresentationJob.join()

portsList.forEach { updatePortsPresentation(it) }
// Clean up after all operations are done
cleanupUnusedLifetimes(allPortsToKeep)
}
}

private fun startForwarding(portStatus: PortsStatus) {
if (isLocalPortForwardingDisabled()) {
return
private suspend fun <T> processPortsInBatches(ports: List<T>, action: suspend (T) -> Unit) {
ports.chunked(BATCH_SIZE).forEach { batch ->
try {
batch.forEach { port ->
try {
withTimeout(5000) {
action(port)
}
} catch (e: Exception) {
thisLogger().warn("gitpod: Error processing port in batch", e)
}
}
delay(BATCH_DELAY)
} catch (e: Exception) {
thisLogger().error("gitpod: Error processing batch", e)
delay(BATCH_DELAY * 2)
}
}
}

private fun cleanupUnusedLifetimes(portsToKeep: Set<Int>) {
portLifetimes.keys.filter { !portsToKeep.contains(it) }.forEach { port ->
portLifetimes[port]?.let { lifetime ->
thisLogger().debug("gitpod: Terminating lifetime for port $port")
lifetime.terminate()
portLifetimes.remove(port)
}
}
}

private fun startForwarding(portStatus: PortsStatus) {
if (isLocalPortForwardingDisabled()) return

val portLifetime = getOrCreatePortLifetime(portStatus.localPort)

try {
perClientPortForwardingManager.forwardPort(
thisLogger().debug("gitpod: Starting forwarding for port ${portStatus.localPort}")
val port = perClientPortForwardingManager.forwardPort(
portStatus.localPort,
PortType.TCP,
setOf(FORWARDED_PORT_LABEL),
)

portLifetime.onTerminationOrNow {
thisLogger().debug("gitpod: Cleaning up port ${portStatus.localPort} due to lifetime termination")
try {
perClientPortForwardingManager.removePort(port)
} catch (e: Exception) {
thisLogger().warn("gitpod: Failed to remove port on lifetime termination", e)
}
}
} catch (throwable: Throwable) {
if (throwable !is PortAlreadyForwardedException) {
thisLogger().warn("gitpod: Caught an exception while forwarding port: ${throwable.message}")
Expand All @@ -156,62 +274,113 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
}

private fun stopForwarding(hostPort: Int) {
perClientPortForwardingManager.getPorts(hostPort)
thisLogger().debug("gitpod: Stopping forwarding for port $hostPort")
val portsToRemove = perClientPortForwardingManager.getPorts(hostPort)
.filter { it.labels.contains(FORWARDED_PORT_LABEL) }
.forEach { perClientPortForwardingManager.removePort(it) }

terminatePortLifetime(hostPort)

portsToRemove.forEach {
try {
perClientPortForwardingManager.removePort(it)
} catch (e: Exception) {
thisLogger().warn("gitpod: Failed to remove forwarded port $hostPort", e)
}
}
}

private fun startExposingOnClient(portStatus: PortsStatus) {
perClientPortForwardingManager.exposePort(
val portLifetime = getOrCreatePortLifetime(portStatus.localPort)

thisLogger().debug("gitpod: Starting exposing for port ${portStatus.localPort}")
val port = perClientPortForwardingManager.exposePort(
portStatus.localPort,
portStatus.exposed.url,
setOf(EXPOSED_PORT_LABEL),
)

portLifetime.onTerminationOrNow {
thisLogger().debug("gitpod: Cleaning up exposed port ${portStatus.localPort} due to lifetime termination")
try {
perClientPortForwardingManager.removePort(port)
} catch (e: Exception) {
thisLogger().warn("gitpod: Failed to remove exposed port on lifetime termination", e)
}
}
}

private fun stopExposingOnClient(hostPort: Int) {
perClientPortForwardingManager.getPorts(hostPort)
thisLogger().debug("gitpod: Stopping exposing for port $hostPort")
val portsToRemove = perClientPortForwardingManager.getPorts(hostPort)
.filter { it.labels.contains(EXPOSED_PORT_LABEL) }
.forEach { perClientPortForwardingManager.removePort(it) }
}

private fun updatePortsPresentation(portStatus: PortsStatus) {
perClientPortForwardingManager.getPorts(portStatus.localPort).forEach {
if (it.configuration.isForwardedPort()) {
it.presentation.name = portStatus.name
it.presentation.description = portStatus.description
it.presentation.tooltip = "Forwarded"
it.presentation.icon = RowIcon(AllIcons.Actions.Commit)
} else if (it.configuration.isExposedPort()) {
val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility)

it.presentation.name = portStatus.name
it.presentation.description = portStatus.description
it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})"
it.presentation.icon = if (isPubliclyExposed) {
RowIcon(AllIcons.Actions.Commit)
} else {
RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock)
}
terminatePortLifetime(hostPort)

portsToRemove.forEach {
try {
perClientPortForwardingManager.removePort(it)
} catch (e: Exception) {
thisLogger().warn("gitpod: Failed to remove exposed port $hostPort", e)
}
}
}

override fun getLocalHostUriFromHostPort(hostPort: Int): Optional<URI> {
val forwardedPort = perClientPortForwardingManager.getPorts(hostPort).firstOrNull {
it.configuration.isForwardedPort()
} ?: return Optional.empty()
private fun getOrCreatePortLifetime(port: Int): Lifetime =
portLifetimes.computeIfAbsent(port) {
thisLogger().debug("gitpod: Creating new lifetime for port $port")
lifetime.createNested()
}

private fun terminatePortLifetime(port: Int) {
portLifetimes[port]?.let { portLifetime ->
thisLogger().debug("gitpod: Terminating lifetime for port $port")
portLifetime.terminate()
portLifetimes.remove(port)
}
}

(forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding).clientPortState.let {
return if (it is ClientPortState.Assigned) {
Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build())
} else {
Optional.empty()
private fun updatePortsPresentation(portStatus: PortsStatus) {
perClientPortForwardingManager.getPorts(portStatus.localPort).forEach {
when {
it.configuration.isForwardedPort() -> {
it.presentation.name = portStatus.name
it.presentation.description = portStatus.description
it.presentation.tooltip = "Forwarded"
it.presentation.icon = RowIcon(AllIcons.Actions.Commit)
}
it.configuration.isExposedPort() -> {
val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility)
it.presentation.name = portStatus.name
it.presentation.description = portStatus.description
it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})"
it.presentation.icon = if (isPubliclyExposed) {
RowIcon(AllIcons.Actions.Commit)
} else {
RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock)
}
}
}
}
}

override fun getLocalHostUriFromHostPort(hostPort: Int): Optional<URI> =
perClientPortForwardingManager.getPorts(hostPort)
.firstOrNull { it.configuration.isForwardedPort() }
?.let { forwardedPort ->
(forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding)
.clientPortState
.let {
if (it is ClientPortState.Assigned) {
Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build())
} else {
Optional.empty()
}
}
} ?: Optional.empty()

override fun dispose() {
portLifetimes.values.forEach { it.terminate() }
portLifetimes.clear()
lifetime.terminate()
}
}
Loading