Skip to content

Commit 4656704

Browse files
committed
Correctly handle --first/--last when reading flows from a file/stdin
Signed-off-by: Chance Zibolski <[email protected]>
1 parent ff75d5a commit 4656704

File tree

5 files changed

+331
-16
lines changed

5 files changed

+331
-16
lines changed

cmd/observe/agent_events.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"io"
11+
"math"
1112
"os"
1213
"os/signal"
1314

@@ -102,7 +103,7 @@ func getAgentEventsRequest() (*observerpb.GetAgentEventsRequest, error) {
102103
switch {
103104
case selectorOpts.all:
104105
// all is an alias for last=uint64_max
105-
selectorOpts.last = ^uint64(0)
106+
selectorOpts.last = math.MaxUint64
106107
case selectorOpts.last == 0:
107108
// no specific parameters were provided, just a vanilla `hubble events agent`
108109
selectorOpts.last = defaults.EventsPrintCount

cmd/observe/io_reader_observer.go

Lines changed: 100 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package observe
66
import (
77
"bufio"
88
"context"
9+
"fmt"
910
"io"
11+
"math"
1012

1113
observerpb "github.com/cilium/cilium/api/v1/observer"
14+
"github.com/cilium/cilium/pkg/container"
1215
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
1316
"github.com/cilium/cilium/pkg/hubble/filters"
1417
"github.com/cilium/hubble/pkg/logger"
@@ -59,11 +62,18 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS
5962

6063
// ioReaderClient implements Observer_GetFlowsClient.
6164
type ioReaderClient struct {
65+
grpc.ClientStream
66+
6267
scanner *bufio.Scanner
6368
request *observerpb.GetFlowsRequest
6469
allow filters.FilterFuncs
6570
deny filters.FilterFuncs
66-
grpc.ClientStream
71+
72+
// Used for --last
73+
buffer *container.RingBuffer
74+
resps []*observerpb.GetFlowsResponse
75+
// Used for --first/--last
76+
flowsReturned uint64
6777
}
6878

6979
func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *observerpb.GetFlowsRequest) (*ioReaderClient, error) {
@@ -75,36 +85,111 @@ func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *obs
7585
if err != nil {
7686
return nil, err
7787
}
88+
89+
var buf *container.RingBuffer
90+
// last
91+
if n := request.GetNumber(); !request.GetFirst() && n != 0 && n != math.MaxUint64 {
92+
if n > 1_000_000 {
93+
return nil, fmt.Errorf("--last must be <= 1_000_000, got %d", n)
94+
}
95+
buf = container.NewRingBuffer(int(n))
96+
}
7897
return &ioReaderClient{
7998
scanner: scanner,
8099
request: request,
81100
allow: allow,
82101
deny: deny,
102+
buffer: buf,
83103
}, nil
84104
}
85105

86106
func (c *ioReaderClient) Recv() (*observerpb.GetFlowsResponse, error) {
107+
if c.returnedEnoughFlows() {
108+
return nil, io.EOF
109+
}
110+
87111
for c.scanner.Scan() {
88-
line := c.scanner.Text()
89-
var res observerpb.GetFlowsResponse
90-
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
91-
if err != nil {
92-
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
112+
res := c.unmarshalNext()
113+
if res == nil {
93114
continue
94115
}
95-
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
96-
continue
97-
}
98-
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
99-
continue
100-
}
101-
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
102-
continue
116+
117+
switch {
118+
case c.isLast():
119+
// store flows in a FIFO buffer, effectively keeping the last N flows
120+
// until we finish reading from the stream
121+
c.buffer.Add(res)
122+
case c.isFirst():
123+
// track number of flows returned, so we can exit once we've given back N flows
124+
c.flowsReturned++
125+
return res, nil
126+
default: // --all
127+
return res, nil
103128
}
104-
return &res, nil
105129
}
130+
106131
if err := c.scanner.Err(); err != nil {
107132
return nil, err
108133
}
134+
135+
if res := c.popFromLastBuffer(); res != nil {
136+
return res, nil
137+
}
138+
109139
return nil, io.EOF
110140
}
141+
142+
func (c *ioReaderClient) isFirst() bool {
143+
return c.request.GetFirst() && c.request.GetNumber() != 0 && c.request.GetNumber() != math.MaxUint64
144+
}
145+
146+
func (c *ioReaderClient) isLast() bool {
147+
return c.buffer != nil && c.request.GetNumber() != math.MaxUint64
148+
}
149+
150+
func (c *ioReaderClient) returnedEnoughFlows() bool {
151+
return c.request.GetNumber() > 0 && c.flowsReturned >= c.request.GetNumber()
152+
}
153+
154+
func (c *ioReaderClient) popFromLastBuffer() *observerpb.GetFlowsResponse {
155+
// Handle --last by iterating over our FIFO and returning one item each time.
156+
if c.isLast() {
157+
if len(c.resps) == 0 {
158+
// Iterate over the buffer and store them in a slice, because we cannot
159+
// index into the ring buffer itself
160+
// TODO: Add the ability to index into the ring buffer and we could avoid
161+
// this copy.
162+
c.buffer.Iterate(func(i interface{}) {
163+
c.resps = append(c.resps, i.(*observerpb.GetFlowsResponse))
164+
})
165+
}
166+
167+
// return the next element from the buffered results
168+
if len(c.resps) > int(c.flowsReturned) {
169+
resp := c.resps[c.flowsReturned]
170+
c.flowsReturned++
171+
return resp
172+
}
173+
}
174+
return nil
175+
}
176+
177+
func (c *ioReaderClient) unmarshalNext() *observerpb.GetFlowsResponse {
178+
var res observerpb.GetFlowsResponse
179+
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
180+
if err != nil {
181+
line := c.scanner.Text()
182+
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
183+
return nil
184+
}
185+
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
186+
return nil
187+
}
188+
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
189+
return nil
190+
}
191+
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
192+
return nil
193+
}
194+
return &res
195+
}

cmd/observe/io_reader_observer_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,82 @@ func Test_getFlowsTimeRange(t *testing.T) {
7070
assert.Equal(t, io.EOF, err)
7171
}
7272

73+
func Test_getFlowsLast(t *testing.T) {
74+
flows := []*observerpb.GetFlowsResponse{
75+
{
76+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
77+
Time: &timestamppb.Timestamp{Seconds: 0},
78+
},
79+
{
80+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
81+
Time: &timestamppb.Timestamp{Seconds: 100},
82+
},
83+
{
84+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_ERROR}},
85+
Time: &timestamppb.Timestamp{Seconds: 200},
86+
},
87+
}
88+
var flowStrings []string
89+
for _, f := range flows {
90+
b, err := f.MarshalJSON()
91+
assert.NoError(t, err)
92+
flowStrings = append(flowStrings, string(b))
93+
}
94+
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
95+
req := observerpb.GetFlowsRequest{
96+
Number: 2,
97+
First: false,
98+
}
99+
client, err := server.GetFlows(context.Background(), &req)
100+
assert.NoError(t, err)
101+
res, err := client.Recv()
102+
assert.NoError(t, err)
103+
assert.Equal(t, flows[1], res)
104+
res, err = client.Recv()
105+
assert.NoError(t, err)
106+
assert.Equal(t, flows[2], res)
107+
_, err = client.Recv()
108+
assert.Equal(t, io.EOF, err)
109+
}
110+
111+
func Test_getFlowsFirst(t *testing.T) {
112+
flows := []*observerpb.GetFlowsResponse{
113+
{
114+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
115+
Time: &timestamppb.Timestamp{Seconds: 0},
116+
},
117+
{
118+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
119+
Time: &timestamppb.Timestamp{Seconds: 100},
120+
},
121+
{
122+
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_ERROR}},
123+
Time: &timestamppb.Timestamp{Seconds: 200},
124+
},
125+
}
126+
var flowStrings []string
127+
for _, f := range flows {
128+
b, err := f.MarshalJSON()
129+
assert.NoError(t, err)
130+
flowStrings = append(flowStrings, string(b))
131+
}
132+
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
133+
req := observerpb.GetFlowsRequest{
134+
Number: 2,
135+
First: true,
136+
}
137+
client, err := server.GetFlows(context.Background(), &req)
138+
assert.NoError(t, err)
139+
res, err := client.Recv()
140+
assert.NoError(t, err)
141+
assert.Equal(t, flows[0], res)
142+
res, err = client.Recv()
143+
assert.NoError(t, err)
144+
assert.Equal(t, flows[1], res)
145+
_, err = client.Recv()
146+
assert.Equal(t, io.EOF, err)
147+
}
148+
73149
func Test_getFlowsFilter(t *testing.T) {
74150
flows := []*observerpb.GetFlowsResponse{
75151
{

0 commit comments

Comments
 (0)