Skip to content

Commit d4138d4

Browse files
Default NAP security-violation logs to be gzipped individually via custom processor (#1125)
1 parent ce964d4 commit d4138d4

File tree

13 files changed

+582
-2
lines changed

13 files changed

+582
-2
lines changed

internal/collector/factories.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package collector
77

88
import (
99
"github.com/nginx/agent/v3/internal/collector/containermetricsreceiver"
10+
"github.com/nginx/agent/v3/internal/collector/logsgzipprocessor"
1011
nginxreceiver "github.com/nginx/agent/v3/internal/collector/nginxossreceiver"
1112
"github.com/nginx/agent/v3/internal/collector/nginxplusreceiver"
1213

@@ -104,6 +105,7 @@ func createProcessorFactories() map[component.Type]processor.Factory {
104105
redactionprocessor.NewFactory(),
105106
resourceprocessor.NewFactory(),
106107
transformprocessor.NewFactory(),
108+
logsgzipprocessor.NewFactory(),
107109
}
108110

109111
processors := make(map[component.Type]processor.Factory)

internal/collector/factories_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) {
1919
assert.NotNil(t, factories, "factories should not be nil")
2020

2121
assert.Len(t, factories.Receivers, 6)
22-
assert.Len(t, factories.Processors, 8)
22+
assert.Len(t, factories.Processors, 9)
2323
assert.Len(t, factories.Exporters, 4)
2424
assert.Len(t, factories.Extensions, 3)
2525
assert.Empty(t, factories.Connectors)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Logs gzip processor
2+
3+
The Logs gzip processor gzips the input log record body, updating the log record in-place.
4+
5+
For metrics and traces, this will just be a pass-through as it does not implement related interfaces.
6+
7+
## Configuration
8+
9+
No configuration needed.
10+
11+
## Benchmarking
12+
13+
We performed benchmark measuring the performance of serial and concurrent operations (more practical) of this processor, with and without the `sync.Pool`. Here are the results:
14+
15+
```
16+
Concurrent Run: Without Sync Pool
17+
goos: darwin
18+
goarch: arm64
19+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
20+
cpu: Apple M2 Pro
21+
BenchmarkGzipProcessor_Concurrent-12 24 45279866 ns/op 817791582 B/op 24727 allocs/op
22+
PASS
23+
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 1.939s
24+
25+
Concurrent Run: With Sync Pool
26+
27+
goos: darwin
28+
goarch: arm64
29+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
30+
cpu: Apple M2 Pro
31+
BenchmarkGzipProcessor_Concurrent-12 147 9383213 ns/op 10948640 B/op 7820 allocs/op
32+
PASS
33+
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 2.026s
34+
35+
————
36+
37+
Serial Run: Without Sync Pool
38+
39+
goos: darwin
40+
goarch: arm64
41+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
42+
cpu: Apple M2 Pro
43+
BenchmarkGzipProcessor/SmallRecords-12 100 12048268 ns/op 81898890 B/op 2537 allocs/op
44+
BenchmarkGzipProcessor/MediumRecords-12 100 13143269 ns/op 82027307 B/op 2541 allocs/op
45+
BenchmarkGzipProcessor/LargeRecords-12 91 15912399 ns/op 83198992 B/op 2580 allocs/op
46+
BenchmarkGzipProcessor/ManySmallRecords-12 2 807707542 ns/op 8143237656 B/op 243348 allocs/op
47+
48+
49+
Serial Run: With Sync Pool
50+
51+
goos: darwin
52+
goarch: arm64
53+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
54+
cpu: Apple M2 Pro
55+
BenchmarkGzipProcessor/SmallRecords-12 205 7304839 ns/op 1027942 B/op 783 allocs/op
56+
BenchmarkGzipProcessor/MediumRecords-12 182 7336266 ns/op 1078050 B/op 784 allocs/op
57+
BenchmarkGzipProcessor/LargeRecords-12 132 9646940 ns/op 2057059 B/op 815 allocs/op
58+
BenchmarkGzipProcessor/ManySmallRecords-12 5 239726258 ns/op 6883977 B/op 73679 allocs/op
59+
PASS
60+
```
61+
62+
63+
To run this benchmark yourself with syncpool implementation, you can run the tests in `processor_benchmark_test.go` in with the `sync.Pool` mode.
64+
65+
To compare benchmark without syncpool, you can use this code block in `processor.go` and comment the existing `gzipCompress` function, and run `processor_benchmark_test.go` :
66+
67+
```
68+
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
69+
var buf bytes.Buffer
70+
w := gzip.NewWriter(&buf)
71+
_, err := w.Write(data)
72+
if err != nil {
73+
return nil, err
74+
}
75+
if err = w.Close(); err != nil {
76+
return nil, err
77+
}
78+
79+
return buf.Bytes(), nil
80+
}
81+
```
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright (c) F5, Inc.
2+
//
3+
// This source code is licensed under the Apache License, Version 2.0 license found in the
4+
// LICENSE file in the root directory of this source tree.
5+
package logsgzipprocessor
6+
7+
import (
8+
"bytes"
9+
"compress/gzip"
10+
"context"
11+
"fmt"
12+
"io"
13+
"sync"
14+
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/consumer"
17+
"go.opentelemetry.io/collector/pdata/pcommon"
18+
"go.opentelemetry.io/collector/pdata/plog"
19+
"go.opentelemetry.io/collector/processor"
20+
"go.uber.org/multierr"
21+
"go.uber.org/zap"
22+
)
23+
24+
// nolint: ireturn
25+
func NewFactory() processor.Factory {
26+
return processor.NewFactory(
27+
component.MustNewType("logsgzip"),
28+
func() component.Config {
29+
return &struct{}{}
30+
},
31+
processor.WithLogs(createLogsGzipProcessor, component.StabilityLevelBeta),
32+
)
33+
}
34+
35+
// nolint: ireturn
36+
func createLogsGzipProcessor(_ context.Context,
37+
settings processor.Settings,
38+
cfg component.Config,
39+
logs consumer.Logs,
40+
) (processor.Logs, error) {
41+
logger := settings.Logger
42+
logger.Info("Creating logs gzip processor")
43+
44+
return newLogsGzipProcessor(logs, settings), nil
45+
}
46+
47+
// logsGzipProcessor is a custom-processor implementation for compressing individual log records into
48+
// gzip format. This can be used to reduce the size of log records and improve performance when processing
49+
// large log volumes. This processor will be used by default for agent interacting with NGINX One
50+
// console (https://docs.nginx.com/nginx-one/about/).
51+
type logsGzipProcessor struct {
52+
nextConsumer consumer.Logs
53+
// We use sync.Pool to efficiently manage and reuse gzip.Writer instances within this processor.
54+
// Otherwise, creating a new compressor for every log record would result in frequent memory allocations
55+
// and increased garbage collection overhead, especially under high-throughput workload like this one.
56+
// By pooling these objects, we minimize allocation churn, reduce GC pressure, and improve overall performance.
57+
pool *sync.Pool
58+
settings processor.Settings
59+
}
60+
61+
type GzipWriter interface {
62+
Write(p []byte) (int, error)
63+
Close() error
64+
Reset(w io.Writer)
65+
}
66+
67+
func newLogsGzipProcessor(logs consumer.Logs, settings processor.Settings) *logsGzipProcessor {
68+
return &logsGzipProcessor{
69+
nextConsumer: logs,
70+
pool: &sync.Pool{
71+
New: func() any {
72+
return gzip.NewWriter(nil)
73+
},
74+
},
75+
settings: settings,
76+
}
77+
}
78+
79+
func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
80+
var errs error
81+
resourceLogs := ld.ResourceLogs()
82+
for i := range resourceLogs.Len() {
83+
scopeLogs := resourceLogs.At(i).ScopeLogs()
84+
for j := range scopeLogs.Len() {
85+
err := p.processLogRecords(scopeLogs.At(j).LogRecords())
86+
if err != nil {
87+
errs = multierr.Append(errs, err)
88+
}
89+
}
90+
}
91+
if errs != nil {
92+
return fmt.Errorf("failed processing log records: %w", errs)
93+
}
94+
95+
return p.nextConsumer.ConsumeLogs(ctx, ld)
96+
}
97+
98+
func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error {
99+
var errs error
100+
// Filter out unsupported data types in the log before processing
101+
logRecords.RemoveIf(func(lr plog.LogRecord) bool {
102+
body := lr.Body()
103+
// Keep only STRING or BYTES types
104+
if body.Type() != pcommon.ValueTypeStr &&
105+
body.Type() != pcommon.ValueTypeBytes {
106+
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", body.Type()))
107+
return true
108+
}
109+
110+
return false
111+
})
112+
// Process remaining valid records
113+
for k := range logRecords.Len() {
114+
record := logRecords.At(k)
115+
body := record.Body()
116+
var data []byte
117+
//nolint:exhaustive // Already filtered out other types with RemoveIf
118+
switch body.Type() {
119+
case pcommon.ValueTypeStr:
120+
data = []byte(body.Str())
121+
case pcommon.ValueTypeBytes:
122+
data = body.Bytes().AsRaw()
123+
}
124+
gzipped, err := p.gzipCompress(data)
125+
if err != nil {
126+
errs = multierr.Append(errs, fmt.Errorf("failed to compress log record: %w", err))
127+
128+
continue
129+
}
130+
err = record.Body().FromRaw(gzipped)
131+
if err != nil {
132+
errs = multierr.Append(errs, fmt.Errorf("failed to set gzipped data to log record body: %w", err))
133+
134+
continue
135+
}
136+
}
137+
138+
return errs
139+
}
140+
141+
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
142+
var buf bytes.Buffer
143+
var err error
144+
wIface := p.pool.Get()
145+
w, ok := wIface.(GzipWriter)
146+
if !ok {
147+
return nil, fmt.Errorf("writer of type %T not supported", wIface)
148+
}
149+
w.Reset(&buf)
150+
defer func() {
151+
if err = w.Close(); err != nil {
152+
p.settings.Logger.Error("Failed to close gzip writer", zap.Error(err))
153+
}
154+
p.pool.Put(w)
155+
}()
156+
157+
_, err = w.Write(data)
158+
if err != nil {
159+
return nil, err
160+
}
161+
if err = w.Close(); err != nil {
162+
return nil, err
163+
}
164+
165+
return buf.Bytes(), nil
166+
}
167+
168+
func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
169+
return consumer.Capabilities{
170+
MutatesData: true,
171+
}
172+
}
173+
174+
func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
175+
p.settings.Logger.Info("Starting logs gzip processor")
176+
return nil
177+
}
178+
179+
func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
180+
p.settings.Logger.Info("Shutting down logs gzip processor")
181+
return nil
182+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright (c) F5, Inc.
2+
//
3+
// This source code is licensed under the Apache License, Version 2.0 license found in the
4+
// LICENSE file in the root directory of this source tree.
5+
package logsgzipprocessor
6+
7+
import (
8+
"context"
9+
"crypto/rand"
10+
"math/big"
11+
"testing"
12+
13+
"go.opentelemetry.io/collector/consumer/consumertest"
14+
"go.opentelemetry.io/collector/pdata/plog"
15+
"go.opentelemetry.io/collector/processor"
16+
)
17+
18+
// Helper to generate logs with variable size and content
19+
func generateLogs(numRecords, recordSize int) plog.Logs {
20+
logs := plog.NewLogs()
21+
rl := logs.ResourceLogs().AppendEmpty()
22+
sl := rl.ScopeLogs().AppendEmpty()
23+
for i := 0; i < numRecords; i++ {
24+
lr := sl.LogRecords().AppendEmpty()
25+
content, _ := randomString(recordSize)
26+
lr.Body().SetStr(content)
27+
}
28+
29+
return logs
30+
}
31+
32+
func randomString(n int) (string, error) {
33+
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
34+
b := make([]byte, n)
35+
lettersSize := big.NewInt(int64(len(letters)))
36+
for i := range b {
37+
num, err := rand.Int(rand.Reader, lettersSize)
38+
if err != nil {
39+
return "", err
40+
}
41+
b[i] = letters[num.Int64()]
42+
}
43+
44+
return string(b), nil
45+
}
46+
47+
func BenchmarkGzipProcessor(b *testing.B) {
48+
benchmarks := []struct {
49+
name string
50+
numRecords int
51+
recordSize int
52+
}{
53+
{"SmallRecords", 100, 50},
54+
{"MediumRecords", 100, 500},
55+
{"LargeRecords", 100, 5000},
56+
{"ManySmallRecords", 10000, 50},
57+
}
58+
59+
for _, bm := range benchmarks {
60+
b.Run(bm.name, func(b *testing.B) {
61+
b.ReportAllocs()
62+
consumer := &consumertest.LogsSink{}
63+
p := newLogsGzipProcessor(consumer, processor.Settings{})
64+
logs := generateLogs(bm.numRecords, bm.recordSize)
65+
66+
b.ResetTimer()
67+
for i := 0; i < b.N; i++ {
68+
_ = p.ConsumeLogs(context.Background(), logs)
69+
}
70+
})
71+
}
72+
}
73+
74+
// Optional: Benchmark with concurrency to simulate real pipeline load
75+
func BenchmarkGzipProcessor_Concurrent(b *testing.B) {
76+
// nolint:unused // concurrent runs require total parallel workers to be specified
77+
const workers = 8
78+
logs := generateLogs(1000, 1000)
79+
consumer := &consumertest.LogsSink{}
80+
p := newLogsGzipProcessor(consumer, processor.Settings{})
81+
82+
b.ReportAllocs()
83+
b.ResetTimer()
84+
b.RunParallel(func(pb *testing.PB) {
85+
for pb.Next() {
86+
_ = p.ConsumeLogs(context.Background(), logs)
87+
}
88+
})
89+
}

0 commit comments

Comments
 (0)