Skip to content

Add read only file plugin #1128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: add-auxiliary-command-server-to-agent-config
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
576 changes: 72 additions & 504 deletions internal/file/file_manager_service.go

Large diffs are not rendered by default.

141 changes: 0 additions & 141 deletions internal/file/file_manager_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"

"github.com/nginx/agent/v3/internal/model"
Expand All @@ -28,146 +27,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestFileManagerService_UpdateOverview(t *testing.T) {
ctx := context.Background()

filePath := filepath.Join(t.TempDir(), "nginx.conf")
fileMeta := protos.FileMeta(filePath, "")

fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}")
fileHash := files.GenerateHash(fileContent)

fileWriteErr := os.WriteFile(filePath, fileContent, 0o600)
require.NoError(t, fileWriteErr)

overview := protos.FileOverview(filePath, fileHash)

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fakeFileServiceClient.UpdateOverviewReturnsOnCall(0, &mpi.UpdateOverviewResponse{
Overview: overview,
}, nil)

fakeFileServiceClient.UpdateOverviewReturnsOnCall(1, &mpi.UpdateOverviewResponse{}, nil)

fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil)

fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{
{
FileMeta: fileMeta,
},
}, 0)

require.NoError(t, err)
assert.Equal(t, 2, fakeFileServiceClient.UpdateOverviewCallCount())
}

func TestFileManagerService_UpdateOverview_MaxIterations(t *testing.T) {
ctx := context.Background()

filePath := filepath.Join(t.TempDir(), "nginx.conf")
fileMeta := protos.FileMeta(filePath, "")

fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}")
fileHash := files.GenerateHash(fileContent)

fileWriteErr := os.WriteFile(filePath, fileContent, 0o600)
require.NoError(t, fileWriteErr)

overview := protos.FileOverview(filePath, fileHash)

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}

// do 5 iterations
for i := 0; i <= 5; i++ {
fakeFileServiceClient.UpdateOverviewReturnsOnCall(i, &mpi.UpdateOverviewResponse{
Overview: overview,
}, nil)
}

fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil)

fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{
{
FileMeta: fileMeta,
},
}, 0)

require.Error(t, err)
assert.Equal(t, "too many UpdateOverview attempts", err.Error())
}

func TestFileManagerService_UpdateFile(t *testing.T) {
tests := []struct {
name string
isCert bool
}{
{
name: "non-cert",
isCert: false,
},
{
name: "cert",
isCert: true,
},
}

tempDir := os.TempDir()

for _, test := range tests {
ctx := context.Background()

testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf")

var fileMeta *mpi.FileMeta
if test.isCert {
fileMeta = protos.CertMeta(testFile.Name(), "")
} else {
fileMeta = protos.FileMeta(testFile.Name(), "")
}

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta})

require.NoError(t, err)
assert.Equal(t, 1, fakeFileServiceClient.UpdateFileCallCount())

helpers.RemoveFileWithErrorCheck(t, testFile.Name())
}
}

func TestFileManagerService_UpdateFile_LargeFile(t *testing.T) {
ctx := context.Background()
tempDir := os.TempDir()

testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf")
writeFileError := os.WriteFile(testFile.Name(), []byte("#test content"), 0o600)
require.NoError(t, writeFileError)
fileMeta := protos.FileMetaLargeFile(testFile.Name(), "")

fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
fakeClientStreamingClient := &FakeClientStreamingClient{sendCount: atomic.Int32{}}
fakeFileServiceClient.UpdateFileStreamReturns(fakeClientStreamingClient, nil)
fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig())
fileManagerService.SetIsConnected(true)

err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta})

require.NoError(t, err)
assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount())
assert.Equal(t, 14, int(fakeClientStreamingClient.sendCount.Load()))

helpers.RemoveFileWithErrorCheck(t, testFile.Name())
}

func TestFileManagerService_ConfigApply_Add(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
Expand Down
35 changes: 35 additions & 0 deletions internal/file/file_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package file
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path"

"github.com/nginx/agent/v3/internal/model"

"google.golang.org/grpc"

"github.com/nginx/agent/v3/pkg/files"
Expand Down Expand Up @@ -140,3 +143,35 @@ func (fo *FileOperator) ReadChunk(

return chunk, err
}

func (fo *FileOperator) WriteManifestFile(updatedFiles map[string]*model.ManifestFile, manifestDir,
manifestPath string,
) (writeError error) {
manifestJSON, err := json.MarshalIndent(updatedFiles, "", " ")
if err != nil {
return fmt.Errorf("unable to marshal manifest file json: %w", err)
}

// 0755 allows read/execute for all, write for owner
if err = os.MkdirAll(manifestDir, dirPerm); err != nil {
return fmt.Errorf("unable to create directory %s: %w", manifestDir, err)
}

// 0600 ensures only root can read/write
newFile, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, filePerm)
if err != nil {
return fmt.Errorf("failed to read manifest file: %w", err)
}
defer func() {
if closeErr := newFile.Close(); closeErr != nil {
writeError = closeErr
}
}()

_, err = newFile.Write(manifestJSON)
if err != nil {
return fmt.Errorf("failed to write manifest file: %w", err)
}

return writeError
}
98 changes: 28 additions & 70 deletions internal/file/file_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package file

import (
"context"
"fmt"
"log/slog"

"github.com/nginx/agent/v3/internal/command"

"github.com/nginx/agent/v3/pkg/files"
"github.com/nginx/agent/v3/pkg/id"

Expand Down Expand Up @@ -61,27 +62,30 @@ func (fp *FilePlugin) Info() *bus.Info {
}
}

// nolint: cyclop, revive
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
case bus.ConnectionCreatedTopic:
slog.DebugContext(ctx, "File plugin received connection created message")
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
fp.handleNginxConfigUpdate(ctx, msg)
case bus.ConfigUploadRequestTopic:
fp.handleConfigUploadRequest(ctx, msg)
case bus.ConfigApplyRequestTopic:
fp.handleConfigApplyRequest(ctx, msg)
case bus.ConfigApplyCompleteTopic:
fp.handleConfigApplyComplete(ctx, msg)
case bus.ConfigApplySuccessfulTopic:
fp.handleConfigApplySuccess(ctx, msg)
case bus.ConfigApplyFailedTopic:
fp.handleConfigApplyFailedRequest(ctx, msg)
default:
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
if logger.ServerType(ctx) == command.Command.String() || logger.ServerType(ctx) == "" {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
case bus.ConnectionCreatedTopic:
slog.DebugContext(ctx, "File plugin received connection created message")
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
fp.handleNginxConfigUpdate(ctx, msg)
case bus.ConfigUploadRequestTopic:
fp.handleConfigUploadRequest(ctx, msg)
case bus.ConfigApplyRequestTopic:
fp.handleConfigApplyRequest(ctx, msg)
case bus.ConfigApplyCompleteTopic:
fp.handleConfigApplyComplete(ctx, msg)
case bus.ConfigApplySuccessfulTopic:
fp.handleConfigApplySuccess(ctx, msg)
case bus.ConfigApplyFailedTopic:
fp.handleConfigApplyFailedRequest(ctx, msg)
default:
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
}
}
}

Expand Down Expand Up @@ -319,27 +323,10 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess
return
}

updateError := fp.fileManagerService.UpdateCurrentFilesOnDisk(
ctx,
files.ConvertToMapOfFiles(nginxConfigContext.Files),
true,
)
if updateError != nil {
slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError)
}

slog.InfoContext(ctx, "Updating overview after nginx config update")
err := fp.fileManagerService.UpdateOverview(ctx, nginxConfigContext.InstanceID, nginxConfigContext.Files, 0)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file overview",
"instance_id", nginxConfigContext.InstanceID,
"error", err,
)
}
fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext)
}

// nolint: dupl
func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "File plugin received config upload request message")
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
Expand All @@ -357,36 +344,7 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me

correlationID := logger.CorrelationID(ctx)

var updatingFilesError error

for _, file := range configUploadRequest.GetOverview().GetFiles() {
err := fp.fileManagerService.UpdateFile(
ctx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
)

response := fp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_ERROR,
fmt.Sprintf("Failed to update file %s", file.GetFileMeta().GetName()),
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
err.Error(),
)

updatingFilesError = err

fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response})

break
}
}
updatingFilesError := fp.fileManagerService.ConfigUpload(ctx, configUploadRequest)

response := &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
Expand Down
17 changes: 5 additions & 12 deletions internal/file/file_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestFilePlugin_Subscriptions(t *testing.T) {
)
}

// nolint: dupl
func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -224,6 +225,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) {
}
}

// nolint: dupl
func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -322,29 +324,20 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) {

assert.Eventually(
t,
func() bool { return len(messagePipe.Messages()) == 2 },
func() bool { return len(messagePipe.Messages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount())

messages := messagePipe.Messages()
assert.Len(t, messages, 2)
assert.Len(t, messages, 1)

assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic)

dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(
t,
mpi.CommandResponse_COMMAND_STATUS_ERROR,
dataPlaneResponse.GetCommandResponse().GetStatus(),
)

assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic)

dataPlaneResponse, ok = messages[1].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(
t,
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Expand Down
Loading