Skip to content

Commit aa42ae8

Browse files
committed
v2/logging: add support for network flow logs API
Adds support for the network flow logs API endpoint at /api/v2/tailnet/{tailnet}/logging/network. This endpoint returns network traffic flow data including virtual, subnet, exit, and physical traffic with packet/byte counts. Updates #41 Signed-off-by: rajsinghtech <[email protected]>
1 parent 3b48c21 commit aa42ae8

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

logging.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ package tailscale
55

66
import (
77
"context"
8+
"encoding/json"
9+
"fmt"
10+
"io"
811
"net/http"
12+
"net/url"
13+
"time"
914
)
1015

1116
// LoggingResource provides access to https://tailscale.com/api#tag/logging.
@@ -147,3 +152,133 @@ func (lr *LoggingResource) ValidateAWSTrustPolicy(ctx context.Context, awsExtern
147152
}
148153
return lr.do(req, nil)
149154
}
155+
156+
// NetworkFlowLog represents a network flow log entry from the Tailscale API.
157+
type NetworkFlowLog struct {
158+
Logged time.Time `json:"logged"` // the time at which this log was captured by the server
159+
NodeID string `json:"nodeId"` // the node ID for which the flow statistics apply
160+
Start time.Time `json:"start"` // the start of the sample period (node's local clock)
161+
End time.Time `json:"end"` // the end of the sample period (node's local clock)
162+
VirtualTraffic []TrafficStats `json:"virtualTraffic,omitempty"` // traffic between Tailscale nodes
163+
SubnetTraffic []TrafficStats `json:"subnetTraffic,omitempty"` // traffic involving subnet routes
164+
ExitTraffic []TrafficStats `json:"exitTraffic,omitempty"` // traffic via exit nodes
165+
PhysicalTraffic []TrafficStats `json:"physicalTraffic,omitempty"` // WireGuard transport-level statistics
166+
}
167+
168+
// TrafficStats represents traffic flow statistics.
169+
// This type is used for all traffic types: virtual, subnet, exit, and physical.
170+
type TrafficStats struct {
171+
Proto int `json:"proto,omitempty"` // IP protocol number (e.g., 6 for TCP, 17 for UDP)
172+
Src string `json:"src,omitempty"` // Source address and port
173+
Dst string `json:"dst,omitempty"` // Destination address and port
174+
TxPkts uint64 `json:"txPkts,omitempty"` // Transmitted packets
175+
TxBytes uint64 `json:"txBytes,omitempty"` // Transmitted bytes
176+
RxPkts uint64 `json:"rxPkts,omitempty"` // Received packets
177+
RxBytes uint64 `json:"rxBytes,omitempty"` // Received bytes
178+
}
179+
180+
// NetworkFlowLogsRequest represents query parameters for fetching network flow logs.
181+
type NetworkFlowLogsRequest struct {
182+
// Start must be set to a non-zero time within the log retention period (last 30 days).
183+
// The server may adjust times that are too old.
184+
Start time.Time
185+
// End must be set to a non-zero time after Start.
186+
End time.Time
187+
}
188+
189+
// NetworkFlowLogHandler is a callback function for processing individual network flow log entries.
190+
// It receives each log entry as it's parsed from the JSON stream.
191+
// Return an error to stop processing and bubble up the error.
192+
type NetworkFlowLogHandler func(log NetworkFlowLog) error
193+
194+
// GetNetworkFlowLogs streams network flow logs for the tailnet, calling the provided
195+
// handler function for each log entry as it's parsed from the JSON response.
196+
// This approach is memory-efficient and handles large datasets without loading all logs into memory.
197+
//
198+
// Both start and end parameters are required by the server.
199+
// Times older than 30 days will be automatically adjusted by the server to the retention limit.
200+
func (lr *LoggingResource) GetNetworkFlowLogs(ctx context.Context, params NetworkFlowLogsRequest, handler NetworkFlowLogHandler) error {
201+
202+
u := lr.buildTailnetURL("logging", "network")
203+
u.RawQuery = url.Values{
204+
"start": {params.Start.Format(time.RFC3339)},
205+
"end": {params.End.Format(time.RFC3339)},
206+
}.Encode()
207+
208+
req, err := lr.buildRequest(ctx, http.MethodGet, u)
209+
if err != nil {
210+
return err
211+
}
212+
213+
return lr.streamNetworkFlowLogs(req, handler)
214+
}
215+
216+
// checkDelim reads and verifies the next JSON delimiter from the decoder
217+
func checkDelim(dec *json.Decoder, want json.Delim, description string) error {
218+
token, err := dec.Token()
219+
if err != nil {
220+
return fmt.Errorf("failed to read %s: %w", description, err)
221+
}
222+
if delim, ok := token.(json.Delim); !ok || delim != want {
223+
return fmt.Errorf("expected %c for %s, got %v", want, description, token)
224+
}
225+
return nil
226+
}
227+
228+
// streamNetworkFlowLogs performs the streaming JSON parsing of network flow logs
229+
func (lr *LoggingResource) streamNetworkFlowLogs(req *http.Request, handler NetworkFlowLogHandler) error {
230+
lr.init()
231+
resp, err := lr.HTTP.Do(req)
232+
if err != nil {
233+
return err
234+
}
235+
defer resp.Body.Close()
236+
237+
if resp.StatusCode != http.StatusOK {
238+
body, _ := io.ReadAll(resp.Body)
239+
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
240+
}
241+
242+
decoder := json.NewDecoder(resp.Body)
243+
244+
if err := checkDelim(decoder, '{', "opening brace"); err != nil {
245+
return err
246+
}
247+
248+
token, err := decoder.Token()
249+
if err != nil {
250+
return fmt.Errorf("failed to read field name: %w", err)
251+
}
252+
if fieldName, ok := token.(string); !ok || fieldName != "logs" {
253+
return fmt.Errorf("expected 'logs' field, got %v", token)
254+
}
255+
256+
if err := checkDelim(decoder, '[', "logs array start"); err != nil {
257+
return err
258+
}
259+
260+
for decoder.More() {
261+
if err := req.Context().Err(); err != nil {
262+
return err
263+
}
264+
265+
var log NetworkFlowLog
266+
if err := decoder.Decode(&log); err != nil {
267+
return fmt.Errorf("failed to decode log entry: %w", err)
268+
}
269+
270+
if err := handler(log); err != nil {
271+
return fmt.Errorf("handler error: %w", err)
272+
}
273+
}
274+
275+
if err := checkDelim(decoder, ']', "logs array end"); err != nil {
276+
return err
277+
}
278+
279+
if err := checkDelim(decoder, '}', "closing brace"); err != nil {
280+
return err
281+
}
282+
283+
return nil
284+
}

logging_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ package tailscale
66
import (
77
"context"
88
"encoding/json"
9+
"fmt"
910
"net/http"
1011
"testing"
12+
"time"
1113

1214
"github.com/stretchr/testify/assert"
1315
)
@@ -129,3 +131,80 @@ func TestClient_ValidateAWSTrustPolicy(t *testing.T) {
129131
assert.NoError(t, err)
130132
assert.EqualValues(t, gotRequest, map[string]string{"roleArn": roleARN})
131133
}
134+
135+
func TestClient_GetNetworkFlowLogs(t *testing.T) {
136+
t.Parallel()
137+
138+
client, server := NewTestHarness(t)
139+
server.ResponseCode = http.StatusOK
140+
141+
now := time.Now().UTC().Truncate(time.Second)
142+
expectedLogs := []NetworkFlowLog{
143+
{
144+
Logged: now,
145+
NodeID: "node1",
146+
Start: now.Add(-5 * time.Minute),
147+
End: now,
148+
VirtualTraffic: []TrafficStats{
149+
{Proto: 6, Src: "10.0.0.1:80", Dst: "10.0.0.2:1234", TxPkts: 10, TxBytes: 1000},
150+
},
151+
},
152+
{
153+
Logged: now.Add(1 * time.Second),
154+
NodeID: "node2",
155+
Start: now.Add(-4 * time.Minute),
156+
End: now.Add(1 * time.Second),
157+
PhysicalTraffic: []TrafficStats{
158+
{Proto: 17, Src: "192.168.1.1:53", Dst: "8.8.8.8:53", RxPkts: 5, RxBytes: 500},
159+
},
160+
},
161+
}
162+
163+
server.ResponseBody = map[string]any{"logs": expectedLogs}
164+
165+
params := NetworkFlowLogsRequest{
166+
Start: now.Add(-1 * time.Hour),
167+
End: now,
168+
}
169+
170+
var actualLogs []NetworkFlowLog
171+
handler := func(log NetworkFlowLog) error {
172+
actualLogs = append(actualLogs, log)
173+
return nil
174+
}
175+
176+
err := client.Logging().GetNetworkFlowLogs(context.Background(), params, handler)
177+
assert.NoError(t, err)
178+
assert.Equal(t, http.MethodGet, server.Method)
179+
assert.Equal(t, "/api/v2/tailnet/example.com/logging/network", server.Path)
180+
181+
assert.Len(t, actualLogs, 2)
182+
assert.Equal(t, expectedLogs, actualLogs)
183+
}
184+
185+
func TestClient_GetNetworkFlowLogs_HandlerError(t *testing.T) {
186+
t.Parallel()
187+
188+
client, server := NewTestHarness(t)
189+
server.ResponseCode = http.StatusOK
190+
191+
now := time.Now().UTC()
192+
server.ResponseBody = map[string]any{
193+
"logs": []NetworkFlowLog{{
194+
Logged: now, NodeID: "test-node", Start: now.Add(-5 * time.Minute), End: now,
195+
}},
196+
}
197+
198+
params := NetworkFlowLogsRequest{Start: now.Add(-1 * time.Hour), End: now}
199+
200+
handler := func(log NetworkFlowLog) error {
201+
return fmt.Errorf("test handler error")
202+
}
203+
204+
err := client.Logging().GetNetworkFlowLogs(context.Background(), params, handler)
205+
assert.Error(t, err)
206+
assert.Contains(t, err.Error(), "handler error")
207+
assert.Contains(t, err.Error(), "test handler error")
208+
}
209+
210+

0 commit comments

Comments
 (0)