Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

package com.apple.foundationdb.record.lucene.codec;

import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.lucene.LuceneExceptions;
Expand Down Expand Up @@ -65,10 +64,10 @@ public LuceneOptimizedStoredFieldsReader(final FDBDirectory directory, final Seg

public static List<byte[]> getPrimaryKeys(final String segmentName, final FDBDirectory directory) throws IOException {
try {
final List<KeyValue> rawStoredFields = directory.readAllStoredFields(segmentName);
final List<byte[]> rawStoredFields = directory.readAllStoredFields(segmentName);
List<byte[]> primaryKeys = new ArrayList<>();
for (final KeyValue rawStoredField : rawStoredFields) {
final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField.getValue());
for (final byte[] rawStoredField : rawStoredFields) {
final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField);
primaryKeys.add(storedFields.getPrimaryKey().toByteArray());
}
return primaryKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -338,11 +339,12 @@ public void setFieldInfoId(final String filename, final long id, final ByteStrin
writeFDBLuceneFileReference(filename, reference);
}

void writeFieldInfos(long id, byte[] value) {
void writeFieldInfos(long id, byte[] rawBytes) {
if (id == 0) {
throw new RecordCoreArgumentException("FieldInfo id should never be 0");
}
byte[] key = fieldInfosSubspace.pack(id);
byte[] value = serializer.encode(rawBytes);
agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE, key.length + value.length);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(getLogMessage("Write lucene stored field infos data",
Expand All @@ -357,7 +359,9 @@ Stream<NonnullPair<Long, byte[]>> getAllFieldInfosStream() {
LuceneEvents.Waits.WAIT_LUCENE_READ_FIELD_INFOS,
agilityContext.apply(aContext -> aContext.ensureActive().getRange(fieldInfosSubspace.range()).asList()))
.stream()
.map(keyValue -> NonnullPair.of(fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0), keyValue.getValue()));
.map(keyValue -> NonnullPair.of(
fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0),
serializer.decodePossiblyWithoutPrefix(keyValue.getValue())));
}

public CompletableFuture<Integer> getFieldInfosCount() {
Expand Down Expand Up @@ -444,10 +448,11 @@ public int writeData(final long id, final int block, @Nonnull final byte[] value
* Write stored fields document to the DB.
* @param segmentName the segment name writing to
* @param docID the document ID to write
* @param value the bytes value of the stored fields
* @param rawBytes the bytes value of the stored fields
*/
public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] value) {
public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] rawBytes) {
byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docID));
byte[] value = serializer.encode(rawBytes);
agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE_STORED_FIELDS, key.length + value.length);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(getLogMessage("Write lucene stored fields data",
Expand Down Expand Up @@ -542,7 +547,7 @@ private CompletableFuture<byte[]> readData(long id, int block) {
}

@Nonnull
public byte[] readStoredFields(String segmentName, int docId) throws IOException {
public byte[] readStoredFields(String segmentName, int docId) {
final byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docId));
final byte[] rawBytes = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_STORED_FIELDS,
agilityContext.instrument(LuceneEvents.Events.LUCENE_READ_STORED_FIELDS,
Expand All @@ -553,11 +558,11 @@ public byte[] readStoredFields(String segmentName, int docId) throws IOException
.addLogInfo(LuceneLogMessageKeys.DOC_ID, docId)
.addLogInfo(LogMessageKeys.KEY, ByteArrayUtil2.loggable(key));
}
return rawBytes;
return Objects.requireNonNull(serializer.decodePossiblyWithoutPrefix(rawBytes));
}

@Nonnull
public List<KeyValue> readAllStoredFields(String segmentName) {
public List<byte[]> readAllStoredFields(String segmentName) {
final Range range = storedFieldsSubspace.range(Tuple.from(segmentName));
final List<KeyValue> list = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_ALL_STORED_FIELDS,
agilityContext.getRange(range.begin, range.end));
Expand All @@ -567,7 +572,7 @@ public List<KeyValue> readAllStoredFields(String segmentName) {
.addLogInfo(LogMessageKeys.RANGE_START, ByteArrayUtil2.loggable(range.begin))
.addLogInfo(LogMessageKeys.RANGE_END, ByteArrayUtil2.loggable(range.end));
}
return list;
return list.stream().map(KeyValue::getValue).map(serializer::decodePossiblyWithoutPrefix).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public byte[] decode(@Nullable byte[] data) {
return null;
}

if (data.length < 2) {
if (data.length < 1) {
throw new RecordCoreException("Invalid data")
.addLogInfo(LuceneLogMessageKeys.DATA_VALUE, data);
}
Expand Down Expand Up @@ -270,4 +270,36 @@ private void decryptIfNeeded(@Nonnull CompressedAndEncryptedSerializerState stat
}
encodedDataInput.reset(decrypted);
}

@Nullable
public byte[] decodePossiblyWithoutPrefix(@Nullable byte[] bytes) {
if (bytes == null) {
return null;
}

if (isProtobufMessageWithoutPrefix(bytes)) {
return bytes;
}
return decode(bytes);
}

// This can be removed once it is guaranteed that all indexes are using the encoded format.
// Only works for Protobuf messages all of whose fields are themselves length-delimited.
private boolean isProtobufMessageWithoutPrefix(@Nonnull byte[] bytes) {
if (bytes.length < 1) {
return true; // No room for prefix; empty message.
}
final int byte0 = bytes[0];
final int fieldTypeOrPrefixFlags = byte0 & 7;
// Either Protobuf LEN or ENCODING_COMPRESSED.
if (fieldTypeOrPrefixFlags != 2) {
return false;
}
final int fieldNumberOrKeyNumber = byte0 >> 3;
// ENCODING_COMPRESSED will never have a key; 0 is not a valid field number.
if (fieldNumberOrKeyNumber == 0) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,27 @@
import com.apple.test.BooleanSource;
import com.apple.test.Tags;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.security.GeneralSecurityException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;

/**
* Test for Lucene data compression/decompression and encryption/decryption validation.
*/
@Tag(Tags.RequiresFDB)
class LuceneSerializerTest {
@Test
void testEncodingWithoutCompression() throws InvalidProtocolBufferException {
final LuceneSerializer serializer = new LuceneSerializer(true, false, null);
void testEncodingWithoutCompression() throws Exception {
final LuceneSerializer serializer = getSerializer(true, false);
final ByteString content = RandomUtil.randomByteString(ThreadLocalRandom.current(), 100);
final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder()
.setId(1)
Expand Down Expand Up @@ -73,8 +76,8 @@ void testEncodingWithoutCompression() throws InvalidProtocolBufferException {
}

@Test
void testEncodingWithCompression() throws InvalidProtocolBufferException {
final LuceneSerializer serializer = new LuceneSerializer(true, false, null);
void testEncodingWithCompression() throws Exception {
final LuceneSerializer serializer = getSerializer(true, false);
final String duplicateMsg = "abcdefghijklmnopqrstuvwxyz";
final String content = "content_" + duplicateMsg + "_" + duplicateMsg;
final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder()
Expand Down Expand Up @@ -103,11 +106,7 @@ void testEncodingWithCompression() throws InvalidProtocolBufferException {
@ParameterizedTest
@BooleanSource
void testEncodingWithEncryption(boolean compressToo) throws Exception {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(128);
SecretKey key = keyGen.generateKey();
final SerializationKeyManager keyManager = new FixedZeroKeyManager(key, null, null);
final LuceneSerializer serializer = new LuceneSerializer(compressToo, true, keyManager);
final LuceneSerializer serializer = getSerializer(compressToo, true);
final ByteString content = RandomUtil.randomByteString(ThreadLocalRandom.current(), 100);
final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder()
.setId(1)
Expand All @@ -122,4 +121,52 @@ void testEncodingWithEncryption(boolean compressToo) throws Exception {
final LuceneFileSystemProto.LuceneFileReference decryptedReference = LuceneFileSystemProto.LuceneFileReference.parseFrom(decodedValue);
Assertions.assertEquals(content, decryptedReference.getContent());
}

@ParameterizedTest
@MethodSource("encodedCompressedAndEncrypted")
void testProtobufMessageWithoutPrefix(boolean encode, boolean compress, boolean encrypt) throws Exception {
final LuceneSerializer serializer = getSerializer(compress, encrypt);
final String storedField = RandomUtil.randomAlphanumericString(ThreadLocalRandom.current(), 20);
final LuceneStoredFieldsProto.LuceneStoredFields.Builder builder = LuceneStoredFieldsProto.LuceneStoredFields.newBuilder();
builder.addStoredFieldsBuilder().setFieldNumber(5).setStringValue(storedField);
final byte[] value = builder.build().toByteArray();
final byte[] encodedValue = encode ? serializer.encode(value) : value;
final byte[] decodedValue = serializer.decodePossiblyWithoutPrefix(encodedValue);
Assertions.assertArrayEquals(value, decodedValue);
final LuceneStoredFieldsProto.LuceneStoredFields storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(decodedValue);
Assertions.assertEquals(storedField, storedFields.getStoredFields(0).getStringValue());
}

@ParameterizedTest
@MethodSource("encodedCompressedAndEncrypted")
void testEmpty(boolean encode, boolean compress, boolean encrypt) throws Exception {
final LuceneSerializer serializer = getSerializer(compress, encrypt);
final byte[] value = new byte[0];
final byte[] encodedValue = encode ? serializer.encode(value) : value;
final byte[] decodedValue = serializer.decodePossiblyWithoutPrefix(encodedValue);
Assertions.assertArrayEquals(value, decodedValue);
}

static Stream<Arguments> encodedCompressedAndEncrypted() {
return Stream.of(
Arguments.of(false, false, false),
Arguments.of(true, false, false),
Arguments.of(true, false, true),
Arguments.of(true, true, false),
Arguments.of(true, true, true)
);
}

private LuceneSerializer getSerializer(boolean compress, boolean encrypt) throws GeneralSecurityException {
final SerializationKeyManager keyManager;
if (encrypt) {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(128);
SecretKey key = keyGen.generateKey();
keyManager = new FixedZeroKeyManager(key, null, null);
} else {
keyManager = null;
}
return new LuceneSerializer(compress, encrypt, keyManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -163,7 +164,7 @@ public IndexInput openInput(@Nonnull final String name, @Nonnull final IOContext
getAgilityContext().getRange(key, ByteArrayUtil.strinc(key))));
final Map<Long, byte[]> storedFields = rawStoredFields.stream().collect(Collectors.toMap(
keyValue -> storedFieldsSubspace.unpack(keyValue.getKey()).getLong(1),
keyValue -> keyValue.getValue()
keyValue -> Objects.requireNonNull(Objects.requireNonNull(getSerializer()).decodePossiblyWithoutPrefix(keyValue.getValue()))
));
final NonnullPair<String, Map<Long, byte[]>> previous = previousStoredFields.getAndSet(NonnullPair.of(name, storedFields));
if (previous != null) {
Expand Down
Loading