Skip to content

Commit

Permalink
Merge pull request moby#49117 from dmcgowan/archive-remove-pools
Browse files Browse the repository at this point in the history
Remove use of `pkg/pools` in archive
  • Loading branch information
thaJeztah authored Dec 28, 2024
2 parents a72026a + be4eac7 commit f5af46d
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 65 deletions.
7 changes: 3 additions & 4 deletions integration-cli/docker_cli_import_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bufio"
"compress/gzip"
"context"
"os"
Expand Down Expand Up @@ -66,7 +65,7 @@ func (s *DockerCLIImportSuite) TestImportFile(c *testing.T) {

icmd.RunCmd(icmd.Cmd{
Command: []string{dockerBinary, "export", "test-import"},
Stdout: bufio.NewWriter(temporaryFile),
Stdout: temporaryFile,
}).Assert(c, icmd.Success)

out := cli.DockerCmd(c, "import", temporaryFile.Name()).Combined()
Expand Down Expand Up @@ -110,7 +109,7 @@ func (s *DockerCLIImportSuite) TestImportFileWithMessage(c *testing.T) {

icmd.RunCmd(icmd.Cmd{
Command: []string{dockerBinary, "export", "test-import"},
Stdout: bufio.NewWriter(temporaryFile),
Stdout: temporaryFile,
}).Assert(c, icmd.Success)

message := "Testing commit message"
Expand Down Expand Up @@ -144,7 +143,7 @@ func (s *DockerCLIImportSuite) TestImportWithQuotedChanges(c *testing.T) {
assert.Assert(c, err == nil, "failed to create temporary file")
defer os.Remove(temporaryFile.Name())

cli.Docker(cli.Args("export", "test-import"), cli.WithStdout(bufio.NewWriter(temporaryFile))).Assert(c, icmd.Success)
cli.Docker(cli.Args("export", "test-import"), cli.WithStdout(temporaryFile)).Assert(c, icmd.Success)

result := cli.DockerCmd(c, "import", "-c", `ENTRYPOINT ["/bin/sh", "-c"]`, temporaryFile.Name())
imgRef := strings.TrimSpace(result.Stdout())
Expand Down
134 changes: 83 additions & 51 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/containerd/log"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/pools"
"github.com/klauspost/compress/zstd"
"github.com/moby/patternmatcher"
"github.com/moby/sys/sequential"
Expand Down Expand Up @@ -230,13 +230,51 @@ func (r *readCloserWrapper) Close() error {

return nil
}
return r.closer()
if r.closer != nil {
return r.closer()
}
return nil
}

var (
bufioReader32KPool = &sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
}
)

type bufferedReader struct {
buf *bufio.Reader
}

func newBufferedReader(r io.Reader) *bufferedReader {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(r)
return &bufferedReader{buf}
}

func (r *bufferedReader) Read(p []byte) (n int, err error) {
if r.buf == nil {
return 0, io.EOF
}
n, err = r.buf.Read(p)
if err == io.EOF {
r.buf.Reset(nil)
bufioReader32KPool.Put(r.buf)
r.buf = nil
}
return
}

func (r *bufferedReader) Peek(n int) ([]byte, error) {
if r.buf == nil {
return nil, io.EOF
}
return r.buf.Peek(n)
}

// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
p := pools.BufioReader32KPool
buf := p.Get(archive)
buf := newBufferedReader(archive)
bs, err := buf.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
Expand All @@ -248,26 +286,12 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
return nil, err
}

wrapReader := func(r io.Reader, cancel context.CancelFunc) io.ReadCloser {
return &readCloserWrapper{
Reader: r,
closer: func() error {
if cancel != nil {
cancel()
}
if readCloser, ok := r.(io.ReadCloser); ok {
readCloser.Close()
}
p.Put(buf)
return nil
},
}
}

compression := DetectCompression(bs)
switch compression {
case Uncompressed:
return wrapReader(buf, nil), nil
return &readCloserWrapper{
Reader: buf,
}, nil
case Gzip:
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -276,10 +300,18 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
cancel()
return nil, err
}
return wrapReader(gzReader, cancel), nil
return &readCloserWrapper{
Reader: gzReader,
closer: func() error {
cancel()
return gzReader.Close()
},
}, nil
case Bzip2:
bz2Reader := bzip2.NewReader(buf)
return wrapReader(bz2Reader, nil), nil
return &readCloserWrapper{
Reader: bz2Reader,
}, nil
case Xz:
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -288,30 +320,44 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
cancel()
return nil, err
}
return wrapReader(xzReader, cancel), nil

return &readCloserWrapper{
Reader: xzReader,
closer: func() error {
cancel()
return xzReader.Close()
},
}, nil
case Zstd:
zstdReader, err := zstd.NewReader(buf)
if err != nil {
return nil, err
}
return wrapReader(zstdReader, nil), nil
return &readCloserWrapper{
Reader: zstdReader,
closer: func() error {
zstdReader.Close()
return nil
},
}, nil
default:
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
}
}

type nopWriteCloser struct {
io.Writer
}

func (nopWriteCloser) Close() error { return nil }

// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
p := pools.BufioWriter32KPool
buf := p.Get(dest)
switch compression {
case Uncompressed:
writeBufWrapper := p.NewWriteCloserWrapper(buf, buf)
return writeBufWrapper, nil
return nopWriteCloser{dest}, nil
case Gzip:
gzWriter := gzip.NewWriter(dest)
writeBufWrapper := p.NewWriteCloserWrapper(buf, gzWriter)
return writeBufWrapper, nil
return gzip.NewWriter(dest), nil
case Bzip2, Xz:
// archive/bzip2 does not support writing, and there is no xz support at all
// However, this is not a problem as docker only currently generates gzipped tars
Expand Down Expand Up @@ -382,7 +428,7 @@ func ReplaceFileTarWrapper(inputTarStream io.ReadCloser, mods map[string]TarModi
pipeWriter.CloseWithError(err)
return
}
if _, err := pools.Copy(tarWriter, tarReader); err != nil {
if _, err := copyWithBuffer(tarWriter, tarReader); err != nil {
pipeWriter.CloseWithError(err)
return
}
Expand Down Expand Up @@ -529,7 +575,6 @@ type tarWhiteoutConverter interface {

type tarAppender struct {
TarWriter *tar.Writer
Buffer *bufio.Writer

// for hardlink mapping
SeenFiles map[uint64]string
Expand All @@ -547,7 +592,6 @@ func newTarAppender(idMapping idtools.IdentityMapping, writer io.Writer, chownOp
return &tarAppender{
SeenFiles: make(map[uint64]string),
TarWriter: tar.NewWriter(writer),
Buffer: pools.BufioWriter32KPool.Get(nil),
IdentityMapping: idMapping,
ChownOpts: chownOpts,
}
Expand Down Expand Up @@ -665,17 +709,11 @@ func (ta *tarAppender) addTarFile(path, name string) error {
return err
}

ta.Buffer.Reset(ta.TarWriter)
defer ta.Buffer.Reset(nil)
_, err = pools.Copy(ta.Buffer, file)
_, err = copyWithBuffer(ta.TarWriter, file)
file.Close()
if err != nil {
return err
}
err = ta.Buffer.Flush()
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -718,7 +756,7 @@ func createTarFile(path, extractDir string, hdr *tar.Header, reader io.Reader, o
if err != nil {
return err
}
if _, err := pools.Copy(file, reader); err != nil {
if _, err := copyWithBuffer(file, reader); err != nil {
file.Close()
return err
}
Expand Down Expand Up @@ -929,9 +967,6 @@ func (t *Tarballer) Do() {
}
}()

// this buffer is needed for the duration of this piped stream
defer pools.BufioWriter32KPool.Put(ta.Buffer)

// In general we log errors here but ignore them because
// during e.g. a diff operation the container can continue
// mutating the filesystem and we can see transient errors
Expand Down Expand Up @@ -1087,8 +1122,6 @@ func (t *Tarballer) Do() {
// Unpack unpacks the decompressedArchive to dest with options.
func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error {
tr := tar.NewReader(decompressedArchive)
trBuf := pools.BufioReader32KPool.Get(nil)
defer pools.BufioReader32KPool.Put(trBuf)

var dirs []*tar.Header
whiteoutConverter := getWhiteoutConverter(options.WhiteoutFormat)
Expand Down Expand Up @@ -1165,7 +1198,6 @@ loop:
}
}
}
trBuf.Reset(tr)

if err := remapIDs(options.IDMap, hdr); err != nil {
return err
Expand All @@ -1181,7 +1213,7 @@ loop:
}
}

if err := createTarFile(path, dest, hdr, trBuf, options); err != nil {
if err := createTarFile(path, dest, hdr, tr, options); err != nil {
return err
}

Expand Down Expand Up @@ -1384,7 +1416,7 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) {
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := pools.Copy(tw, srcF); err != nil {
if _, err := copyWithBuffer(tw, srcF); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/archive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func tarUntar(t *testing.T, origin string, options *TarOptions) ([]Change, error
defer archive.Close()

buf := make([]byte, 10)
if _, err := archive.Read(buf); err != nil {
if _, err := io.ReadFull(archive, buf); err != nil {
return nil, err
}
wrap := io.MultiReader(bytes.NewReader(buf), archive)
Expand Down
4 changes: 0 additions & 4 deletions pkg/archive/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/containerd/log"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/pools"
)

// ChangeType represents the change type.
Expand Down Expand Up @@ -389,9 +388,6 @@ func ExportChanges(dir string, changes []Change, idMap idtools.IdentityMapping)
go func() {
ta := newTarAppender(idMap, writer, nil)

// this buffer is needed for the duration of this piped stream
defer pools.BufioWriter32KPool.Put(ta.Buffer)

sort.Sort(changesByPath(changes))

// In general we log errors here but ignore them because
Expand Down
12 changes: 12 additions & 0 deletions pkg/archive/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/containerd/log"
)
Expand All @@ -20,6 +21,17 @@ var (
ErrInvalidCopySource = errors.New("invalid copy source content")
)

var copyPool = sync.Pool{
New: func() interface{} { s := make([]byte, 32*1024); return &s },
}

func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
buf := copyPool.Get().(*[]byte)
written, err = io.CopyBuffer(dst, src, *buf)
copyPool.Put(buf)
return
}

// PreserveTrailingDotOrSeparator returns the given cleaned path (after
// processing using any utility functions from the path or filepath stdlib
// packages) and appends a trailing `/.` or `/` if its corresponding original
Expand Down
6 changes: 1 addition & 5 deletions pkg/archive/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import (
"strings"

"github.com/containerd/log"
"github.com/docker/docker/pkg/pools"
)

// UnpackLayer unpack `layer` to a `dest`. The stream `layer` can be
// compressed or uncompressed.
// Returns the size in bytes of the contents of the layer.
func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, err error) {
tr := tar.NewReader(layer)
trBuf := pools.BufioReader32KPool.Get(tr)
defer pools.BufioReader32KPool.Put(trBuf)

var dirs []*tar.Header
unpackedPaths := make(map[string]struct{})
Expand Down Expand Up @@ -159,8 +156,7 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64,
}
}

trBuf.Reset(tr)
srcData := io.Reader(trBuf)
srcData := io.Reader(tr)
srcHdr := hdr

// Hard links into /.wh..wh.plnk don't work, as we don't extract that directory, so
Expand Down
2 changes: 2 additions & 0 deletions pkg/chrootarchive/archive_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func TestUntarWithMaliciousSymlinks(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, string(hostData), "I am a host file")

io.Copy(io.Discard, tee)

// Now test by chrooting to an attacker controlled path
// This should succeed as is and overwrite a "host" file
// Note that this would be a mis-use of this function.
Expand Down

0 comments on commit f5af46d

Please sign in to comment.