Skip to content

Commit 7588fcf

Browse files
authored
Restart Subscribe on Connection Reset (#1261)
1 parent b06c66b commit 7588fcf

File tree

8 files changed

+321
-27
lines changed

8 files changed

+321
-27
lines changed

internal/command/command_plugin.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,18 +228,36 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
228228
}
229229

230230
func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
231+
var subscribeCtx context.Context
231232
slog.DebugContext(ctx, "Command plugin received connection reset message")
233+
232234
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
235+
slog.DebugContext(ctx, "Canceling Subscribe after connection reset")
236+
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)
237+
cp.subscribeMutex.Lock()
238+
defer cp.subscribeMutex.Unlock()
239+
240+
if cp.subscribeCancel != nil {
241+
cp.subscribeCancel()
242+
slog.DebugContext(ctxWithMetadata, "Successfully canceled subscribe after connection reset")
243+
}
244+
233245
connectionErr := cp.conn.Close(ctx)
234246
if connectionErr != nil {
235247
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", connectionErr)
236248
}
249+
237250
cp.conn = newConnection
238251
err := cp.commandService.UpdateClient(ctx, cp.conn.CommandServiceClient())
239252
if err != nil {
240253
slog.ErrorContext(ctx, "Failed to reset connection", "error", err)
241254
return
242255
}
256+
257+
slog.DebugContext(ctxWithMetadata, "Starting new subscribe after connection reset")
258+
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctxWithMetadata)
259+
go cp.commandService.Subscribe(subscribeCtx)
260+
243261
slog.DebugContext(ctx, "Command service client reset successfully")
244262
}
245263
}

internal/file/file_manager_service.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type (
5252
reader *bufio.Reader,
5353
chunkID uint32,
5454
) (mpi.FileDataChunk_Content, error)
55-
WriteManifestFile(updatedFiles map[string]*model.ManifestFile,
55+
WriteManifestFile(ctx context.Context, updatedFiles map[string]*model.ManifestFile,
5656
manifestDir, manifestPath string) (writeError error)
5757
}
5858

@@ -68,6 +68,7 @@ type (
6868
fileToUpdate *mpi.File,
6969
) error
7070
SetIsConnected(isConnected bool)
71+
UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
7172
}
7273

7374
fileManagerServiceInterface interface {
@@ -85,6 +86,7 @@ type (
8586
) (map[string]*model.FileCache, map[string][]byte, error)
8687
IsConnected() bool
8788
SetIsConnected(isConnected bool)
89+
ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
8890
}
8991
)
9092

@@ -101,6 +103,7 @@ type FileManagerService struct {
101103
currentFilesOnDisk map[string]*mpi.File // key is file path
102104
previousManifestFiles map[string]*model.ManifestFile
103105
manifestFilePath string
106+
rollbackManifest bool
104107
filesMutex sync.RWMutex
105108
}
106109

@@ -116,10 +119,16 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig
116119
currentFilesOnDisk: make(map[string]*mpi.File),
117120
previousManifestFiles: make(map[string]*model.ManifestFile),
118121
manifestFilePath: agentConfig.ManifestDir + "/manifest.json",
122+
rollbackManifest: true,
119123
manifestLock: manifestLock,
120124
}
121125
}
122126

127+
func (fms *FileManagerService) ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) {
128+
fms.fileServiceOperator.UpdateClient(ctx, fileServiceClient)
129+
slog.DebugContext(ctx, "File manager service reset client successfully")
130+
}
131+
123132
func (fms *FileManagerService) IsConnected() bool {
124133
return fms.fileServiceOperator.IsConnected()
125134
}
@@ -131,6 +140,7 @@ func (fms *FileManagerService) SetIsConnected(isConnected bool) {
131140
func (fms *FileManagerService) ConfigApply(ctx context.Context,
132141
configApplyRequest *mpi.ConfigApplyRequest,
133142
) (status model.WriteStatus, err error) {
143+
fms.rollbackManifest = true
134144
fileOverview := configApplyRequest.GetOverview()
135145

136146
if fileOverview == nil {
@@ -161,6 +171,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,
161171

162172
fileErr := fms.executeFileActions(ctx)
163173
if fileErr != nil {
174+
fms.rollbackManifest = false
164175
return model.RollbackRequired, fileErr
165176
}
166177
fileOverviewFiles := files.ConvertToMapOfFiles(fileOverview.GetFiles())
@@ -179,6 +190,7 @@ func (fms *FileManagerService) ClearCache() {
179190
clear(fms.previousManifestFiles)
180191
}
181192

193+
//nolint:revive // cognitive-complexity of 13 max is 12, loop is needed cant be broken up
182194
func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error {
183195
slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID)
184196

@@ -212,10 +224,13 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)
212224
}
213225
}
214226

215-
manifestFileErr := fms.fileOperator.WriteManifestFile(fms.previousManifestFiles,
216-
fms.agentConfig.ManifestDir, fms.manifestFilePath)
217-
if manifestFileErr != nil {
218-
return manifestFileErr
227+
if fms.rollbackManifest {
228+
slog.DebugContext(ctx, "Rolling back manifest file", "manifest_previous", fms.previousManifestFiles)
229+
manifestFileErr := fms.fileOperator.WriteManifestFile(ctx, fms.previousManifestFiles,
230+
fms.agentConfig.ManifestDir, fms.manifestFilePath)
231+
if manifestFileErr != nil {
232+
return manifestFileErr
233+
}
219234
}
220235

221236
return nil
@@ -374,7 +389,7 @@ func (fms *FileManagerService) UpdateCurrentFilesOnDisk(
374389
fms.currentFilesOnDisk[currentFile.GetFileMeta().GetName()] = currentFile
375390
}
376391

377-
err := fms.UpdateManifestFile(currentFiles, referenced)
392+
err := fms.UpdateManifestFile(ctx, currentFiles, referenced)
378393
if err != nil {
379394
return fmt.Errorf("failed to update manifest file: %w", err)
380395
}
@@ -385,12 +400,24 @@ func (fms *FileManagerService) UpdateCurrentFilesOnDisk(
385400
// seems to be a control flag, avoid control coupling
386401
//
387402
//nolint:revive // referenced is a required flag
388-
func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.File, referenced bool) (err error) {
389-
slog.Debug("Updating manifest file", "current_files", currentFiles, "referenced", referenced)
403+
func (fms *FileManagerService) UpdateManifestFile(ctx context.Context,
404+
currentFiles map[string]*mpi.File, referenced bool,
405+
) (err error) {
406+
slog.DebugContext(ctx, "Updating manifest file", "current_files", currentFiles, "referenced", referenced)
390407
currentManifestFiles, _, readError := fms.manifestFile()
408+
409+
// When agent is first started the manifest is updated when an NGINX instance is found, but the manifest file
410+
// will be empty leading to previousManifestFiles being empty. This was causing issues if the first config
411+
// apply failed leading to the manifest file being rolled back to an empty file.
412+
// If the currentManifestFiles is empty then we can assume the Agent has just started and this is the first
413+
// write of the Manifest file, so set previousManifestFiles to be the currentFiles.
414+
if len(currentManifestFiles) == 0 {
415+
currentManifestFiles = fms.convertToManifestFileMap(currentFiles, referenced)
416+
}
417+
391418
fms.previousManifestFiles = currentManifestFiles
392419
if readError != nil && !errors.Is(readError, os.ErrNotExist) {
393-
slog.Debug("Error reading manifest file", "current_manifest_files",
420+
slog.DebugContext(ctx, "Error reading manifest file", "current_manifest_files",
394421
currentManifestFiles, "updated_files", currentFiles, "referenced", referenced)
395422

396423
return fmt.Errorf("unable to read manifest file: %w", readError)
@@ -416,7 +443,7 @@ func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.F
416443
updatedFiles = manifestFiles
417444
}
418445

419-
return fms.fileOperator.WriteManifestFile(updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath)
446+
return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath)
420447
}
421448

422449
func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) {

0 commit comments

Comments
 (0)