diff --git a/components/camera/server.go b/components/camera/server.go index 3f7805057db..6051a5b9175 100644 --- a/components/camera/server.go +++ b/components/camera/server.go @@ -180,6 +180,10 @@ func (s *serviceServer) GetPointCloud( return nil, err } + if camClient, ok := camera.(*client); ok { + return camClient.client.GetPointCloud(ctx, req) + } + pc, err := camera.NextPointCloud(ctx, req.Extra.AsMap()) if err != nil { return nil, err diff --git a/module/frame_system.go b/module/frame_system.go new file mode 100644 index 00000000000..4cf06340738 --- /dev/null +++ b/module/frame_system.go @@ -0,0 +1,64 @@ +package module + +import ( + "context" + + "go.viam.com/rdk/pointcloud" + "go.viam.com/rdk/referenceframe" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot/client" + "go.viam.com/rdk/robot/framesystem" +) + +type frameSystemClient struct { + robotClient *client.RobotClient + resource.TriviallyCloseable + resource.TriviallyReconfigurable +} + +// NewFrameSystemClient provides access to only the framesystem.Service functions contained inside RobotClient. +func NewFrameSystemClient(robotClient *client.RobotClient) framesystem.Service { + return &frameSystemClient{robotClient: robotClient} +} + +func (f *frameSystemClient) Name() resource.Name { + return framesystem.PublicServiceName +} + +func (f *frameSystemClient) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { + return nil, resource.ErrDoUnimplemented +} + +func (f *frameSystemClient) FrameSystemConfig(ctx context.Context) (*framesystem.Config, error) { + return f.robotClient.FrameSystemConfig(ctx) +} + +func (f *frameSystemClient) GetPose(ctx context.Context, + componentName, destinationFrame string, + supplementalTransforms []*referenceframe.LinkInFrame, + extra map[string]interface{}, +) (*referenceframe.PoseInFrame, error) { + return f.robotClient.GetPose(ctx, componentName, destinationFrame, supplementalTransforms, extra) +} + +func (f *frameSystemClient) TransformPose( + ctx context.Context, + pose *referenceframe.PoseInFrame, + dst string, + supplementalTransforms []*referenceframe.LinkInFrame, +) (*referenceframe.PoseInFrame, error) { + return f.robotClient.TransformPose(ctx, pose, dst, supplementalTransforms) +} + +func (f *frameSystemClient) TransformPointCloud( + ctx context.Context, + srcpc pointcloud.PointCloud, + srcName, + dstName string, +) (pointcloud.PointCloud, error) { + return f.robotClient.TransformPointCloud(ctx, srcpc, srcName, dstName) +} + +func (f *frameSystemClient) CurrentInputs(ctx context.Context) (referenceframe.FrameSystemInputs, error) { + return f.robotClient.CurrentInputs(ctx) +} diff --git a/module/module.go b/module/module.go index 6a256821c28..a5ed50718a9 100644 --- a/module/module.go +++ b/module/module.go @@ -4,45 +4,30 @@ import ( "context" "crypto/sha256" "encoding/base32" + "errors" "fmt" "net" "os" "path/filepath" "sync" - "time" - "github.com/fullstorydev/grpcurl" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "github.com/jhump/protoreflect/desc" - "github.com/jhump/protoreflect/grpcreflect" - "github.com/pion/rtp" - "github.com/pkg/errors" "github.com/viamrobotics/webrtc/v3" - "go.opencensus.io/trace" - "go.uber.org/multierr" pb "go.viam.com/api/module/v1" robotpb "go.viam.com/api/robot/v1" streampb "go.viam.com/api/stream/v1" "go.viam.com/utils" "go.viam.com/utils/rpc" - "golang.org/x/exp/maps" "google.golang.org/grpc" - reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "go.viam.com/rdk/components/camera/rtppassthrough" // Register component APIs. _ "go.viam.com/rdk/components/register_apis" - "go.viam.com/rdk/config" rgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" "go.viam.com/rdk/operation" - "go.viam.com/rdk/pointcloud" - "go.viam.com/rdk/protoutils" - "go.viam.com/rdk/referenceframe" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot/client" - "go.viam.com/rdk/robot/framesystem" - "go.viam.com/rdk/services/discovery" // Register service APIs. _ "go.viam.com/rdk/services/register_apis" rutils "go.viam.com/rdk/utils" @@ -65,9 +50,6 @@ const ( NoModuleParentEnvVar = "VIAM_NO_MODULE_PARENT" ) -// errMaxSupportedWebRTCTrackLimit is the error returned when the MaxSupportedWebRTCTRacks limit is reached. -var errMaxSupportedWebRTCTrackLimit = fmt.Errorf("only %d WebRTC tracks are supported per peer connection", maxSupportedWebRTCTRacks) - // CreateSocketAddress returns a socket address of the form parentDir/desiredName.sock // if it is shorter than the socketMaxAddressLength. If this path would be too long, this function // truncates desiredName and returns parentDir/truncatedName-hashOfDesiredName.sock. @@ -80,7 +62,7 @@ func CreateSocketAddress(parentDir, desiredName string) (string, error) { len(socketSuffix) - 1 // `/` between baseAddr and name if numRemainingChars < len(desiredName) && numRemainingChars < socketHashSuffixLength+1 { - return "", errors.Errorf("module socket base path would result in a path greater than the OS limit of %d characters: %s", + return "", fmt.Errorf("module socket base path would result in a path greater than the OS limit of %d characters: %s", socketMaxAddressLength, baseAddr) } // If possible, early-exit with a non-truncated socket path @@ -91,12 +73,12 @@ func CreateSocketAddress(parentDir, desiredName string) (string, error) { desiredNameHashCreator := sha256.New() _, err := desiredNameHashCreator.Write([]byte(desiredName)) if err != nil { - return "", errors.Errorf("failed to calculate a hash for %q while creating a truncated socket address", desiredName) + return "", fmt.Errorf("failed to calculate a hash for %q while creating a truncated socket address", desiredName) } desiredNameHash := base32.StdEncoding.EncodeToString(desiredNameHashCreator.Sum(nil)) if len(desiredNameHash) < socketHashSuffixLength { // sha256.Sum() should return 32 bytes so this shouldn't occur, but good to check instead of panicing - return "", errors.Errorf("the encoded hash %q for %q is shorter than the minimum socket suffix length %v", + return "", fmt.Errorf("the encoded hash %q for %q is shorter than the minimum socket suffix length %v", desiredNameHash, desiredName, socketHashSuffixLength) } // Assemble the truncated socket address @@ -105,109 +87,57 @@ func CreateSocketAddress(parentDir, desiredName string) (string, error) { return filepath.Join(baseAddr, fmt.Sprintf("%s-%s%s", truncatedName, socketHashSuffix, socketSuffix)), nil } -// HandlerMap is the format for api->model pairs that the module will service. -// Ex: mymap["rdk:component:motor"] = ["acme:marine:thruster", "acme:marine:outboard"]. -type HandlerMap map[resource.RPCAPI][]resource.Model - -// ToProto converts the HandlerMap to a protobuf representation. -func (h HandlerMap) ToProto() *pb.HandlerMap { - pMap := &pb.HandlerMap{} - for s, models := range h { - subtype := &robotpb.ResourceRPCSubtype{ - Subtype: protoutils.ResourceNameToProto(resource.Name{ - API: s.API, - Name: "", - }), - ProtoService: s.ProtoSvcName, - } - - handler := &pb.HandlerDefinition{Subtype: subtype} - for _, m := range models { - handler.Models = append(handler.Models, m.String()) - } - pMap.Handlers = append(pMap.Handlers, handler) - } - return pMap -} - -// NewHandlerMapFromProto converts protobuf to HandlerMap. -func NewHandlerMapFromProto(ctx context.Context, pMap *pb.HandlerMap, conn rpc.ClientConn) (HandlerMap, error) { - hMap := make(HandlerMap) - refClient := grpcreflect.NewClientV1Alpha(ctx, reflectpb.NewServerReflectionClient(conn)) - defer refClient.Reset() - reflSource := grpcurl.DescriptorSourceFromServer(ctx, refClient) - - var errs error - for _, h := range pMap.GetHandlers() { - api := protoutils.ResourceNameFromProto(h.Subtype.Subtype).API - rpcAPI := &resource.RPCAPI{ - API: api, - } - // due to how tagger is setup in the api we cannot use reflection on the discovery service currently - // for now we will skip the reflection step for discovery until the issue is resolved. - // TODO(RSDK-9718) - remove the skip. - if api != discovery.API { - symDesc, err := reflSource.FindSymbol(h.Subtype.ProtoService) - if err != nil { - errs = multierr.Combine(errs, err) - if errors.Is(err, grpcurl.ErrReflectionNotSupported) { - return nil, errs - } - continue - } - svcDesc, ok := symDesc.(*desc.ServiceDescriptor) - if !ok { - return nil, errors.Errorf("expected descriptor to be service descriptor but got %T", symDesc) - } - rpcAPI.Desc = svcDesc - } - for _, m := range h.Models { - model, err := resource.NewModelFromString(m) - if err != nil { - return nil, err - } - hMap[*rpcAPI] = append(hMap[*rpcAPI], model) - } - } - return hMap, errs -} - -type peerResourceState struct { - // NOTE As I'm only suppporting video to start this will always be a single element - // once we add audio we will need to make this a slice / map - subID rtppassthrough.SubscriptionID -} - // Module represents an external resource module that services components/services. type Module struct { // The name of the module as per the robot config. This value is communicated via the // `VIAM_MODULE_NAME` env var. name string + // mu protects high level operations. Specifically, reconfiguring resources, removing resources and shutdown. + mu sync.Mutex + + // registerMu protects the maps immediately below as resources/streams come in and out of existence + registerMu sync.Mutex + collections map[resource.API]resource.APIResourceCollection[resource.Resource] + // internalDeps is keyed by a "child" resource and its values are "internal" resources that + // depend on the child. We use a pointer for the value such that it's stable across map growth. + // Similarly, the slice of `resConfigureArgs` can grow, hence we must use pointers such that + // modifiying in place remains valid. + internalDeps map[resource.Resource][]resConfigureArgs + resLoggers map[resource.Resource]logging.Logger + activeResourceStreams map[resource.Name]peerResourceState + streamSourceByName map[resource.Name]rtppassthrough.Source + ready bool shutdownCtx context.Context shutdownFn context.CancelFunc - parent *client.RobotClient - server rpc.Server - logger logging.Logger - mu sync.Mutex - activeResourceStreams map[resource.Name]peerResourceState - streamSourceByName map[resource.Name]rtppassthrough.Source - operations *operation.Manager - ready bool - addr string - parentAddr string activeBackgroundWorkers sync.WaitGroup - handlers HandlerMap - collections map[resource.API]resource.APIResourceCollection[resource.Resource] - resLoggers map[resource.Resource]logging.Logger closeOnce sync.Once - pc *webrtc.PeerConnection - pcReady <-chan struct{} - pcClosed <-chan struct{} - pcFailed <-chan struct{} + + // operations is expected to manage concurrency internally. + operations *operation.Manager + server rpc.Server + handlers HandlerMap pb.UnimplementedModuleServiceServer streampb.UnimplementedStreamServiceServer robotpb.UnimplementedRobotServiceServer + + addr string + parent *client.RobotClient + parentAddr string + pc *webrtc.PeerConnection + pcReady <-chan struct{} + pcClosed <-chan struct{} + pcFailed <-chan struct{} + + logger logging.Logger +} + +// NewModuleFromArgs directly parses the command line argument to get its address. +func NewModuleFromArgs(ctx context.Context) (*Module, error) { + if len(os.Args) < 2 { + return nil, errors.New("need socket path as command line argument") + } + return NewModule(ctx, os.Args[1], NewLoggerFromArgs("")) } // NewModule returns the basic module framework/structure. Use ModularMain and NewModuleFromArgs unless @@ -249,6 +179,7 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod handlers: HandlerMap{}, collections: map[resource.API]resource.APIResourceCollection[resource.Resource]{}, resLoggers: map[resource.Resource]logging.Logger{}, + internalDeps: map[resource.Resource][]resConfigureArgs{}, } if err := m.server.RegisterServiceServer(ctx, &pb.ModuleService_ServiceDesc, m); err != nil { return nil, err @@ -264,16 +195,15 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod // attempt to construct a PeerConnection pc, err := rgrpc.NewLocalPeerConnection(logger) if err != nil { - logger.Debugw("Unable to create optional peer connection for module. Skipping WebRTC for module...", "err", err) + logger.Debugw("Unable to create optional peer connection for module. Skipping WebRTC for module.", "err", err) return m, nil } // attempt to configure PeerConnection pcReady, pcClosed, err := rpc.ConfigureForRenegotiation(pc, rpc.PeerRoleServer, logger) if err != nil { - msg := "Error creating renegotiation channel for module. Unable to " + - "create optional peer connection for module. Skipping WebRTC for module..." - logger.Debugw(msg, "err", err) + logger.Debugw("Error creating renegotiation channel for module. Unable to create optional peer connection "+ + "for module. Skipping WebRTC for module.", "err", err) return m, nil } @@ -284,19 +214,13 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod return m, nil } -// NewModuleFromArgs directly parses the command line argument to get its address. -func NewModuleFromArgs(ctx context.Context) (*Module, error) { - if len(os.Args) < 2 { - return nil, errors.New("need socket path as command line argument") - } - return NewModule(ctx, os.Args[1], NewLoggerFromArgs("")) +// OperationManager returns the operation manager for the module. +func (m *Module) OperationManager() *operation.Manager { + return m.operations } // Start starts the module service and grpc server. func (m *Module) Start(ctx context.Context) error { - m.mu.Lock() - defer m.mu.Unlock() - var lis net.Listener prot := "unix" if rutils.TCPRegex.MatchString(m.addr) { @@ -306,7 +230,7 @@ func (m *Module) Start(ctx context.Context) error { var err error lis, err = net.Listen(prot, m.addr) if err != nil { - return errors.WithMessage(err, "failed to listen") + return fmt.Errorf("failed to listen: %w", err) } return nil }); err != nil { @@ -328,6 +252,10 @@ func (m *Module) Start(ctx context.Context) error { // Close shuts down the module and grpc server. func (m *Module) Close(ctx context.Context) { + // Dan: This feels unnecessary. The PR comment regarding windows and process shut down does not + // give any insight in how module management code that calls `Close` a single time somehow + // happens twice. I expect if this was real, it's because `Close` was called directly in process + // signal handling that no longer exists. m.closeOnce.Do(func() { m.shutdownFn() m.mu.Lock() @@ -351,16 +279,6 @@ func (m *Module) Close(ctx context.Context) { }) } -// GetParentResource returns a resource from the parent robot by name. -func (m *Module) GetParentResource(ctx context.Context, name resource.Name) (resource.Resource, error) { - // Refresh parent to ensure it has the most up-to-date resources before calling - // ResourceByName. - if err := m.parent.Refresh(ctx); err != nil { - return nil, err - } - return m.parent.ResourceByName(name) -} - func (m *Module) connectParent(ctx context.Context) error { // If parent connection has already been made, do not make another one. Some // tests send two ReadyRequests sequentially, and if an rdk were to retry @@ -421,6 +339,11 @@ func (m *Module) PeerConnect(encodedOffer string) (string, error) { return "", errors.New("no PeerConnection object") } + if encodedOffer == "" { + //nolint + return "", errors.New("Server not running with WebRTC enabled.") + } + offer := webrtc.SessionDescription{} if err := rpc.DecodeSDP(encodedOffer, &offer); err != nil { return "", err @@ -450,7 +373,7 @@ func (m *Module) Ready(ctx context.Context, req *pb.ReadyRequest) (*pb.ReadyResp if err == nil { resp.WebrtcAnswer = encodedAnswer } else { - m.logger.Debugw("Unable to create optional peer connection for module. Skipping WebRTC for module...", "err", err) + m.logger.Debugw("Unable to create optional peer connection for module. Skipping WebRTC for module.", "err", err) pcFailed := make(chan struct{}) close(pcFailed) m.pcFailed = pcFailed @@ -479,544 +402,3 @@ func (m *Module) Ready(ctx context.Context, req *pb.ReadyRequest) (*pb.ReadyResp resp.Handlermap = m.handlers.ToProto() return resp, nil } - -type frameSystemClient struct { - robotClient *client.RobotClient - resource.TriviallyCloseable - resource.TriviallyReconfigurable -} - -// NewFrameSystemClient provides access to only the framesystem.Service functions contained inside RobotClient. -func NewFrameSystemClient(robotClient *client.RobotClient) framesystem.Service { - return &frameSystemClient{robotClient: robotClient} -} - -func (f *frameSystemClient) Name() resource.Name { - return framesystem.PublicServiceName -} - -func (f *frameSystemClient) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - return nil, resource.ErrDoUnimplemented -} - -func (f *frameSystemClient) FrameSystemConfig(ctx context.Context) (*framesystem.Config, error) { - return f.robotClient.FrameSystemConfig(ctx) -} - -func (f *frameSystemClient) GetPose(ctx context.Context, - componentName, destinationFrame string, - supplementalTransforms []*referenceframe.LinkInFrame, - extra map[string]interface{}, -) (*referenceframe.PoseInFrame, error) { - return f.robotClient.GetPose(ctx, componentName, destinationFrame, supplementalTransforms, extra) -} - -func (f *frameSystemClient) TransformPose( - ctx context.Context, - pose *referenceframe.PoseInFrame, - dst string, - supplementalTransforms []*referenceframe.LinkInFrame, -) (*referenceframe.PoseInFrame, error) { - return f.robotClient.TransformPose(ctx, pose, dst, supplementalTransforms) -} - -func (f *frameSystemClient) TransformPointCloud( - ctx context.Context, - srcpc pointcloud.PointCloud, - srcName, - dstName string, -) (pointcloud.PointCloud, error) { - return f.robotClient.TransformPointCloud(ctx, srcpc, srcName, dstName) -} - -func (f *frameSystemClient) CurrentInputs(ctx context.Context) (referenceframe.FrameSystemInputs, error) { - return f.robotClient.CurrentInputs(ctx) -} - -// AddResource receives the component/service configuration from the parent. -func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (*pb.AddResourceResponse, error) { - select { - case <-m.pcReady: - case <-m.pcFailed: - } - - deps := make(resource.Dependencies) - for _, c := range req.Dependencies { - name, err := resource.NewFromString(c) - if err != nil { - return nil, err - } - c, err := m.GetParentResource(ctx, name) - if err != nil { - return nil, err - } - deps[name] = c - } - - // let modules access RobotFrameSystem (name $framesystem) without needing entire RobotClient - deps[framesystem.PublicServiceName] = NewFrameSystemClient(m.parent) - - conf, err := config.ComponentConfigFromProto(req.Config, m.logger) - if err != nil { - return nil, err - } - - if err := addConvertedAttributes(conf); err != nil { - return nil, errors.Wrapf(err, "unable to convert attributes when adding resource") - } - - resInfo, ok := resource.LookupRegistration(conf.API, conf.Model) - if !ok { - return nil, errors.Errorf("do not know how to construct %q", conf.API) - } - if resInfo.Constructor == nil { - return nil, errors.Errorf("invariant: no constructor for %q", conf.API) - } - resLogger := m.logger.Sublogger(conf.ResourceName().String()) - levelStr := req.Config.GetLogConfiguration().GetLevel() - // An unset LogConfiguration will materialize as an empty string. - if levelStr != "" { - if level, err := logging.LevelFromString(levelStr); err == nil { - resLogger.SetLevel(level) - } else { - m.logger.Warnw("LogConfiguration does not contain a valid level.", "resource", conf.ResourceName().Name, "level", levelStr) - } - } - - res, err := resInfo.Constructor(ctx, deps, *conf, resLogger) - if err != nil { - return nil, err - } - - // If context has errored, even if construction succeeded we should close the resource and return the context error. - // Use shutdownCtx because otherwise any Close operations that rely on the context will immediately fail. - // The deadline associated with the context passed in to this function is rutils.GetResourceConfigurationTimeout, - // which is propagated to AddResource through gRPC. - if ctx.Err() != nil { - m.logger.CDebugw(ctx, "resource successfully constructed but context is done, closing constructed resource", "err", ctx.Err().Error()) - return nil, multierr.Combine(ctx.Err(), res.Close(m.shutdownCtx)) - } - - var passthroughSource rtppassthrough.Source - if p, ok := res.(rtppassthrough.Source); ok { - passthroughSource = p - } - - m.mu.Lock() - defer m.mu.Unlock() - coll, ok := m.collections[conf.API] - if !ok { - return nil, errors.Errorf("module cannot service api: %s", conf.API) - } - - // If adding the resource name to the collection fails, close the resource - // and return an error - if err := coll.Add(conf.ResourceName(), res); err != nil { - return nil, multierr.Combine(err, res.Close(ctx)) - } - - m.resLoggers[res] = resLogger - - // add the video stream resources upon creation - if passthroughSource != nil { - m.streamSourceByName[res.Name()] = passthroughSource - } - return &pb.AddResourceResponse{}, nil -} - -// ReconfigureResource receives the component/service configuration from the parent. -func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) { - var res resource.Resource - deps := make(resource.Dependencies) - for _, c := range req.Dependencies { - name, err := resource.NewFromString(c) - if err != nil { - return nil, err - } - c, err := m.GetParentResource(ctx, name) - if err != nil { - return nil, err - } - deps[name] = c - } - - // let modules access RobotFrameSystem (name $framesystem) without needing entire RobotClient - deps[framesystem.PublicServiceName] = NewFrameSystemClient(m.parent) - - // it is assumed the caller robot has handled model differences - conf, err := config.ComponentConfigFromProto(req.Config, m.logger) - if err != nil { - return nil, err - } - - if err := addConvertedAttributes(conf); err != nil { - return nil, errors.Wrapf(err, "unable to convert attributes when reconfiguring resource") - } - - m.mu.Lock() - defer m.mu.Unlock() - - coll, ok := m.collections[conf.API] - if !ok { - return nil, errors.Errorf("no rpc service for %+v", conf) - } - res, err = coll.Resource(conf.ResourceName().Name) - if err != nil { - return nil, err - } - - if logger, ok := m.resLoggers[res]; ok { - levelStr := req.GetConfig().GetLogConfiguration().GetLevel() - // An unset LogConfiguration will materialize as an empty string. - if levelStr != "" { - if level, err := logging.LevelFromString(levelStr); err == nil { - logger.SetLevel(level) - } else { - m.logger.Warnw("LogConfiguration does not contain a valid level.", "resource", res.Name().Name, "level", levelStr) - } - } - } - - err = res.Reconfigure(ctx, deps, *conf) - if err == nil { - return &pb.ReconfigureResourceResponse{}, nil - } - - if !resource.IsMustRebuildError(err) { - return nil, err - } - - m.logger.Debugw("rebuilding", "name", conf.ResourceName().String()) - if err := res.Close(ctx); err != nil { - m.logger.Error(err) - } - - delete(m.activeResourceStreams, res.Name()) - resInfo, ok := resource.LookupRegistration(conf.API, conf.Model) - if !ok { - return nil, errors.Errorf("do not know how to construct %q", conf.API) - } - if resInfo.Constructor == nil { - return nil, errors.Errorf("invariant: no constructor for %q", conf.API) - } - - newRes, err := resInfo.Constructor(ctx, deps, *conf, m.logger) - if err != nil { - return nil, err - } - var passthroughSource rtppassthrough.Source - if p, ok := newRes.(rtppassthrough.Source); ok { - passthroughSource = p - } - - if passthroughSource != nil { - m.streamSourceByName[res.Name()] = passthroughSource - } - return &pb.ReconfigureResourceResponse{}, coll.ReplaceOne(conf.ResourceName(), newRes) -} - -// ValidateConfig receives the validation request for a resource from the parent. -func (m *Module) ValidateConfig(ctx context.Context, - req *pb.ValidateConfigRequest, -) (*pb.ValidateConfigResponse, error) { - c, err := config.ComponentConfigFromProto(req.Config, m.logger) - if err != nil { - return nil, err - } - - if err := addConvertedAttributes(c); err != nil { - return nil, errors.Wrapf(err, "unable to convert attributes for validation") - } - - if c.ConvertedAttributes != nil { - implicitRequiredDeps, implicitOptionalDeps, err := c.ConvertedAttributes.Validate(c.Name) - if err != nil { - return nil, errors.Wrapf(err, "error validating resource") - } - resp := &pb.ValidateConfigResponse{ - Dependencies: implicitRequiredDeps, - OptionalDependencies: implicitOptionalDeps, - } - return resp, nil - } - - // Resource configuration object does not implement Validate, but return an - // empty response and no error to maintain backward compatibility. - return &pb.ValidateConfigResponse{}, nil -} - -// RemoveResource receives the request for resource removal. -func (m *Module) RemoveResource(ctx context.Context, req *pb.RemoveResourceRequest) (*pb.RemoveResourceResponse, error) { - slowWatcher, slowWatcherCancel := utils.SlowGoroutineWatcher( - 30*time.Second, fmt.Sprintf("module resource %q is taking a while to remove", req.Name), m.logger) - defer func() { - slowWatcherCancel() - <-slowWatcher - }() - m.mu.Lock() - defer m.mu.Unlock() - - name, err := resource.NewFromString(req.Name) - if err != nil { - return nil, err - } - - coll, ok := m.collections[name.API] - if !ok { - return nil, errors.Errorf("no grpc service for %+v", name) - } - res, err := coll.Resource(name.Name) - if err != nil { - return nil, err - } - - if err := res.Close(ctx); err != nil { - m.logger.Error(err) - } - - delete(m.streamSourceByName, res.Name()) - delete(m.activeResourceStreams, res.Name()) - - return &pb.RemoveResourceResponse{}, coll.Remove(name) -} - -// addAPIFromRegistry adds a preregistered API (rpc API) to the module's services. -func (m *Module) addAPIFromRegistry(ctx context.Context, api resource.API) error { - m.mu.Lock() - defer m.mu.Unlock() - _, ok := m.collections[api] - if ok { - return nil - } - - apiInfo, ok := resource.LookupGenericAPIRegistration(api) - if !ok { - return errors.Errorf("invariant: registration does not exist for %q", api) - } - - newColl := apiInfo.MakeEmptyCollection() - m.collections[api] = newColl - - if !ok { - return nil - } - return apiInfo.RegisterRPCService(ctx, m.server, newColl) -} - -// AddModelFromRegistry adds a preregistered component or service model to the module's services. -func (m *Module) AddModelFromRegistry(ctx context.Context, api resource.API, model resource.Model) error { - err := validateRegistered(api, model) - if err != nil { - return err - } - - m.mu.Lock() - _, ok := m.collections[api] - m.mu.Unlock() - if !ok { - if err := m.addAPIFromRegistry(ctx, api); err != nil { - return err - } - } - - apiInfo, ok := resource.LookupGenericAPIRegistration(api) - if !ok { - return errors.Errorf("invariant: registration does not exist for %q", api) - } - if apiInfo.ReflectRPCServiceDesc == nil { - m.logger.Errorf("rpc subtype %s doesn't contain a valid ReflectRPCServiceDesc", api) - } - rpcAPI := resource.RPCAPI{ - API: api, - ProtoSvcName: apiInfo.RPCServiceDesc.ServiceName, - Desc: apiInfo.ReflectRPCServiceDesc, - } - - m.mu.Lock() - defer m.mu.Unlock() - m.handlers[rpcAPI] = append(m.handlers[rpcAPI], model) - return nil -} - -// OperationManager returns the operation manager for the module. -func (m *Module) OperationManager() *operation.Manager { - return m.operations -} - -// ListStreams lists the streams. -func (m *Module) ListStreams(ctx context.Context, req *streampb.ListStreamsRequest) (*streampb.ListStreamsResponse, error) { - _, span := trace.StartSpan(ctx, "module::module::ListStreams") - defer span.End() - names := make([]string, 0, len(m.streamSourceByName)) - for _, n := range maps.Keys(m.streamSourceByName) { - names = append(names, n.String()) - } - return &streampb.ListStreamsResponse{Names: names}, nil -} - -// AddStream adds a stream. -// Returns an error if: -// 1. there is no WebRTC peer connection with viam-sever -// 2. resource doesn't exist -// 3. the resource doesn't implement rtppassthrough.Source, -// 4. there are already the max number of supported tracks on the peer connection -// 5. SubscribeRTP returns an error -// 6. A webrtc track is unable to be created -// 7. Adding the track to the peer connection fails. -func (m *Module) AddStream(ctx context.Context, req *streampb.AddStreamRequest) (*streampb.AddStreamResponse, error) { - ctx, span := trace.StartSpan(ctx, "module::module::AddStream") - defer span.End() - name, err := resource.NewFromString(req.GetName()) - if err != nil { - return nil, err - } - m.mu.Lock() - defer m.mu.Unlock() - if m.pc == nil { - return nil, errors.New("module has no peer connection") - } - vcss, ok := m.streamSourceByName[name] - if !ok { - err := errors.New("unknown stream for resource") - m.logger.CWarnw(ctx, err.Error(), "name", name.String(), "streamSourceByName", fmt.Sprintf("%#v", m.streamSourceByName)) - return nil, err - } - - if _, ok = m.activeResourceStreams[name]; ok { - m.logger.CWarnw(ctx, "AddStream called with when there is already a stream for peer connection. NoOp", "name", name) - return &streampb.AddStreamResponse{}, nil - } - - if len(m.activeResourceStreams) >= maxSupportedWebRTCTRacks { - return nil, errMaxSupportedWebRTCTrackLimit - } - - tlsRTP, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: "video/H264"}, "video", name.String()) - if err != nil { - return nil, errors.Wrap(err, "error creating a new TrackLocalStaticRTP") - } - - sub, err := vcss.SubscribeRTP(ctx, rtpBufferSize, func(pkts []*rtp.Packet) { - for _, pkt := range pkts { - if err := tlsRTP.WriteRTP(pkt); err != nil { - m.logger.CWarnw(ctx, "SubscribeRTP callback function WriteRTP", "err", err) - } - } - }) - if err != nil { - return nil, errors.Wrap(err, "error setting up stream subscription") - } - - m.logger.CDebugw(ctx, "AddStream calling AddTrack", "name", name.String(), "subID", sub.ID.String()) - sender, err := m.pc.AddTrack(tlsRTP) - if err != nil { - err = errors.Wrap(err, "error adding track") - if unsubErr := vcss.Unsubscribe(ctx, sub.ID); unsubErr != nil { - return nil, multierr.Combine(err, unsubErr) - } - return nil, err - } - - removeTrackOnSubTerminate := func() { - defer m.logger.Debugw("RemoveTrack called on ", "name", name.String(), "subID", sub.ID.String()) - // wait until either the module is shutting down, or the subscription terminates - var msg string - select { - case <-sub.Terminated.Done(): - msg = "rtp_passthrough subscription expired, calling RemoveTrack" - case <-m.shutdownCtx.Done(): - msg = "module closing calling RemoveTrack" - } - // remove the track from the peer connection so that viam-server clients know that the stream has terminated - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Debugw(msg, "name", name.String(), "subID", sub.ID.String()) - delete(m.activeResourceStreams, name) - if err := m.pc.RemoveTrack(sender); err != nil { - m.logger.Warnf("RemoveTrack returned error", "name", name.String(), "subID", sub.ID.String(), "err", err) - } - } - m.activeBackgroundWorkers.Add(1) - utils.ManagedGo(removeTrackOnSubTerminate, m.activeBackgroundWorkers.Done) - - m.activeResourceStreams[name] = peerResourceState{subID: sub.ID} - return &streampb.AddStreamResponse{}, nil -} - -// RemoveStream removes a stream. -func (m *Module) RemoveStream(ctx context.Context, req *streampb.RemoveStreamRequest) (*streampb.RemoveStreamResponse, error) { - ctx, span := trace.StartSpan(ctx, "module::module::RemoveStream") - defer span.End() - name, err := resource.NewFromString(req.GetName()) - if err != nil { - return nil, err - } - m.mu.Lock() - defer m.mu.Unlock() - if m.pc == nil { - return nil, errors.New("module has no peer connection") - } - vcss, ok := m.streamSourceByName[name] - if !ok { - return nil, errors.Errorf("unknown stream for resource %s", name) - } - - prs, ok := m.activeResourceStreams[name] - if !ok { - return nil, errors.Errorf("stream %s is not active", name) - } - - if err := vcss.Unsubscribe(ctx, prs.subID); err != nil { - m.logger.CWarnw(ctx, "RemoveStream > Unsubscribe", "name", name.String(), "subID", prs.subID.String(), "err", err) - return nil, err - } - - delete(m.activeResourceStreams, name) - return &streampb.RemoveStreamResponse{}, nil -} - -// addConvertedAttributesToConfig uses the MapAttributeConverter to fill in the -// ConvertedAttributes field from the Attributes and AssociatedResourceConfigs. -func addConvertedAttributes(cfg *resource.Config) error { - // Try to find map converter for a resource. - reg, ok := resource.LookupRegistration(cfg.API, cfg.Model) - if ok && reg.AttributeMapConverter != nil { - converted, err := reg.AttributeMapConverter(cfg.Attributes) - if err != nil { - return errors.Wrapf(err, "error converting attributes for resource") - } - cfg.ConvertedAttributes = converted - } - - // Also try for associated configs (will only succeed if module itself registers the associated config API). - for subIdx, associatedConf := range cfg.AssociatedResourceConfigs { - conv, ok := resource.LookupAssociatedConfigRegistration(associatedConf.API) - if !ok { - continue - } - if conv.AttributeMapConverter != nil { - converted, err := conv.AttributeMapConverter(associatedConf.Attributes) - if err != nil { - return errors.Wrap(err, "error converting associated resource config attributes") - } - // associated resource configs for resources might be missing a resource name - // which can be inferred from its resource config. - converted.UpdateResourceNames(func(oldName resource.Name) resource.Name { - return cfg.ResourceName() - }) - cfg.AssociatedResourceConfigs[subIdx].ConvertedAttributes = converted - } - } - return nil -} - -// validateRegistered returns an error if the passed-in api and model have not -// yet been registered. -func validateRegistered(api resource.API, model resource.Model) error { - resInfo, ok := resource.LookupRegistration(api, model) - if ok && resInfo.Constructor != nil { - return nil - } - - return errors.Errorf("resource with API %s and model %s not yet registered", api, model) -} diff --git a/module/module_functional_test.go b/module/module_functional_test.go new file mode 100644 index 00000000000..b22f0bab671 --- /dev/null +++ b/module/module_functional_test.go @@ -0,0 +1,316 @@ +package module + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + + v1 "go.viam.com/api/app/v1" + pb "go.viam.com/api/module/v1" + "go.viam.com/test" + "go.viam.com/utils" + "go.viam.com/utils/protoutils" + + "go.viam.com/rdk/components/generic" + "go.viam.com/rdk/logging" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/shell" +) + +// setupLocalModule sets up a module without a parent connection. +func setupLocalModule(t *testing.T, ctx context.Context, logger logging.Logger) *Module { + t.Helper() + + // Use 'foo.sock' for arbitrary module to test AddModelFromRegistry. + m, err := NewModule(ctx, filepath.Join(t.TempDir(), "foo.sock"), logger) + test.That(t, err, test.ShouldBeNil) + t.Cleanup(func() { + m.Close(ctx) + }) + + // Hit Ready as a way to close m.pcFailed, so that AddResource can proceed. Set + // NoModuleParentEnvVar so that parent connection will not be attempted. + test.That(t, os.Setenv(NoModuleParentEnvVar, "true"), test.ShouldBeNil) + t.Cleanup(func() { + test.That(t, os.Unsetenv(NoModuleParentEnvVar), test.ShouldBeNil) + }) + + _, err = m.Ready(ctx, &pb.ReadyRequest{}) + test.That(t, err, test.ShouldBeNil) + return m +} + +type doCommandDependerConfig struct { + DependsOn []string +} + +func (dcdc *doCommandDependerConfig) Validate(path string) ([]string, []string, error) { + if len(dcdc.DependsOn) > 0 { + return []string{}, dcdc.DependsOn, nil + } + + return []string{}, []string{}, nil +} + +type doCommandDepender struct { + resource.Named + resource.AlwaysRebuild + resource.TriviallyCloseable + + dependsOn []resource.Resource + closed bool + + logger logging.Logger +} + +func (dcd *doCommandDepender) Close(ctx context.Context) error { + dcd.logger.Infof("Closing %v %p, depends: %p", dcd.Name(), dcd, dcd.dependsOn) + dcd.closed = true + return nil +} + +func (dcd *doCommandDepender) DoCommand(ctx context.Context, cmd map[string]any) (map[string]any, error) { + if dcd.closed { + return nil, errors.New("was closed") + } + + // Dan: I do some fancy stuff here to chain the API calls through to dependencies. But the + // values are not asserted on. They were just deemed to be some way to summarize the path a + // request took. + switch { + case len(dcd.dependsOn) == 0: + return map[string]any{ + "outgoing": 0, + "incoming": 1, + }, nil + + case len(dcd.dependsOn) == 1: + if val, exists := cmd["outgoing"]; exists { + cmd["outgoing"] = val.(int) + 1 + } else { + cmd["outgoing"] = 1 + } + + resp, err := dcd.dependsOn[0].DoCommand(ctx, cmd) + if err != nil { + return nil, err + } + + resp["incoming"] = resp["incoming"].(int) + 1 + return resp, nil + default: + // > 1 + for _, dep := range dcd.dependsOn { + _, err := dep.DoCommand(ctx, cmd) + if err != nil { + return nil, err + } + } + + return map[string]any{ + "outgoing": len(dcd.dependsOn), + "incoming": 1, + }, nil + } +} + +// testLocal gets a fresh local resource for each input string and asserts its `DoCommand` succeeds. +func testLocal(ctx context.Context, t *testing.T, mod *Module, resStrings ...string) { + t.Helper() + for _, resStr := range resStrings { + res, err := mod.getLocalResource(ctx, generic.Named(resStr)) + test.That(t, err, test.ShouldBeNil) + + _, err = res.DoCommand(ctx, map[string]any{}) + test.That(t, err, test.ShouldBeNil) + } +} + +func TestOptimizedModuleCommunication(t *testing.T) { + ctx := t.Context() + logger := logging.NewTestLogger(t) + + // We expose one model that satisfies the (component) generic API and can have a variable number + // of dependents. The test will create multiple of these components that may or may not depend + // on others. + modelName := utils.RandomAlphaString(20) + model := resource.DefaultModelFamily.WithModel(modelName) + logger.Info("Randomized model name:", modelName) + + // We will use this to count how often a particular resource gets constructed. + superConstructCount := 0 + resource.RegisterComponent(generic.API, model, resource.Registration[resource.Resource, *doCommandDependerConfig]{ + Constructor: func( + ctx context.Context, deps resource.Dependencies, rcfg resource.Config, logger logging.Logger, + ) (resource.Resource, error) { + if rcfg.Name == "super" { + superConstructCount++ + } + + cfg, err := resource.NativeConfig[*doCommandDependerConfig](rcfg) + if err != nil { + return nil, err + } + + ret := &doCommandDepender{ + Named: rcfg.ResourceName().AsNamed(), + logger: logger, + } + + for _, depStr := range cfg.DependsOn { + dep, err := generic.FromProvider(deps, depStr) + if err != nil { + return nil, err + } + + ret.dependsOn = append(ret.dependsOn, dep) + } + logger.Infof("I %v (%p) depend on %p Config.DependsOn: %v", + rcfg.Name, ret, ret.dependsOn, cfg.DependsOn) + + return ret, nil + }, + }) + t.Cleanup(func() { + resource.Deregister(shell.API, model) + }) + + module := setupLocalModule(t, ctx, logging.NewTestLogger(t)) + test.That(t, module.AddModelFromRegistry(ctx, generic.API, model), test.ShouldBeNil) + + // This test will ultimately create three resources: + // 1) A leaf that depends on nothing + // 2) A branch that depends on the leaf + // 3) A trunk that depends on the branch + // + // We create these resources in proper dependency order. + // + // Add a leaf resource that depends on nothing. + _, err := module.AddResource(ctx, &pb.AddResourceRequest{Config: &v1.ComponentConfig{ + Name: "leaf", Api: generic.API.String(), Model: model.String(), + }}) + test.That(t, err, test.ShouldBeNil) + + // Assert leaf can be used. + testLocal(ctx, t, module, "leaf") + + // Build up and add a branch resource that depends on the leaf resource. + attrsBuf, err := protoutils.StructToStructPb(&doCommandDependerConfig{ + DependsOn: []string{"leaf"}, + }) + test.That(t, err, test.ShouldBeNil) + + _, err = module.AddResource(ctx, &pb.AddResourceRequest{ + Dependencies: []string{generic.Named("leaf").String()}, + Config: &v1.ComponentConfig{ + Name: "branch", Api: generic.API.String(), Model: model.String(), + DependsOn: []string{"leaf"}, // unnecessary but mimics reality? + Attributes: attrsBuf, + }, + }) + test.That(t, err, test.ShouldBeNil) + + // Assert branch can use its dependency. + testLocal(ctx, t, module, "branch") + + // Get a handle on the branch resource that we are going to invalidate. + staleBranchRes, err := module.getLocalResource(ctx, generic.Named("branch")) + test.That(t, err, test.ShouldBeNil) + + // Reconfigure the leaf. This results in a `Close` -> `Constructor`. Invaliding the above + // `staleBranchRes`. + logger.Info("Reconfiguring leaf first time") + _, err = module.ReconfigureResource(ctx, &pb.ReconfigureResourceRequest{ + Config: &v1.ComponentConfig{ // Same config. + Name: "leaf", Api: generic.API.String(), Model: model.String(), + }, + }) + test.That(t, err, test.ShouldBeNil) + + // Assert that the original `branchRes` has its dependency invalidated. + _, err = staleBranchRes.DoCommand(ctx, map[string]any{}) + test.That(t, err, test.ShouldNotBeNil) + + // Assert getting a fresh value for the `branchRes` succeeds in using its dependency. + testLocal(ctx, t, module, "branch") + + // Build up and add a branch resource that depends on the leaf resource. + trunkImplicitDependsOn, err := protoutils.StructToStructPb(&doCommandDependerConfig{ + DependsOn: []string{"branch"}, + }) + test.That(t, err, test.ShouldBeNil) + // Add a trunk that depends on the branch resource. + _, err = module.AddResource(ctx, &pb.AddResourceRequest{ + Dependencies: []string{generic.Named("branch").String()}, + Config: &v1.ComponentConfig{ + Name: "trunk", Api: generic.API.String(), Model: model.String(), + Attributes: trunkImplicitDependsOn, + }, + }) + test.That(t, err, test.ShouldBeNil) + + testLocal(ctx, t, module, "trunk") + + logger.Info("Reconfiguring leaf second time") + + // Reconfigure the leaf again. This results in a `Close` -> `Constructor` on both the leaf + // _and_ the branch _and_ the trunk. Refetch the branch and trunk and assert they + // both trunk can continue respond to `DoCommand`s that require dependencies to be valid. + _, err = module.ReconfigureResource(ctx, &pb.ReconfigureResourceRequest{ + Config: &v1.ComponentConfig{ // Same config. + Name: "leaf", Api: generic.API.String(), Model: model.String(), + }, + }) + test.That(t, err, test.ShouldBeNil) + + testLocal(ctx, t, module, "branch", "trunk") + + // To play a trick, we add a `super` resource that will depend on each of `leaf`, `branch` and + // `trunk`. Reconfiguring leaf ought to reconfigure `super` three times. Once for each of its + // dependencies. + superImplicitDependsOn, err := protoutils.StructToStructPb(&doCommandDependerConfig{ + DependsOn: []string{"leaf", "branch", "trunk"}, + }) + test.That(t, err, test.ShouldBeNil) + + _, err = module.AddResource(ctx, &pb.AddResourceRequest{ + Dependencies: []string{ + generic.Named("leaf").String(), + generic.Named("branch").String(), + generic.Named("trunk").String(), + }, + Config: &v1.ComponentConfig{ + Name: "super", Api: generic.API.String(), Model: model.String(), + Attributes: superImplicitDependsOn, + }, + }) + test.That(t, err, test.ShouldBeNil) + testLocal(ctx, t, module, "super") + + // Get a handle on the `super` resource that we are going to invalidate. + staleSuperRes, err := module.getLocalResource(ctx, generic.Named("super")) + test.That(t, err, test.ShouldBeNil) + + // Reconfigure the leaf _yet_ again. Assert that the `staleSuperRes` does not work. + logger.Info("Reconfiguring leaf. Super resource should reconfigure 4 times.") + _, err = module.ReconfigureResource(ctx, &pb.ReconfigureResourceRequest{ + Config: &v1.ComponentConfig{ // Same config. + Name: "leaf", Api: generic.API.String(), Model: model.String(), + }, + }) + test.That(t, err, test.ShouldBeNil) + + _, err = staleSuperRes.DoCommand(ctx, map[string]any{}) + test.That(t, err, test.ShouldNotBeNil) + + // Assert that super continues to work with all of its dependencies. Along with all the others + // for good measure. + testLocal(ctx, t, module, "super", "trunk", "branch", "leaf") + + // Assert that `super` was constructed once initially plus three times due to cascading + // reconfigures. This is not a correctness assertion, but just a demonstration of current + // behavior. By all means optimize this away with some topological sorting. + test.That(t, superConstructCount, test.ShouldEqual, 4) +} diff --git a/module/module_test.go b/module/module_test.go index 955e674c4df..c9c4ccbd1f0 100644 --- a/module/module_test.go +++ b/module/module_test.go @@ -70,7 +70,7 @@ func TestAddModelFromRegistry(t *testing.T) { validServiceModel := mysum.Model validComponentModel := mygizmo.Model - resourceError := "resource with API %s and model %s not yet registered" + resourceError := "resource with API %q and model %q not yet registered" testCases := []struct { api resource.API model resource.Model diff --git a/module/resources.go b/module/resources.go new file mode 100644 index 00000000000..d62d51fb955 --- /dev/null +++ b/module/resources.go @@ -0,0 +1,550 @@ +package module + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/multierr" + pb "go.viam.com/api/module/v1" + "go.viam.com/utils" + + "go.viam.com/rdk/components/camera/rtppassthrough" + "go.viam.com/rdk/config" + "go.viam.com/rdk/logging" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot/framesystem" +) + +type resConfigureArgs struct { + // Keep the resource object around in case we can simply reconfigure. + toReconfig resource.Resource + + // Otherwise keep the configuration and its dependencies around to close -> reconstruct. + conf *resource.Config + depStrings []string +} + +// AddResource receives the component/service configuration from the viam-server. +func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (*pb.AddResourceResponse, error) { + select { + case <-m.pcReady: + case <-m.pcFailed: + } + + conf, err := config.ComponentConfigFromProto(req.Config, m.logger) + if err != nil { + return nil, err + } + + if err := addConvertedAttributes(conf); err != nil { + return nil, fmt.Errorf("unable to convert attributes when adding resource: %w", err) + } + + resInfo, ok := resource.LookupRegistration(conf.API, conf.Model) + if !ok { + return nil, fmt.Errorf("resource with API %q and model %q not yet registered", conf.API, conf.Model) + } + if resInfo.Constructor == nil { + return nil, fmt.Errorf("invariant: no constructor for %q", conf.Model) + } + + resLogger := m.logger.Sublogger(conf.ResourceName().String()) + levelStr := req.Config.GetLogConfiguration().GetLevel() + // An unset LogConfiguration will materialize as an empty string. + if levelStr != "" { + if level, err := logging.LevelFromString(levelStr); err == nil { + resLogger.SetLevel(level) + } else { + m.logger.Warnw("LogConfiguration does not contain a valid level.", + "resource", conf.Name, "level", levelStr) + } + } + + err = m.addResource(ctx, req.Dependencies, conf, resLogger) + if err != nil { + return nil, err + } + + return &pb.AddResourceResponse{}, nil +} + +// ReconfigureResource receives the component/service configuration from the viam-server. +func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) { + // it is assumed the caller robot has handled model differences + conf, err := config.ComponentConfigFromProto(req.Config, m.logger) + if err != nil { + return nil, err + } + + if err := addConvertedAttributes(conf); err != nil { + return nil, fmt.Errorf("unable to convert attributes when reconfiguring resource: %w", err) + } + + m.mu.Lock() + defer m.mu.Unlock() + + m.registerMu.Lock() + deps, err := m.getDependenciesForConstruction(ctx, req.Dependencies) + m.registerMu.Unlock() + if err != nil { + return nil, err + } + + var logLevel *logging.Level + logLevelStr := req.GetConfig().GetLogConfiguration().GetLevel() + if level, err := logging.LevelFromString(logLevelStr); err == nil { + // Dan: If `Reconfigure` fails, we do not undo this change. I feel it's reasonable + // to partially reconfigure in this way. + logLevel = &level + } else if logLevelStr != "" { + m.logger.Warnw("LogConfiguration does not contain a valid level", + "resource", conf.Name, "level", logLevelStr) + } + + if _, err = m.reconfigureResource(ctx, deps, conf, logLevel); err != nil { + return nil, err + } + + if errors.Is(ctx.Err(), context.Canceled) { + m.logger.Error( + "Context was canceled before returning. Viam-server will not know the state of this resource. Module must be restarted.", + "res", conf.Name, + ) + } + + return &pb.ReconfigureResourceResponse{}, nil +} + +// ValidateConfig receives the validation request for a resource from the viam-server. +func (m *Module) ValidateConfig(ctx context.Context, + req *pb.ValidateConfigRequest, +) (*pb.ValidateConfigResponse, error) { + c, err := config.ComponentConfigFromProto(req.Config, m.logger) + if err != nil { + return nil, err + } + + if err := addConvertedAttributes(c); err != nil { + return nil, fmt.Errorf("unable to convert attributes for validation: %w", err) + } + + if c.ConvertedAttributes != nil { + implicitRequiredDeps, implicitOptionalDeps, err := c.ConvertedAttributes.Validate(c.Name) + if err != nil { + return nil, fmt.Errorf("error validating resource: %w", err) + } + resp := &pb.ValidateConfigResponse{ + Dependencies: implicitRequiredDeps, + OptionalDependencies: implicitOptionalDeps, + } + return resp, nil + } + + // Resource configuration object does not implement Validate, but return an + // empty response and no error to maintain backward compatibility. + return &pb.ValidateConfigResponse{}, nil +} + +// RemoveResource receives the request for resource removal. +func (m *Module) RemoveResource(ctx context.Context, req *pb.RemoveResourceRequest) (*pb.RemoveResourceResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + + name, err := resource.NewFromString(req.Name) + if err != nil { + return nil, err + } + + err = m.removeResource(ctx, name) + if err != nil { + return nil, err + } + + return &pb.RemoveResourceResponse{}, nil +} + +// GetParentResource returns a resource from the viam-server by name. +func (m *Module) GetParentResource(ctx context.Context, name resource.Name) (resource.Resource, error) { + // Refresh parent to ensure it has the most up-to-date resources before calling + // ResourceByName. + if err := m.parent.Refresh(ctx); err != nil { + return nil, err + } + return m.parent.ResourceByName(name) +} + +// getLocalResource returns a resource from within the module by name. `getLocalResource` must be +// called while holding the `registerMu`. +func (m *Module) getLocalResource(_ context.Context, name resource.Name) (resource.Resource, error) { + for res := range m.resLoggers { + if res.Name() == name { + return res, nil + } + } + + return nil, resource.NewNotFoundError(name) +} + +// addConvertedAttributesToConfig uses the MapAttributeConverter to fill in the +// ConvertedAttributes field from the Attributes and AssociatedResourceConfigs. +func addConvertedAttributes(cfg *resource.Config) error { + // Try to find map converter for a resource. + reg, ok := resource.LookupRegistration(cfg.API, cfg.Model) + if ok && reg.AttributeMapConverter != nil { + converted, err := reg.AttributeMapConverter(cfg.Attributes) + if err != nil { + return fmt.Errorf("error converting attributes for resource") + } + cfg.ConvertedAttributes = converted + } + + // Also try for associated configs (will only succeed if module itself registers the associated config API). + for subIdx, associatedConf := range cfg.AssociatedResourceConfigs { + conv, ok := resource.LookupAssociatedConfigRegistration(associatedConf.API) + if !ok { + continue + } + if conv.AttributeMapConverter != nil { + converted, err := conv.AttributeMapConverter(associatedConf.Attributes) + if err != nil { + return fmt.Errorf("error converting associated resource config attributes: %w", err) + } + // associated resource configs for resources might be missing a resource name + // which can be inferred from its resource config. + converted.UpdateResourceNames(func(oldName resource.Name) resource.Name { + return cfg.ResourceName() + }) + cfg.AssociatedResourceConfigs[subIdx].ConvertedAttributes = converted + } + } + return nil +} + +// addAPIFromRegistry adds a preregistered API (rpc API) to the module's services. +func (m *Module) addAPIFromRegistry(ctx context.Context, api resource.API) error { + m.registerMu.Lock() + defer m.registerMu.Unlock() + _, ok := m.collections[api] + if ok { + return nil + } + + apiInfo, ok := resource.LookupGenericAPIRegistration(api) + if !ok { + return fmt.Errorf("invariant: registration does not exist for %q", api) + } + + newColl := apiInfo.MakeEmptyCollection() + m.collections[api] = newColl + + if !ok { + return nil + } + return apiInfo.RegisterRPCService(ctx, m.server, newColl) +} + +// AddModelFromRegistry adds a preregistered component or service model to the module's services. +func (m *Module) AddModelFromRegistry(ctx context.Context, api resource.API, model resource.Model) error { + resInfo, ok := resource.LookupRegistration(api, model) + if !ok { + return fmt.Errorf("resource with API %q and model %q not yet registered", api, model) + } + if resInfo.Constructor == nil { + return fmt.Errorf("invariant: no constructor for %q", model) + } + + m.registerMu.Lock() + _, ok = m.collections[api] + m.registerMu.Unlock() + if !ok { + if err := m.addAPIFromRegistry(ctx, api); err != nil { + return err + } + } + + apiInfo, ok := resource.LookupGenericAPIRegistration(api) + if !ok { + return fmt.Errorf("invariant: registration does not exist for %q", api) + } + if apiInfo.ReflectRPCServiceDesc == nil { + m.logger.Errorf("rpc subtype %s doesn't contain a valid ReflectRPCServiceDesc", api) + } + rpcAPI := resource.RPCAPI{ + API: api, + ProtoSvcName: apiInfo.RPCServiceDesc.ServiceName, + Desc: apiInfo.ReflectRPCServiceDesc, + } + + m.registerMu.Lock() + m.handlers[rpcAPI] = append(m.handlers[rpcAPI], model) + m.registerMu.Unlock() + return nil +} + +// getDependenciesForConstruction must be called while holding the `registerMu`. +func (m *Module) getDependenciesForConstruction(ctx context.Context, depStrings []string, +) (resource.Dependencies, error) { + deps := resource.Dependencies{framesystem.PublicServiceName: NewFrameSystemClient(m.parent)} + for _, c := range depStrings { + depName, err := resource.NewFromString(c) + if err != nil { + return nil, err + } + + // If the dependency is local to this module, add the resource object directly, rather than + // a client object that talks with the viam-server. + localRes, err := m.getLocalResource(ctx, depName) + if err == nil { + deps[depName] = localRes + continue + } + + // Get a viam-server client object that can access the dependency. + clientRes, err := m.GetParentResource(ctx, depName) + if err != nil { + return nil, err + } + deps[depName] = clientRes + } + + // let modules access RobotFrameSystem (name $framesystem) without needing entire RobotClient + return deps, nil +} + +func (m *Module) addResource( + ctx context.Context, depStrings []string, conf *resource.Config, resLogger logging.Logger, +) error { + m.registerMu.Lock() + deps, err := m.getDependenciesForConstruction(ctx, depStrings) + m.registerMu.Unlock() + if err != nil { + return err + } + + resInfo, ok := resource.LookupRegistration(conf.API, conf.Model) + if !ok { + return fmt.Errorf("resource with API %q and model %q not yet registered", conf.API, conf.Model) + } + if resInfo.Constructor == nil { + return fmt.Errorf("invariant: no constructor for %q", conf.Model) + } + + res, err := resInfo.Constructor(ctx, deps, *conf, resLogger) + if err != nil { + return err + } + + // If context has errored, even if construction succeeded we should close the resource and + // return the context error. Use shutdownCtx because otherwise any Close operations that rely + // on the context will immediately fail. The deadline associated with the context passed in to + // this function is rutils.GetResourceConfigurationTimeout, which is propagated to AddResource + // through gRPC. + if ctx.Err() != nil { + m.logger.CDebugw(ctx, "resource successfully constructed but context is done, closing constructed resource", + "err", ctx.Err().Error()) + return multierr.Combine(ctx.Err(), res.Close(m.shutdownCtx)) + } + + m.registerMu.Lock() + defer m.registerMu.Unlock() + + coll, ok := m.collections[conf.API] + if !ok { + return fmt.Errorf("module cannot service api: %s", conf.API) + } + + // If adding the resource name to the collection fails, close the resource and return an error. + if err := coll.Add(conf.ResourceName(), res); err != nil { + return multierr.Combine(err, res.Close(ctx)) + } + + m.resLoggers[res] = resLogger + // add the video stream resources upon creation + if p, ok := res.(rtppassthrough.Source); ok { + m.streamSourceByName[res.Name()] = p + } + + for _, dep := range deps { + // If the dependency is in the `resLogger` it is a "local resource". And we must track + // reconfigures on our dependencies as that invalidates resource handles. + // + // Dan: We could call `m.getLocalResource(dep.Name())` but that's just a linear scan over + // resLoggers. + if _, exists := m.resLoggers[dep]; exists { + m.internalDeps[dep] = append(m.internalDeps[dep], resConfigureArgs{ + toReconfig: res, + conf: conf, + depStrings: depStrings, + }) + } + } + + return nil +} + +func (m *Module) removeResource(ctx context.Context, resName resource.Name) error { + slowWatcher, slowWatcherCancel := utils.SlowGoroutineWatcher( + 30*time.Second, fmt.Sprintf("module resource %q is taking a while to remove", resName), m.logger) + defer func() { + slowWatcherCancel() + <-slowWatcher + }() + + m.registerMu.Lock() + coll, ok := m.collections[resName.API] + if !ok { + m.registerMu.Unlock() + return fmt.Errorf("no grpc service for %+v", resName) + } + + res, err := coll.Resource(resName.Name) + if err != nil { + m.registerMu.Unlock() + return err + } + m.registerMu.Unlock() + + if err := res.Close(ctx); err != nil { + m.logger.Error(err) + } + + m.registerMu.Lock() + defer m.registerMu.Unlock() + delete(m.streamSourceByName, res.Name()) + delete(m.activeResourceStreams, res.Name()) + delete(m.resLoggers, res) + + // The viam-server forbids removing a resource until dependents are first closed/removed. Hence + // it's safe to assume the value in the map for `res` is empty and simply remove the map entry.q + delete(m.internalDeps, res) + + for dep, chainReconfiguresPtr := range m.internalDeps { + chainReconfigures := chainReconfiguresPtr + for idx, chainRes := range chainReconfigures { + if res == chainRes.toReconfig { + // Clear the removed resource from any chain of reconfigures it appears in. + m.internalDeps[dep] = append(chainReconfigures[:idx], chainReconfigures[idx+1:]...) + } + } + } + + return coll.Remove(resName) +} + +// reconfigureResource will reconfigure a resource and, if successful, return the new resource +// pointer/interface object. +func (m *Module) reconfigureResource( + ctx context.Context, deps resource.Dependencies, conf *resource.Config, logLevel *logging.Level, +) (resource.Resource, error) { + m.registerMu.Lock() + coll, ok := m.collections[conf.API] + if !ok { + m.registerMu.Unlock() + return nil, fmt.Errorf("no rpc service for %+v", conf) + } + + res, err := coll.Resource(conf.ResourceName().Name) + if err != nil { + m.registerMu.Unlock() + return nil, err + } + + resLogger, hasLogger := m.resLoggers[res] + m.registerMu.Unlock() + if hasLogger && logLevel != nil { + resLogger.SetLevel(*logLevel) + } + + err = res.Reconfigure(ctx, deps, *conf) + if err == nil { + return res, nil + } + + if !resource.IsMustRebuildError(err) { + return nil, err + } + + if err := res.Close(ctx); err != nil { + m.logger.Error(err) + } + + m.registerMu.Lock() + delete(m.activeResourceStreams, res.Name()) + m.registerMu.Unlock() + + resInfo, ok := resource.LookupRegistration(conf.API, conf.Model) + if !ok { + return nil, fmt.Errorf("resource with API %q and model %q not yet registered", conf.API, conf.Model) + } + if resInfo.Constructor == nil { + return nil, fmt.Errorf("invariant: no constructor for %q", conf.Model) + } + + newRes, err := resInfo.Constructor(ctx, deps, *conf, m.logger) + if err != nil { + return nil, err + } + + if err := coll.ReplaceOne(conf.ResourceName(), newRes); err != nil { + m.registerMu.Unlock() + return nil, err + } + + m.registerMu.Lock() + // We're modifying internal module maps now. We must not error out at this point without rolling + // back module state mutations. + delete(m.resLoggers, res) + m.resLoggers[newRes] = resLogger + + if p, ok := newRes.(rtppassthrough.Source); ok { + m.streamSourceByName[res.Name()] = p + } + + depsToReconfigure := m.internalDeps[res] + // Build up a new slice to map `m.internalDeps[newRes]` to. + newDepsToReconfigure := make([]resConfigureArgs, 0, len(depsToReconfigure)) + for _, depToReconfig := range depsToReconfigure { + // We are going to modify `toReconfig` at the end. Make sure changes to `dependentResConfigureArgs` + // get reflected in the slice. + deps, err := m.getDependenciesForConstruction(ctx, depToReconfig.depStrings) + if err != nil { + m.logger.Warn("Failed to get dependencies for cascading dependent reconfigure", + "changedResource", conf.Name, + "dependent", depToReconfig.conf.Name, + "dependentDeps", depToReconfig.depStrings, + "err", err) + continue + } + + // We release the `registerMu` to let other resource query/acquisition methods make + // progress. We do not assume `reconfigureResource` is fast. + // + // We also release the mutex as the recursive call to `reconfigureResource` will reacquire + // it. And the mutex is not reentrant. + m.registerMu.Unlock() + + var nilLogLevel *logging.Level // pass in nil to avoid changing the log level + rebuiltRes, err := m.reconfigureResource(ctx, deps, depToReconfig.conf, nilLogLevel) + if err != nil { + m.logger.Warn("Failed to cascade dependent reconfigure", + "changedResource", conf.Name, + "dependent", depToReconfig.conf.Name, + "err", err) + } + m.registerMu.Lock() + + newDepsToReconfigure = append(newDepsToReconfigure, resConfigureArgs{ + toReconfig: rebuiltRes, + conf: depToReconfig.conf, + depStrings: depToReconfig.depStrings, + }) + } + + m.internalDeps[newRes] = newDepsToReconfigure + delete(m.internalDeps, res) + m.registerMu.Unlock() + + return newRes, nil +} diff --git a/module/server.go b/module/server.go index 9da5c68595a..b1fc10f828a 100644 --- a/module/server.go +++ b/module/server.go @@ -3,14 +3,26 @@ package module import ( "context" "crypto/tls" + "fmt" "net" "net/http" "sync" + "github.com/fullstorydev/grpcurl" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/grpcreflect" "github.com/pkg/errors" + "go.uber.org/multierr" + pb "go.viam.com/api/module/v1" + robotpb "go.viam.com/api/robot/v1" "go.viam.com/utils/rpc" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + + "go.viam.com/rdk/protoutils" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/discovery" ) // NewServer returns a new (module specific) rpc.Server. @@ -110,3 +122,70 @@ type httpHandler struct{} func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "http unsupported", http.StatusInternalServerError) } + +// HandlerMap is the format for api->model pairs that the module will service. +// Ex: mymap["rdk:component:motor"] = ["acme:marine:thruster", "acme:marine:outboard"]. +type HandlerMap map[resource.RPCAPI][]resource.Model + +// ToProto converts the HandlerMap to a protobuf representation. +func (h HandlerMap) ToProto() *pb.HandlerMap { + pMap := &pb.HandlerMap{} + for s, models := range h { + subtype := &robotpb.ResourceRPCSubtype{ + Subtype: protoutils.ResourceNameToProto(resource.Name{ + API: s.API, + Name: "", + }), + ProtoService: s.ProtoSvcName, + } + + handler := &pb.HandlerDefinition{Subtype: subtype} + for _, m := range models { + handler.Models = append(handler.Models, m.String()) + } + pMap.Handlers = append(pMap.Handlers, handler) + } + return pMap +} + +// NewHandlerMapFromProto converts protobuf to HandlerMap. +func NewHandlerMapFromProto(ctx context.Context, pMap *pb.HandlerMap, conn rpc.ClientConn) (HandlerMap, error) { + hMap := make(HandlerMap) + refClient := grpcreflect.NewClientV1Alpha(ctx, reflectpb.NewServerReflectionClient(conn)) + defer refClient.Reset() + reflSource := grpcurl.DescriptorSourceFromServer(ctx, refClient) + + var errs error + for _, h := range pMap.GetHandlers() { + api := protoutils.ResourceNameFromProto(h.Subtype.Subtype).API + rpcAPI := &resource.RPCAPI{ + API: api, + } + // due to how tagger is setup in the api we cannot use reflection on the discovery service currently + // for now we will skip the reflection step for discovery until the issue is resolved. + // TODO(RSDK-9718) - remove the skip. + if api != discovery.API { + symDesc, err := reflSource.FindSymbol(h.Subtype.ProtoService) + if err != nil { + errs = multierr.Combine(errs, err) + if errors.Is(err, grpcurl.ErrReflectionNotSupported) { + return nil, errs + } + continue + } + svcDesc, ok := symDesc.(*desc.ServiceDescriptor) + if !ok { + return nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc) + } + rpcAPI.Desc = svcDesc + } + for _, m := range h.Models { + model, err := resource.NewModelFromString(m) + if err != nil { + return nil, err + } + hMap[*rpcAPI] = append(hMap[*rpcAPI], model) + } + } + return hMap, errs +} diff --git a/module/streams.go b/module/streams.go new file mode 100644 index 00000000000..672d6fd230e --- /dev/null +++ b/module/streams.go @@ -0,0 +1,161 @@ +package module + +import ( + "context" + "errors" + "fmt" + + "github.com/pion/rtp" + "github.com/viamrobotics/webrtc/v3" + "go.opencensus.io/trace" + "go.uber.org/multierr" + streampb "go.viam.com/api/stream/v1" + "go.viam.com/utils" + "golang.org/x/exp/maps" + + "go.viam.com/rdk/components/camera/rtppassthrough" + "go.viam.com/rdk/resource" +) + +// errMaxSupportedWebRTCTrackLimit is the error returned when the MaxSupportedWebRTCTRacks limit is +// reached. +var errMaxSupportedWebRTCTrackLimit = fmt.Errorf("only %d WebRTC tracks are supported per peer connection", + maxSupportedWebRTCTRacks) + +type peerResourceState struct { + // NOTE As I'm only suppporting video to start this will always be a single element + // once we add audio we will need to make this a slice / map + subID rtppassthrough.SubscriptionID +} + +// ListStreams lists the streams. +func (m *Module) ListStreams(ctx context.Context, req *streampb.ListStreamsRequest) (*streampb.ListStreamsResponse, error) { + _, span := trace.StartSpan(ctx, "module::module::ListStreams") + defer span.End() + names := make([]string, 0, len(m.streamSourceByName)) + for _, n := range maps.Keys(m.streamSourceByName) { + names = append(names, n.String()) + } + return &streampb.ListStreamsResponse{Names: names}, nil +} + +// AddStream adds a stream. +// Returns an error if: +// 1. there is no WebRTC peer connection with viam-sever +// 2. resource doesn't exist +// 3. the resource doesn't implement rtppassthrough.Source, +// 4. there are already the max number of supported tracks on the peer connection +// 5. SubscribeRTP returns an error +// 6. A webrtc track is unable to be created +// 7. Adding the track to the peer connection fails. +func (m *Module) AddStream(ctx context.Context, req *streampb.AddStreamRequest) (*streampb.AddStreamResponse, error) { + ctx, span := trace.StartSpan(ctx, "module::module::AddStream") + defer span.End() + name, err := resource.NewFromString(req.GetName()) + if err != nil { + return nil, err + } + m.registerMu.Lock() + defer m.registerMu.Unlock() + if m.pc == nil { + return nil, errors.New("module has no peer connection") + } + vcss, ok := m.streamSourceByName[name] + if !ok { + err := errors.New("unknown stream for resource") + m.logger.CWarnw(ctx, err.Error(), "name", name.String(), "streamSourceByName", fmt.Sprintf("%#v", m.streamSourceByName)) + return nil, err + } + + if _, ok = m.activeResourceStreams[name]; ok { + m.logger.CWarnw(ctx, "AddStream called with when there is already a stream for peer connection. NoOp", "name", name) + return &streampb.AddStreamResponse{}, nil + } + + if len(m.activeResourceStreams) >= maxSupportedWebRTCTRacks { + return nil, errMaxSupportedWebRTCTrackLimit + } + + tlsRTP, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: "video/H264"}, "video", name.String()) + if err != nil { + return nil, fmt.Errorf("error creating a new TrackLocalStaticRTP: %w", err) + } + + sub, err := vcss.SubscribeRTP(ctx, rtpBufferSize, func(pkts []*rtp.Packet) { + for _, pkt := range pkts { + if err := tlsRTP.WriteRTP(pkt); err != nil { + m.logger.CWarnw(ctx, "SubscribeRTP callback function WriteRTP", "err", err) + } + } + }) + if err != nil { + return nil, fmt.Errorf("error setting up stream subscription: %w", err) + } + + m.logger.CDebugw(ctx, "AddStream calling AddTrack", "name", name.String(), "subID", sub.ID.String()) + sender, err := m.pc.AddTrack(tlsRTP) + if err != nil { + err = fmt.Errorf("error adding track: %w", err) + if unsubErr := vcss.Unsubscribe(ctx, sub.ID); unsubErr != nil { + return nil, multierr.Combine(err, unsubErr) + } + return nil, err + } + + removeTrackOnSubTerminate := func() { + defer m.logger.Debugw("RemoveTrack called on ", "name", name.String(), "subID", sub.ID.String()) + // wait until either the module is shutting down, or the subscription terminates + var msg string + select { + case <-sub.Terminated.Done(): + msg = "rtp_passthrough subscription expired, calling RemoveTrack" + case <-m.shutdownCtx.Done(): + msg = "module closing calling RemoveTrack" + } + // remove the track from the peer connection so that viam-server clients know that the stream has terminated + m.registerMu.Lock() + defer m.registerMu.Unlock() + m.logger.Debugw(msg, "name", name.String(), "subID", sub.ID.String()) + delete(m.activeResourceStreams, name) + if err := m.pc.RemoveTrack(sender); err != nil { + m.logger.Warnf("RemoveTrack returned error", "name", name.String(), "subID", sub.ID.String(), "err", err) + } + } + m.activeBackgroundWorkers.Add(1) + utils.ManagedGo(removeTrackOnSubTerminate, m.activeBackgroundWorkers.Done) + + m.activeResourceStreams[name] = peerResourceState{subID: sub.ID} + return &streampb.AddStreamResponse{}, nil +} + +// RemoveStream removes a stream. +func (m *Module) RemoveStream(ctx context.Context, req *streampb.RemoveStreamRequest) (*streampb.RemoveStreamResponse, error) { + ctx, span := trace.StartSpan(ctx, "module::module::RemoveStream") + defer span.End() + name, err := resource.NewFromString(req.GetName()) + if err != nil { + return nil, err + } + m.registerMu.Lock() + defer m.registerMu.Unlock() + if m.pc == nil { + return nil, errors.New("module has no peer connection") + } + vcss, ok := m.streamSourceByName[name] + if !ok { + return nil, fmt.Errorf("unknown stream for resource %s", name) + } + + prs, ok := m.activeResourceStreams[name] + if !ok { + return nil, fmt.Errorf("stream %s is not active", name) + } + + if err := vcss.Unsubscribe(ctx, prs.subID); err != nil { + m.logger.CWarnw(ctx, "RemoveStream > Unsubscribe", "name", name.String(), "subID", prs.subID.String(), "err", err) + return nil, err + } + + delete(m.activeResourceStreams, name) + return &streampb.RemoveStreamResponse{}, nil +} diff --git a/utils/env.go b/utils/env.go index aa9c4bc2f84..01dffc221bb 100644 --- a/utils/env.go +++ b/utils/env.go @@ -17,7 +17,7 @@ import ( const ( // DefaultResourceConfigurationTimeout is the default resource configuration // timeout. - DefaultResourceConfigurationTimeout = time.Minute + DefaultResourceConfigurationTimeout = 2 * time.Minute // ResourceConfigurationTimeoutEnvVar is the environment variable that can // be set to override DefaultResourceConfigurationTimeout as the duration