Skip to content

Commit

Permalink
refactor(auth): send caddy access logs directly to metric-drain
Browse files Browse the repository at this point in the history
This change removes the need to hop from `container-drain` to
`metric-drain`.  Instead we are going to process access logs directly
inside of our `metric-drain` function.
  • Loading branch information
neurosnap committed Dec 17, 2024
1 parent 931fca5 commit 9b9e1db
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 86 deletions.
114 changes: 28 additions & 86 deletions auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
"github.com/picosh/utils"
"github.com/picosh/utils/pipe"
"github.com/picosh/utils/pipe/metrics"
)

Expand Down Expand Up @@ -583,35 +582,32 @@ func checkoutHandler() http.HandlerFunc {
}
}

type AccessLogReq struct {
RemoteIP string `json:"remote_ip"`
RemotePort string `json:"remote_port"`
ClientIP string `json:"client_ip"`
Method string `json:"method"`
Host string `json:"host"`
Uri string `json:"uri"`
Headers struct {
UserAgent []string `json:"User-Agent"`
Referer []string `json:"Referer"`
} `json:"headers"`
Tls struct {
ServerName string `json:"server_name"`
} `json:"tls"`
type AccessLog struct {
Status int `json:"status"`
ServerID string `json:"server_id"`
Request AccessLogReq `json:"request"`
RespHeaders AccessRespHeaders `json:"resp_headers"`
}

type RespHeaders struct {
ContentType []string `json:"Content-Type"`
type AccessLogReqHeaders struct {
UserAgent []string `json:"User-Agent"`
Referer []string `json:"Referer"`
}

type AccessLogReq struct {
ClientIP string `json:"client_ip"`
Method string `json:"method"`
Host string `json:"host"`
Uri string `json:"uri"`
Headers AccessLogReqHeaders `json:"headers"`
}

type CaddyAccessLog struct {
Request AccessLogReq `json:"request"`
Status int `json:"status"`
RespHeaders RespHeaders `json:"resp_headers"`
ServiceID string `json:"server_id"`
type AccessRespHeaders struct {
ContentType []string `json:"Content-Type"`
}

func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
spaceRaw := strings.SplitN(access.ServiceID, ".", 2)
func deserializeCaddyAccessLog(dbpool db.DB, access *AccessLog) (*db.AnalyticsVisits, error) {
spaceRaw := strings.SplitN(access.ServerID, ".", 2)
space := spaceRaw[0]
host := access.Request.Host
path := access.Request.Uri
Expand Down Expand Up @@ -676,62 +672,8 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
}, nil
}

// this feels really stupid because i'm taking containter-drain,
// filtering it, and then sending it to metric-drain. The
// metricDrainSub function listens on the metric-drain and saves it.
// So why not just call the necessary functions to save the visit?
// We want to be able to use pipe as a debugging tool which means we
// can manually sub to `metric-drain` and have a nice clean output to view.
func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
info := shared.NewPicoPipeClient()
drain := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
info,
"container drain",
"sub container-drain -k",
100,
-1,
)

send := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
info,
"from container drain to metric drain",
"pub metric-drain -b=false",
100,
-1,
)

for {
scanner := bufio.NewScanner(drain)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "http.log.access") {
clean := strings.TrimSpace(line)
visit, err := accessLogToVisit(dbpool, clean)
if err != nil {
logger.Debug("could not convert access log to a visit", "err", err)
continue
}
jso, err := json.Marshal(visit)
if err != nil {
logger.Error("could not marshal json of a visit", "err", err)
continue
}
jso = append(jso, []byte("\n")...)
_, err = send.Write(jso)
if err != nil {
logger.Error("could not write to metric-drain", "err", err)
}
}
}
}
}

func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
accessLog := CaddyAccessLog{}
accessLog := AccessLog{}
err := json.Unmarshal([]byte(line), &accessLog)
if err != nil {
return nil, err
Expand All @@ -753,14 +695,16 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
scanner := bufio.NewScanner(drain)
for scanner.Scan() {
line := scanner.Text()
visit := db.AnalyticsVisits{}
err := json.Unmarshal([]byte(line), &visit)
clean := strings.TrimSpace(line)

visit, err := accessLogToVisit(dbpool, clean)
if err != nil {
logger.Info("could not unmarshal json", "err", err, "line", line)
logger.Error("could not convert access log to a visit", "err", err)
continue
}

logger.Info("received visit", "visit", visit)
err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
if err != nil {
logger.Info("could not record analytics visit", "err", err)
continue
Expand All @@ -772,7 +716,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
}

logger.Info("inserting visit", "visit", visit)
err = dbpool.InsertVisit(&visit)
err = dbpool.InsertVisit(visit)
if err != nil {
logger.Error("could not insert visit record", "err", err)
}
Expand Down Expand Up @@ -846,8 +790,6 @@ func StartApiServer() {

ctx := context.Background()

// convert container logs to access logs
go containerDrainSub(ctx, db, logger)
// gather metrics in the auth service
go metricDrainSub(ctx, db, logger, cfg.Secret)

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:
- all
pipemgr:
image: ghcr.io/picosh/pipemgr:latest
command: /pipemgr -command "pub metric-drain -b=false"
restart: always
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
Expand Down

0 comments on commit 9b9e1db

Please sign in to comment.