Skip to content

Commit 3d8227b

Browse files
committed
Now using zapx.HumanDuration for all our Duration logging needs
This will print the string duration a bit more nicely with the unit suffix to better show the duration to the user.
1 parent 557a32c commit 3d8227b

15 files changed

Lines changed: 56 additions & 37 deletions

File tree

cmd/substreams-sink-sql/inject_csv.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
. "github.com/streamingfast/cli"
2020
"github.com/streamingfast/cli/sflags"
2121
"github.com/streamingfast/dstore"
22+
"github.com/streamingfast/logging/zapx"
2223
db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db"
2324
"go.uber.org/zap"
2425
)
@@ -79,7 +80,7 @@ func injectCSVE(cmd *cobra.Command, args []string) error {
7980
return fmt.Errorf("table filler %q: %w", tableName, err)
8081
}
8182

82-
zlog.Info("table done", zap.Duration("total", time.Since(t0)))
83+
zlog.Info("table done", zapx.HumanDuration("total", time.Since(t0)))
8384
return nil
8485
}
8586

@@ -232,7 +233,7 @@ func (t *TableFiller) injectCSVFromReader(ctx context.Context, fl io.Reader, sou
232233
zap.String("source", source),
233234
zap.String("table_name", t.tblName),
234235
zap.Int64("rows_affected", count),
235-
zap.Duration("elapsed", elapsed),
236+
zapx.HumanDuration("elapsed", elapsed),
236237
)
237238

238239
return nil

cmd/substreams-sink-sql/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
. "github.com/streamingfast/cli"
1313
"github.com/streamingfast/cli/sflags"
1414
"github.com/streamingfast/dmetrics"
15+
"github.com/streamingfast/logging/zapx"
1516
"go.uber.org/zap"
1617
)
1718

@@ -50,7 +51,7 @@ func preStart(cmd *cobra.Command, _ []string) {
5051

5152
delay := sflags.MustGetDuration(cmd, "delay-before-start")
5253
if delay > 0 {
53-
zlog.Info("sleeping to respect delay before start setting", zap.Duration("delay", delay))
54+
zlog.Info("sleeping to respect delay before start setting", zapx.HumanDuration("delay", delay))
5455
time.Sleep(delay)
5556
}
5657

db_changes/bundler/stats.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/streamingfast/bstream"
77
"github.com/streamingfast/dmetrics"
8+
"github.com/streamingfast/logging/zapx"
89
"go.uber.org/zap"
910
)
1011

@@ -61,14 +62,14 @@ func (s *boundaryStats) Log() []zap.Field {
6162
return []zap.Field{
6263
zap.Uint64("file_count", s.totalBoundaryCount),
6364
zap.Stringer("boundary", s.boundary),
64-
zap.Duration("boundary_process_duration", s.boundaryProcessTime),
65-
zap.Duration("upload_duration", s.uploadedDuration),
66-
zap.Duration("data_process_duration", s.procesingDataTime),
67-
zap.Duration("avg_upload_duration", s.avgUploadDuration.Average()),
68-
zap.Duration("total_upload_duration", s.avgUploadDuration.Total()),
69-
zap.Duration("avg_boundary_process_duration", s.avgBoundaryProcessDuration.Average()),
70-
zap.Duration("total_boundary_process_duration", s.avgBoundaryProcessDuration.Total()),
71-
zap.Duration("avg_data_process_duration", s.avgDataProcessDuration.Average()),
72-
zap.Duration("total_data_process_duration", s.avgDataProcessDuration.Total()),
65+
zapx.HumanDuration("boundary_process_duration", s.boundaryProcessTime),
66+
zapx.HumanDuration("upload_duration", s.uploadedDuration),
67+
zapx.HumanDuration("data_process_duration", s.procesingDataTime),
68+
zapx.HumanDuration("avg_upload_duration", s.avgUploadDuration.Average()),
69+
zapx.HumanDuration("total_upload_duration", s.avgUploadDuration.Total()),
70+
zapx.HumanDuration("avg_boundary_process_duration", s.avgBoundaryProcessDuration.Average()),
71+
zapx.HumanDuration("total_boundary_process_duration", s.avgBoundaryProcessDuration.Total()),
72+
zapx.HumanDuration("avg_data_process_duration", s.avgDataProcessDuration.Average()),
73+
zapx.HumanDuration("total_data_process_duration", s.avgDataProcessDuration.Total()),
7374
}
7475
}

db_changes/db/flush.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/ClickHouse/clickhouse-go/v2"
9+
"github.com/streamingfast/logging/zapx"
910
sink "github.com/streamingfast/substreams-sink"
1011
"go.uber.org/zap"
1112
)
@@ -42,7 +43,7 @@ func (l *Loader) Flush(ctx context.Context, outputModuleHash string, cursor *sin
4243
l.reset()
4344

4445
// We add + 1 to the table count because the `cursors` table is an implicit table
45-
l.logger.Debug("flushed table(s) rows to database", zap.Int("table_count", l.entries.Len()+1), zap.Int("row_count", rowFlushedCount), zap.Duration("took", time.Since(startAt)))
46+
l.logger.Debug("flushed table(s) rows to database", zap.Int("table_count", l.entries.Len()+1), zap.Int("row_count", rowFlushedCount), zapx.HumanDuration("took", time.Since(startAt)))
4647
return rowFlushedCount, nil
4748
}
4849

db_changes/sinker/generate_csv_sinker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/streamingfast/dstore"
1717
"github.com/streamingfast/logging"
18+
"github.com/streamingfast/logging/zapx"
1819
"github.com/streamingfast/shutter"
1920
sink "github.com/streamingfast/substreams-sink"
2021
pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1"
@@ -154,7 +155,7 @@ func (s *GenerateCSVSinker) Run(ctx context.Context) {
154155
s.stats.Start(logEach, cursor)
155156

156157
s.logger.Info("starting sql generate CSV sink",
157-
zap.Duration("stats_refresh_each", logEach),
158+
zapx.HumanDuration("stats_refresh_each", logEach),
158159
zap.Stringer("restarting_at", cursor.Block()),
159160
zap.String("loader", s.loader.GetIdentifier()),
160161
)

db_changes/sinker/sinker.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/streamingfast/logging"
11+
"github.com/streamingfast/logging/zapx"
1112
"github.com/streamingfast/shutter"
1213
sink "github.com/streamingfast/substreams-sink"
1314
pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1"
@@ -109,7 +110,7 @@ func (s *SQLSinker) Run(ctx context.Context) {
109110
s.stats.Start(logEach, cursor)
110111

111112
s.logger.Info("starting sql sink",
112-
zap.Duration("stats_refresh_each", logEach),
113+
zapx.HumanDuration("stats_refresh_each", logEach),
113114
zap.Stringer("restarting_at", cursor.Block()),
114115
zap.String("loader", s.loader.GetIdentifier()),
115116
)
@@ -129,7 +130,7 @@ func (s *SQLSinker) flushWithRetry(ctx context.Context, moduleHash string, curso
129130
s.logger.Warn("retrying flush after error",
130131
zap.Int("attempt", attempt),
131132
zap.Int("max_retries", retries),
132-
zap.Duration("delay", delay),
133+
zapx.HumanDuration("delay", delay),
133134
zap.Error(lastErr))
134135

135136
select {
@@ -194,7 +195,7 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
194195
zap.Bool("block_flush_needed_before_timing_check", blockFlushNeeded))
195196

196197
if blockFlushNeeded && isLive != nil && *isLive && s.stats.AverageFlushDuration() > data.Clock.Timestamp.AsTime().Sub(s.lastAppliedBlockTime) {
197-
s.logger.Debug("skipping a flush because we are LIVE and flush average duration is above time between blocks", zap.Duration("flush_duration_average", s.stats.AverageFlushDuration()), zap.Time("last_block_time", s.lastAppliedBlockTime), zap.Time("block_time", data.Clock.Timestamp.AsTime()))
198+
s.logger.Debug("skipping a flush because we are LIVE and flush average duration is above time between blocks", zapx.HumanDuration("flush_duration_average", s.stats.AverageFlushDuration()), zap.Time("last_block_time", s.lastAppliedBlockTime), zap.Time("block_time", data.Clock.Timestamp.AsTime()))
198199
blockFlushNeeded = false
199200
}
200201

@@ -225,7 +226,7 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
225226
level = zap.WarnLevel
226227
}
227228

228-
s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zap.Duration("took", flushDuration))
229+
s.logger.Check(level, "flush to database took a long time to complete, could cause long sync time along the road").Write(zapx.HumanDuration("took", flushDuration))
229230
}
230231

231232
FlushCount.Inc()

db_proto/sinker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/streamingfast/logging/zapx"
910
sink "github.com/streamingfast/substreams-sink"
1011
sql "github.com/streamingfast/substreams-sink-sql/db_proto/sql"
1112
"github.com/streamingfast/substreams-sink-sql/db_proto/stats"
@@ -120,7 +121,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
120121
holding = append(holding, holder)
121122
if data.Clock.Number > (s.lastAppliedBlockNum+s.blockBatchSize) || s.blockBatchSize == 1 || (isLive != nil && *isLive) {
122123
if isLive != nil && *isLive && s.stats.FlushDuration.Average() > data.Clock.Timestamp.AsTime().Sub(s.lastAppliedBlockTime) {
123-
s.logger.Debug("skipping a flush because we are LIVE and flush average duration is above time between blocks", zap.Duration("flush_duration_average", s.stats.FlushDuration.Average()), zap.Time("last_block_time", s.lastAppliedBlockTime), zap.Time("block_time", data.Clock.Timestamp.AsTime()))
124+
s.logger.Debug("skipping a flush because we are LIVE and flush average duration is above time between blocks", zapx.HumanDuration("flush_duration_average", s.stats.FlushDuration.Average()), zap.Time("last_block_time", s.lastAppliedBlockTime), zap.Time("block_time", data.Clock.Timestamp.AsTime()))
124125
return nil
125126
}
126127

db_proto/sql/click_house/accumulator_inserter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ func (i *AccumulatorInserter) flush(database *Database) error {
608608
}
609609
i.accumulators = accs
610610

611-
i.logger.Debug("flushing done", zap.Duration("duration", time.Since(start)), zap.Int("rows", rowCount))
611+
i.logger.Debug("flushing done", zapx.HumanDuration("duration", time.Since(start)), zap.Int("rows", rowCount))
612612

613613
return nil
614614
}

db_proto/sql/click_house/database.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/ClickHouse/ch-go"
1414
"github.com/streamingfast/logging"
15+
"github.com/streamingfast/logging/zapx"
1516
sink "github.com/streamingfast/substreams-sink"
1617
"github.com/streamingfast/substreams-sink-sql/bytes"
1718
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
@@ -415,7 +416,7 @@ func (d *Database) HandleBlocksUndo(lastValidBlockNum uint64) error {
415416
// return fmt.Errorf("optimizing table: %w", err)
416417
//}
417418

418-
d.logger.Info("undo completed", zap.String("table", "_block_"), zap.Duration("duration", time.Since(start)))
419+
d.logger.Info("undo completed", zap.String("table", "_block_"), zapx.HumanDuration("duration", time.Since(start)))
419420

420421
for _, table := range tables {
421422
d.logger.Info("undoing blocks", zap.String("table", table.Name), zap.Uint64("last_valid_block_num", lastValidBlockNum))
@@ -467,7 +468,7 @@ func (d *Database) HandleBlocksUndo(lastValidBlockNum uint64) error {
467468
// Body: fmt.Sprintf("OPTIMIZE TABLE %s FINAL CLEANUP;", tableFullName),
468469
//})
469470

470-
d.logger.Info("undo completed", zap.String("table", table.Name), zap.Duration("duration", time.Since(start)))
471+
d.logger.Info("undo completed", zap.String("table", table.Name), zapx.HumanDuration("duration", time.Since(start)))
471472
}
472473
err = d.CommitTransaction()
473474
if err != nil {

db_proto/sql/postgres/database.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"hash/fnv"
88
"time"
99

10+
"github.com/streamingfast/logging/zapx"
1011
sink "github.com/streamingfast/substreams-sink"
1112
"github.com/streamingfast/substreams-sink-sql/bytes"
1213
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
@@ -148,7 +149,7 @@ func (d *Database) applyConstraints() error {
148149
return fmt.Errorf("executing fk constraint statement: %w %s", err, constraint.Sql)
149150
}
150151
}
151-
d.logger.Info("applying constraints", zap.Duration("duration", time.Since(startAt)))
152+
d.logger.Info("applying constraints", zapx.HumanDuration("duration", time.Since(startAt)))
152153
return nil
153154
}
154155

0 commit comments

Comments
 (0)