diff --git a/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java b/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java new file mode 100644 index 00000000000..caee3163a9a --- /dev/null +++ b/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java @@ -0,0 +1,75 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.ssz.sos; + +import java.util.Arrays; +import java.util.List; + +public class SszByteArrayChunksWriter implements SszWriter { + private final byte[][] chunks; + private final int maxChunkSize; + private final int maxChunks; + + private int lastChunkIndex; + private int size = 0; + + public SszByteArrayChunksWriter(final int maxSize, final int maxChunkSize) { + this.maxChunks = (maxSize + maxChunkSize - 1) / maxChunkSize; + this.chunks = new byte[maxChunks][]; + this.maxChunkSize = maxChunkSize; + } + + @Override + public void write(final byte[] bytes, final int offset, final int length) { + final int chunk = this.size / maxChunkSize; + final int chunkOffset = this.size % maxChunkSize; + final byte[] chunkData = getChunk(chunk); + + if (chunkOffset + length > maxChunkSize) { + final int lengthToFillCurrentChunk = maxChunkSize - chunkOffset; + System.arraycopy(bytes, offset, chunkData, chunkOffset, lengthToFillCurrentChunk); + this.size += lengthToFillCurrentChunk; + + write(bytes, offset + lengthToFillCurrentChunk, length - lengthToFillCurrentChunk); + } else { + System.arraycopy(bytes, offset, chunkData, chunkOffset, length); + this.size += length; + } + } + + private byte[] getChunk(final int chunk) { + if (chunk >= maxChunks) { + throw new IndexOutOfBoundsException( + "Chunk index out of bounds: " + chunk + ", max chunks: " + maxChunks); + } + byte[] chunkData = chunks[chunk]; + if (chunkData == null) { + chunkData = new byte[maxChunkSize]; + chunks[chunk] = chunkData; + lastChunkIndex = chunk; + } + return chunkData; + } + + public List getChunks() { + int lastChunkSize = size % maxChunkSize; + if (lastChunkSize != 0) { + byte[] lastChunk = chunks[lastChunkIndex]; + byte[] lastChunkTrimmed = Arrays.copyOf(lastChunk, lastChunkSize); + chunks[lastChunkIndex] = lastChunkTrimmed; + } + + return Arrays.stream(chunks).limit(lastChunkIndex + 1).toList(); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java index 9779986b9a5..d389c10b238 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java @@ -14,18 +14,24 @@ package tech.pegasys.teku.storage.server.kvstore; import com.google.errorprone.annotations.MustBeClosed; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public interface KvStoreAccessor extends AutoCloseable { - Optional get(KvStoreVariable variable); + Optional get(KvStoreUnchunkedVariable variable); - Optional getRaw(KvStoreVariable variable); + Optional get(KvStoreChunkedVariable variable); + + Optional getRaw(KvStoreUnchunkedVariable variable); + + Optional> getRaw(KvStoreChunkedVariable variable); Optional get(KvStoreColumn column, K key); @@ -112,14 +118,18 @@ , V> Stream> stream( interface KvStoreTransaction extends AutoCloseable { - void put(KvStoreVariable variable, T value); + void put(KvStoreUnchunkedVariable variable, T value); + + void put(KvStoreChunkedVariable variable, T value); /** * Write raw bytes to a specified variable. * *

WARNING: should only be used to migrate data between database instances */ - void putRaw(KvStoreVariable variable, Bytes value); + void putRaw(KvStoreUnchunkedVariable variable, Bytes value); + + void putRaw(KvStoreChunkedVariable variable, List valueChunks); void put(KvStoreColumn column, K key, V value); @@ -134,7 +144,7 @@ interface KvStoreTransaction extends AutoCloseable { void delete(KvStoreColumn column, K key); - void delete(KvStoreVariable variable); + void delete(KvStoreUnchunkedVariable variable); void commit(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java index 83bda83c9a8..e21fe756ed0 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java @@ -45,7 +45,9 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedStateStorageLogic.FinalizedStateUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaCombined; @@ -205,8 +207,7 @@ public void ingest( try (final KvStoreTransaction transaction = db.startTransaction()) { for (String key : newVariables.keySet()) { logger.accept(String.format("Copy variable %s", key)); - dao.getRawVariable(oldVariables.get(key)) - .ifPresent(value -> transaction.putRaw(newVariables.get(key), value)); + handleVariableMigration(dao, newVariables.get(key), transaction); } transaction.commit(); } @@ -228,6 +229,28 @@ public void ingest( } } + private void handleVariableMigration( + final V4MigratableSourceDao dao, + final KvStoreVariable variable, + final KvStoreTransaction transaction) { + if (variable.toChunkedVariable().isPresent()) { + final KvStoreChunkedVariable chunkedVariable = variable.toChunkedVariable().orElseThrow(); + dao.getRawVariable(chunkedVariable) + .ifPresent(chunks -> transaction.putRaw(chunkedVariable, chunks)); + return; + } + + if (variable.toUnchunkedVariable().isPresent()) { + final KvStoreUnchunkedVariable unchunckedVariable = + variable.toUnchunkedVariable().orElseThrow(); + dao.getRawVariable(unchunckedVariable) + .ifPresent(value -> transaction.putRaw(unchunckedVariable, value)); + return; + } + + throw new IllegalStateException("Variable must be chunked or unchunked"); + } + @Override public Map> getColumnMap() { return schema.getColumnMap(); @@ -239,7 +262,12 @@ public Map> getVariableMap() { } @Override - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + return db.getRaw(var); + } + + @Override + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java index b9d918f98dc..41dad1d16c1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java @@ -40,7 +40,9 @@ import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedKvStoreDao.V4FinalizedUpdater; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4HotKvStoreDao.V4HotUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public class KvStoreCombinedDaoAdapter implements KvStoreCombinedDao, V4MigratableSourceDao { @@ -367,7 +369,16 @@ public Map> getVariableMap() { } @Override - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + if (hotDao.getVariableMap().containsValue(var)) { + return hotDao.getRawVariable(var); + } else { + return finalizedDao.getRawVariable(var); + } + } + + @Override + public Optional> getRawVariable(final KvStoreChunkedVariable var) { if (hotDao.getVariableMap().containsValue(var)) { return hotDao.getRawVariable(var); } else { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java index e4eae58e694..cf02709a5d1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java @@ -38,7 +38,9 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.FinalizedUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaFinalizedSnapshotStateAdapter; @@ -195,7 +197,11 @@ public Optional getEarliestBlobSidecarSlot() { return db.get(schema.getVariableEarliestBlobSidecarSlot()); } - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + return db.getRaw(var); + } + + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java index 567ad9120e5..ec9814d6566 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java @@ -37,7 +37,9 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.HotUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaHotAdapter; @@ -142,7 +144,11 @@ public V4HotUpdater hotUpdater() { return new V4HotUpdater(db, schema); } - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + return db.getRaw(var); + } + + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java index 8e5a9ab2d99..fe988cbebaa 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java @@ -14,12 +14,15 @@ package tech.pegasys.teku.storage.server.kvstore.dataaccess; import com.google.errorprone.annotations.MustBeClosed; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public interface V4MigratableSourceDao { @@ -27,7 +30,9 @@ public interface V4MigratableSourceDao { Map> getVariableMap(); - Optional getRawVariable(final KvStoreVariable var); + Optional getRawVariable(final KvStoreUnchunkedVariable var); + + Optional> getRawVariable(final KvStoreChunkedVariable var); @MustBeClosed Stream> streamRawColumn(final KvStoreColumn kvStoreColumn); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java new file mode 100644 index 00000000000..5ec8c8d4435 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java @@ -0,0 +1,73 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.schema; + +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; + +import java.util.Objects; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreChunkingSerializer; + +public class KvStoreChunkedVariable implements KvStoreVariable { + private final Bytes id; + private final KvStoreChunkingSerializer serializer; + + private KvStoreChunkedVariable( + final byte[] id, final KvStoreChunkingSerializer serializer) { + this.id = Bytes.wrap(id); + this.serializer = serializer; + } + + public static KvStoreChunkedVariable create( + final int id, final KvStoreChunkingSerializer serializer) { + final byte byteId = toByteExact(id); + return new KvStoreChunkedVariable(new byte[] {byteId}, serializer); + } + + public Bytes getId() { + return id; + } + + public KvStoreChunkingSerializer getSerializer() { + return serializer; + } + + @Override + public Optional> toChunkedVariable() { + return Optional.of(this); + } + + @Override + public Optional> toUnchunkedVariable() { + return Optional.empty(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KvStoreChunkedVariable that = (KvStoreChunkedVariable) o; + return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); + } + + @Override + public int hashCode() { + return Objects.hash(id, serializer); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java new file mode 100644 index 00000000000..859ae2f87a5 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java @@ -0,0 +1,72 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.schema; + +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; + +import java.util.Objects; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; + +public class KvStoreUnchunkedVariable implements KvStoreVariable { + private final Bytes id; + private final KvStoreSerializer serializer; + + private KvStoreUnchunkedVariable(final byte[] id, final KvStoreSerializer serializer) { + this.id = Bytes.wrap(id); + this.serializer = serializer; + } + + public static KvStoreUnchunkedVariable create( + final int id, final KvStoreSerializer serializer) { + final byte byteId = toByteExact(id); + return new KvStoreUnchunkedVariable(new byte[] {byteId}, serializer); + } + + public Bytes getId() { + return id; + } + + public KvStoreSerializer getSerializer() { + return serializer; + } + + @Override + public Optional> toChunkedVariable() { + return Optional.empty(); + } + + @Override + public Optional> toUnchunkedVariable() { + return Optional.of(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KvStoreUnchunkedVariable that = (KvStoreUnchunkedVariable) o; + return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); + } + + @Override + public int hashCode() { + return Objects.hash(id, serializer); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java index 8e9c96b5c0e..c602785a467 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java @@ -1,5 +1,5 @@ /* - * Copyright Consensys Software Inc., 2022 + * Copyright Consensys Software Inc., 2025 * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -13,48 +13,11 @@ package tech.pegasys.teku.storage.server.kvstore.schema; -import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; +import java.util.Optional; -import java.util.Objects; -import org.apache.tuweni.bytes.Bytes; -import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; +public interface KvStoreVariable { -public class KvStoreVariable { - private final Bytes id; - private final KvStoreSerializer serializer; + Optional> toChunkedVariable(); - private KvStoreVariable(final byte[] id, final KvStoreSerializer serializer) { - this.id = Bytes.wrap(id); - this.serializer = serializer; - } - - public static KvStoreVariable create(final int id, final KvStoreSerializer serializer) { - final byte byteId = toByteExact(id); - return new KvStoreVariable(new byte[] {byteId}, serializer); - } - - public Bytes getId() { - return id; - } - - public KvStoreSerializer getSerializer() { - return serializer; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final KvStoreVariable that = (KvStoreVariable) o; - return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); - } - - @Override - public int hashCode() { - return Objects.hash(id, serializer); - } + Optional> toUnchunkedVariable(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java index c74190ae832..a6b2e17b178 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java @@ -63,29 +63,29 @@ public interface SchemaCombined extends Schema { getColumnNonCanonicalBlobSidecarBySlotRootBlobIndex(); // Variables - KvStoreVariable getVariableGenesisTime(); + KvStoreUnchunkedVariable getVariableGenesisTime(); - KvStoreVariable getVariableJustifiedCheckpoint(); + KvStoreUnchunkedVariable getVariableJustifiedCheckpoint(); - KvStoreVariable getVariableBestJustifiedCheckpoint(); + KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint(); - KvStoreVariable getVariableFinalizedCheckpoint(); + KvStoreUnchunkedVariable getVariableFinalizedCheckpoint(); - KvStoreVariable getVariableLatestFinalizedState(); + KvStoreChunkedVariable getVariableLatestFinalizedState(); - KvStoreVariable getVariableMinGenesisTimeBlock(); + KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock(); - KvStoreVariable getVariableWeakSubjectivityCheckpoint(); + KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint(); - KvStoreVariable getVariableAnchorCheckpoint(); + KvStoreUnchunkedVariable getVariableAnchorCheckpoint(); - KvStoreVariable getOptimisticTransitionBlockSlot(); + KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot(); - KvStoreVariable getVariableEarliestBlobSidecarSlot(); + KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot(); - KvStoreVariable getVariableEarliestBlockSlot(); + KvStoreUnchunkedVariable getVariableEarliestBlockSlot(); - KvStoreVariable getVariableFinalizedDepositSnapshot(); + KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot(); Map> getColumnMap(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java index a98a89dcbc6..c92498a6a64 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java @@ -98,15 +98,15 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { return delegate.getColumnNonCanonicalRootsBySlot(); } - public KvStoreVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot() { return delegate.getOptimisticTransitionBlockSlot(); } - public KvStoreVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot() { return delegate.getVariableEarliestBlobSidecarSlot(); } - public KvStoreVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlockSlot() { return delegate.getVariableEarliestBlockSlot(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java index 4d4547e0982..1b5255e1589 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java @@ -70,39 +70,39 @@ public KvStoreColumn getColumnHotStatesByRoot() { return delegate.getColumnBlobSidecarBySlotRootBlobIndex(); } - public KvStoreVariable getVariableGenesisTime() { + public KvStoreUnchunkedVariable getVariableGenesisTime() { return delegate.getVariableGenesisTime(); } - public KvStoreVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableJustifiedCheckpoint() { return delegate.getVariableJustifiedCheckpoint(); } - public KvStoreVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint() { return delegate.getVariableBestJustifiedCheckpoint(); } - public KvStoreVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunkedVariable getVariableFinalizedCheckpoint() { return delegate.getVariableFinalizedCheckpoint(); } - public KvStoreVariable getVariableLatestFinalizedState() { + public KvStoreChunkedVariable getVariableLatestFinalizedState() { return delegate.getVariableLatestFinalizedState(); } - public KvStoreVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock() { return delegate.getVariableMinGenesisTimeBlock(); } - public KvStoreVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint() { return delegate.getVariableWeakSubjectivityCheckpoint(); } - public KvStoreVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunkedVariable getVariableAnchorCheckpoint() { return delegate.getVariableAnchorCheckpoint(); } - public KvStoreVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot() { return delegate.getVariableFinalizedDepositSnapshot(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java index 15630607eb5..5d304d8fdc8 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreChunkingSerializer; import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; public abstract class V6SchemaCombined implements SchemaCombined { @@ -63,28 +64,28 @@ public abstract class V6SchemaCombined implements SchemaCombined { KvStoreColumn.create(7, BYTES32_SERIALIZER, CHECKPOINT_EPOCHS_SERIALIZER); // Variables - private static final KvStoreVariable GENESIS_TIME = - KvStoreVariable.create(1, UINT64_SERIALIZER); - private static final KvStoreVariable JUSTIFIED_CHECKPOINT = - KvStoreVariable.create(2, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable BEST_JUSTIFIED_CHECKPOINT = - KvStoreVariable.create(3, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable FINALIZED_CHECKPOINT = - KvStoreVariable.create(4, CHECKPOINT_SERIALIZER); - private final KvStoreVariable latestFinalizedState; - private static final KvStoreVariable MIN_GENESIS_TIME_BLOCK = - KvStoreVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); + private static final KvStoreUnchunkedVariable GENESIS_TIME = + KvStoreUnchunkedVariable.create(1, UINT64_SERIALIZER); + private static final KvStoreUnchunkedVariable JUSTIFIED_CHECKPOINT = + KvStoreUnchunkedVariable.create(2, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable BEST_JUSTIFIED_CHECKPOINT = + KvStoreUnchunkedVariable.create(3, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable FINALIZED_CHECKPOINT = + KvStoreUnchunkedVariable.create(4, CHECKPOINT_SERIALIZER); + private final KvStoreChunkedVariable latestFinalizedState; + private static final KvStoreUnchunkedVariable MIN_GENESIS_TIME_BLOCK = + KvStoreUnchunkedVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); // 7 was the protoarray snapshot variable but is no longer used. - private static final KvStoreVariable WEAK_SUBJECTIVITY_CHECKPOINT = - KvStoreVariable.create(8, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable ANCHOR_CHECKPOINT = - KvStoreVariable.create(9, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable FINALIZED_DEPOSIT_SNAPSHOT = - KvStoreVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); + private static final KvStoreUnchunkedVariable WEAK_SUBJECTIVITY_CHECKPOINT = + KvStoreUnchunkedVariable.create(8, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable ANCHOR_CHECKPOINT = + KvStoreUnchunkedVariable.create(9, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable FINALIZED_DEPOSIT_SNAPSHOT = + KvStoreUnchunkedVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); - private final KvStoreVariable optimisticTransitionBlockSlot; - private final KvStoreVariable earliestBlobSidecarSlot; - private final KvStoreVariable earliestBlockSlot; + private final KvStoreUnchunkedVariable optimisticTransitionBlockSlot; + private final KvStoreUnchunkedVariable earliestBlobSidecarSlot; + private final KvStoreUnchunkedVariable earliestBlockSlot; protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { this.finalizedOffset = finalizedOffset; @@ -95,13 +96,18 @@ protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { KvStoreSerializer.createStateSerializer(spec); checkpointStates = KvStoreColumn.create(2, CHECKPOINT_SERIALIZER, stateSerializer); hotStatesByRoot = KvStoreColumn.create(6, BYTES32_SERIALIZER, stateSerializer); - latestFinalizedState = KvStoreVariable.create(5, stateSerializer); + final KvStoreChunkingSerializer stateChunkingSerializer = + KvStoreChunkingSerializer.createStateSerializer(spec); + + latestFinalizedState = KvStoreChunkedVariable.create(5, stateChunkingSerializer); votes = KvStoreColumn.create(3, UINT64_SERIALIZER, VOTE_TRACKER_SERIALIZER); - optimisticTransitionBlockSlot = KvStoreVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); - earliestBlobSidecarSlot = KvStoreVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); - earliestBlockSlot = KvStoreVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); + optimisticTransitionBlockSlot = + KvStoreUnchunkedVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); + earliestBlobSidecarSlot = + KvStoreUnchunkedVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); + earliestBlockSlot = KvStoreUnchunkedVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); } @Override @@ -140,62 +146,62 @@ public KvStoreColumn getColumnHotStatesByRoot() { } @Override - public KvStoreVariable getVariableGenesisTime() { + public KvStoreUnchunkedVariable getVariableGenesisTime() { return GENESIS_TIME; } @Override - public KvStoreVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableJustifiedCheckpoint() { return JUSTIFIED_CHECKPOINT; } @Override - public KvStoreVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint() { return BEST_JUSTIFIED_CHECKPOINT; } @Override - public KvStoreVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunkedVariable getVariableFinalizedCheckpoint() { return FINALIZED_CHECKPOINT; } @Override - public KvStoreVariable getVariableLatestFinalizedState() { + public KvStoreChunkedVariable getVariableLatestFinalizedState() { return latestFinalizedState; } @Override - public KvStoreVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock() { return MIN_GENESIS_TIME_BLOCK; } @Override - public KvStoreVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint() { return WEAK_SUBJECTIVITY_CHECKPOINT; } @Override - public KvStoreVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunkedVariable getVariableAnchorCheckpoint() { return ANCHOR_CHECKPOINT; } @Override - public KvStoreVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot() { return optimisticTransitionBlockSlot; } @Override - public KvStoreVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot() { return FINALIZED_DEPOSIT_SNAPSHOT; } @Override - public KvStoreVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot() { return earliestBlobSidecarSlot; } @Override - public KvStoreVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlockSlot() { return earliestBlockSlot; } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java new file mode 100644 index 00000000000..5b0f82d28dd --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java @@ -0,0 +1,61 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import java.util.List; +import java.util.Objects; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.ssz.sos.SszByteArrayChunksWriter; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; + +class BeaconStateChunkingSerializer implements KvStoreChunkingSerializer { + + private final Spec spec; + + BeaconStateChunkingSerializer(final Spec spec) { + this.spec = spec; + } + + @Override + public BeaconState deserialize(final List data) { + return spec.deserializeBeaconState(Bytes.wrap(data.stream().map(Bytes::wrap).toList())); + } + + @Override + public List serialize(final BeaconState value) { + final SszByteArrayChunksWriter sszByteArrayChunksWriter = + new SszByteArrayChunksWriter( + value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 1 << 23); + value.sszSerialize(sszByteArrayChunksWriter); + return sszByteArrayChunksWriter.getChunks(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final BeaconStateChunkingSerializer that = (BeaconStateChunkingSerializer) o; + return Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(spec); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java new file mode 100644 index 00000000000..5d7253ecc69 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java @@ -0,0 +1,57 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.tuweni.bytes.Bytes; + +/** + * This serializer is intended to be used as a Key so that it preserve slot ordering when we stream + * data. This is useful for values that are always looked up by root and slot, giving us the ability + * to quickly lookup most recent\oldest values by slot as well as perform pruning based on slot + */ +public class ChunkedVariableKeySerializer + implements KvStoreSerializer { + static final int ID_KEY_SIZE = 1; + + // we support only 256 chunks + static final int CHUNK_KEY_SIZE = 1; + + static final int ID_KEY_OFFSET = 0; + static final int CHUNK_KEY_OFFSET = ID_KEY_OFFSET + ID_KEY_SIZE; + static final int DATA_SIZE = CHUNK_KEY_OFFSET + CHUNK_KEY_SIZE; + + @Override + public IdAndChunkKey deserialize(final byte[] data) { + checkArgument(data.length == DATA_SIZE); + return IdAndChunkKey.fromBytes(Bytes.wrap(data)); + } + + @Override + public byte[] serialize(final IdAndChunkKey value) { + return value.toBytes().toArrayUnsafe(); + } + + public record IdAndChunkKey(Bytes id, Bytes chunkKey) { + public static IdAndChunkKey fromBytes(final Bytes bytes) { + return new IdAndChunkKey( + bytes.slice(0, ID_KEY_SIZE), bytes.slice(ID_KEY_SIZE, CHUNK_KEY_SIZE)); + } + + public Bytes toBytes() { + return Bytes.concatenate(id, chunkKey); + } + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java new file mode 100644 index 00000000000..44f476ab380 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java @@ -0,0 +1,29 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import java.util.List; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; + +public interface KvStoreChunkingSerializer { + + static KvStoreChunkingSerializer createStateSerializer(final Spec spec) { + return new BeaconStateChunkingSerializer(spec); + } + + T deserialize(List data); + + List serialize(T value); +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java index ecaa64c3b50..ea464391af4 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java @@ -57,6 +57,8 @@ public interface KvStoreSerializer { SLOT_AND_BLOCK_ROOT_AND_BLOB_INDEX_KEY_SERIALIZER = new SlotAndBlockRootAndBlobIndexKeySerializer(); + ChunkedVariableKeySerializer CHUNKED_VARIABLE_KEY_SERIALIZER = new ChunkedVariableKeySerializer(); + static KvStoreSerializer createStateSerializer(final Spec spec) { return new BeaconStateSerializer(spec); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java index 634bfac9bf2..f672efc037f 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java @@ -16,6 +16,8 @@ import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.asColumnEntry; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.asOptionalColumnEntry; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.deserializeKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableChunksKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getColumnKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getKeyAfterColumn; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getVariableKey; @@ -29,10 +31,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,8 +53,9 @@ import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; /** * Implements {@link KvStoreAccessor} using LevelDB to store the data. @@ -92,17 +97,47 @@ public LevelDbInstance( } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable) .map(value -> variable.getSerializer().deserialize(value.toArrayUnsafe())); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + return getRaw(variable) + .map( + value -> + variable + .getSerializer() + .deserialize(value.stream().map(Bytes::toArrayUnsafe).toList())); + } + + @Override + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); return Optional.ofNullable(db.get(getVariableKey(variable))).map(Bytes::wrap); } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + assertOpen(); + return Optional.ofNullable(db.get(getChunkedVariableChunksKey(variable))) + .map(Bytes::wrap) + .map( + chunksNumber -> { + if (chunksNumber.size() > 1) { + // this is a non chunked variable to be migrated + return List.of(Bytes.wrap(chunksNumber)); + } + // TODO implement with an iterator + return IntStream.range(0, Byte.toUnsignedInt(chunksNumber.get(0))) + .mapToObj(i -> db.get(getChunkedVariableKey(variable, i))) + .peek(Objects::requireNonNull) + .map(Bytes::wrap) + .toList(); + }); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java index 1235aaac991..747e1dd0b9a 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java @@ -14,11 +14,15 @@ package tech.pegasys.teku.storage.server.leveldb; import static tech.pegasys.teku.infrastructure.logging.DbLogger.DB_LOGGER; +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableChunksKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getColumnKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getVariableKey; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.apache.tuweni.bytes.Bytes; @@ -26,8 +30,9 @@ import org.iq80.leveldb.WriteBatch; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class LevelDbTransaction implements KvStoreTransaction { @@ -46,15 +51,33 @@ public LevelDbTransaction( } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { putRaw(variable, Bytes.wrap(variable.getSerializer().serialize(value))); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + putRaw(variable, variable.getSerializer().serialize(value).stream().map(Bytes::wrap).toList()); + } + + @Override + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { applyUpdate(() -> writeBatch.put(getVariableKey(variable), value.toArrayUnsafe())); } + @Override + public void putRaw(final KvStoreChunkedVariable variable, final List valueChunks) { + applyUpdate( + () -> { + writeBatch.put( + getChunkedVariableChunksKey(variable), new byte[] {toByteExact(valueChunks.size())}); + for (int index = 0; index < valueChunks.size(); index++) { + writeBatch.put( + getChunkedVariableKey(variable, index), valueChunks.get(index).toArrayUnsafe()); + } + }); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { putRaw( @@ -84,7 +107,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { applyUpdate(() -> writeBatch.delete(getVariableKey(variable))); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java index cc17103b12d..60da943ae32 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java @@ -14,13 +14,15 @@ package tech.pegasys.teku.storage.server.leveldb; import static com.google.common.base.Preconditions.checkArgument; +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; import java.util.Arrays; import java.util.Map; import java.util.Optional; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; class LevelDbUtils { @@ -33,7 +35,27 @@ static byte[] getKeyAfterColumn(final KvStoreColumn column) { return keyAfterColumn; } - static byte[] getVariableKey(final KvStoreVariable variable) { + static byte[] getVariableKey(final KvStoreUnchunkedVariable variable) { + final byte[] suffix = variable.getId().toArrayUnsafe(); + final byte[] key = new byte[suffix.length + 1]; + // All 1s in binary so right at the end of the index. + key[0] = VARIABLE_COLUMN_PREFIX; + System.arraycopy(suffix, 0, key, 1, suffix.length); + return key; + } + + static byte[] getChunkedVariableKey(final KvStoreChunkedVariable variable, final int chunkId) { + final byte[] idSuffix = variable.getId().toArrayUnsafe(); + final byte[] key = new byte[idSuffix.length + 2]; + // All 1s in binary so right at the end of the index. + key[0] = VARIABLE_COLUMN_PREFIX; + System.arraycopy(idSuffix, 0, key, 1, idSuffix.length); + // last is chunk id + key[key.length - 1] = toByteExact(chunkId); + return key; + } + + static byte[] getChunkedVariableChunksKey(final KvStoreChunkedVariable variable) { final byte[] suffix = variable.getId().toArrayUnsafe(); final byte[] key = new byte[suffix.length + 1]; // All 1s in binary so right at the end of the index. diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java index 8ed14e3ab5d..c963d5066f8 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java @@ -34,8 +34,9 @@ import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class RocksDbInstance implements KvStoreAccessor { @@ -59,12 +60,17 @@ public class RocksDbInstance implements KvStoreAccessor { } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable).map(data -> variable.getSerializer().deserialize(data.toArrayUnsafe())); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); try { return Optional.ofNullable(db.get(defaultHandle, variable.getId().toArrayUnsafe())) @@ -74,6 +80,11 @@ public Optional getRaw(final KvStoreVariable variable) { } } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java index c3b1b3c969d..ac3ae40ba42 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.storage.server.rocksdb; import com.google.common.collect.ImmutableMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -26,8 +27,9 @@ import org.rocksdb.WriteOptions; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class RocksDbTransaction implements KvStoreTransaction { private final ColumnFamilyHandle defaultHandle; @@ -53,13 +55,18 @@ public class RocksDbTransaction implements KvStoreTransaction { } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { final Bytes serialized = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, serialized); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { applyUpdate( () -> { try { @@ -70,6 +77,11 @@ public void putRaw(final KvStoreVariable variable, final Bytes value) { }); } + @Override + public void putRaw(final KvStoreChunkedVariable variable, final List valueChunks) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { final Bytes keyBytes = Bytes.wrap(column.getKeySerializer().serialize(key)); @@ -122,7 +134,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { applyUpdate( () -> { try { diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java new file mode 100644 index 00000000000..0dd1289fdc2 --- /dev/null +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java @@ -0,0 +1,39 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class BeaconStateChunkingSerializerTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final KvStoreChunkingSerializer stateSerializer = + new BeaconStateChunkingSerializer(spec); + + @Test + public void roundTrip_state() { + final BeaconState value = dataStructureUtil.randomBeaconState(11); + final List bytes = stateSerializer.serialize(value); + final BeaconState deserialized = stateSerializer.deserialize(bytes); + assertThat(deserialized).isEqualTo(value); + } +} diff --git a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java index 5c54c615a65..8cbc0e74ae1 100644 --- a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java +++ b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -30,7 +31,9 @@ import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.ShuttingDownException; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public class MockKvStoreInstance implements KvStoreAccessor { @@ -74,17 +77,27 @@ public static MockKvStoreInstance createEmpty( } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable).map(Bytes::toArrayUnsafe).map(variable.getSerializer()::deserialize); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); assertValidVariable(variable); return Optional.ofNullable(variableData.get(variable)); } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); @@ -242,7 +255,7 @@ public void close() { closed.set(true); } - private void assertValidVariable(final KvStoreVariable variable) { + private void assertValidVariable(final KvStoreUnchunkedVariable variable) { checkArgument(variables.contains(variable), "Unknown RocksDbVariable supplied"); } @@ -261,7 +274,8 @@ private static class MockKvStoreTransaction implements KvStoreTransaction { private final MockKvStoreInstance dbInstance; private final Map, Map> columnUpdates = new HashMap<>(); private final Map, Set> deletedColumnKeys = new HashMap<>(); - private final Map, Optional> variableUpdates = new HashMap<>(); + private final Map, Optional> variableUpdates = + new HashMap<>(); private boolean closed = false; public MockKvStoreTransaction(final MockKvStoreInstance mockRocksDbInstance) { @@ -269,18 +283,29 @@ public MockKvStoreTransaction(final MockKvStoreInstance mockRocksDbInstance) { } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { final Bytes valueBytes = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, valueBytes); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.of(value)); } + @Override + public void putRaw( + final KvStoreChunkedVariable variable, final List valueChunks) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { final Bytes keyBytes = dbInstance.keyToBytes(column, key); @@ -321,7 +346,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.empty());