Skip to content

[APP-7355]: Remove TCP socket support for Windows, and replace it with existing unix socket support for all OS. #4714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 15 additions & 32 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -43,9 +42,6 @@ import (
rutils "go.viam.com/rdk/utils"
)

// tcpPortRange is the beginning of the port range. Only used when ViamTCPSockets() = true.
const tcpPortRange = 13500

var (
validateConfigTimeout = 5 * time.Second
errMessageExitStatus143 = "exit status 143"
Expand Down Expand Up @@ -75,7 +71,6 @@ func NewManager(
packagesDir: options.PackagesDir,
ftdc: options.FTDC,
}
ret.nextPort.Store(tcpPortRange)
return ret
}

Expand Down Expand Up @@ -109,8 +104,6 @@ type module struct {
inRecoveryLock sync.Mutex
logger logging.Logger
ftdc *ftdc.FTDC
// port stores the listen port of this module when ViamTCPSockets() = true.
port int
}

type addedResource struct {
Expand Down Expand Up @@ -191,8 +184,6 @@ type Manager struct {
restartCtx context.Context
restartCtxCancel context.CancelFunc
ftdc *ftdc.FTDC
// nextPort manages ports when ViamTCPSockets() = true.
nextPort atomic.Int32
}

// Close terminates module connections and processes.
Expand Down Expand Up @@ -350,7 +341,6 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo
resources: map[resource.Name]*addedResource{},
logger: moduleLogger,
ftdc: mgr.ftdc,
port: int(mgr.nextPort.Add(1)),
}

if err := mgr.startModule(ctx, mod); err != nil {
Expand Down Expand Up @@ -1025,10 +1015,10 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
func (m *module) dial() error {
// TODO(PRODUCT-343): session support probably means interceptors here
var err error
addrToDial := m.addr
if !rutils.TCPRegex.MatchString(addrToDial) {
addrToDial = "unix://" + m.addr
}
// Unix socket paths on Windows are formatted with "C:" and backslashes (\), which are incompatible
// with url.Parse(). Reformat the path to use forward slashes (/) and remove the "C:" prefix for compatibility.
addrToDial := "unix://" + m.addr

conn, err := grpc.Dial( //nolint:staticcheck
addrToDial,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(rpc.MaxMessageSize)),
Expand Down Expand Up @@ -1139,15 +1129,11 @@ func (m *module) startProcess(
) error {
var err error

if rutils.ViamTCPSockets() {
m.addr = "127.0.0.1:" + strconv.Itoa(m.port)
} else {
// append a random alpha string to the module name while creating a socket address to avoid conflicts
// with old versions of the module.
if m.addr, err = modlib.CreateSocketAddress(
filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil {
return err
}
// append a random alpha string to the module name while creating a socket address to avoid conflicts
// with old versions of the module.
if m.addr, err = modlib.CreateSocketAddress(
filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil {
return err
}

// We evaluate the Module's ExePath absolutely in the viam-server process so that
Expand Down Expand Up @@ -1215,15 +1201,12 @@ func (m *module) startProcess(
)
}
}
if !rutils.TCPRegex.MatchString(m.addr) {
// note: we don't do this check in TCP mode because TCP addresses are not file paths and will fail check.
err = modlib.CheckSocketOwner(m.addr)
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return errors.WithMessage(err, "module startup failed")
}
err = modlib.CheckSocketOwner(m.addr)
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return errors.WithMessage(err, "module startup failed")
}
break
}
Expand Down
11 changes: 2 additions & 9 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os/exec"
"path/filepath"
"runtime"
"strconv"
"sync/atomic"
"syscall"
"testing"
Expand All @@ -37,7 +36,6 @@ import (
modmanageroptions "go.viam.com/rdk/module/modmanager/options"
"go.viam.com/rdk/module/modmaninterface"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot/web"
rtestutils "go.viam.com/rdk/testutils"
rutils "go.viam.com/rdk/utils"
)
Expand All @@ -49,12 +47,8 @@ func setupSocketWithRobot(t *testing.T) string {

var socketAddress string
var err error
if rutils.ViamTCPSockets() {
socketAddress = "127.0.0.1:" + strconv.Itoa(web.TestTCPParentPort)
} else {
socketAddress, err = modlib.CreateSocketAddress(t.TempDir(), "parent")
test.That(t, err, test.ShouldBeNil)
}
socketAddress, err = modlib.CreateSocketAddress(t.TempDir(), "parent")
test.That(t, err, test.ShouldBeNil)

rtestutils.MakeRobotForModuleLogging(t, socketAddress)
return socketAddress
Expand Down Expand Up @@ -136,7 +130,6 @@ func TestModManagerFunctions(t *testing.T) {
},
dataDir: "module-data-dir",
logger: logger,
port: tcpPortRange,
}

err = mod.startProcess(ctx, parentAddr, nil, viamHomeTemp, filepath.Join(viamHomeTemp, "packages"))
Expand Down
25 changes: 15 additions & 10 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@ func CreateSocketAddress(parentDir, desiredName string) (string, error) {
// Assemble the truncated socket address
socketHashSuffix := desiredNameHash[:socketHashSuffixLength]
truncatedName := desiredName[:(numRemainingChars - socketHashSuffixLength - 1)]
return filepath.Join(baseAddr, fmt.Sprintf("%s-%s%s", truncatedName, socketHashSuffix, socketSuffix)), nil
addr := strings.Replace(
strings.ReplaceAll(
filepath.Join(
baseAddr,
fmt.Sprintf("%s-%s%s", truncatedName, socketHashSuffix, socketSuffix)),
`\`,
`/`),
"C:",
"",
1,
)
return addr, nil
}

// HandlerMap is the format for api->model pairs that the module will service.
Expand Down Expand Up @@ -283,9 +294,6 @@ func (m *Module) Start(ctx context.Context) error {

var lis net.Listener
prot := "unix"
if rutils.TCPRegex.MatchString(m.addr) {
prot = "tcp"
}
if err := MakeSelfOwnedFilesFunc(func() error {
var err error
lis, err = net.Listen(prot, m.addr)
Expand Down Expand Up @@ -355,13 +363,10 @@ func (m *Module) connectParent(ctx context.Context) error {
return nil
}

fullAddr := m.parentAddr
if !rutils.TCPRegex.MatchString(m.parentAddr) {
if err := CheckSocketOwner(m.parentAddr); err != nil {
return err
}
fullAddr = "unix://" + m.parentAddr
if err := CheckSocketOwner(m.parentAddr); err != nil {
return err
}
fullAddr := "unix://" + m.parentAddr

// moduleLoggers may be creating the client connection below, so use a
// different logger here to avoid a deadlock where the client connection
Expand Down
15 changes: 4 additions & 11 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -168,17 +167,11 @@ func (svc *webService) StartModule(ctx context.Context) error {
if err != nil {
return errors.WithMessage(err, "module startup failed")
}

if rutils.ViamTCPSockets() {
addr = "127.0.0.1:" + strconv.Itoa(TCPParentPort)
lis, err = net.Listen("tcp", addr)
} else {
addr, err = module.CreateSocketAddress(dir, "parent")
if err != nil {
return errors.WithMessage(err, "module startup failed")
}
lis, err = net.Listen("unix", addr)
addr, err = module.CreateSocketAddress(dir, "parent")
if err != nil {
return errors.WithMessage(err, "module startup failed")
}
lis, err = net.Listen("unix", addr)
if err != nil {
return errors.WithMessage(err, "failed to listen")
}
Expand Down
4 changes: 0 additions & 4 deletions testutils/module_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.viam.com/utils/rpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/utils"
)

type mockRobotService struct {
Expand All @@ -27,9 +26,6 @@ func (ms *mockRobotService) Log(ctx context.Context, req *robotpb.LogRequest) (*
func MakeRobotForModuleLogging(t *testing.T, parentAddr string) rpc.Server {
logger := logging.NewTestLogger(t)
prot := "unix"
if utils.TCPRegex.MatchString(parentAddr) {
prot = "tcp"
}
listener, err := net.Listen(prot, parentAddr)
test.That(t, err, test.ShouldBeNil)
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
Expand Down
14 changes: 0 additions & 14 deletions utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package utils

import (
"os"
"regexp"
"runtime"
"slices"
"time"

"go.viam.com/rdk/logging"
Expand Down Expand Up @@ -35,9 +33,6 @@ const (
// EnvTrueValues contains strings that we interpret as boolean true in env vars.
var EnvTrueValues = []string{"true", "yes", "1", "TRUE", "YES"}

// TCPRegex tests whether a module address is TCP (vs unix sockets). See also ViamTCPSockets().
var TCPRegex = regexp.MustCompile(`:\d+$`)

// GetResourceConfigurationTimeout calculates the resource configuration
// timeout (env variable value if set, DefaultResourceConfigurationTimeout
// otherwise).
Expand Down Expand Up @@ -86,12 +81,3 @@ func PlatformMkdirTemp(dir, pattern string) (string, error) {
}
return os.MkdirTemp(dir, pattern)
}

// ViamTCPSockets returns true if an env is set or if the platform requires it.
func ViamTCPSockets() bool {
// note: unix sockets have been supported on windows for a while, but go-grpc does not support them.
// 2017 support announcement: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
// go grpc client bug on win: https://github.com/dotnet/aspnetcore/issues/47043
return runtime.GOOS == "windows" ||
slices.Contains(EnvTrueValues, os.Getenv("VIAM_TCP_SOCKETS"))
}
Loading