From 63d86ed15b7ce30670b839702611ff7711b5c28c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Rod=C3=A1k?= Date: Thu, 25 Jun 2026 14:36:16 +0200 Subject: [PATCH] Use gvproxy notify socket instead of polling for readiness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes: https://redhat.atlassian.net/browse/RUN-4471 Signed-off-by: Jan Rodák --- pkg/machine/apple/apple.go | 11 +- pkg/machine/config.go | 5 +- pkg/machine/gvproxy.go | 14 ++- pkg/machine/gvproxy_notify.go | 165 +++++++++++++++++++++++++++++ pkg/machine/gvproxy_notify_test.go | 95 +++++++++++++++++ pkg/machine/qemu/stubber.go | 15 --- pkg/machine/shim/host.go | 20 +++- pkg/machine/shim/networking.go | 37 ++++--- 8 files changed, 319 insertions(+), 43 deletions(-) create mode 100644 pkg/machine/gvproxy_notify.go create mode 100644 pkg/machine/gvproxy_notify_test.go diff --git a/pkg/machine/apple/apple.go b/pkg/machine/apple/apple.go index 329dc160755..0bb9a26541a 100644 --- a/pkg/machine/apple/apple.go +++ b/pkg/machine/apple/apple.go @@ -24,11 +24,7 @@ import ( const applehvMACAddress = "5a:94:ef:e4:0c:ee" -var ( - gvProxyWaitBackoff = 500 * time.Millisecond - gvProxyMaxBackoffAttempts = 6 - ignitionSocketName = "ignition.sock" -) +var ignitionSocketName = "ignition.sock" // ResizeDisk uses os truncate to resize (only larger) a raw disk. the input size // is assumed GiB @@ -78,11 +74,6 @@ func StartGenericAppleVM(mc *vmconfigs.MachineConfig, cmdBinary string, bootload return nil, nil, err } - // Wait on gvproxy to be running and aware - if err := sockets.WaitForSocketWithBackoffs(gvProxyMaxBackoffAttempts, gvProxyWaitBackoff, gvproxySocket.GetPath(), "gvproxy"); err != nil { - return nil, nil, err - } - netDevice.SetUnixSocketPath(gvproxySocket.GetPath()) // create a one-time virtual machine for starting because we dont want all this information in the diff --git a/pkg/machine/config.go b/pkg/machine/config.go index e0e32cda728..669aa60b07f 100644 --- a/pkg/machine/config.go +++ b/pkg/machine/config.go @@ -19,7 +19,10 @@ import ( "go.podman.io/podman/v6/pkg/machine/vmconfigs" ) -const apiUpTimeout = 20 * time.Second +const ( + apiUpTimeout = 20 * time.Second + GvProxyReadyTimeout = 30 * time.Second +) var ForwarderBinaryName = "gvproxy" diff --git a/pkg/machine/gvproxy.go b/pkg/machine/gvproxy.go index c7a948e866c..6ab699f5c21 100644 --- a/pkg/machine/gvproxy.go +++ b/pkg/machine/gvproxy.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io/fs" + "path/filepath" "strconv" "time" @@ -33,7 +34,8 @@ func readPIDFileWithRetry(f define.VMFile) ([]byte, error) { return f.Read() } -// CleanupGVProxy reads the --pid-file for gvproxy attempts to stop it +// CleanupGVProxy reads the --pid-file for gvproxy, stops the process, +// removes the PID file, and cleans up the notification socket. func CleanupGVProxy(f define.VMFile) error { gvPid, err := readPIDFileWithRetry(f) if err != nil { @@ -51,5 +53,13 @@ func CleanupGVProxy(f define.VMFile) error { if err := waitOnProcess(proxyPid); err != nil { return err } - return removeGVProxyPIDFile(f) + if err := removeGVProxyPIDFile(f); err != nil { + return err + } + + // Remove notification socket after gvproxy has exited + runtimeDir := filepath.Dir(f.GetPath()) + CleanupGvProxyNotifySocket(runtimeDir) + + return nil } diff --git a/pkg/machine/gvproxy_notify.go b/pkg/machine/gvproxy_notify.go new file mode 100644 index 00000000000..89bd15a8b65 --- /dev/null +++ b/pkg/machine/gvproxy_notify.go @@ -0,0 +1,165 @@ +package machine + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "os" + "path/filepath" + + gvProxyTypes "github.com/containers/gvisor-tap-vsock/pkg/types" + "github.com/sirupsen/logrus" +) + +const notifySocketName = "gvproxy-notify.sock" + +// GvProxyNotifier listens on a unix socket for JSON notification messages +// from gvproxy. +type GvProxyNotifier struct { + listener net.Listener + socketPath string + readyCh chan struct{} + connectedCh chan string + errorCh chan error +} + +// SocketPath returns the path to the notification unix socket. +func (n *GvProxyNotifier) SocketPath() string { + return n.socketPath +} + +// NewGvProxyNotifier creates a notification listener socket in the given runtime directory. +// +// The socket begins accepting connections at the kernel level immediately upon creation +// (via net.Listen), so gvproxy can be started and connect before Start() is called without +// losing messages. The kernel backlog buffers both the connection and any data sent on it +// until Accept()/Read() consume them. +// +// The notifier is only used during machine start. Close() stops the listener but +// intentionally leaves the socket file on disk because gvproxy remains running after +// start completes and may still dial the socket for later notifications. The socket +// file is removed by CleanupGvProxyNotifySocket, which is called from CleanupGVProxy +// during machine stop after gvproxy has exited. Any stale socket from a previous run +// is removed at the top of this constructor before creating a new listener. +func NewGvProxyNotifier(runtimeDir string) (*GvProxyNotifier, error) { + socketPath := filepath.Join(runtimeDir, notifySocketName) + + if err := os.Remove(socketPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("removing old notification socket: %w", err) + } + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("creating notification listener: %w", err) + } + + // Prevent Close() from removing the socket file. gvproxy may still + // dial it for notifications after the listener is closed. The file is + // removed by CleanupGvProxyNotifySocket during machine stop. + listener.(*net.UnixListener).SetUnlinkOnClose(false) + + return &GvProxyNotifier{ + listener: listener, + socketPath: socketPath, + readyCh: make(chan struct{}, 1), + connectedCh: make(chan string, 1), + errorCh: make(chan error, 1), + }, nil +} + +// CleanupGvProxyNotifySocket removes the notification socket file from the given runtime directory. +// +// This is called from CleanupGVProxy during machine stop, at which point the notifier +// (which only runs during machine start) has long since closed its listener. +func CleanupGvProxyNotifySocket(runtimeDir string) { + socketPath := filepath.Join(runtimeDir, notifySocketName) + if err := os.Remove(socketPath); err != nil && !errors.Is(err, os.ErrNotExist) { + logrus.Debugf("failed to remove gvproxy notification socket: %v", err) + } +} + +// Close stops the notifier listener. +// +// The socket file is intentionally left on disk because gvproxy may still dial it for later notifications +// (connection_established, connection_closed). +// The file is cleaned up by CleanupGvProxyNotifySocket after gvproxy exits. +func (n *GvProxyNotifier) Close() { + n.listener.Close() +} + +// Start begins accepting connections and processing notifications. +// +// It blocks until the context is canceled or the listener is closed, +// intended to be run as a goroutine. +func (n *GvProxyNotifier) Start(ctx context.Context) { + for { + conn, err := n.listener.Accept() + if err != nil { + if ctx.Err() != nil { + return + } + if errors.Is(err, net.ErrClosed) { + return + } + logrus.Debugf("notification listener accept error: %v", err) + return + } + go n.handleConnection(ctx, conn) + } +} + +func (n *GvProxyNotifier) handleConnection(ctx context.Context, conn net.Conn) { + defer conn.Close() + decoder := json.NewDecoder(conn) + + for { + if ctx.Err() != nil { + return + } + + var msg gvProxyTypes.NotificationMessage + if err := decoder.Decode(&msg); err != nil { + if ctx.Err() != nil { + return + } + logrus.Debugf("notification decode error: %v", err) + return + } + + logrus.Debugf("gvproxy notification received: type=%s mac=%s", msg.NotificationType, msg.MacAddress) + + switch msg.NotificationType { + case gvProxyTypes.Ready: + select { + case n.readyCh <- struct{}{}: + default: + } + case gvProxyTypes.ConnectionEstablished: + select { + case n.connectedCh <- msg.MacAddress: + default: + } + case gvProxyTypes.HypervisorError: + select { + case n.errorCh <- fmt.Errorf("gvproxy reported hypervisor error"): + default: + } + case gvProxyTypes.ConnectionClosed: + logrus.Debugf("gvproxy: VM disconnected (mac=%s)", msg.MacAddress) + } + } +} + +// WaitReady blocks until the "ready" notification is received or the context expires. +func (n *GvProxyNotifier) WaitReady(ctx context.Context) error { + select { + case <-n.readyCh: + return nil + case err := <-n.errorCh: + return err + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for gvproxy ready notification: %w", ctx.Err()) + } +} diff --git a/pkg/machine/gvproxy_notify_test.go b/pkg/machine/gvproxy_notify_test.go new file mode 100644 index 00000000000..b68e8880e17 --- /dev/null +++ b/pkg/machine/gvproxy_notify_test.go @@ -0,0 +1,95 @@ +//go:build amd64 || arm64 + +package machine + +import ( + "context" + "encoding/json" + "errors" + "net" + "os" + "testing" + "time" + + gvproxyTypes "github.com/containers/gvisor-tap-vsock/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGvProxyNotifier_Ready(t *testing.T) { + dir := t.TempDir() + notifier, err := NewGvProxyNotifier(dir) + require.NoError(t, err) + defer notifier.Close() + + go notifier.Start(t.Context()) + + conn, err := net.Dial("unix", notifier.SocketPath()) + require.NoError(t, err) + defer conn.Close() + + err = json.NewEncoder(conn).Encode(gvproxyTypes.NotificationMessage{ + NotificationType: gvproxyTypes.Ready, + }) + require.NoError(t, err) + + waitCtx, waitCancel := context.WithTimeout(t.Context(), 2*time.Second) + defer waitCancel() + err = notifier.WaitReady(waitCtx) + assert.NoError(t, err) +} + +func TestGvProxyNotifier_HypervisorError(t *testing.T) { + dir := t.TempDir() + notifier, err := NewGvProxyNotifier(dir) + require.NoError(t, err) + defer notifier.Close() + + go notifier.Start(t.Context()) + + conn, err := net.Dial("unix", notifier.SocketPath()) + require.NoError(t, err) + defer conn.Close() + + err = json.NewEncoder(conn).Encode(gvproxyTypes.NotificationMessage{ + NotificationType: gvproxyTypes.HypervisorError, + }) + require.NoError(t, err) + + waitCtx, waitCancel := context.WithTimeout(t.Context(), 2*time.Second) + defer waitCancel() + err = notifier.WaitReady(waitCtx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "hypervisor error") +} + +func TestGvProxyNotifier_Timeout(t *testing.T) { + dir := t.TempDir() + notifier, err := NewGvProxyNotifier(dir) + require.NoError(t, err) + defer notifier.Close() + + go notifier.Start(t.Context()) + + waitCtx, waitCancel := context.WithTimeout(t.Context(), 100*time.Millisecond) + defer waitCancel() + err = notifier.WaitReady(waitCtx) + assert.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestCleanupGvProxyNotifySocket(t *testing.T) { + dir := t.TempDir() + + notifier, err := NewGvProxyNotifier(dir) + require.NoError(t, err) + socketPath := notifier.SocketPath() + notifier.Close() + + _, err = os.Stat(socketPath) + assert.NoError(t, err, "socket file should still exist after Close()") + + CleanupGvProxyNotifySocket(dir) + _, err = os.Stat(socketPath) + assert.True(t, errors.Is(err, os.ErrNotExist), "socket file should be gone after cleanup") +} diff --git a/pkg/machine/qemu/stubber.go b/pkg/machine/qemu/stubber.go index 17b872101e5..6289b753dee 100644 --- a/pkg/machine/qemu/stubber.go +++ b/pkg/machine/qemu/stubber.go @@ -33,11 +33,6 @@ type QEMUStubber struct { virtiofsHelpers []virtiofsdHelperCmd } -var ( - gvProxyWaitBackoff = 500 * time.Millisecond - gvProxyMaxBackoffAttempts = 6 -) - func (q *QEMUStubber) UserModeNetworkEnabled(*vmconfigs.MachineConfig) bool { return true } @@ -155,16 +150,6 @@ func (q *QEMUStubber) StartVM(mc *vmconfigs.MachineConfig) (func() error, func() return nil, nil, err } - gvProxySock, err := mc.GVProxySocket() - if err != nil { - return nil, nil, err - } - - // Wait on gvproxy to be running and aware - if err := sockets.WaitForSocketWithBackoffs(gvProxyMaxBackoffAttempts, gvProxyWaitBackoff, gvProxySock.GetPath(), "gvproxy"); err != nil { - return nil, nil, err - } - dnr, dnw, err := machine.GetDevNullFiles() if err != nil { return nil, nil, err diff --git a/pkg/machine/shim/host.go b/pkg/machine/shim/host.go index d1db58b47fc..65fa9377aa6 100644 --- a/pkg/machine/shim/host.go +++ b/pkg/machine/shim/host.go @@ -2,6 +2,7 @@ package shim import ( "bufio" + "context" "errors" "fmt" "io" @@ -583,7 +584,7 @@ func Start(mc *vmconfigs.MachineConfig, mp vmconfigs.VMProvider, opts machine.St } // start gvproxy and set up the API socket forwarding - forwardSocketPath, forwardingState, err := startNetworking(mc, mp) + forwardSocketPath, forwardingState, gvProxyNotifier, err := startNetworking(mc, mp) if err != nil { return err } @@ -598,6 +599,23 @@ func Start(mc *vmconfigs.MachineConfig, mp vmconfigs.VMProvider, opts machine.St } callBackFuncs.Add(cleanGV) + // Wait for gvproxy to signal readiness via the notification socket. + if gvProxyNotifier != nil { + notifyCtx, notifyCancel := context.WithCancel(context.Background()) + go gvProxyNotifier.Start(notifyCtx) + defer func() { + notifyCancel() + gvProxyNotifier.Close() + }() + + gvProxyReadyCtx, gvProxyReadyCancel := context.WithTimeout(context.Background(), machine.GvProxyReadyTimeout) + defer gvProxyReadyCancel() + if err := gvProxyNotifier.WaitReady(gvProxyReadyCtx); err != nil { + return fmt.Errorf("waiting for gvproxy readiness: %w", err) + } + logrus.Debug("gvproxy ready notification received") + } + // if there are generic things that need to be done, a preStart function could be added here // should it be extensive diff --git a/pkg/machine/shim/networking.go b/pkg/machine/shim/networking.go index 7281f8fc94c..861a10b4564 100644 --- a/pkg/machine/shim/networking.go +++ b/pkg/machine/shim/networking.go @@ -30,7 +30,7 @@ var ( ErrSSHNotListening = errors.New("machine is not listening on ssh port") ) -func startHostForwarder(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvider, dirs *define.MachineDirs, hostSocks []string) error { +func startHostForwarder(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvider, dirs *define.MachineDirs, hostSocks []string) (*machine.GvProxyNotifier, error) { forwardUser := mc.SSH.RemoteUsername // TODO should this go up the stack higher or @@ -43,12 +43,12 @@ func startHostForwarder(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvid cfg, err := config.Default() if err != nil { - return err + return nil, err } binary, err := cfg.FindHelperBinary(machine.ForwarderBinaryName, false) if err != nil { - return err + return nil, err } cmd := gvproxy.NewGvproxyCommand() @@ -77,48 +77,57 @@ func startHostForwarder(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvid // This allows a provider to perform additional setup as well as // add in any provider specific options for gvproxy if err := provider.StartNetworking(mc, &cmd); err != nil { - return err + return nil, err + } + + gvProxyNotifier, err := machine.NewGvProxyNotifier(runDir.GetPath()) + if err != nil { + return nil, fmt.Errorf("setting up gvproxy notification listener: %w", err) } c := cmd.Cmd(binary) + c.Args = append(c.Args, "-notification", "unix://"+gvProxyNotifier.SocketPath()) logrus.Debugf("gvproxy command-line: %s %s", binary, strings.Join(cmd.ToCmdline(), " ")) if err := c.Start(); err != nil { - return fmt.Errorf("unable to execute: %q: %w", cmd.ToCmdline(), err) + gvProxyNotifier.Close() + machine.CleanupGvProxyNotifySocket(runDir.GetPath()) + return nil, fmt.Errorf("unable to execute: %q: %w", cmd.ToCmdline(), err) } - return nil + return gvProxyNotifier, nil } -func startNetworking(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvider) (string, machine.APIForwardingState, error) { +func startNetworking(mc *vmconfigs.MachineConfig, provider vmconfigs.VMProvider) (string, machine.APIForwardingState, *machine.GvProxyNotifier, error) { // Check if SSH port is in use, and reassign if necessary if !ports.IsLocalPortAvailable(mc.SSH.Port) { logrus.Warnf("detected port conflict on machine ssh port [%d], reassigning", mc.SSH.Port) if err := reassignSSHPort(mc, provider); err != nil { - return "", 0, err + return "", 0, nil, err } } // Provider has its own networking code path (e.g. WSL) if provider.UseProviderNetworkSetup() { - return "", 0, provider.StartNetworking(mc, nil) + return "", 0, nil, provider.StartNetworking(mc, nil) } dirs, err := env.GetMachineDirs(provider.VMType()) if err != nil { - return "", 0, err + return "", 0, nil, err } hostSocks, forwardSock, forwardingState, err := setupMachineSockets(mc, dirs) if err != nil { - return "", 0, err + return "", 0, nil, err } - if err := startHostForwarder(mc, provider, dirs, hostSocks); err != nil { - return "", 0, err + gvProxyNotifier, err := startHostForwarder(mc, provider, dirs, hostSocks) + if err != nil { + return "", 0, nil, err } - return forwardSock, forwardingState, nil + return forwardSock, forwardingState, gvProxyNotifier, nil } // conductVMReadinessCheck checks to make sure the machine is in the proper state