diff --git a/cache/manager.go b/cache/manager.go index 1c35de8a04d4..d522151659eb 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -21,6 +21,7 @@ import ( "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/progress" @@ -49,6 +50,11 @@ type ManagerOpt struct { Differ diff.Comparer MetadataStore *metadata.Store MountPoolRoot string + + // dagger-specific, see manager_dagger.go + VolumeSnapshotter CtdVolumeSnapshotter + VolumeSourceContentHasher func(context.Context, ImmutableRef, session.Group) (digest.Digest, error) + SeenVolumes *sync.Map } type Accessor interface { @@ -62,6 +68,9 @@ type Accessor interface { IdentityMapping() *idtools.IdentityMapping Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) + + // Dagger-specific, see manager_dagger.go + GetOrInitVolume(context.Context, string, ImmutableRef, pb.CacheSharingOpt, session.Group) (MutableRef, error) } type Controller interface { @@ -97,6 +106,11 @@ type cacheManager struct { muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results unlazyG flightcontrol.Group[struct{}] + + // dagger-specific, see manager_dagger.go + volumeSnapshotter VolumeSnapshotter + volumeSourceContentHasher func(context.Context, ImmutableRef, session.Group) (digest.Digest, error) + seenVolumes *sync.Map } func NewManager(opt ManagerOpt) (Manager, error) { @@ -110,6 +124,10 @@ func NewManager(opt ManagerOpt) (Manager, error) { Differ: opt.Differ, MetadataStore: opt.MetadataStore, records: make(map[string]*cacheRecord), + + volumeSnapshotter: newVolumeSnapshotter(context.TODO(), opt.VolumeSnapshotter, opt.LeaseManager), + volumeSourceContentHasher: opt.VolumeSourceContentHasher, + seenVolumes: opt.SeenVolumes, } if err := cm.init(context.TODO()); err != nil { @@ -444,7 +462,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt return rec, nil } else if IsNotFound(err) { // The equal mutable for this ref is not found, check to see if our snapshot exists - if _, statErr := cm.Snapshotter.Stat(ctx, md.getSnapshotID()); statErr != nil { + if _, statErr := cm.snapshotterFor(md).Stat(ctx, md.getSnapshotID()); statErr != nil { // this ref's snapshot also doesn't exist, just remove this record cm.MetadataStore.Clear(id) return nil, errors.Wrap(errNotFound, id) @@ -484,7 +502,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if rec.mutable { // If the record is mutable, then the snapshot must exist - if _, err := cm.Snapshotter.Stat(ctx, rec.ID()); err != nil { + if _, err := cm.snapshotterFor(md).Stat(ctx, rec.ID()); err != nil { if !cerrdefs.IsNotFound(err) { return nil, errors.Wrap(err, "failed to check mutable ref snapshot") } diff --git a/cache/manager_dagger.go b/cache/manager_dagger.go new file mode 100644 index 000000000000..63ebc38b546c --- /dev/null +++ b/cache/manager_dagger.go @@ -0,0 +1,300 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/containerd/containerd/leases" + ctdsnapshots "github.com/containerd/containerd/snapshots" + "github.com/containerd/continuity/fs" + cerrdefs "github.com/containerd/errdefs" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/util/bklog" + "github.com/moby/buildkit/util/leaseutil" + "github.com/opencontainers/go-digest" +) + +type AcquireSnapshotter interface { + Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) +} + +type CtdVolumeSnapshotter interface { + ctdsnapshots.Snapshotter + Name() string + AcquireSnapshotter +} + +type VolumeSnapshotter interface { + snapshot.MergeSnapshotter + AcquireSnapshotter +} + +func newVolumeSnapshotter(ctx context.Context, ctdSnapshoter CtdVolumeSnapshotter, leaseManager leases.Manager) VolumeSnapshotter { + return volumeSnapshotterAdapter{ + MergeSnapshotter: snapshot.NewMergeSnapshotter(ctx, containerd.NewSnapshotter( + ctdSnapshoter.Name(), + ctdSnapshoter, + "buildkit", + nil, // no idmapping + ), leaseManager), + base: ctdSnapshoter, + } +} + +type volumeSnapshotterAdapter struct { + snapshot.MergeSnapshotter + base CtdVolumeSnapshotter +} + +var _ VolumeSnapshotter = (*volumeSnapshotterAdapter)(nil) + +func (sn volumeSnapshotterAdapter) Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) { + return sn.base.Acquire(ctx, key, sharingMode) +} + +func (cm *cacheManager) GetOrInitVolume(ctx context.Context, key string, source ImmutableRef, sharingMode pb.CacheSharingOpt, sess session.Group) (_ MutableRef, rerr error) { + // figure out the unique definition-based ID of the volume. + idParts := []string{key} + + sourceChecksum, err := cm.volumeSourceContentHasher(ctx, source, sess) + if err != nil { + return nil, fmt.Errorf("failed to calculate sourceChecksum: %w", err) + } + idParts = append(idParts, sourceChecksum.String()) + + id := digest.FromString(strings.Join(idParts, "\x00")).Encoded() + + var parent *immutableRef + if source != nil { + if _, ok := source.(*immutableRef); ok { + parent = source.Clone().(*immutableRef) + } else { + p, err := cm.Get(ctx, source.ID(), nil, NoUpdateLastUsed) + if err != nil { + return nil, err + } + parent = p.(*immutableRef) + } + if err := parent.Finalize(ctx); err != nil { + return nil, err + } + if err := parent.Extract(ctx, sess); err != nil { + return nil, err + } + } + defer func() { + if parent != nil { + parent.Release(context.WithoutCancel(ctx)) + } + }() + + rec, err := func() (_ *cacheRecord, rerr error) { + cm.mu.Lock() + defer cm.mu.Unlock() + + rec, err := cm.getRecord(ctx, id) + switch { + case err == nil: + return rec, nil + + case errors.Is(err, errNotFound): + md, _ := cm.getMetadata(id) + + rec = &cacheRecord{ + mu: &sync.Mutex{}, + mutable: true, + cm: cm, + refs: make(map[ref]struct{}), + cacheMetadata: md, + } + + opts := []RefOption{ + WithRecordType(client.UsageRecordTypeCacheMount), + WithDescription(fmt.Sprintf("cache mount %s (%s)", key, id)), // TODO: rest of metadata? + CachePolicyRetain, + withSnapshotID(id), + } + if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil { + return nil, err + } + // this is needed because for some reason snapshotID is an imageRefOption + if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil { + return nil, fmt.Errorf("failed to append image ref metadata to ref %s: %w", id, err) + } + + cm.records[id] = rec + return rec, nil + + default: + return nil, fmt.Errorf("failed to get volume cache record: %w", err) + } + }() + if err != nil { + return nil, err + } + + // TODO: race condition here if someone grabs the record somehow before the lock below + + rec.mu.Lock() + + _, err = cm.volumeSnapshotter.Stat(ctx, id) + exists := err == nil + + if !exists { + l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = id + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil && !cerrdefs.IsAlreadyExists(err) { + rec.mu.Unlock() + return nil, fmt.Errorf("failed to create lease: %w", err) + } + if cerrdefs.IsAlreadyExists(err) { + l = leases.Lease{ID: id} + } + defer func() { + if rerr != nil { + ctx := context.WithoutCancel(ctx) + if err := cm.LeaseManager.Delete(ctx, leases.Lease{ + ID: id, + }); err != nil { + bklog.G(ctx).Errorf("failed to remove lease: %+v", err) + } + } + }() + if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{ + ID: id, + Type: "snapshots/" + cm.volumeSnapshotter.Name(), + }); err != nil && !cerrdefs.IsAlreadyExists(err) { + rec.mu.Unlock() + return nil, fmt.Errorf("failed to add snapshot %s resource to lease: %w", id, err) + } + + var sourceSnapshotID string + if parent != nil { + sourceSnapshotID = sourceChecksum.Encoded() + _, err := cm.volumeSnapshotter.Stat(ctx, sourceSnapshotID) + sourceExists := err == nil + + if !sourceExists { + if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{ + ID: sourceSnapshotID, + Type: "snapshots/" + cm.volumeSnapshotter.Name(), + }); err != nil && !cerrdefs.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to add source snapshot resource to lease: %w", err) + } + + tmpActiveSnapshotID := identity.NewID() + if _, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = tmpActiveSnapshotID + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }, leaseutil.MakeTemporary); err != nil && !cerrdefs.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to create lease for tmp active source snapshot: %w", err) + } + defer func() { + ctx := context.WithoutCancel(ctx) + if err := cm.LeaseManager.Delete(ctx, leases.Lease{ + ID: tmpActiveSnapshotID, + }); err != nil { + bklog.G(ctx).Errorf("failed to remove lease: %+v", err) + } + }() + if err := cm.LeaseManager.AddResource(ctx, leases.Lease{ + ID: tmpActiveSnapshotID, + }, leases.Resource{ + ID: tmpActiveSnapshotID, + Type: "snapshots/" + cm.volumeSnapshotter.Name(), + }); err != nil && !cerrdefs.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to add source snapshot resource to lease: %w", err) + } + + if err := cm.volumeSnapshotter.Prepare(ctx, tmpActiveSnapshotID, ""); err != nil && !cerrdefs.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to prepare source snapshot: %w", err) + } + newMntable, err := cm.volumeSnapshotter.Mounts(ctx, tmpActiveSnapshotID) + if err != nil { + return nil, fmt.Errorf("failed to get source mounts: %w", err) + } + newMnter := snapshot.LocalMounter(newMntable) + newMntpoint, err := newMnter.Mount() + if err != nil { + return nil, fmt.Errorf("failed to mount new source snapshot: %w", err) + } + + oldMntable, err := source.Mount(ctx, true, sess) + if err != nil { + newMnter.Unmount() + return nil, fmt.Errorf("failed to get old source mounts: %w", err) + } + oldMnter := snapshot.LocalMounter(oldMntable) + oldMntpoint, err := oldMnter.Mount() + if err != nil { + newMnter.Unmount() + return nil, fmt.Errorf("failed to mount old source snapshot: %w", err) + } + + if err := fs.CopyDir(newMntpoint, oldMntpoint, fs.WithAllowXAttrErrors()); err != nil { + newMnter.Unmount() + oldMnter.Unmount() + return nil, fmt.Errorf("failed to copy source snapshot: %w", err) + } + + newMnter.Unmount() + oldMnter.Unmount() + + if err := cm.volumeSnapshotter.Commit(ctx, sourceSnapshotID, tmpActiveSnapshotID); err != nil { + return nil, fmt.Errorf("failed to commit source snapshot: %w", err) + } + } + } + + if err := cm.volumeSnapshotter.Prepare(ctx, id, sourceSnapshotID); err != nil && !cerrdefs.IsAlreadyExists(err) { + rec.mu.Unlock() + return nil, fmt.Errorf("failed to prepare volume: %w", err) + } + } + rec.mu.Unlock() + + releaseFunc, err := cm.volumeSnapshotter.Acquire(ctx, id, sharingMode) + if err != nil { + return nil, fmt.Errorf("failed to acquire volume: %w", err) + } + defer func() { + if rerr != nil { + rerr = errors.Join(rerr, releaseFunc()) + } + }() + + rec.mu.Lock() + defer rec.mu.Unlock() + + // TODO: note about how we are creating multiple mutable refs on a cacheRecord but it is safe to do so it turns out + ref := rec.mref(true, DescHandlers{}) + ref.releaseFunc = releaseFunc + + cm.seenVolumes.Store(id, struct{}{}) + + return ref, nil +} + +func (cm *cacheManager) snapshotterFor(md *cacheMetadata) snapshot.MergeSnapshotter { + if md.GetRecordType() == client.UsageRecordTypeCacheMount { + return cm.volumeSnapshotter + } + return cm.Snapshotter +} diff --git a/cache/refs.go b/cache/refs.go index 08a38818a795..82235cc88e14 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -300,7 +300,7 @@ func (cr *cacheRecord) isLazy(ctx context.Context) (bool, error) { } // If the snapshot is a remote snapshot, this layer is lazy. - if info, err := cr.cm.Snapshotter.Stat(ctx, cr.getSnapshotID()); err == nil { + if info, err := cr.cm.snapshotterFor(cr.cacheMetadata).Stat(ctx, cr.getSnapshotID()); err == nil { if _, ok := info.Labels["containerd.io/snapshot/remote"]; ok { return true, nil } @@ -335,14 +335,16 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) { return s, nil } driverID := cr.getSnapshotID() + snapshotter := cr.cm.snapshotterFor(cr.cacheMetadata) if cr.equalMutable != nil { driverID = cr.equalMutable.getSnapshotID() + snapshotter = cr.cm.snapshotterFor(cr.equalMutable.cacheMetadata) } cr.mu.Unlock() var usage snapshots.Usage if !cr.getBlobOnly() { var err error - usage, err = cr.cm.Snapshotter.Usage(ctx, driverID) + usage, err = snapshotter.Usage(ctx, driverID) if err != nil { cr.mu.Lock() isDead := cr.isDead() @@ -412,14 +414,14 @@ func (cr *cacheRecord) mount(ctx context.Context) (_ snapshot.Mountable, rerr er }() if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.viewLeaseID()}, leases.Resource{ ID: mountSnapshotID, - Type: "snapshots/" + cr.cm.Snapshotter.Name(), + Type: "snapshots/" + cr.cm.snapshotterFor(cr.cacheMetadata).Name(), }); err != nil && !cerrdefs.IsAlreadyExists(err) { return nil, err } // Return the mount direct from View rather than setting it using the Mounts call below. // The two are equivalent for containerd snapshotters but the moby snapshotter requires // the use of the mountable returned by View in this case. - mnts, err := cr.cm.Snapshotter.View(ctx, mountSnapshotID, cr.getSnapshotID()) + mnts, err := cr.cm.snapshotterFor(cr.cacheMetadata).View(ctx, mountSnapshotID, cr.getSnapshotID()) if err != nil && !cerrdefs.IsAlreadyExists(err) { return nil, err } @@ -429,8 +431,7 @@ func (cr *cacheRecord) mount(ctx context.Context) (_ snapshot.Mountable, rerr er if cr.mountCache != nil { return cr.mountCache, nil } - - mnts, err := cr.cm.Snapshotter.Mounts(ctx, mountSnapshotID) + mnts, err := cr.cm.snapshotterFor(cr.cacheMetadata).Mounts(ctx, mountSnapshotID) if err != nil { return nil, err } @@ -620,6 +621,9 @@ type mutableRef struct { *cacheRecord triggerLastUsed bool descHandlers DescHandlers + + // dagger-specific + releaseFunc func() error } // hold ref lock before calling @@ -972,7 +976,7 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou } var mnt snapshot.Mountable - if sr.cm.Snapshotter.Name() == "stargz" { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" { if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { mnt, rerr = sr.mount(ctx) }); err != nil { @@ -1004,7 +1008,7 @@ func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr erro return nil } - if sr.cm.Snapshotter.Name() == "stargz" { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" { if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { if rerr = sr.prepareRemoteSnapshotsStargzMode(ctx, s); rerr != nil { return @@ -1023,7 +1027,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, dhs := sr.descHandlers for _, r := range sr.layerChain() { r := r - info, err := r.cm.Snapshotter.Stat(ctx, r.getSnapshotID()) + info, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, r.getSnapshotID()) if err != nil && !cerrdefs.IsNotFound(err) { return err } else if cerrdefs.IsNotFound(err) { @@ -1040,7 +1044,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, // For avoiding collosion among calls, keys of these tmp labels contain an unique ID. flds, labels := makeTmpLabelsStargzMode(snapshots.FilterInheritedLabels(dh.SnapshotLabels), s) info.Labels = labels - if _, err := r.cm.Snapshotter.Update(ctx, info, flds...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, flds...); err != nil { return errors.Wrapf(err, "failed to add tmp remote labels for remote snapshot") } defer func() { @@ -1048,7 +1052,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, for k := range info.Labels { info.Labels[k] = "" // Remove labels appended in this call } - if _, err := r.cm.Snapshotter.Update(ctx, info, flds...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, flds...); err != nil { bklog.G(ctx).Warn(errors.Wrapf(err, "failed to remove tmp remote labels")) } }() @@ -1067,7 +1071,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s for _, r := range sr.layerChain() { r := r snapshotID := r.getSnapshotID() - if _, err := r.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, snapshotID); err == nil { continue } @@ -1099,11 +1103,11 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s if r.layerParent != nil { parentID = r.layerParent.getSnapshotID() } - if err := r.cm.Snapshotter.Prepare(ctx, key, parentID, opts...); err != nil { + if err := r.cm.snapshotterFor(sr.cacheMetadata).Prepare(ctx, key, parentID, opts...); err != nil { if cerrdefs.IsAlreadyExists(err) { // Check if the targeting snapshot ID has been prepared as // a remote snapshot in the snapshotter. - info, err := r.cm.Snapshotter.Stat(ctx, snapshotID) + info, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, snapshotID) if err == nil { // usable as remote snapshot without unlazying. defer func() { ctx := context.WithoutCancel(ctx) @@ -1120,7 +1124,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s WithField("name", info.Name). Debug("snapshots exist but labels are nil") } - if _, err := r.cm.Snapshotter.Update(ctx, info, tmpFields...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, tmpFields...); err != nil { bklog.G(ctx).Warn(errors.Wrapf(err, "failed to remove tmp remote labels after prepare")) } @@ -1160,7 +1164,7 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error { _, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) { - if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil { + if _, err := sr.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, sr.getSnapshotID()); err == nil { if !ensureContentStore { return nil, nil } @@ -1239,7 +1243,7 @@ func (sr *immutableRef) unlazyDiffMerge(ctx context.Context, dhs DescHandlers, p defer statusDone() } - return sr.cm.Snapshotter.Merge(ctx, sr.getSnapshotID(), diffs) + return sr.cm.snapshotterFor(sr.cacheMetadata).Merge(ctx, sr.getSnapshotID(), diffs) } // should be called within sizeG.Do call for this ref's ID @@ -1310,12 +1314,12 @@ func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, pg pr key := fmt.Sprintf("extract-%s %s", identity.NewID(), sr.getChainID()) - err = sr.cm.Snapshotter.Prepare(ctx, key, parentID) + err = sr.cm.snapshotterFor(sr.cacheMetadata).Prepare(ctx, key, parentID) if err != nil { return err } - mountable, err := sr.cm.Snapshotter.Mounts(ctx, key) + mountable, err := sr.cm.snapshotterFor(sr.cacheMetadata).Mounts(ctx, key) if err != nil { return err } @@ -1332,7 +1336,7 @@ func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, pg pr if err := unmount(); err != nil { return err } - if err := sr.cm.Snapshotter.Commit(ctx, sr.getSnapshotID(), key); err != nil { + if err := sr.cm.snapshotterFor(sr.cacheMetadata).Commit(ctx, sr.getSnapshotID(), key); err != nil { if !errors.Is(err, cerrdefs.ErrAlreadyExists) { return err } @@ -1430,13 +1434,13 @@ func (cr *cacheRecord) finalize(ctx context.Context) error { if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{ ID: cr.getSnapshotID(), - Type: "snapshots/" + cr.cm.Snapshotter.Name(), + Type: "snapshots/" + cr.cm.snapshotterFor(cr.cacheMetadata).Name(), }); err != nil { cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) return errors.Wrapf(err, "failed to add snapshot %s to lease", cr.getSnapshotID()) } - if err := cr.cm.Snapshotter.Commit(ctx, cr.getSnapshotID(), mutable.getSnapshotID()); err != nil { + if err := cr.cm.snapshotterFor(cr.cacheMetadata).Commit(ctx, cr.getSnapshotID(), mutable.getSnapshotID()); err != nil { cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) return errors.Wrapf(err, "failed to commit %s to %s during finalize", mutable.getSnapshotID(), cr.getSnapshotID()) } @@ -1517,7 +1521,7 @@ func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group) } var mnt snapshot.Mountable - if sr.cm.Snapshotter.Name() == "stargz" && sr.layerParent != nil { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" && sr.layerParent != nil { if err := sr.layerParent.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { mnt, rerr = sr.mount(ctx) }); err != nil { @@ -1589,6 +1593,11 @@ func (sr *mutableRef) release(ctx context.Context) (rerr error) { sr.updateLastUsed() sr.triggerLastUsed = false } + + if sr.releaseFunc != nil { + return sr.releaseFunc() + } + return nil } diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index 90acbd44bc12..e6a65da89b8c 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -381,7 +381,7 @@ func (mm *MountManager) MountableCache(ctx context.Context, m *pb.Mount, ref cac if m.CacheOpt == nil { return nil, errors.Errorf("missing cache mount options") } - return mm.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing, g) + return mm.cm.GetOrInitVolume(ctx, m.CacheOpt.ID, ref, m.CacheOpt.Sharing, g) } func (mm *MountManager) MountableTmpFS(m *pb.Mount) cache.Mountable { diff --git a/worker/base/worker.go b/worker/base/worker.go index 8121c74c6530..1b5811dbc552 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" "github.com/containerd/containerd/content" @@ -16,6 +17,7 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/hashicorp/go-multierror" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/contenthash" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb/sourceresolver" @@ -81,6 +83,10 @@ type WorkerOpt struct { MetadataStore *metadata.Store MountPoolRoot string ResourceMonitor *resources.Monitor + + // dagger-specific + VolumeSnapshotter cache.CtdVolumeSnapshotter + SeenVolumes *sync.Map } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -111,6 +117,13 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { Differ: opt.Differ, MetadataStore: opt.MetadataStore, MountPoolRoot: opt.MountPoolRoot, + + // dagger-specific + VolumeSnapshotter: opt.VolumeSnapshotter, + VolumeSourceContentHasher: func(ctx context.Context, source cache.ImmutableRef, sess session.Group) (digest.Digest, error) { + return contenthash.Checksum(ctx, source, "/", contenthash.ChecksumOpts{}, sess) + }, + SeenVolumes: opt.SeenVolumes, }) if err != nil { return nil, err