From 1b545d033945e4c85e943c11371d00ab10e57b03 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Wed, 15 Jan 2025 14:57:21 +0000 Subject: [PATCH] [Hammer] Refactor to allow code reuse with CT (#448) This isn't the full job, as the core of the hammer needs extracting from hammer.go. This is a good start and sets the right direction for making this a general purpose library for use in true tlog-tiles and the static CT variation. --- internal/hammer/hammer.go | 46 +++++++++------- internal/hammer/hammer_test.go | 10 ++-- internal/hammer/{ => loadtest}/client.go | 57 +++++++++++++------- internal/hammer/{ => loadtest}/workerpool.go | 26 ++++----- internal/hammer/{ => loadtest}/workers.go | 28 ++++++---- 5 files changed, 103 insertions(+), 64 deletions(-) rename internal/hammer/{ => loadtest}/client.go (78%) rename internal/hammer/{ => loadtest}/workerpool.go (71%) rename internal/hammer/{ => loadtest}/workers.go (91%) diff --git a/internal/hammer/hammer.go b/internal/hammer/hammer.go index 3b50c47d..a9175137 100644 --- a/internal/hammer/hammer.go +++ b/internal/hammer/hammer.go @@ -30,6 +30,7 @@ import ( movingaverage "github.com/RobinUS2/golang-moving-average" "github.com/transparency-dev/trillian-tessera/client" + "github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest" "golang.org/x/mod/sumdb/note" "golang.org/x/net/http2" @@ -99,7 +100,14 @@ func main() { klog.Exitf("failed to create verifier: %v", err) } - f, w := newLogClientsFromFlags() + f, w, err := loadtest.NewLogClients(logURL, writeLogURL, loadtest.ClientOpts{ + Client: hc, + BearerToken: *bearerToken, + BearerTokenWrite: *bearerTokenWrite, + }) + if err != nil { + klog.Exit(err) + } var cpRaw []byte cons := client.UnilateralConsensus(f.ReadCheckpoint) @@ -118,7 +126,7 @@ func main() { go ha.errorLoop(ctx) gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize, *dupChance) - hammer := NewHammer(&tracker, f.ReadEntryBundle, w.Write, gen, ha.seqLeafChan, ha.errChan) + hammer := NewHammer(&tracker, f.ReadEntryBundle, w, gen, ha.seqLeafChan, ha.errChan) exitCode := 0 if *leafWriteGoal > 0 { @@ -169,17 +177,19 @@ func main() { os.Exit(exitCode) } -func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, w LeafWriter, gen func() []byte, seqLeafChan chan<- leafTime, errChan chan<- error) *Hammer { +func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, w loadtest.LeafWriter, gen func() []byte, seqLeafChan chan<- loadtest.LeafTime, errChan chan<- error) *Hammer { readThrottle := NewThrottle(*maxReadOpsPerSecond) writeThrottle := NewThrottle(*maxWriteOpsPerSecond) - randomReaders := newWorkerPool(func() worker { - return NewLeafReader(tracker, f, RandomNextLeaf(), readThrottle.tokenChan, errChan) + randomReaders := loadtest.NewWorkerPool(func() loadtest.Worker { + return loadtest.NewLeafReader(tracker, f, loadtest.RandomNextLeaf(), readThrottle.tokenChan, errChan) + }) + fullReaders := loadtest.NewWorkerPool(func() loadtest.Worker { + return loadtest.NewLeafReader(tracker, f, loadtest.MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan) }) - fullReaders := newWorkerPool(func() worker { - return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan) + writers := loadtest.NewWorkerPool(func() loadtest.Worker { + return loadtest.NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan) }) - writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan) }) return &Hammer{ randomReaders: randomReaders, @@ -195,9 +205,9 @@ func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, // of write and read operations. The work of analysing the results of hammering should // live outside of this class. type Hammer struct { - randomReaders workerPool - fullReaders workerPool - writers workerPool + randomReaders loadtest.WorkerPool + fullReaders loadtest.WorkerPool + writers loadtest.WorkerPool readThrottle *Throttle writeThrottle *Throttle tracker *client.LogStateTracker @@ -246,7 +256,7 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) { } func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser { - leafSampleChan := make(chan leafTime, 100) + leafSampleChan := make(chan loadtest.LeafTime, 100) errChan := make(chan error, 20) return &HammerAnalyser{ treeSizeFn: treeSizeFn, @@ -260,7 +270,7 @@ func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser { // HammerAnalyser is responsible for measuring and interpreting the result of hammering. type HammerAnalyser struct { treeSizeFn func() uint64 - seqLeafChan chan leafTime + seqLeafChan chan loadtest.LeafTime errChan chan error queueTime *movingaverage.ConcurrentMovingAverage @@ -284,7 +294,7 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { totalLatency := time.Duration(0) queueLatency := time.Duration(0) numLeaves := 0 - var sample *leafTime + var sample *loadtest.LeafTime ReadLoop: for { if sample == nil { @@ -302,16 +312,16 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { // either the current checkpoint or "now": // - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI // - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included). - if sample.idx >= newSize || sample.assignedAt.After(now) { + if sample.Index >= newSize || sample.AssignedAt.After(now) { break } - queueLatency += sample.assignedAt.Sub(sample.queuedAt) + queueLatency += sample.AssignedAt.Sub(sample.QueuedAt) // totalLatency is skewed towards being higher than perhaps it may technically be by: // - the tick interval of this goroutine, // - the tick interval of the goroutine which updates the LogStateTracker, // - any latency in writes to the log becoming visible for reads. // But it's probably good enough for now. - totalLatency += now.Sub(sample.queuedAt) + totalLatency += now.Sub(sample.QueuedAt) numLeaves++ sample = nil @@ -343,7 +353,7 @@ func (a *HammerAnalyser) errorLoop(ctx context.Context) { } case err := <-a.errChan: - if errors.Is(err, ErrRetry) { + if errors.Is(err, loadtest.ErrRetry) { pbCount++ continue } diff --git a/internal/hammer/hammer_test.go b/internal/hammer/hammer_test.go index 465edb9e..5d238bfc 100644 --- a/internal/hammer/hammer_test.go +++ b/internal/hammer/hammer_test.go @@ -19,6 +19,8 @@ import ( "sync" "testing" "time" + + "github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest" ) func TestLeafGenerator(t *testing.T) { @@ -52,10 +54,10 @@ func TestHammerAnalyser_Stats(t *testing.T) { baseTime := time.Now().Add(-1 * time.Minute) for i := 0; i < 10; i++ { - ha.seqLeafChan <- leafTime{ - idx: uint64(i), - queuedAt: baseTime, - assignedAt: baseTime.Add(time.Duration(i) * time.Second), + ha.seqLeafChan <- loadtest.LeafTime{ + Index: uint64(i), + QueuedAt: baseTime, + AssignedAt: baseTime.Add(time.Duration(i) * time.Second), } } treeSize.setSize(10) diff --git a/internal/hammer/client.go b/internal/hammer/loadtest/client.go similarity index 78% rename from internal/hammer/client.go rename to internal/hammer/loadtest/client.go index 63a04028..50d8810a 100644 --- a/internal/hammer/client.go +++ b/internal/hammer/loadtest/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package loadtest import ( "bytes" @@ -39,16 +39,23 @@ type fetcher interface { ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) } -// newLogClientsFromFlags returns a fetcher and a writer that will read +type ClientOpts struct { + BearerToken string + BearerTokenWrite string + + Client *http.Client +} + +// NewLogClients returns a fetcher and a writer that will read // and write leaves to all logs in the `log_url` flag set. -func newLogClientsFromFlags() (*roundRobinFetcher, *roundRobinLeafWriter) { - if len(logURL) == 0 { - klog.Exitf("--log_url must be provided") +func NewLogClients(readLogURLs, writeLogURLs []string, opts ClientOpts) (LogReader, LeafWriter, error) { + if len(readLogURLs) == 0 { + return nil, nil, fmt.Errorf("URL(s) for reading log must be provided") } - if len(writeLogURL) == 0 { + if len(writeLogURLs) == 0 { // If no write_log_url is provided, then default it to log_url - writeLogURL = logURL + writeLogURLs = readLogURLs } rootUrlOrDie := func(s string) *url.URL { @@ -64,30 +71,30 @@ func newLogClientsFromFlags() (*roundRobinFetcher, *roundRobinLeafWriter) { } fetchers := []fetcher{} - for _, s := range logURL { - fetchers = append(fetchers, newFetcher(rootUrlOrDie(s))) + for _, s := range readLogURLs { + fetchers = append(fetchers, newFetcher(rootUrlOrDie(s), opts.BearerToken)) } writers := []httpLeafWriter{} - for _, s := range writeLogURL { + for _, s := range writeLogURLs { addURL, err := rootUrlOrDie(s).Parse("add") if err != nil { - klog.Exitf("Failed to create add URL: %v", err) + return nil, nil, fmt.Errorf("failed to create add URL: %v", err) } - writers = append(writers, httpLeafWriter{u: addURL}) + writers = append(writers, newHTTPLeafWriter(opts.Client, addURL, opts.BearerTokenWrite)) } - return &roundRobinFetcher{f: fetchers}, &roundRobinLeafWriter{ws: writers} + return &roundRobinFetcher{f: fetchers}, (&roundRobinLeafWriter{ws: writers}).Write, nil } // newFetcher creates a Fetcher for the log at the given root location. -func newFetcher(root *url.URL) fetcher { +func newFetcher(root *url.URL, bearerToken string) fetcher { switch root.Scheme { case "http", "https": c, err := client.NewHTTPFetcher(root, nil) if err != nil { klog.Exitf("NewHTTPFetcher: %v", err) } - if *bearerToken != "" { - c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", *bearerToken)) + if bearerToken != "" { + c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", bearerToken)) } return c case "file": @@ -130,8 +137,18 @@ func (rr *roundRobinFetcher) next() fetcher { return f } +func newHTTPLeafWriter(hc *http.Client, u *url.URL, bearerToken string) httpLeafWriter { + return httpLeafWriter{ + hc: hc, + u: u, + bearerToken: bearerToken, + } +} + type httpLeafWriter struct { - u *url.URL + hc *http.Client + u *url.URL + bearerToken string } func (w httpLeafWriter) Write(ctx context.Context, newLeaf []byte) (uint64, error) { @@ -139,10 +156,10 @@ func (w httpLeafWriter) Write(ctx context.Context, newLeaf []byte) (uint64, erro if err != nil { return 0, fmt.Errorf("failed to create request: %v", err) } - if *bearerTokenWrite != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", *bearerTokenWrite)) + if w.bearerToken != "" { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", w.bearerToken)) } - resp, err := hc.Do(req.WithContext(ctx)) + resp, err := w.hc.Do(req.WithContext(ctx)) if err != nil { return 0, fmt.Errorf("failed to write leaf: %v", err) } diff --git a/internal/hammer/workerpool.go b/internal/hammer/loadtest/workerpool.go similarity index 71% rename from internal/hammer/workerpool.go rename to internal/hammer/loadtest/workerpool.go index 6cf35cdc..9ce804d8 100644 --- a/internal/hammer/workerpool.go +++ b/internal/hammer/loadtest/workerpool.go @@ -12,40 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package loadtest import "context" -type worker interface { +type Worker interface { Run(ctx context.Context) Kill() } +// NewWorkerPool creates a simple pool of workers. +// // This works well enough for the simple task we ask of it at the moment. // If we find ourselves adding more features to this, consider swapping it // for a library such as https://github.com/alitto/pond. -func newWorkerPool(factory func() worker) workerPool { - workers := make([]worker, 0) - pool := workerPool{ +func NewWorkerPool(factory func() Worker) WorkerPool { + workers := make([]Worker, 0) + pool := WorkerPool{ workers: workers, factory: factory, } return pool } -// workerPool contains a collection of _running_ workers. -type workerPool struct { - workers []worker - factory func() worker +// WorkerPool contains a collection of _running_ workers. +type WorkerPool struct { + workers []Worker + factory func() Worker } -func (p *workerPool) Grow(ctx context.Context) { +func (p *WorkerPool) Grow(ctx context.Context) { w := p.factory() p.workers = append(p.workers, w) go w.Run(ctx) } -func (p *workerPool) Shrink(ctx context.Context) { +func (p *WorkerPool) Shrink(ctx context.Context) { if len(p.workers) == 0 { return } @@ -54,6 +56,6 @@ func (p *workerPool) Shrink(ctx context.Context) { w.Kill() } -func (p *workerPool) Size() int { +func (p *WorkerPool) Size() int { return len(p.workers) } diff --git a/internal/hammer/workers.go b/internal/hammer/loadtest/workers.go similarity index 91% rename from internal/hammer/workers.go rename to internal/hammer/loadtest/workers.go index 2f17605d..2ba816c9 100644 --- a/internal/hammer/workers.go +++ b/internal/hammer/loadtest/workers.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package loadtest import ( "context" @@ -32,6 +32,14 @@ import ( // number at which this data will be found in the log, or an error. type LeafWriter func(ctx context.Context, data []byte) (uint64, error) +type LogReader interface { + ReadCheckpoint(ctx context.Context) ([]byte, error) + + ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) + + ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) +} + // NewLeafReader creates a LeafReader. // The next function provides a strategy for which leaves will be read. // Custom implementations can be passed, or use RandomNextLeaf or MonotonicallyIncreasingNextLeaf. @@ -153,20 +161,20 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 { } } -// leafTime records the time at which a leaf was assigned the given index. +// LeafTime records the time at which a leaf was assigned the given index. // // This is used when sampling leaves which are added in order to later calculate // how long it took to for them to become integrated. -type leafTime struct { - idx uint64 - queuedAt time.Time - assignedAt time.Time +type LeafTime struct { + Index uint64 + QueuedAt time.Time + AssignedAt time.Time } // NewLogWriter creates a LogWriter. // u is the URL of the write endpoint for the log. // gen is a function that generates new leaves to add. -func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- leafTime) *LogWriter { +func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- LeafTime) *LogWriter { return &LogWriter{ writer: writer, gen: gen, @@ -182,7 +190,7 @@ type LogWriter struct { gen func() []byte throttle <-chan bool errChan chan<- error - leafChan chan<- leafTime + leafChan chan<- LeafTime cancel func() } @@ -199,13 +207,13 @@ func (w *LogWriter) Run(ctx context.Context) { case <-w.throttle: } newLeaf := w.gen() - lt := leafTime{queuedAt: time.Now()} + lt := LeafTime{QueuedAt: time.Now()} index, err := w.writer(ctx, newLeaf) if err != nil { w.errChan <- fmt.Errorf("failed to create request: %w", err) continue } - lt.idx, lt.assignedAt = index, time.Now() + lt.Index, lt.AssignedAt = index, time.Now() // See if we can send a leaf sample select { // TODO: we might want to count dropped samples, and/or make sampling a bit more statistical.