Skip to content

Commit

Permalink
Lock initialise to prevent concurrent initialisation (#480)
Browse files Browse the repository at this point in the history
* Lock initialise to prevent concurrent initialisation

If two tasks attempted to initialise an empty checkpoint, and one task
stalled while the other began accepting traffic, then the tree state and
checkpoint would be invalidated by the stalled task. This guarantees
initialisation only once per log.
  • Loading branch information
haydentherapper authored Feb 14, 2025
1 parent 5f7ca09 commit 6d36a7e
Showing 1 changed file with 44 additions and 13 deletions.
57 changes: 44 additions & 13 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (l *logResourceStorage) ReadTile(_ context.Context, level, index uint64, p
func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error {
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockForTreeUpdate()` ensures that distinct tasks are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
a.s.mu.Lock()
unlock, err := a.s.lockFile("treeState.lock")
if err != nil {
Expand Down Expand Up @@ -307,7 +307,7 @@ func doIntegrate(ctx context.Context, fromSeq uint64, leafHashes [][]byte, ls *l
newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, leafHashes)
if err != nil {
klog.Errorf("Integrate: %v", err)
return 0, nil, fmt.Errorf("Integrate: %v", err)
return 0, nil, fmt.Errorf("error in Integrate: %v", err)
}
for k, v := range tiles {
if err := ls.storeTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil {
Expand Down Expand Up @@ -416,6 +416,25 @@ func (lrs *logResourceStorage) writeBundle(_ context.Context, index uint64, part
// initialise ensures that the storage location is valid by loading the checkpoint from this location, or
// creating a zero-sized one if it doesn't already exist.
func (a *appender) initialise() error {
// Idempotent: If folder exists, nothing happens.
if err := os.MkdirAll(filepath.Join(a.s.path, stateDir), dirPerm); err != nil {
return fmt.Errorf("failed to create log directory: %q", err)
}
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
a.s.mu.Lock()
unlock, err := a.s.lockFile("treeState.lock")
if err != nil {
panic(err)
}
defer func() {
if err := unlock(); err != nil {
panic(err)
}
a.s.mu.Unlock()
}()

if err := a.s.ensureVersion(compatibilityVersion); err != nil {
return err
}
Expand All @@ -426,9 +445,6 @@ func (a *appender) initialise() error {
}
// Create the directory structure and write out an empty checkpoint
klog.Infof("Initializing directory for POSIX log at %q (this should only happen ONCE per log!)", a.s.path)
if err := os.MkdirAll(filepath.Join(a.s.path, stateDir), dirPerm); err != nil {
return fmt.Errorf("failed to create log directory: %q", err)
}
if err := a.s.writeTreeState(0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
return fmt.Errorf("failed to write tree-state checkpoint: %v", err)
}
Expand Down Expand Up @@ -483,7 +499,7 @@ func (s *Storage) ensureVersion(version uint16) error {
func (s *Storage) writeTreeState(size uint64, root []byte) error {
raw, err := json.Marshal(treeState{Size: size, Root: root})
if err != nil {
return fmt.Errorf("Marshal: %v", err)
return fmt.Errorf("error in Marshal: %v", err)
}

if err := s.overwrite(filepath.Join(stateDir, "treeState"), raw); err != nil {
Expand All @@ -497,11 +513,11 @@ func (s *Storage) readTreeState() (uint64, []byte, error) {
p := filepath.Join(s.path, stateDir, "treeState")
raw, err := os.ReadFile(p)
if err != nil {
return 0, nil, fmt.Errorf("ReadFile(%q): %w", p, err)
return 0, nil, fmt.Errorf("error in ReadFile(%q): %w", p, err)
}
ts := &treeState{}
if err := json.Unmarshal(raw, ts); err != nil {
return 0, nil, fmt.Errorf("Unmarshal: %v", err)
return 0, nil, fmt.Errorf("error in Unmarshal: %v", err)
}
return ts.Size, ts.Root, nil
}
Expand Down Expand Up @@ -705,6 +721,25 @@ func (m *MigrationStorage) AwaitIntegration(ctx context.Context, sourceSize uint
}

func (m *MigrationStorage) initialise() error {
// Idempotent: If folder exists, nothing happens.
if err := os.MkdirAll(filepath.Join(m.s.path, stateDir), dirPerm); err != nil {
return fmt.Errorf("failed to create log directory: %q", err)
}
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
m.s.mu.Lock()
unlock, err := m.s.lockFile("treeState.lock")
if err != nil {
panic(err)
}
defer func() {
if err := unlock(); err != nil {
panic(err)
}
m.s.mu.Unlock()
}()

if err := m.s.ensureVersion(compatibilityVersion); err != nil {
return err
}
Expand All @@ -728,7 +763,7 @@ func (m *MigrationStorage) State(_ context.Context) (uint64, []byte, error) {
func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) error {
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `lockForTreeUpdate()` ensures that distinct tasks are serialised.
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
m.s.mu.Lock()
unlock, err := m.s.lockFile("treeState.lock")
if err != nil {
Expand Down Expand Up @@ -760,10 +795,6 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
if err != nil {
return fmt.Errorf("doIntegrate(%d, ...): %v", size, err)
}
if err != nil {
klog.Errorf("Integrate failed: %v", err)
return err
}
if err := m.s.writeTreeState(newSize, newRoot); err != nil {
return fmt.Errorf("failed to write new tree state: %v", err)
}
Expand Down

0 comments on commit 6d36a7e

Please sign in to comment.