Skip to content

Commit

Permalink
Use the new API format for pushing logs
Browse files Browse the repository at this point in the history
This change implements the new[1] JSON format to push logs, which is a
bit more concise, and future-proof since it's versioned.

1: https://github.com/grafana/loki/blob/master/docs/api.md#post-lokiapiv1push
  • Loading branch information
lhchavez committed Dec 23, 2019
1 parent 9513b57 commit 696767e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/omegaup-logslurp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type logslurpConfig struct {
func readLogslurpConfig(configPath string) (*logslurpConfig, error) {
config := logslurpConfig{
Client: clientConfig{
URL: "https://loki.omegaup.com/api/prom/push",
URL: "https://loki.omegaup.com/loki/api/v1/push",
},
OffsetFilePath: "/var/lib/omegaup/logslurp_offsets.json",
}
Expand Down
92 changes: 44 additions & 48 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,14 @@ type StreamConfig struct {
TimestampLayout string `json:"timestamp_layout"`
}

// Time is identital to time.Time, except it implements the json.Marshaler
// interface with a format that's acceptable by Loki.
type Time time.Time

var _ json.Marshaler = (*Time)(nil)

func (t *Time) MarshalJSON() ([]byte, error) {
return []byte(t.Format("\"2006-01-02T15:04:05.000000-07:00\"")), nil
}

func (t Time) Format(layout string) string {
return (time.Time)(t).Format(layout)
}

func (t Time) Before(o Time) bool {
return (time.Time)(t).Before((time.Time)(o))
}

// A PushRequestStreamEntry is a timestamp/log line value pair.
type PushRequestStreamEntry struct {
Timestamp Time `json:"ts"`
Line string `json:"line"`
Timestamp time.Time
Line string
}

var _ fmt.Stringer = (*PushRequestStreamEntry)(nil)
var _ json.Marshaler = (*PushRequestStreamEntry)(nil)

func (e *PushRequestStreamEntry) String() string {
return fmt.Sprintf(
Expand All @@ -52,11 +36,25 @@ func (e *PushRequestStreamEntry) String() string {
)
}

func (e *PushRequestStreamEntry) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(
"[\"%d\", %q]",
e.Timestamp.UnixNano(),
e.Line,
)), nil
}

// A PushRequestStream is a stream of timestamp/log line values in ascending
// timestamp order, plus stream labels in key/value pair format.
type PushRequestStream struct {
Labels string `json:"labels"`
Entries []*PushRequestStreamEntry `json:"entries"`
Stream map[string]string `json:"stream"`
Values []*PushRequestStreamEntry `json:"values"`
}

// PushRequest is a message that can be sent to /loki/api/v1/push.
//
// This is documented at
// https://github.com/grafana/loki/blob/master/docs/api.md#post-lokiapiv1push
type PushRequest struct {
Streams []*PushRequestStream `json:"streams"`
}
Expand Down Expand Up @@ -153,7 +151,7 @@ func (s *LogStream) Read() (*PushRequestStream, error) {
if ts, err := time.Parse(s.config.TimestampLayout, group); err != nil {
return nil, errors.Wrapf(err, "failed to parse timestamp \"%s\"", group)
} else {
entry.Timestamp = Time(ts)
entry.Timestamp = ts
}
} else if label != "" {
labels[label] = group
Expand All @@ -165,45 +163,43 @@ func (s *LogStream) Read() (*PushRequestStream, error) {
s.slop = t.w.String()[groupPairs[len(groupPairs)-1]:]

if time.Time(entry.Timestamp).IsZero() {
entry.Timestamp = Time(time.Now())
}

result := &PushRequestStream{
Entries: []*PushRequestStreamEntry{entry},
}

var serializedLabels []string
for k, v := range labels {
serializedLabels = append(serializedLabels, fmt.Sprintf("%s=%q", k, v))
entry.Timestamp = time.Now()
}
sort.Strings(serializedLabels)

result.Labels = fmt.Sprintf("{%s}", strings.Join(serializedLabels, ","))
return result, nil
return &PushRequestStream{
Values: []*PushRequestStreamEntry{entry},
Stream: labels,
}, nil
}

// NewPushRequest returns a PushRequest given a list of PushRequestStream
// objects. This groups the streams by their labels so that the final payload
// is smaller.
func NewPushRequest(streams []*PushRequestStream) *PushRequest {
mapping := map[string][]*PushRequestStreamEntry{}
mapping := map[string]*PushRequestStream{}

for _, Logstream := range streams {
if _, ok := mapping[Logstream.Labels]; ok {
mapping[Logstream.Labels] = append(mapping[Logstream.Labels], Logstream.Entries...)
for _, logstream := range streams {
var serializedLabels []string
for k, v := range logstream.Stream {
serializedLabels = append(serializedLabels, fmt.Sprintf("%s=%q", k, v))
}
sort.Strings(serializedLabels)

mappingKey := strings.Join(serializedLabels, ",")
if mappedStream, ok := mapping[mappingKey]; ok {
mappedStream.Values = append(mappedStream.Values, logstream.Values...)
} else {
mapping[Logstream.Labels] = append([]*PushRequestStreamEntry{}, Logstream.Entries...)
mapping[mappingKey] = &PushRequestStream{
Stream: logstream.Stream,
Values: logstream.Values,
}
}
}

request := &PushRequest{}
for labels, entries := range mapping {
stream := &PushRequestStream{
Labels: labels,
Entries: entries,
}
sort.Slice(stream.Entries, func(i, j int) bool {
return stream.Entries[i].Timestamp.Before(stream.Entries[j].Timestamp)
for _, stream := range mapping {
sort.Slice(stream.Values, func(i, j int) bool {
return stream.Values[i].Timestamp.Before(stream.Values[j].Timestamp)
})
request.Streams = append(request.Streams, stream)
}
Expand Down
10 changes: 5 additions & 5 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func TestStream(t *testing.T) {
}

expectedStringEntries := []string{
`{"labels":"{filename=\"log\",lvl=\"ERROR\"}","entries":[{"ts":"2019-05-13T15:13:51.000000+00:00","line":"Message 1"}]}`,
`{"labels":"{filename=\"log\",lvl=\"ERROR\"}","entries":[{"ts":"2019-05-13T15:13:52.000000+00:00","line":"Message 2"}]}`,
`{"labels":"{filename=\"log\",lvl=\"ERROR\"}","entries":[{"ts":"2019-05-13T15:13:53.000000+00:00","line":"Message 3\nin two lines"}]}`,
`{"labels":"{filename=\"log\",lvl=\"ERROR\"}","entries":[{"ts":"2019-05-13T15:13:54.000000+00:00","line":"Message 4\nin three\nlines"}]}`,
`{"stream":{"filename":"log","lvl":"ERROR"},"values":[["1557760431000000000","Message 1"]]}`,
`{"stream":{"filename":"log","lvl":"ERROR"},"values":[["1557760432000000000","Message 2"]]}`,
`{"stream":{"filename":"log","lvl":"ERROR"},"values":[["1557760433000000000","Message 3\nin two lines"]]}`,
`{"stream":{"filename":"log","lvl":"ERROR"},"values":[["1557760434000000000","Message 4\nin three\nlines"]]}`,
}

if !reflect.DeepEqual(stringEntries, expectedStringEntries) {
Expand All @@ -63,7 +63,7 @@ func TestStream(t *testing.T) {
if len(request.Streams) != 1 {
t.Errorf("failed to coalesce stream. got %v, expected 1", request)
}
expectedRequest := `{"streams":[{"labels":"{filename=\"log\",lvl=\"ERROR\"}","entries":[{"ts":"2019-05-13T15:13:51.000000+00:00","line":"Message 1"},{"ts":"2019-05-13T15:13:52.000000+00:00","line":"Message 2"},{"ts":"2019-05-13T15:13:53.000000+00:00","line":"Message 3\nin two lines"},{"ts":"2019-05-13T15:13:54.000000+00:00","line":"Message 4\nin three\nlines"}]}]}`
expectedRequest := `{"streams":[{"stream":{"filename":"log","lvl":"ERROR"},"values":[["1557760431000000000","Message 1"],["1557760432000000000","Message 2"],["1557760433000000000","Message 3\nin two lines"],["1557760434000000000","Message 4\nin three\nlines"]]}]}`
if marshaledRequest, err := json.Marshal(request); err != nil {
t.Fatalf("Failed to marshal: %v", err)
} else if string(marshaledRequest) != expectedRequest {
Expand Down

0 comments on commit 696767e

Please sign in to comment.