Skip to content

Commit f3cfe41

Browse files
committed
update
1 parent b774c19 commit f3cfe41

File tree

6 files changed

+184
-11
lines changed

6 files changed

+184
-11
lines changed

parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,27 @@ private static class ColumnIOCreatorVisitor implements TypeVisitor {
3939
private final boolean validating;
4040
private final MessageType requestedSchema;
4141
private final String createdBy;
42+
private final boolean strictUnsignedIntegerValidation;
4243
private int currentRequestedIndex;
4344
private Type currentRequestedType;
4445
private boolean strictTypeChecking;
4546

4647
private ColumnIOCreatorVisitor(
47-
boolean validating, MessageType requestedSchema, String createdBy, boolean strictTypeChecking) {
48+
boolean validating,
49+
MessageType requestedSchema,
50+
String createdBy,
51+
boolean strictTypeChecking,
52+
boolean strictUnsignedIntegerValidation) {
4853
this.validating = validating;
4954
this.requestedSchema = requestedSchema;
5055
this.createdBy = createdBy;
5156
this.strictTypeChecking = strictTypeChecking;
57+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
5258
}
5359

5460
@Override
5561
public void visit(MessageType messageType) {
56-
columnIO = new MessageColumnIO(requestedSchema, validating, createdBy);
62+
columnIO = new MessageColumnIO(requestedSchema, validating, strictUnsignedIntegerValidation, createdBy);
5763
visitChildren(columnIO, messageType, requestedSchema);
5864
columnIO.setLevels();
5965
columnIO.setLeaves(leaves);
@@ -113,12 +119,13 @@ public MessageColumnIO getColumnIO() {
113119

114120
private final String createdBy;
115121
private final boolean validating;
122+
private final boolean strictUnsignedIntegerValidation;
116123

117124
/**
118125
* validation is off by default
119126
*/
120127
public ColumnIOFactory() {
121-
this(null, false);
128+
this(null, false, false);
122129
}
123130

124131
/**
@@ -127,24 +134,42 @@ public ColumnIOFactory() {
127134
* @param createdBy createdBy string for readers
128135
*/
129136
public ColumnIOFactory(String createdBy) {
130-
this(createdBy, false);
137+
this(createdBy, false, false);
131138
}
132139

133140
/**
134141
* @param validating to turn validation on
135142
*/
136143
public ColumnIOFactory(boolean validating) {
137-
this(null, validating);
144+
this(null, validating, false);
145+
}
146+
147+
/**
148+
* @param validating to turn validation on
149+
* @param strictUnsignedIntegerValidation to turn strict unsigned integer validation on
150+
*/
151+
public ColumnIOFactory(boolean validating, boolean strictUnsignedIntegerValidation) {
152+
this(null, validating, strictUnsignedIntegerValidation);
138153
}
139154

140155
/**
141156
* @param createdBy createdBy string for readers
142157
* @param validating to turn validation on
143158
*/
144159
public ColumnIOFactory(String createdBy, boolean validating) {
160+
this(createdBy, validating, false);
161+
}
162+
163+
/**
164+
* @param createdBy createdBy string for readers
165+
* @param validating to turn validation on
166+
* @param strictUnsignedIntegerValidation to turn strict unsigned integer validation on
167+
*/
168+
public ColumnIOFactory(String createdBy, boolean validating, boolean strictUnsignedIntegerValidation) {
145169
super();
146170
this.createdBy = createdBy;
147171
this.validating = validating;
172+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
148173
}
149174

150175
/**
@@ -163,7 +188,8 @@ public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType file
163188
* @return the corresponding serializing/deserializing structure
164189
*/
165190
public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
166-
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, createdBy, strict);
191+
ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(
192+
validating, requestedSchema, createdBy, strict, strictUnsignedIntegerValidation);
167193
fileSchema.accept(visitor);
168194
return visitor.getColumnIO();
169195
}

parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,18 @@ public class MessageColumnIO extends GroupColumnIO {
6161
private List<PrimitiveColumnIO> leaves;
6262

6363
private final boolean validating;
64+
private final boolean strictUnsignedIntegerValidation;
6465
private final String createdBy;
6566

6667
MessageColumnIO(MessageType messageType, boolean validating, String createdBy) {
68+
this(messageType, validating, false, createdBy);
69+
}
70+
71+
MessageColumnIO(
72+
MessageType messageType, boolean validating, boolean strictUnsignedIntegerValidation, String createdBy) {
6773
super(messageType, null, 0);
6874
this.validating = validating;
75+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
6976
this.createdBy = createdBy;
7077
}
7178

@@ -508,7 +515,9 @@ public void flush() {
508515
public RecordConsumer getRecordWriter(ColumnWriteStore columns) {
509516
RecordConsumer recordWriter = new MessageColumnIORecordConsumer(columns);
510517
if (DEBUG) recordWriter = new RecordConsumerLoggingWrapper(recordWriter);
511-
return validating ? new ValidatingRecordConsumer(recordWriter, getType()) : recordWriter;
518+
return validating
519+
? new ValidatingRecordConsumer(recordWriter, getType(), strictUnsignedIntegerValidation)
520+
: recordWriter;
512521
}
513522

514523
void setLevels() {

parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class ValidatingRecordConsumer extends RecordConsumer {
5252
private static final int UINT_16_MAX_VALUE = 65535;
5353

5454
private final RecordConsumer delegate;
55+
private final boolean strictUnsignedIntegerValidation;
5556

5657
private Deque<Type> types = new ArrayDeque<>();
5758
private Deque<Integer> fields = new ArrayDeque<>();
@@ -63,7 +64,18 @@ public class ValidatingRecordConsumer extends RecordConsumer {
6364
* @param schema the schema to validate against
6465
*/
6566
public ValidatingRecordConsumer(RecordConsumer delegate, MessageType schema) {
67+
this(delegate, schema, false);
68+
}
69+
70+
/**
71+
* @param delegate the consumer to pass down the event to
72+
* @param schema the schema to validate against
73+
* @param strictUnsignedIntegerValidation whether to enable strict unsigned integer validation
74+
*/
75+
public ValidatingRecordConsumer(
76+
RecordConsumer delegate, MessageType schema, boolean strictUnsignedIntegerValidation) {
6677
this.delegate = delegate;
78+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
6779
this.types.push(schema);
6880
}
6981

@@ -207,7 +219,9 @@ private void validate(PrimitiveTypeName... ptypes) {
207219
@Override
208220
public void addInteger(int value) {
209221
validate(INT32);
210-
validateUnsignedInteger(value);
222+
if (strictUnsignedIntegerValidation) {
223+
validateUnsignedInteger(value);
224+
}
211225
delegate.addInteger(value);
212226
}
213227

@@ -217,7 +231,9 @@ public void addInteger(int value) {
217231
@Override
218232
public void addLong(long value) {
219233
validate(INT64);
220-
validateUnsignedLong(value);
234+
if (strictUnsignedIntegerValidation) {
235+
validateUnsignedLong(value);
236+
}
221237
delegate.addLong(value);
222238
}
223239

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class InternalParquetRecordWriter<T> {
5353
private long nextRowGroupSize;
5454
private final BytesInputCompressor compressor;
5555
private final boolean validating;
56+
private final boolean strictUnsignedIntegerValidation;
5657
private final ParquetProperties props;
5758

5859
private boolean closed;
@@ -87,6 +88,28 @@ public InternalParquetRecordWriter(
8788
BytesInputCompressor compressor,
8889
boolean validating,
8990
ParquetProperties props) {
91+
this(
92+
parquetFileWriter,
93+
writeSupport,
94+
schema,
95+
extraMetaData,
96+
rowGroupSize,
97+
compressor,
98+
validating,
99+
false,
100+
props);
101+
}
102+
103+
public InternalParquetRecordWriter(
104+
ParquetFileWriter parquetFileWriter,
105+
WriteSupport<T> writeSupport,
106+
MessageType schema,
107+
Map<String, String> extraMetaData,
108+
long rowGroupSize,
109+
BytesInputCompressor compressor,
110+
boolean validating,
111+
boolean strictUnsignedIntegerValidation,
112+
ParquetProperties props) {
90113
this.parquetFileWriter = parquetFileWriter;
91114
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
92115
this.schema = schema;
@@ -96,6 +119,7 @@ public InternalParquetRecordWriter(
96119
this.nextRowGroupSize = rowGroupSizeThreshold;
97120
this.compressor = compressor;
98121
this.validating = validating;
122+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
99123
this.props = props;
100124
this.fileEncryptor = parquetFileWriter.getEncryptor();
101125
this.rowGroupOrdinal = 0;
@@ -120,7 +144,7 @@ private void initStore() {
120144
bloomFilterWriteStore = columnChunkPageWriteStore;
121145

122146
columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
123-
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
147+
MessageColumnIO columnIO = new ColumnIOFactory(validating, strictUnsignedIntegerValidation).getColumnIO(schema);
124148
this.recordConsumer = columnIO.getRecordWriter(columnStore);
125149
writeSupport.prepareForWrite(recordConsumer);
126150
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
361361
new CodecFactory(conf, encodingProps.getPageSizeThreshold()),
362362
rowGroupSize,
363363
validating,
364+
false,
364365
conf,
365366
maxPaddingSize,
366367
encodingProps,
@@ -375,6 +376,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
375376
CompressionCodecFactory codecFactory,
376377
long rowGroupSize,
377378
boolean validating,
379+
boolean strictUnsignedIntegerValidation,
378380
ParquetConfiguration conf,
379381
int maxPaddingSize,
380382
ParquetProperties encodingProps,
@@ -417,7 +419,15 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
417419
}
418420

419421
this.writer = new InternalParquetRecordWriter<T>(
420-
fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps);
422+
fileWriter,
423+
writeSupport,
424+
schema,
425+
extraMetadata,
426+
rowGroupSize,
427+
compressor,
428+
validating,
429+
strictUnsignedIntegerValidation,
430+
encodingProps);
421431
}
422432

423433
public void write(T object) throws IOException {
@@ -474,6 +484,7 @@ public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
474484
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
475485
private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
476486
private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
487+
private boolean strictUnsignedIntegerValidation = false;
477488
private ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder();
478489

479490
protected Builder(Path path) {
@@ -715,6 +726,27 @@ public SELF withValidation(boolean enableValidation) {
715726
return self();
716727
}
717728

729+
/**
730+
* Enable strict unsigned integer validation for the constructed writer.
731+
*
732+
* @return this builder for method chaining.
733+
*/
734+
public SELF enableStrictUnsignedIntegerValidation() {
735+
this.strictUnsignedIntegerValidation = true;
736+
return self();
737+
}
738+
739+
/**
740+
* Enable or disable strict unsigned integer validation for the constructed writer.
741+
*
742+
* @param strictUnsignedIntegerValidation whether strict unsigned integer validation should be enabled
743+
* @return this builder for method chaining.
744+
*/
745+
public SELF withStrictUnsignedIntegerValidation(boolean strictUnsignedIntegerValidation) {
746+
this.strictUnsignedIntegerValidation = strictUnsignedIntegerValidation;
747+
return self();
748+
}
749+
718750
/**
719751
* Set the {@link WriterVersion format version} used by the constructed
720752
* writer.
@@ -978,6 +1010,7 @@ public ParquetWriter<T> build() throws IOException {
9781010
codecFactory,
9791011
rowGroupSize,
9801012
enableValidation,
1013+
strictUnsignedIntegerValidation,
9811014
conf,
9821015
maxPaddingSize,
9831016
encodingProps,

0 commit comments

Comments
 (0)