Skip to content

Commit 454a93a

Browse files
committed
TC-1800 Manage CompleteMultipartUpload events
Signed-off-by: mrizzi <[email protected]>
1 parent d03019f commit 454a93a

File tree

5 files changed

+44
-5
lines changed

5 files changed

+44
-5
lines changed

pkg/handler/collector/s3/messaging/kafka.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type KafkaMessage struct {
4646
func (m *KafkaMessage) GetEvent() (EventName, error) {
4747
if m.EventName == "s3:ObjectCreated:Put" {
4848
return PUT, nil
49+
} else if m.EventName == "s3:ObjectCreated:CompleteMultipartUpload" {
50+
return CompleteMultipartUpload, nil
4951
}
5052
return "", nil
5153
}

pkg/handler/collector/s3/messaging/message.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import "context"
2020
type EventName string
2121

2222
const (
23-
PUT EventName = "PUT"
23+
PUT EventName = "PUT"
24+
CompleteMultipartUpload EventName = "CompleteMultipartUpload"
2425
)
2526

2627
// Message A generic message related to an S3 bucket and item

pkg/handler/collector/s3/messaging/sqs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ func (m *SqsMessage) GetEvent() (EventName, error) {
6363

6464
if m.Records[0].EventName == "ObjectCreated:Put" {
6565
return PUT, nil
66+
} else if m.Records[0].EventName == "ObjectCreated:CompleteMultipartUpload" {
67+
return CompleteMultipartUpload, nil
6668
}
6769

6870
return "", nil

pkg/handler/collector/s3/s3.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,11 @@ func retrieveWithPoll(s S3Collector, ctx context.Context, docChannel chan<- *pro
210210
continue
211211
}
212212

213-
if e, er := m.GetEvent(); e != messaging.PUT {
213+
if e, er := m.GetEvent(); e != messaging.PUT && e != messaging.CompleteMultipartUpload {
214214
if er != nil {
215215
logger.Debugf("skipping message: %v\n", er)
216216
}
217+
logger.Infof("skipping event: %v\n", e)
217218
continue
218219
}
219220
bucketName, err := m.GetBucket()

pkg/handler/collector/s3/s3_test.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,29 @@ func (t *TestProvider) Close(ctx context.Context) error {
7272
return nil
7373
}
7474

75+
// Test Multipart Message Provider
76+
type TestMultipartProvider struct {
77+
queue string
78+
}
79+
80+
func NewTestMultipartProvider(queue string) TestMultipartProvider {
81+
return TestMultipartProvider{queue}
82+
}
83+
84+
func (t *TestMultipartProvider) ReceiveMessage(context.Context) (messaging.Message, error) {
85+
time.Sleep(2 * time.Second)
86+
87+
return &TestMessage{
88+
item: "test-message",
89+
bucket: t.queue,
90+
event: messaging.CompleteMultipartUpload,
91+
}, nil
92+
}
93+
94+
func (t *TestMultipartProvider) Close(ctx context.Context) error {
95+
return nil
96+
}
97+
7598
// Test Message Provider builder
7699
type TestMpBuilder struct {
77100
}
@@ -81,6 +104,15 @@ func (tb *TestMpBuilder) GetMessageProvider(config messaging.MessageProviderConf
81104
return &provider, nil
82105
}
83106

107+
// Test Message Provider builder
108+
type TestMpMultipartBuilder struct {
109+
}
110+
111+
func (tb *TestMpMultipartBuilder) GetMessageProvider(config messaging.MessageProviderConfig) (messaging.MessageProvider, error) {
112+
provider := NewTestMultipartProvider(config.Queue)
113+
return &provider, nil
114+
}
115+
84116
// Test Bucket
85117
type TestBucket struct {
86118
}
@@ -108,13 +140,14 @@ func TestS3Collector(t *testing.T) {
108140
ctx := context.Background()
109141

110142
t.Run("no polling", func(t *testing.T) { testNoPolling(t, ctx) })
111-
t.Run("queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx) })
143+
t.Run("queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx, &TestMpBuilder{}) })
144+
t.Run("multipart queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx, &TestMpMultipartBuilder{}) })
112145
}
113146

114-
func testQueuesSplitPolling(t *testing.T, ctx context.Context) {
147+
func testQueuesSplitPolling(t *testing.T, ctx context.Context, mpBuilder messaging.MessageProviderBuilder) {
115148
s3Collector := NewS3Collector(S3CollectorConfig{
116149
Queues: "q1,q2",
117-
MpBuilder: &TestMpBuilder{},
150+
MpBuilder: mpBuilder,
118151
BucketBuilder: &TestBucketBuilder{},
119152
Poll: true,
120153
})

0 commit comments

Comments
 (0)