Skip to content

Commit

Permalink
[Hammer] Refactor to allow code reuse with CT (#448)
Browse files Browse the repository at this point in the history
This isn't the full job, as the core of the hammer needs extracting from
hammer.go. This is a good start and sets the right direction for making
this a general purpose library for use in true tlog-tiles and the static
CT variation.
  • Loading branch information
mhutchinson authored Jan 15, 2025
1 parent 770517a commit 1b545d0
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 64 deletions.
46 changes: 28 additions & 18 deletions internal/hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/transparency-dev/trillian-tessera/client"
"github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest"
"golang.org/x/mod/sumdb/note"
"golang.org/x/net/http2"

Expand Down Expand Up @@ -99,7 +100,14 @@ func main() {
klog.Exitf("failed to create verifier: %v", err)
}

f, w := newLogClientsFromFlags()
f, w, err := loadtest.NewLogClients(logURL, writeLogURL, loadtest.ClientOpts{
Client: hc,
BearerToken: *bearerToken,
BearerTokenWrite: *bearerTokenWrite,
})
if err != nil {
klog.Exit(err)
}

var cpRaw []byte
cons := client.UnilateralConsensus(f.ReadCheckpoint)
Expand All @@ -118,7 +126,7 @@ func main() {
go ha.errorLoop(ctx)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize, *dupChance)
hammer := NewHammer(&tracker, f.ReadEntryBundle, w.Write, gen, ha.seqLeafChan, ha.errChan)
hammer := NewHammer(&tracker, f.ReadEntryBundle, w, gen, ha.seqLeafChan, ha.errChan)

exitCode := 0
if *leafWriteGoal > 0 {
Expand Down Expand Up @@ -169,17 +177,19 @@ func main() {
os.Exit(exitCode)
}

func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, w LeafWriter, gen func() []byte, seqLeafChan chan<- leafTime, errChan chan<- error) *Hammer {
func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, w loadtest.LeafWriter, gen func() []byte, seqLeafChan chan<- loadtest.LeafTime, errChan chan<- error) *Hammer {
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)

randomReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, RandomNextLeaf(), readThrottle.tokenChan, errChan)
randomReaders := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLeafReader(tracker, f, loadtest.RandomNextLeaf(), readThrottle.tokenChan, errChan)
})
fullReaders := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLeafReader(tracker, f, loadtest.MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
writers := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan) })

return &Hammer{
randomReaders: randomReaders,
Expand All @@ -195,9 +205,9 @@ func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc,
// of write and read operations. The work of analysing the results of hammering should
// live outside of this class.
type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
randomReaders loadtest.WorkerPool
fullReaders loadtest.WorkerPool
writers loadtest.WorkerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
Expand Down Expand Up @@ -246,7 +256,7 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
}

func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser {
leafSampleChan := make(chan leafTime, 100)
leafSampleChan := make(chan loadtest.LeafTime, 100)
errChan := make(chan error, 20)
return &HammerAnalyser{
treeSizeFn: treeSizeFn,
Expand All @@ -260,7 +270,7 @@ func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser {
// HammerAnalyser is responsible for measuring and interpreting the result of hammering.
type HammerAnalyser struct {
treeSizeFn func() uint64
seqLeafChan chan leafTime
seqLeafChan chan loadtest.LeafTime
errChan chan error

queueTime *movingaverage.ConcurrentMovingAverage
Expand All @@ -284,7 +294,7 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *leafTime
var sample *loadtest.LeafTime
ReadLoop:
for {
if sample == nil {
Expand All @@ -302,16 +312,16 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.idx >= newSize || sample.assignedAt.After(now) {
if sample.Index >= newSize || sample.AssignedAt.After(now) {
break
}
queueLatency += sample.assignedAt.Sub(sample.queuedAt)
queueLatency += sample.AssignedAt.Sub(sample.QueuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.queuedAt)
totalLatency += now.Sub(sample.QueuedAt)

numLeaves++
sample = nil
Expand Down Expand Up @@ -343,7 +353,7 @@ func (a *HammerAnalyser) errorLoop(ctx context.Context) {

}
case err := <-a.errChan:
if errors.Is(err, ErrRetry) {
if errors.Is(err, loadtest.ErrRetry) {
pbCount++
continue
}
Expand Down
10 changes: 6 additions & 4 deletions internal/hammer/hammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"testing"
"time"

"github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest"
)

func TestLeafGenerator(t *testing.T) {
Expand Down Expand Up @@ -52,10 +54,10 @@ func TestHammerAnalyser_Stats(t *testing.T) {

baseTime := time.Now().Add(-1 * time.Minute)
for i := 0; i < 10; i++ {
ha.seqLeafChan <- leafTime{
idx: uint64(i),
queuedAt: baseTime,
assignedAt: baseTime.Add(time.Duration(i) * time.Second),
ha.seqLeafChan <- loadtest.LeafTime{
Index: uint64(i),
QueuedAt: baseTime,
AssignedAt: baseTime.Add(time.Duration(i) * time.Second),
}
}
treeSize.setSize(10)
Expand Down
57 changes: 37 additions & 20 deletions internal/hammer/client.go → internal/hammer/loadtest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package loadtest

import (
"bytes"
Expand All @@ -39,16 +39,23 @@ type fetcher interface {
ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error)
}

// newLogClientsFromFlags returns a fetcher and a writer that will read
type ClientOpts struct {
BearerToken string
BearerTokenWrite string

Client *http.Client
}

// NewLogClients returns a fetcher and a writer that will read
// and write leaves to all logs in the `log_url` flag set.
func newLogClientsFromFlags() (*roundRobinFetcher, *roundRobinLeafWriter) {
if len(logURL) == 0 {
klog.Exitf("--log_url must be provided")
func NewLogClients(readLogURLs, writeLogURLs []string, opts ClientOpts) (LogReader, LeafWriter, error) {
if len(readLogURLs) == 0 {
return nil, nil, fmt.Errorf("URL(s) for reading log must be provided")
}

if len(writeLogURL) == 0 {
if len(writeLogURLs) == 0 {
// If no write_log_url is provided, then default it to log_url
writeLogURL = logURL
writeLogURLs = readLogURLs
}

rootUrlOrDie := func(s string) *url.URL {
Expand All @@ -64,30 +71,30 @@ func newLogClientsFromFlags() (*roundRobinFetcher, *roundRobinLeafWriter) {
}

fetchers := []fetcher{}
for _, s := range logURL {
fetchers = append(fetchers, newFetcher(rootUrlOrDie(s)))
for _, s := range readLogURLs {
fetchers = append(fetchers, newFetcher(rootUrlOrDie(s), opts.BearerToken))
}
writers := []httpLeafWriter{}
for _, s := range writeLogURL {
for _, s := range writeLogURLs {
addURL, err := rootUrlOrDie(s).Parse("add")
if err != nil {
klog.Exitf("Failed to create add URL: %v", err)
return nil, nil, fmt.Errorf("failed to create add URL: %v", err)
}
writers = append(writers, httpLeafWriter{u: addURL})
writers = append(writers, newHTTPLeafWriter(opts.Client, addURL, opts.BearerTokenWrite))
}
return &roundRobinFetcher{f: fetchers}, &roundRobinLeafWriter{ws: writers}
return &roundRobinFetcher{f: fetchers}, (&roundRobinLeafWriter{ws: writers}).Write, nil
}

// newFetcher creates a Fetcher for the log at the given root location.
func newFetcher(root *url.URL) fetcher {
func newFetcher(root *url.URL, bearerToken string) fetcher {
switch root.Scheme {
case "http", "https":
c, err := client.NewHTTPFetcher(root, nil)
if err != nil {
klog.Exitf("NewHTTPFetcher: %v", err)
}
if *bearerToken != "" {
c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", *bearerToken))
if bearerToken != "" {
c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", bearerToken))
}
return c
case "file":
Expand Down Expand Up @@ -130,19 +137,29 @@ func (rr *roundRobinFetcher) next() fetcher {
return f
}

func newHTTPLeafWriter(hc *http.Client, u *url.URL, bearerToken string) httpLeafWriter {
return httpLeafWriter{
hc: hc,
u: u,
bearerToken: bearerToken,
}
}

type httpLeafWriter struct {
u *url.URL
hc *http.Client
u *url.URL
bearerToken string
}

func (w httpLeafWriter) Write(ctx context.Context, newLeaf []byte) (uint64, error) {
req, err := http.NewRequest(http.MethodPost, w.u.String(), bytes.NewReader(newLeaf))
if err != nil {
return 0, fmt.Errorf("failed to create request: %v", err)
}
if *bearerTokenWrite != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", *bearerTokenWrite))
if w.bearerToken != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", w.bearerToken))
}
resp, err := hc.Do(req.WithContext(ctx))
resp, err := w.hc.Do(req.WithContext(ctx))
if err != nil {
return 0, fmt.Errorf("failed to write leaf: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package loadtest

import "context"

type worker interface {
type Worker interface {
Run(ctx context.Context)
Kill()
}

// NewWorkerPool creates a simple pool of workers.
//
// This works well enough for the simple task we ask of it at the moment.
// If we find ourselves adding more features to this, consider swapping it
// for a library such as https://github.com/alitto/pond.
func newWorkerPool(factory func() worker) workerPool {
workers := make([]worker, 0)
pool := workerPool{
func NewWorkerPool(factory func() Worker) WorkerPool {
workers := make([]Worker, 0)
pool := WorkerPool{
workers: workers,
factory: factory,
}
return pool
}

// workerPool contains a collection of _running_ workers.
type workerPool struct {
workers []worker
factory func() worker
// WorkerPool contains a collection of _running_ workers.
type WorkerPool struct {
workers []Worker
factory func() Worker
}

func (p *workerPool) Grow(ctx context.Context) {
func (p *WorkerPool) Grow(ctx context.Context) {
w := p.factory()
p.workers = append(p.workers, w)
go w.Run(ctx)
}

func (p *workerPool) Shrink(ctx context.Context) {
func (p *WorkerPool) Shrink(ctx context.Context) {
if len(p.workers) == 0 {
return
}
Expand All @@ -54,6 +56,6 @@ func (p *workerPool) Shrink(ctx context.Context) {
w.Kill()
}

func (p *workerPool) Size() int {
func (p *WorkerPool) Size() int {
return len(p.workers)
}
Loading

0 comments on commit 1b545d0

Please sign in to comment.