Skip to content

Commit f370408

Browse files
committed
feat: implement command retrieval with pagination and cursor support
1 parent 5af0179 commit f370408

File tree

10 files changed

+217
-23
lines changed

10 files changed

+217
-23
lines changed

internal/db/command_repository.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"time"
89

10+
"github.com/google/uuid"
911
"github.com/jackc/pgx/v5/pgconn"
1012
"github.com/jackc/pgx/v5/pgxpool"
13+
"github.com/raphico/go-device-telemetry-api/internal/common/pagination"
1114
"github.com/raphico/go-device-telemetry-api/internal/domain/command"
1215
"github.com/raphico/go-device-telemetry-api/internal/domain/device"
1316
)
@@ -57,3 +60,84 @@ func (r *CommandRepository) Create(ctx context.Context, c *command.Command) erro
5760

5861
return nil
5962
}
63+
64+
func (r *CommandRepository) FindCommands(
65+
ctx context.Context,
66+
deviceID device.DeviceID,
67+
limit int,
68+
cursor *pagination.Cursor,
69+
) ([]*command.Command, *pagination.Cursor, error) {
70+
var query string
71+
var args []any
72+
73+
if cursor == nil {
74+
query = `
75+
SELECT id, device_id, command_name, payload, status, executed_at, created_at
76+
FROM commands
77+
WHERE device_id = $1
78+
ORDER BY created_at ASC, id ASC
79+
LIMIT $2
80+
`
81+
args = []any{deviceID, limit + 1}
82+
} else {
83+
query = `
84+
SELECT id, device_id, command_name, payload, status, executed_at, created_at
85+
FROM commands
86+
WHERE device_id = $1
87+
AND (created_at, id) > ($2, $3)
88+
ORDER BY created_at ASC, id ASC
89+
LIMIT $4
90+
`
91+
args = []any{deviceID, cursor.CreatedAt, cursor.ID, limit + 1}
92+
}
93+
94+
rows, err := r.db.Query(ctx, query, args...)
95+
if err != nil {
96+
return nil, nil, fmt.Errorf("failed to query commands: %w", err)
97+
}
98+
99+
var result []*command.Command
100+
for rows.Next() {
101+
var (
102+
id uuid.UUID
103+
deviceID uuid.UUID
104+
commandName string
105+
payload []byte
106+
status string
107+
executedAt *time.Time
108+
createdAt time.Time
109+
)
110+
111+
if err := rows.Scan(&id, &deviceID, &commandName, &payload, &status, &executedAt, &createdAt); err != nil {
112+
return nil, nil, fmt.Errorf("failed to scan commands: %w", err)
113+
}
114+
115+
t, err := command.RehydrateCommand(
116+
id,
117+
deviceID,
118+
commandName,
119+
payload,
120+
status,
121+
executedAt,
122+
createdAt,
123+
)
124+
if err != nil {
125+
return nil, nil, fmt.Errorf("failed to rehydrate command: %w", err)
126+
}
127+
128+
result = append(result, t)
129+
}
130+
131+
if err := rows.Err(); err != nil {
132+
return nil, nil, fmt.Errorf("rows error: %w", err)
133+
}
134+
135+
var nextCur *pagination.Cursor
136+
if len(result) > limit {
137+
lastVisible := result[limit-1]
138+
result = result[:limit]
139+
nextCur = pagination.NewCursor(uuid.UUID(lastVisible.ID), lastVisible.CreatedAt)
140+
}
141+
142+
return result, nextCur, nil
143+
}

internal/domain/command/command.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package command
22

33
import (
4+
"encoding/json"
5+
"fmt"
46
"time"
57

8+
"github.com/google/uuid"
69
"github.com/raphico/go-device-telemetry-api/internal/domain/device"
710
)
811

@@ -36,3 +39,38 @@ func (c *Command) UpdateStatus(value string) error {
3639
c.Status = status
3740
return nil
3841
}
42+
43+
func RehydrateCommand(
44+
id uuid.UUID,
45+
deviceID uuid.UUID,
46+
name string,
47+
payloadBytes []byte,
48+
status string,
49+
executedAt *time.Time,
50+
createdAt time.Time,
51+
) (*Command, error) {
52+
n, err := NewName(name)
53+
if err != nil {
54+
return nil, fmt.Errorf("corrupt command name: %w", err)
55+
}
56+
57+
var payload Payload
58+
if err := json.Unmarshal(payloadBytes, &payload); err != nil {
59+
return nil, fmt.Errorf("corrupt payload: %w", err)
60+
}
61+
62+
s, err := NewStatus(status)
63+
if err != nil {
64+
return nil, fmt.Errorf("corrupt status: %w", err)
65+
}
66+
67+
return &Command{
68+
ID: CommandID(id),
69+
DeviceID: device.DeviceID(deviceID),
70+
Name: n,
71+
Payload: payload,
72+
Status: s,
73+
ExecutedAt: executedAt,
74+
CreatedAt: createdAt,
75+
}, nil
76+
}
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
package command
22

3-
import "context"
3+
import (
4+
"context"
5+
6+
"github.com/raphico/go-device-telemetry-api/internal/common/pagination"
7+
"github.com/raphico/go-device-telemetry-api/internal/domain/device"
8+
)
49

510
type Repository interface {
611
Create(ctx context.Context, c *Command) error
12+
FindCommands(
13+
ctx context.Context,
14+
deviceID device.DeviceID,
15+
limit int,
16+
cursor *pagination.Cursor,
17+
) ([]*Command, *pagination.Cursor, error)
718
}

internal/domain/command/service.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package command
33
import (
44
"context"
55

6+
"github.com/raphico/go-device-telemetry-api/internal/common/pagination"
67
"github.com/raphico/go-device-telemetry-api/internal/domain/device"
78
)
89

@@ -29,3 +30,12 @@ func (s *Service) CreateCommand(
2930

3031
return command, nil
3132
}
33+
34+
func (s *Service) ListDeviceCommands(
35+
ctx context.Context,
36+
deviceID device.DeviceID,
37+
limit int,
38+
cursor *pagination.Cursor,
39+
) ([]*Command, *pagination.Cursor, error) {
40+
return s.repo.FindCommands(ctx, deviceID, limit, cursor)
41+
}

internal/domain/telemetry/telemetry.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,9 @@ func RehydrateTelemetry(
4545
return nil, fmt.Errorf("corrupt telemetry type: %w", err)
4646
}
4747

48-
var payload map[string]any
49-
if payloadBytes == nil {
50-
payload = make(map[string]any)
51-
} else {
52-
err := json.Unmarshal(payloadBytes, &payload)
53-
if err != nil {
54-
return nil, fmt.Errorf("corrupt payload: %w", err)
55-
}
48+
var payload Payload
49+
if err := json.Unmarshal(payloadBytes, &payload); err != nil {
50+
return nil, fmt.Errorf("corrupt payload: %w", err)
5651
}
5752

5853
r, err := RecordedAtFromTime(recordedAt)

internal/http/command_handler.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"net/http"
8+
"strconv"
89

910
"github.com/go-chi/chi/v5"
11+
"github.com/raphico/go-device-telemetry-api/internal/common/pagination"
1012
"github.com/raphico/go-device-telemetry-api/internal/domain/command"
1113
"github.com/raphico/go-device-telemetry-api/internal/domain/device"
1214
"github.com/raphico/go-device-telemetry-api/internal/logger"
@@ -29,7 +31,7 @@ type createCommandRequest struct {
2931
Payload any `json:"payload"`
3032
}
3133

32-
type createCommandResponse struct {
34+
type commandResponse struct {
3335
ID string `json:"id"`
3436
CommandName string `json:"command_name"`
3537
Payload any `json:"payload"`
@@ -74,7 +76,7 @@ func (h *CommandHandler) HandleCreateCommand(w http.ResponseWriter, r *http.Requ
7476
return
7577
}
7678

77-
res := createCommandResponse{
79+
res := commandResponse{
7880
ID: c.ID.String(),
7981
CommandName: c.Name.String(),
8082
Payload: c.Payload,
@@ -83,3 +85,61 @@ func (h *CommandHandler) HandleCreateCommand(w http.ResponseWriter, r *http.Requ
8385

8486
WriteJSON(w, http.StatusCreated, res, nil)
8587
}
88+
89+
func (h *CommandHandler) HandleGetDeviceCommands(w http.ResponseWriter, r *http.Request) {
90+
deviceIDStr := chi.URLParam(r, "device_id")
91+
deviceID, err := device.NewDeviceID(deviceIDStr)
92+
if err != nil {
93+
WriteJSONError(w, http.StatusBadRequest, invalidRequest, "invalid device id")
94+
return
95+
}
96+
97+
limit := pagination.DefaultLimit
98+
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
99+
if v, err := strconv.Atoi(limitStr); err != nil || v < 0 {
100+
WriteJSONError(w, http.StatusBadRequest, invalidRequest, "limit must be a positive integer")
101+
return
102+
} else {
103+
limit = pagination.ClampLimit(v)
104+
}
105+
}
106+
107+
var cur *pagination.Cursor
108+
if cstr := r.URL.Query().Get("cursor"); cstr != "" {
109+
if decoded, err := pagination.Decode(cstr); err != nil {
110+
WriteJSONError(w, http.StatusBadRequest, invalidRequest, err.Error())
111+
return
112+
} else {
113+
cur = &decoded
114+
}
115+
}
116+
117+
commands, next, err := h.command.ListDeviceCommands(r.Context(), deviceID, limit, cur)
118+
if err != nil {
119+
h.log.Error(fmt.Sprintf("failed to get device commands: %v", err))
120+
WriteInternalError(w)
121+
return
122+
}
123+
124+
out := make([]commandResponse, 0, len(commands))
125+
for _, c := range commands {
126+
out = append(out, commandResponse{
127+
ID: c.ID.String(),
128+
CommandName: c.Name.String(),
129+
Payload: c.Payload,
130+
Status: c.Status.String(),
131+
})
132+
}
133+
134+
var nextStr string
135+
if next != nil {
136+
nextStr = pagination.Encode(*next)
137+
}
138+
139+
meta := pageMeta{
140+
NextCursor: nextStr,
141+
Limit: limit,
142+
}
143+
144+
WriteJSON(w, http.StatusOK, out, meta)
145+
}

internal/http/device_handler.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,6 @@ func (h *DeviceHandler) HandleGetDevice(w http.ResponseWriter, r *http.Request)
141141
WriteJSON(w, http.StatusOK, res, nil)
142142
}
143143

144-
type listDevicesMeta struct {
145-
NextCursor string `json:"next_cursor,omitempty"`
146-
Limit int `json:"limit"`
147-
}
148-
149144
func (h *DeviceHandler) HandleListDevices(w http.ResponseWriter, r *http.Request) {
150145
userId, ok := GetUserID(r.Context())
151146
if !ok {
@@ -197,7 +192,7 @@ func (h *DeviceHandler) HandleListDevices(w http.ResponseWriter, r *http.Request
197192
nextStr = s
198193
}
199194

200-
meta := listDevicesMeta{
195+
meta := pageMeta{
201196
NextCursor: nextStr,
202197
Limit: limit,
203198
}

internal/http/response.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ type responseEnvelope struct {
1717
Error *errorPayload `json:"error,omitempty"`
1818
}
1919

20+
type pageMeta struct {
21+
NextCursor string `json:"next_cursor,omitempty"`
22+
Limit int `json:"limit"`
23+
}
24+
2025
func WriteJSONError(w http.ResponseWriter, status int, code errorCode, msg string) {
2126
w.Header().Set("Content-Type", "application/json")
2227
w.WriteHeader(status)

internal/http/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func NewRouter(
5151

5252
r.Route("/{device_id}/commands", func(r chi.Router) {
5353
r.Post("/", commandHandler.HandleCreateCommand)
54+
r.Get("/", commandHandler.HandleGetDeviceCommands)
5455
})
5556
})
5657
})

internal/http/telemetry_handler.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ func (h *TelemetryHandler) HandleCreateTelemetry(w http.ResponseWriter, r *http.
9494
WriteJSON(w, http.StatusCreated, res, nil)
9595
}
9696

97-
type getDeviceTelemetryMeta struct {
98-
NextCursor string `json:"next_cursor,omitempty"`
99-
Limit int `json:"limit"`
100-
}
101-
10297
func (h *TelemetryHandler) HandleGetDeviceTelemetry(w http.ResponseWriter, r *http.Request) {
10398
deviceIDStr := chi.URLParam(r, "device_id")
10499
deviceID, err := device.NewDeviceID(deviceIDStr)
@@ -149,7 +144,7 @@ func (h *TelemetryHandler) HandleGetDeviceTelemetry(w http.ResponseWriter, r *ht
149144
nextStr = pagination.Encode(*next)
150145
}
151146

152-
meta := getDeviceTelemetryMeta{
147+
meta := pageMeta{
153148
NextCursor: nextStr,
154149
Limit: limit,
155150
}

0 commit comments

Comments
 (0)