Skip to content

Commit

Permalink
fix: multipart replication with single part objects (minio#20895)
Browse files Browse the repository at this point in the history
x-amz-checksum-algorithm is not set, causing all multipart single-part objects
to fail to replicate going via sftp/FTP uploads.
  • Loading branch information
klauspost authored Feb 5, 2025
1 parent 7fa3e39 commit b8dde47
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/api-response.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) Initi

// generates CompleteMultipartUploadResponse for given bucket, key, location and ETag.
func generateCompleteMultipartUploadResponse(bucket, key, location string, oi ObjectInfo, h http.Header) CompleteMultipartUploadResponse {
cs := oi.decryptChecksums(0, h)
cs, _ := oi.decryptChecksums(0, h)
c := CompleteMultipartUploadResponse{
Location: location,
Bucket: bucket,
Expand Down
16 changes: 8 additions & 8 deletions cmd/batch-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,12 @@ func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLa
},
}

opts, err := batchReplicationOpts(ctx, "", gr.ObjInfo)
opts, _, err := batchReplicationOpts(ctx, "", gr.ObjInfo)
if err != nil {
batchLogIf(ctx, err)
continue
}

// TODO: I am not sure we read it back, but we aren't sending whether checksums are single/multipart.
for k, vals := range opts.Header() {
for _, v := range vals {
snowballObj.Headers.Add(k, v)
Expand Down Expand Up @@ -712,14 +712,14 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
return err
}

putOpts, err := batchReplicationOpts(ctx, "", objInfo)
putOpts, isMP, err := batchReplicationOpts(ctx, "", objInfo)
if err != nil {
return err
}
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
putOpts.Internal = miniogo.AdvancedPutOptions{}
}
if objInfo.isMultipart() {
if isMP {
if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil {
return err
}
Expand Down Expand Up @@ -1576,19 +1576,19 @@ func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string
return err
}

func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, isMP bool, err error) {
// TODO: support custom storage class for remote replication
putOpts, err = putReplicationOpts(ctx, "", objInfo, 0)
putOpts, isMP, err = putReplicationOpts(ctx, "", objInfo)
if err != nil {
return putOpts, err
return putOpts, isMP, err
}
putOpts.Internal = miniogo.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
ReplicationRequest: true,
}
return putOpts, nil
return putOpts, isMP, nil
}

// ListBatchJobs - lists all currently active batch jobs, optionally takes {jobType}
Expand Down
63 changes: 33 additions & 30 deletions cmd/bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
return "", false
}

func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, partNum int) (putOpts minio.PutObjectOptions, err error) {
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, isMP bool, err error) {
meta := make(map[string]string)
isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined)

Expand All @@ -794,23 +794,22 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
meta[k] = v
}
}
isMP = objInfo.isMultipart()
if len(objInfo.Checksum) > 0 {
// Add encrypted CRC to metadata for SSE-C objects.
if isSSEC {
meta[ReplicationSsecChecksumHeader] = base64.StdEncoding.EncodeToString(objInfo.Checksum)
} else {
if objInfo.isMultipart() && partNum > 0 {
for _, pi := range objInfo.Parts {
if pi.Number == partNum {
for k, v := range pi.Checksums { // for PutObjectPart
meta[k] = v
}
}
}
} else {
for k, v := range getCRCMeta(objInfo, 0, nil) { // for PutObject/NewMultipartUpload
meta[k] = v
}
cs, mp := getCRCMeta(objInfo, 0, nil)
// Set object checksum.
for k, v := range cs {
meta[k] = v
}
isMP = mp
if !objInfo.isMultipart() && cs[xhttp.AmzChecksumType] == xhttp.AmzChecksumTypeFullObject {
// For objects where checksum is full object, it will be the same.
// Therefore, we use the cheaper PutObject replication.
isMP = false
}
}
}
Expand Down Expand Up @@ -841,7 +840,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
if err != nil {
return putOpts, err
return putOpts, false, err
}
}
putOpts.Internal.TaggingTimestamp = tagTimestamp
Expand All @@ -865,15 +864,15 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
rdate, err := amztime.ISO8601Parse(retainDateStr)
if err != nil {
return putOpts, err
return putOpts, false, err
}
putOpts.RetainUntilDate = rdate
// set retention timestamp in opts
retTimestamp := objInfo.ModTime
if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
if err != nil {
return putOpts, err
return putOpts, false, err
}
}
putOpts.Internal.RetentionTimestamp = retTimestamp
Expand All @@ -885,7 +884,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
if err != nil {
return putOpts, err
return putOpts, false, err
}
}
putOpts.Internal.LegalholdTimestamp = lholdTimestamp
Expand All @@ -897,7 +896,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
if crypto.S3KMS.IsEncrypted(objInfo.UserDefined) {
sseEnc, err := encrypt.NewSSEKMS(objInfo.KMSKeyID(), nil)
if err != nil {
return putOpts, err
return putOpts, false, err
}
putOpts.ServerSideEncryption = sseEnc
}
Expand Down Expand Up @@ -1290,7 +1289,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
// use core client to avoid doing multipart on PUT
c := &minio.Core{Client: tgt.Client}

putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
if err != nil {
replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err))
sendEvent(eventArgs{
Expand Down Expand Up @@ -1322,7 +1321,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
defer cancel()
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if isMP {
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
} else {
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
Expand Down Expand Up @@ -1577,8 +1576,7 @@ applyAction:
replLogIf(ctx, fmt.Errorf("unable to replicate metadata for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
}
} else {
var putOpts minio.PutObjectOptions
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
if err != nil {
replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err))
sendEvent(eventArgs{
Expand Down Expand Up @@ -1609,7 +1607,7 @@ applyAction:
defer cancel()
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if isMP {
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
} else {
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
Expand Down Expand Up @@ -1687,7 +1685,8 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
cHeader := http.Header{}
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
if !isSSEC {
for k, v := range getCRCMeta(objInfo, partInfo.Number, nil) {
cs, _ := getCRCMeta(objInfo, partInfo.Number, nil)
for k, v := range cs {
cHeader.Add(k, v)
}
}
Expand Down Expand Up @@ -1732,8 +1731,9 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
defer ccancel()

if len(objInfo.Checksum) > 0 {
for k, v := range getCRCMeta(objInfo, 0, nil) {
userMeta[k] = v
cs, _ := getCRCMeta(objInfo, 0, nil)
for k, v := range cs {
userMeta[k] = strings.Split(v, "-")[0]
}
}
_, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
Expand Down Expand Up @@ -3782,9 +3782,9 @@ type validateReplicationDestinationOptions struct {
checkReadyErr sync.Map
}

func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string {
func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) (cs map[string]string, isMP bool) {
meta := make(map[string]string)
cs := oi.decryptChecksums(partNum, h)
cs, isMP = oi.decryptChecksums(partNum, h)
for k, v := range cs {
cksum := hash.NewChecksumString(k, v)
if cksum == nil {
Expand All @@ -3793,7 +3793,10 @@ func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string {
if cksum.Valid() {
meta[cksum.Type.Key()] = v
}
meta[xhttp.AmzChecksumType] = cksum.Type.ObjType()
if isMP && partNum == 0 {
meta[xhttp.AmzChecksumType] = cksum.Type.ObjType()
meta[xhttp.AmzChecksumAlgo] = cksum.Type.String()
}
}
return meta
return meta, isMP
}
9 changes: 5 additions & 4 deletions cmd/encryption-v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,16 +1155,17 @@ func (o *ObjectInfo) metadataEncryptFn(headers http.Header) (objectMetaEncryptFn

// decryptChecksums will attempt to decode checksums and return it/them if set.
// if part > 0, and we have the checksum for the part that will be returned.
func (o *ObjectInfo) decryptChecksums(part int, h http.Header) map[string]string {
// Returns whether the checksum (main part 0) is a multipart checksum.
func (o *ObjectInfo) decryptChecksums(part int, h http.Header) (cs map[string]string, isMP bool) {
data := o.Checksum
if len(data) == 0 {
return nil
return nil, false
}
if part > 0 && !crypto.SSEC.IsEncrypted(o.UserDefined) {
// already decrypted in ToObjectInfo for multipart objects
for _, pi := range o.Parts {
if pi.Number == part {
return pi.Checksums
return pi.Checksums, true
}
}
}
Expand All @@ -1174,7 +1175,7 @@ func (o *ObjectInfo) decryptChecksums(part int, h http.Header) map[string]string
if err != crypto.ErrSecretKeyMismatch {
encLogIf(GlobalContext, err)
}
return nil
return nil, part > 0
}
data = decrypted
}
Expand Down
1 change: 1 addition & 0 deletions cmd/ftp-server-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reade
info, err := clnt.PutObject(context.Background(), bucket, object, data, -1, minio.PutObjectOptions{
ContentType: mimedb.TypeByExtension(path.Ext(object)),
DisableContentSha256: true,
Checksum: minio.ChecksumFullObjectCRC32C,
})
n = info.Size
return n, err
Expand Down
3 changes: 2 additions & 1 deletion cmd/object-handlers-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, del bool, h htt
lc.SetPredictionHeaders(w, objInfo.ToLifecycleOpts())
}
}
hash.AddChecksumHeader(w, objInfo.decryptChecksums(0, h))
cs, _ := objInfo.decryptChecksums(0, h)
hash.AddChecksumHeader(w, cs)
}

func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete, lcEvent lifecycle.Event) {
Expand Down
8 changes: 5 additions & 3 deletions cmd/object-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj

if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
// AWS S3 silently drops checksums on range requests.
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header))
cs, _ := objInfo.decryptChecksums(opts.PartNumber, r.Header)
hash.AddChecksumHeader(w, cs)
}

if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil {
Expand Down Expand Up @@ -632,7 +633,7 @@ func (api objectAPIHandlers) getObjectAttributesHandler(ctx context.Context, obj
w.Header().Del(xhttp.ContentType)

if _, ok := opts.ObjectAttributes[xhttp.Checksum]; ok {
chkSums := objInfo.decryptChecksums(0, r.Header)
chkSums, _ := objInfo.decryptChecksums(0, r.Header)
// AWS does not appear to append part number on this API call.
if len(chkSums) > 0 {
OA.Checksum = &objectAttributesChecksum{
Expand Down Expand Up @@ -945,7 +946,8 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob

if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
// AWS S3 silently drops checksums on range requests.
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header))
cs, _ := objInfo.decryptChecksums(opts.PartNumber, r.Header)
hash.AddChecksumHeader(w, cs)
}

// Set standard object headers.
Expand Down
1 change: 1 addition & 0 deletions cmd/sftp-server-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
oi, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{
ContentType: mimedb.TypeByExtension(path.Ext(object)),
DisableContentSha256: true,
Checksum: minio.ChecksumFullObjectCRC32C,
})
stopFn(oi.Size, err)
pr.CloseWithError(err)
Expand Down
25 changes: 23 additions & 2 deletions internal/hash/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ func (c ChecksumType) String() string {
return "invalid"
}

// StringFull returns the type and all flags as a string.
func (c ChecksumType) StringFull() string {
out := []string{c.String()}
if c.Is(ChecksumMultipart) {
out = append(out, "MULTIPART")
}
if c.Is(ChecksumIncludesMultipart) {
out = append(out, "INCLUDESMP")
}
if c.Is(ChecksumTrailing) {
out = append(out, "TRAILING")
}
if c.Is(ChecksumFullObject) {
out = append(out, "FULLOBJ")
}
return strings.Join(out, "|")
}

// FullObjectRequested will return if the checksum type indicates full object checksum was requested.
func (c ChecksumType) FullObjectRequested() bool {
return c&(ChecksumFullObject) == ChecksumFullObject || c.Is(ChecksumCRC64NVME)
Expand Down Expand Up @@ -263,7 +281,8 @@ func NewChecksumFromData(t ChecksumType, data []byte) *Checksum {
}

// ReadCheckSums will read checksums from b and return them.
func ReadCheckSums(b []byte, part int) map[string]string {
// Returns whether this is (part of) a multipart checksum.
func ReadCheckSums(b []byte, part int) (cs map[string]string, isMP bool) {
res := make(map[string]string, 1)
for len(b) > 0 {
t, n := binary.Uvarint(b)
Expand All @@ -277,9 +296,11 @@ func ReadCheckSums(b []byte, part int) map[string]string {
if length == 0 || len(b) < length {
break
}

cs := base64.StdEncoding.EncodeToString(b[:length])
b = b[length:]
if typ.Is(ChecksumMultipart) {
isMP = true
t, n := binary.Uvarint(b)
if n < 0 {
break
Expand Down Expand Up @@ -317,7 +338,7 @@ func ReadCheckSums(b []byte, part int) map[string]string {
if len(res) == 0 {
res = nil
}
return res
return res, isMP
}

// ReadPartCheckSums will read all part checksums from b and return them.
Expand Down

0 comments on commit b8dde47

Please sign in to comment.