-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(zetaclient)!: orchestrator V2 (#3332)
* Minor fixes * Add orchestrator V2. Move context updater to v2 * Fix orchestrator_v2 test cases * Fix flaky test cases during concurrent runs (spoiler: goroutines) * Add V2 to start.go * chain sync skeleton * Move common btc stuff to common/ to fix import cycle * Implement BTC observerSigner * Drop redundant code * Fix ticker concurrency bug * Add scheduler.Tasks() * Add v2 btc observer-signer 101 test cases. Drop redundant tests * Address PR comments * Add issue * fix inbound debug cmd * Add tss graceful shutdown * Update changelog * fix tss tests * Fix IntervalUpdater * Mitigate errors when BTC node is disabled * Implement pkg/fanout * Apply fanout to block subscriber * Fix typo * Minor btc signer improvements * Make V1.Stop() safe to call multiple times * FIX DATA RACE
- Loading branch information
Showing
55 changed files
with
1,672 additions
and
866 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
// Package fanout provides a fan-out pattern implementation. | ||
// It allows one channel to stream data to multiple independent channels. | ||
// Note that context handling is out of the scope of this package. | ||
package fanout | ||
|
||
import "sync" | ||
|
||
const DefaultBuffer = 8 | ||
|
||
// FanOut is a fan-out pattern implementation. | ||
// It is NOT a worker pool, so use it wisely. | ||
type FanOut[T any] struct { | ||
input <-chan T | ||
outputs []chan T | ||
|
||
// outputBuffer chan buffer size for outputs channels. | ||
// This helps with writing to chan in case of slow consumers. | ||
outputBuffer int | ||
|
||
mu sync.RWMutex | ||
} | ||
|
||
// New constructs FanOut | ||
func New[T any](source <-chan T, buf int) *FanOut[T] { | ||
return &FanOut[T]{ | ||
input: source, | ||
outputs: make([]chan T, 0), | ||
outputBuffer: buf, | ||
} | ||
} | ||
|
||
func (f *FanOut[T]) Add() <-chan T { | ||
out := make(chan T, f.outputBuffer) | ||
|
||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
f.outputs = append(f.outputs, out) | ||
|
||
return out | ||
} | ||
|
||
// Start starts the fan-out process | ||
func (f *FanOut[T]) Start() { | ||
go func() { | ||
// loop for new data | ||
for data := range f.input { | ||
f.mu.RLock() | ||
for _, output := range f.outputs { | ||
// note that this might spawn lots of goroutines. | ||
// it is a naive approach, but should be more than enough for our use cases. | ||
go func(output chan<- T) { output <- data }(output) | ||
} | ||
f.mu.RUnlock() | ||
} | ||
|
||
// at this point, the input was closed | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
for _, out := range f.outputs { | ||
close(out) | ||
} | ||
|
||
f.outputs = nil | ||
}() | ||
} |
Oops, something went wrong.