Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for optional progress updates channel #661

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@
// source storage to fetch large blobs.
// If FindSuccessors is nil, content.Successors will be used.
FindSuccessors func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error)

// UpdateChannel is an optional channel to receive progress updates.
// Each update will include the number of bytes copied for a particular blob
// or manifest, the expected total size, and the descriptor of the blob or
// manifest. It is up to the consumer of the channel to differentiate
// between updates among different blobs and manifests; no mechanism is
// provided for distinguishing between them, other than the descriptor
// passed with each update. The total size of downloads of all blobs and
// manifests is not provided, as it is not known. You can calculate the
// percentage downloaded for a particular blob in an individual update
// based on the total size of that blob, which is provided in the
// descriptor, and the number of bytes copied, which is provided in the
// update.
Comment on lines +116 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presenting a progress bar of an operation isn't an easy job. Eventually, we will end up with an MVC pattern if we continue iterating.

Therefore, we need to define data models first for oras operations like Copy, PushBytes, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, oras CLI has a data model so as the buildkit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @qweeah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presenting a progress bar of an operation isn't an easy job

True, but I am not concerned with that here. oras CLI might, but the Copy() and such library calls should not care. They only should worry about some library-centric way of publishing updates. A CLI (like oras) or other consumer can do with them what they want.

we need to define data models

The data model is simple: stream of defined struct (or interface, if you prefer), each of which contains the amount of bytes transferred and descriptor to which it applies. Anything higher level would be outside the scope of this type of option, I think.

// Updates are sent each time a block is copied. The number of bytes copied
// depends upon whatever calls the io.ReadCloser of the source Target.
// This may be io.Copy, which, by default, is 32KB, or it may be some other
// implementation.
// The caller is responsible for closing the channel.
UpdateChannel chan<- CopyUpdate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channels are blocking. If someone sets opts.UpdateChannel = make(chan<- CopyUpdate) and never consumes the updates, the Copy() will be blocked forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is a good point. I was thinking about that, wasn't sure quite how to handle it. Here are some possibilities:

  • send to the channel only from within a goroutine. That could lead to a lot of goroutines if it blocks. See below.
  • instead of passing a channel, pass a function call. Each such call would be in a goroutine, but again, same issue. See below.

For either of the above, maybe we have one goroutine and one channel we own. Main routine publishes to our channel (we control it, so we can ensure no blocking issues), which then either publishes to the passed channel (option 1 above) or calls the passed function (option 2). At least we don't block our main routine, although if their function blocks, our goroutine still is blocked.

Is a func call better than a goroutine? In theory, they could use a channel in there. Or have a slow func call.

}

// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
Expand Down Expand Up @@ -266,11 +285,17 @@
}

// doCopyNode copies a single content from the source CAS to the destination CAS.
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor) error {
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor, ch chan<- CopyUpdate) error {
rc, err := src.Fetch(ctx, desc)
if err != nil {
return err
}
if ch != nil {
rc = &progressReader{
c: ch,
r: rc,
}
}

Check warning on line 298 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L294-L298

Added lines #L294 - L298 were not covered by tests
Comment on lines +293 to +298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping a ReadCloser may have perf penalties as some types may implement io.WriteTo which can trigger an optimization by the built-in io.Copy() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that it is performance impact by wrapping the Read() call? Or that by not having the wrapper also implement WriterTo, if the original ReadCloser does, then we lose the ability to WriteTo? I think the second of these.

What do you suggest? That we check for WriterTo, like io.Copy() does, and if it does, implement WriteTo().

How would we then capture the updates?

defer rc.Close()
err = dst.Push(ctx, desc, rc)
if err != nil && !errors.Is(err, errdef.ErrAlreadyExists) {
Expand All @@ -291,7 +316,7 @@
}
}

if err := doCopyNode(ctx, src, dst, desc); err != nil {
if err := doCopyNode(ctx, src, dst, desc, opts.UpdateChannel); err != nil {
return err
}

Expand Down
45 changes: 45 additions & 0 deletions progress_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oras

import (
"io"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

type CopyUpdate struct {
Copied int64
Descriptor ocispec.Descriptor
}

type progressReader struct {
desc ocispec.Descriptor
r io.ReadCloser
c chan<- CopyUpdate
}

func (p *progressReader) Close() error {
return p.r.Close()

Check warning on line 36 in progress_reader.go

View check run for this annotation

Codecov / codecov/patch

progress_reader.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}

func (p *progressReader) Read(buf []byte) (int, error) {
n, err := p.r.Read(buf)
if n > 0 {
p.c <- CopyUpdate{Copied: int64(n), Descriptor: p.desc}
}
return n, err

Check warning on line 44 in progress_reader.go

View check run for this annotation

Codecov / codecov/patch

progress_reader.go#L39-L44

Added lines #L39 - L44 were not covered by tests
}