Skip to content

Commit d27f5e8

Browse files
committed
Add EmptyMatchTreatment support for SMB reads (#5759)
Allows SMB reads to handle empty directories by specifying EmptyMatchTreatment.ALLOW, enabling pipelines to continue execution when no files match the input pattern. Closes #5759
1 parent 34bd46d commit d27f5e8

File tree

10 files changed

+321
-19
lines changed

10 files changed

+321
-19
lines changed

scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.beam.sdk.extensions.smb.SortedBucketSource.Predicate;
3939
import org.apache.beam.sdk.extensions.smb.SortedBucketTransform.NewBucketMetadataFn;
4040
import org.apache.beam.sdk.io.FileSystems;
41+
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
4142
import org.apache.beam.sdk.io.fs.ResourceId;
4243
import org.apache.beam.sdk.values.TupleTag;
4344
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -218,6 +219,9 @@ public abstract static class Read<T extends IndexedRecord> extends SortedBucketI
218219
@Nullable
219220
abstract Predicate<T> getPredicate();
220221

222+
@Nullable
223+
abstract EmptyMatchTreatment getEmptyMatchTreatment();
224+
221225
abstract Builder<T> toBuilder();
222226

223227
@AutoValue.Builder
@@ -236,6 +240,8 @@ abstract static class Builder<T extends IndexedRecord> {
236240

237241
abstract Builder<T> setPredicate(Predicate<T> predicate);
238242

243+
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
244+
239245
abstract Read<T> build();
240246
}
241247

@@ -264,6 +270,11 @@ public Read<T> withPredicate(Predicate<T> predicate) {
264270
return toBuilder().setPredicate(predicate).build();
265271
}
266272

273+
/** Specifies the empty match treatment for handling empty directories. */
274+
public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
275+
return toBuilder().setEmptyMatchTreatment(emptyMatchTreatment).build();
276+
}
277+
267278
@SuppressWarnings("unchecked")
268279
FileOperations<T> getFileOperations() {
269280
return AvroFileOperations.of(getDatumFactory(), getSchema());
@@ -278,7 +289,8 @@ public SortedBucketSource.BucketedInput<T> toBucketedInput(
278289
getInputDirectories(),
279290
getFilenameSuffix(),
280291
getFileOperations(),
281-
getPredicate());
292+
getPredicate(),
293+
getEmptyMatchTreatment());
282294
}
283295
}
284296

scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonSortedBucketIO.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.extensions.smb.SortedBucketTransform.NewBucketMetadataFn;
3232
import org.apache.beam.sdk.io.Compression;
3333
import org.apache.beam.sdk.io.FileSystems;
34+
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
3435
import org.apache.beam.sdk.io.fs.ResourceId;
3536
import org.apache.beam.sdk.values.TupleTag;
3637
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -118,6 +119,9 @@ public abstract static class Read extends SortedBucketIO.Read<TableRow> {
118119
@Nullable
119120
abstract Predicate<TableRow> getPredicate();
120121

122+
@Nullable
123+
abstract EmptyMatchTreatment getEmptyMatchTreatment();
124+
121125
abstract Builder toBuilder();
122126

123127
@AutoValue.Builder
@@ -134,6 +138,8 @@ abstract static class Builder {
134138

135139
abstract Builder setPredicate(Predicate<TableRow> predicate);
136140

141+
abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
142+
137143
abstract Read build();
138144
}
139145

@@ -156,6 +162,19 @@ public Read withPredicate(Predicate<TableRow> predicate) {
156162
return toBuilder().setPredicate(predicate).build();
157163
}
158164

165+
/**
166+
* Specifies how to handle empty file matches when reading input directories.
167+
*
168+
* @param emptyMatchTreatment the behavior when file patterns don't match any files. Use
169+
* {@link EmptyMatchTreatment#ALLOW} to proceed with empty inputs, {@link
170+
* EmptyMatchTreatment#DISALLOW} to fail on empty matches, or {@link
171+
* EmptyMatchTreatment#ALLOW_IF_WILDCARD} to allow empty matches only for wildcard patterns.
172+
* @return a new {@link Read} instance with the specified empty match treatment
173+
*/
174+
public Read withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
175+
return toBuilder().setEmptyMatchTreatment(emptyMatchTreatment).build();
176+
}
177+
159178
FileOperations<TableRow> getFileOperations() {
160179
return JsonFileOperations.of(getCompression());
161180
}
@@ -168,7 +187,8 @@ public BucketedInput<TableRow> toBucketedInput(final SortedBucketSource.Keying k
168187
getInputDirectories(),
169188
getFilenameSuffix(),
170189
getFileOperations(),
171-
getPredicate());
190+
getPredicate(),
191+
getEmptyMatchTreatment());
172192
}
173193
}
174194

scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIO.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.beam.sdk.extensions.smb.SortedBucketSource.Predicate;
3535
import org.apache.beam.sdk.extensions.smb.SortedBucketTransform.NewBucketMetadataFn;
3636
import org.apache.beam.sdk.io.FileSystems;
37+
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
3738
import org.apache.beam.sdk.io.fs.ResourceId;
3839
import org.apache.beam.sdk.values.TupleTag;
3940
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -222,6 +223,9 @@ public abstract static class Read<T extends IndexedRecord> extends SortedBucketI
222223
@Nullable
223224
abstract Schema getProjection();
224225

226+
@Nullable
227+
abstract EmptyMatchTreatment getEmptyMatchTreatment();
228+
225229
abstract Builder<T> toBuilder();
226230

227231
@AutoValue.Builder
@@ -246,6 +250,8 @@ abstract static class Builder<T extends IndexedRecord> {
246250

247251
abstract Builder<T> setProjection(Schema projection);
248252

253+
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
254+
249255
abstract Read<T> build();
250256
}
251257

@@ -283,6 +289,19 @@ public Read<T> withProjection(Schema projection) {
283289
return toBuilder().setProjection(projection).build();
284290
}
285291

292+
/**
293+
* Specifies how to handle empty file matches when reading input directories.
294+
*
295+
* @param emptyMatchTreatment the behavior when file patterns don't match any files. Use
296+
* {@link EmptyMatchTreatment#ALLOW} to proceed with empty inputs, {@link
297+
* EmptyMatchTreatment#DISALLOW} to fail on empty matches, or {@link
298+
* EmptyMatchTreatment#ALLOW_IF_WILDCARD} to allow empty matches only for wildcard patterns.
299+
* @return a new {@link Read} instance with the specified empty match treatment
300+
*/
301+
public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
302+
return toBuilder().setEmptyMatchTreatment(emptyMatchTreatment).build();
303+
}
304+
286305
@Override
287306
public BucketedInput<T> toBucketedInput(final SortedBucketSource.Keying keying) {
288307
return BucketedInput.of(
@@ -291,7 +310,8 @@ public BucketedInput<T> toBucketedInput(final SortedBucketSource.Keying keying)
291310
getInputDirectories(),
292311
getFilenameSuffix(),
293312
getFileOperations(),
294-
getPredicate());
313+
getPredicate(),
314+
getEmptyMatchTreatment());
295315
}
296316

297317
FileOperations<T> getFileOperations() {

scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@
4141
import java.util.regex.Pattern;
4242
import java.util.stream.Collectors;
4343
import java.util.stream.IntStream;
44+
import javax.annotation.Nullable;
4445
import org.apache.beam.sdk.coders.Coder;
4546
import org.apache.beam.sdk.coders.KvCoder;
4647
import org.apache.beam.sdk.extensions.smb.BucketMetadataUtil.SourceMetadata;
4748
import org.apache.beam.sdk.io.BoundedSource;
4849
import org.apache.beam.sdk.io.FileSystems;
50+
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
4951
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
5052
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
5153
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -348,20 +350,39 @@ public PrimaryKeyedBucketedInput(
348350
String filenameSuffix,
349351
FileOperations<V> fileOperations,
350352
Predicate<V> predicate) {
353+
this(tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate, EmptyMatchTreatment.DISALLOW);
354+
}
355+
356+
public PrimaryKeyedBucketedInput(
357+
TupleTag<V> tupleTag,
358+
List<String> inputDirectories,
359+
String filenameSuffix,
360+
FileOperations<V> fileOperations,
361+
Predicate<V> predicate,
362+
EmptyMatchTreatment emptyMatchTreatment) {
351363
this(
352364
tupleTag,
353365
inputDirectories.stream()
354366
.collect(
355367
Collectors.toMap(
356368
Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))),
357-
predicate);
369+
predicate,
370+
emptyMatchTreatment);
358371
}
359372

360373
public PrimaryKeyedBucketedInput(
361374
TupleTag<V> tupleTag,
362375
Map<String, KV<String, FileOperations<V>>> directories,
363376
Predicate<V> predicate) {
364-
super(Keying.PRIMARY, tupleTag, directories, predicate);
377+
this(tupleTag, directories, predicate, EmptyMatchTreatment.DISALLOW);
378+
}
379+
380+
public PrimaryKeyedBucketedInput(
381+
TupleTag<V> tupleTag,
382+
Map<String, KV<String, FileOperations<V>>> directories,
383+
Predicate<V> predicate,
384+
EmptyMatchTreatment emptyMatchTreatment) {
385+
super(Keying.PRIMARY, tupleTag, directories, predicate, emptyMatchTreatment);
365386
}
366387

367388
public SourceMetadata<V> getSourceMetadata() {
@@ -378,20 +399,39 @@ public PrimaryAndSecondaryKeyedBucktedInput(
378399
String filenameSuffix,
379400
FileOperations<V> fileOperations,
380401
Predicate<V> predicate) {
402+
this(tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate, EmptyMatchTreatment.DISALLOW);
403+
}
404+
405+
public PrimaryAndSecondaryKeyedBucktedInput(
406+
TupleTag<V> tupleTag,
407+
List<String> inputDirectories,
408+
String filenameSuffix,
409+
FileOperations<V> fileOperations,
410+
Predicate<V> predicate,
411+
EmptyMatchTreatment emptyMatchTreatment) {
381412
this(
382413
tupleTag,
383414
inputDirectories.stream()
384415
.collect(
385416
Collectors.toMap(
386417
Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))),
387-
predicate);
418+
predicate,
419+
emptyMatchTreatment);
388420
}
389421

390422
public PrimaryAndSecondaryKeyedBucktedInput(
391423
TupleTag<V> tupleTag,
392424
Map<String, KV<String, FileOperations<V>>> directories,
393425
Predicate<V> predicate) {
394-
super(Keying.PRIMARY_AND_SECONDARY, tupleTag, directories, predicate);
426+
this(tupleTag, directories, predicate, EmptyMatchTreatment.DISALLOW);
427+
}
428+
429+
public PrimaryAndSecondaryKeyedBucktedInput(
430+
TupleTag<V> tupleTag,
431+
Map<String, KV<String, FileOperations<V>>> directories,
432+
Predicate<V> predicate,
433+
EmptyMatchTreatment emptyMatchTreatment) {
434+
super(Keying.PRIMARY_AND_SECONDARY, tupleTag, directories, predicate, emptyMatchTreatment);
395435
}
396436

397437
public SourceMetadata<V> getSourceMetadata() {
@@ -413,6 +453,7 @@ public abstract static class BucketedInput<V> implements Serializable {
413453
protected TupleTag<V> tupleTag;
414454
protected Predicate<V> predicate;
415455
protected Keying keying;
456+
protected EmptyMatchTreatment emptyMatchTreatment;
416457
// lazy, internal checks depend on what kind of iteration is requested
417458
protected transient SourceMetadata<V> sourceMetadata = null; // lazy
418459

@@ -427,21 +468,43 @@ public static <V> BucketedInput<V> of(
427468
String filenameSuffix,
428469
FileOperations<V> fileOperations,
429470
Predicate<V> predicate) {
471+
return of(keying, tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate, null);
472+
}
473+
474+
public static <V> BucketedInput<V> of(
475+
Keying keying,
476+
TupleTag<V> tupleTag,
477+
List<String> inputDirectories,
478+
String filenameSuffix,
479+
FileOperations<V> fileOperations,
480+
Predicate<V> predicate,
481+
@Nullable EmptyMatchTreatment emptyMatchTreatment) {
482+
EmptyMatchTreatment treatment = emptyMatchTreatment != null ? emptyMatchTreatment : EmptyMatchTreatment.DISALLOW;
430483
if (keying == Keying.PRIMARY)
431484
return new PrimaryKeyedBucketedInput<>(
432-
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate);
485+
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate, treatment);
433486
return new PrimaryAndSecondaryKeyedBucktedInput<>(
434-
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate);
487+
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate, treatment);
435488
}
436489

437490
public static <V> BucketedInput<V> of(
438491
Keying keying,
439492
TupleTag<V> tupleTag,
440493
Map<String, KV<String, FileOperations<V>>> directories,
441494
Predicate<V> predicate) {
495+
return of(keying, tupleTag, directories, predicate, null);
496+
}
497+
498+
public static <V> BucketedInput<V> of(
499+
Keying keying,
500+
TupleTag<V> tupleTag,
501+
Map<String, KV<String, FileOperations<V>>> directories,
502+
Predicate<V> predicate,
503+
@Nullable EmptyMatchTreatment emptyMatchTreatment) {
504+
EmptyMatchTreatment treatment = emptyMatchTreatment != null ? emptyMatchTreatment : EmptyMatchTreatment.DISALLOW;
442505
if (keying == Keying.PRIMARY)
443-
return new PrimaryKeyedBucketedInput<>(tupleTag, directories, predicate);
444-
return new PrimaryAndSecondaryKeyedBucktedInput<>(tupleTag, directories, predicate);
506+
return new PrimaryKeyedBucketedInput<>(tupleTag, directories, predicate, treatment);
507+
return new PrimaryAndSecondaryKeyedBucktedInput<>(tupleTag, directories, predicate, treatment);
445508
}
446509

447510
public BucketedInput(
@@ -458,16 +521,27 @@ public BucketedInput(
458521
.collect(
459522
Collectors.toMap(
460523
Functions.identity(), dir -> KV.of(filenameSuffix, fileOperations))),
461-
predicate);
524+
predicate,
525+
EmptyMatchTreatment.DISALLOW);
462526
}
463527

464528
public BucketedInput(
465529
Keying keying,
466530
TupleTag<V> tupleTag,
467531
Map<String, KV<String, FileOperations<V>>> directories,
468532
Predicate<V> predicate) {
533+
this(keying, tupleTag, directories, predicate, EmptyMatchTreatment.DISALLOW);
534+
}
535+
536+
public BucketedInput(
537+
Keying keying,
538+
TupleTag<V> tupleTag,
539+
Map<String, KV<String, FileOperations<V>>> directories,
540+
Predicate<V> predicate,
541+
EmptyMatchTreatment emptyMatchTreatment) {
469542
this.keying = keying;
470543
this.tupleTag = tupleTag;
544+
this.emptyMatchTreatment = emptyMatchTreatment;
471545
this.inputs =
472546
directories.entrySet().stream()
473547
.collect(
@@ -537,9 +611,14 @@ static CoGbkResultSchema schemaOf(List<BucketedInput<?>> sources) {
537611
}
538612

539613
private static List<Metadata> sampleDirectory(ResourceId directory, String filepattern) {
614+
return sampleDirectory(directory, filepattern, EmptyMatchTreatment.DISALLOW);
615+
}
616+
617+
private static List<Metadata> sampleDirectory(ResourceId directory, String filepattern, EmptyMatchTreatment emptyMatchTreatment) {
540618
try {
541619
return FileSystems.match(
542-
directory.resolve(filepattern, StandardResolveOptions.RESOLVE_FILE).toString())
620+
directory.resolve(filepattern, StandardResolveOptions.RESOLVE_FILE).toString(),
621+
emptyMatchTreatment)
543622
.metadata();
544623
} catch (FileNotFoundException e) {
545624
return Collections.emptyList();
@@ -559,11 +638,11 @@ long getOrSampleByteSize() {
559638
final String filenameSuffix = entry.getValue().getKey();
560639
final ResourceId directory = entry.getKey();
561640
List<Metadata> sampledFiles =
562-
sampleDirectory(directory, "*-0000?-of-?????" + filenameSuffix);
641+
sampleDirectory(directory, "*-0000?-of-?????" + filenameSuffix, emptyMatchTreatment);
563642
if (sampledFiles.isEmpty()) {
564643
sampledFiles =
565644
sampleDirectory(
566-
directory, "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix);
645+
directory, "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix, emptyMatchTreatment);
567646
}
568647

569648
int numBuckets = 0;

0 commit comments

Comments
 (0)