From dd9652f6cf6e6beb3c0455ff5eaa584013305ae6 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 22 Jan 2025 17:22:32 +0100 Subject: [PATCH 1/4] step 1 --- docker/jdk21/Dockerfile | 2 +- .../ssz/sos/SszByteArrayChunksWriter.java | 75 +++++++++++++++++ .../server/kvstore/KvStoreAccessor.java | 22 +++-- .../dataaccess/CombinedKvStoreDao.java | 34 +++++++- .../dataaccess/KvStoreCombinedDaoAdapter.java | 7 +- .../dataaccess/V4FinalizedKvStoreDao.java | 6 +- .../kvstore/dataaccess/V4HotKvStoreDao.java | 5 +- .../dataaccess/V4MigratableSourceDao.java | 7 +- .../schema/KvStoreChunkedVariable.java | 73 +++++++++++++++++ .../kvstore/schema/KvStoreVariable.java | 47 ++--------- .../server/kvstore/schema/SchemaCombined.java | 24 +++--- .../schema/SchemaCombinedSnapshotState.java | 4 +- .../schema/SchemaCombinedTreeState.java | 4 +- .../SchemaFinalizedSnapshotStateAdapter.java | 10 +-- .../kvstore/schema/SchemaHotAdapter.java | 22 ++--- .../kvstore/schema/V6SchemaCombined.java | 82 ++++++++++--------- .../schema/V6SchemaCombinedSnapshot.java | 2 +- .../schema/V6SchemaCombinedTreeState.java | 6 +- .../BeaconStateChunkingSerializer.java | 61 ++++++++++++++ .../ChunkedVariableKeySerializer.java | 57 +++++++++++++ .../KvStoreChunkingSerializer.java | 29 +++++++ .../serialization/KvStoreSerializer.java | 2 + .../server/leveldb/LevelDbInstance.java | 20 ++++- .../server/leveldb/LevelDbTransaction.java | 14 +++- .../storage/server/leveldb/LevelDbUtils.java | 4 +- .../server/rocksdb/RocksDbInstance.java | 6 +- .../server/rocksdb/RocksDbTransaction.java | 8 +- .../BeaconStateChunkingSerializerTest.java | 39 +++++++++ .../server/kvstore/MockKvStoreInstance.java | 29 +++---- 29 files changed, 536 insertions(+), 165 deletions(-) create mode 100644 infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java create mode 100644 storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java diff --git a/docker/jdk21/Dockerfile b/docker/jdk21/Dockerfile index 996efa66fcf..71bb2229c92 100644 --- a/docker/jdk21/Dockerfile +++ b/docker/jdk21/Dockerfile @@ -1,7 +1,7 @@ FROM eclipse-temurin:21 as jre-build # Create a custom Java runtime -RUN JAVA_TOOL_OPTIONS="-Djdk.lang.Process.launchMechanism=vfork" $JAVA_HOME/bin/jlink \ +RUN JAVA_TOOL_OPTIONS="-Djdk.lang.Process.launchMechanism=vfork -XX:UseSVE=0" $JAVA_HOME/bin/jlink \ --add-modules ALL-MODULE-PATH \ --strip-debug \ --no-man-pages \ diff --git a/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java b/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java new file mode 100644 index 00000000000..caee3163a9a --- /dev/null +++ b/infrastructure/ssz/src/main/java/tech/pegasys/teku/infrastructure/ssz/sos/SszByteArrayChunksWriter.java @@ -0,0 +1,75 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.ssz.sos; + +import java.util.Arrays; +import java.util.List; + +public class SszByteArrayChunksWriter implements SszWriter { + private final byte[][] chunks; + private final int maxChunkSize; + private final int maxChunks; + + private int lastChunkIndex; + private int size = 0; + + public SszByteArrayChunksWriter(final int maxSize, final int maxChunkSize) { + this.maxChunks = (maxSize + maxChunkSize - 1) / maxChunkSize; + this.chunks = new byte[maxChunks][]; + this.maxChunkSize = maxChunkSize; + } + + @Override + public void write(final byte[] bytes, final int offset, final int length) { + final int chunk = this.size / maxChunkSize; + final int chunkOffset = this.size % maxChunkSize; + final byte[] chunkData = getChunk(chunk); + + if (chunkOffset + length > maxChunkSize) { + final int lengthToFillCurrentChunk = maxChunkSize - chunkOffset; + System.arraycopy(bytes, offset, chunkData, chunkOffset, lengthToFillCurrentChunk); + this.size += lengthToFillCurrentChunk; + + write(bytes, offset + lengthToFillCurrentChunk, length - lengthToFillCurrentChunk); + } else { + System.arraycopy(bytes, offset, chunkData, chunkOffset, length); + this.size += length; + } + } + + private byte[] getChunk(final int chunk) { + if (chunk >= maxChunks) { + throw new IndexOutOfBoundsException( + "Chunk index out of bounds: " + chunk + ", max chunks: " + maxChunks); + } + byte[] chunkData = chunks[chunk]; + if (chunkData == null) { + chunkData = new byte[maxChunkSize]; + chunks[chunk] = chunkData; + lastChunkIndex = chunk; + } + return chunkData; + } + + public List getChunks() { + int lastChunkSize = size % maxChunkSize; + if (lastChunkSize != 0) { + byte[] lastChunk = chunks[lastChunkIndex]; + byte[] lastChunkTrimmed = Arrays.copyOf(lastChunk, lastChunkSize); + chunks[lastChunkIndex] = lastChunkTrimmed; + } + + return Arrays.stream(chunks).limit(lastChunkIndex + 1).toList(); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java index 9779986b9a5..0c0843e0868 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java @@ -14,18 +14,24 @@ package tech.pegasys.teku.storage.server.kvstore; import com.google.errorprone.annotations.MustBeClosed; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; public interface KvStoreAccessor extends AutoCloseable { - Optional get(KvStoreVariable variable); + Optional get(KvStoreUnchunckedVariable variable); - Optional getRaw(KvStoreVariable variable); + Optional get(KvStoreChunkedVariable variable); + + Optional getRaw(KvStoreUnchunckedVariable variable); + + Optional> getRaw(KvStoreChunkedVariable variable); Optional get(KvStoreColumn column, K key); @@ -112,14 +118,18 @@ , V> Stream> stream( interface KvStoreTransaction extends AutoCloseable { - void put(KvStoreVariable variable, T value); + void put(KvStoreUnchunckedVariable variable, T value); + + void put(KvStoreChunkedVariable variable, T value); /** * Write raw bytes to a specified variable. * *

WARNING: should only be used to migrate data between database instances */ - void putRaw(KvStoreVariable variable, Bytes value); + void putRaw(KvStoreUnchunckedVariable variable, Bytes value); + + void putRaw(KvStoreChunkedVariable variable, List value); void put(KvStoreColumn column, K key, V value); @@ -134,7 +144,7 @@ interface KvStoreTransaction extends AutoCloseable { void delete(KvStoreColumn column, K key); - void delete(KvStoreVariable variable); + void delete(KvStoreUnchunckedVariable variable); void commit(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java index 83bda83c9a8..f66b7a3aa0e 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java @@ -45,7 +45,9 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedStateStorageLogic.FinalizedStateUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaCombined; @@ -205,8 +207,7 @@ public void ingest( try (final KvStoreTransaction transaction = db.startTransaction()) { for (String key : newVariables.keySet()) { logger.accept(String.format("Copy variable %s", key)); - dao.getRawVariable(oldVariables.get(key)) - .ifPresent(value -> transaction.putRaw(newVariables.get(key), value)); + handleVariableMigration(dao, newVariables.get(key), transaction); } transaction.commit(); } @@ -228,6 +229,28 @@ public void ingest( } } + private void handleVariableMigration( + final V4MigratableSourceDao dao, + final KvStoreVariable variable, + final KvStoreTransaction transaction) { + if (variable.toChunkedVariable().isPresent()) { + final KvStoreChunkedVariable chunkedVariable = variable.toChunkedVariable().orElseThrow(); + dao.getRawVariable(chunkedVariable) + .ifPresent(chunks -> transaction.putRaw(chunkedVariable, chunks)); + return; + } + + if (variable.toUnchunkedVariable().isPresent()) { + final KvStoreUnchunckedVariable unchunckedVariable = + variable.toUnchunkedVariable().orElseThrow(); + dao.getRawVariable(unchunckedVariable) + .ifPresent(value -> transaction.putRaw(unchunckedVariable, value)); + return; + } + + throw new IllegalStateException("Variable must be chunked or unchunked"); + } + @Override public Map> getColumnMap() { return schema.getColumnMap(); @@ -239,7 +262,12 @@ public Map> getVariableMap() { } @Override - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunckedVariable var) { + return db.getRaw(var); + } + + @Override + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java index b9d918f98dc..0d223d11064 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java @@ -41,6 +41,7 @@ import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedKvStoreDao.V4FinalizedUpdater; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4HotKvStoreDao.V4HotUpdater; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public class KvStoreCombinedDaoAdapter implements KvStoreCombinedDao, V4MigratableSourceDao { @@ -359,15 +360,15 @@ public CombinedUpdater combinedUpdater() { } @Override - public Map> getVariableMap() { - final Map> allVariables = new HashMap<>(); + public Map getVariableMap() { + final Map allVariables = new HashMap<>(); allVariables.putAll(hotDao.getVariableMap()); allVariables.putAll(finalizedDao.getVariableMap()); return allVariables; } @Override - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunckedVariable var) { if (hotDao.getVariableMap().containsValue(var)) { return hotDao.getRawVariable(var); } else { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java index e4eae58e694..80c80c6a7fa 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java @@ -39,7 +39,7 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.FinalizedUpdater; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaFinalizedSnapshotStateAdapter; public class V4FinalizedKvStoreDao { @@ -195,7 +195,7 @@ public Optional getEarliestBlobSidecarSlot() { return db.get(schema.getVariableEarliestBlobSidecarSlot()); } - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunckedVariable var) { return db.getRaw(var); } @@ -219,7 +219,7 @@ public V4FinalizedUpdater finalizedUpdater() { return schema.getColumnMap(); } - public Map> getVariableMap() { + public Map> getVariableMap() { return schema.getVariableMap(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java index 567ad9120e5..6305fa33142 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java @@ -38,6 +38,7 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.HotUpdater; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaHotAdapter; @@ -142,7 +143,7 @@ public V4HotUpdater hotUpdater() { return new V4HotUpdater(db, schema); } - public Optional getRawVariable(final KvStoreVariable var) { + public Optional getRawVariable(final KvStoreUnchunckedVariable var) { return db.getRaw(var); } @@ -164,7 +165,7 @@ public void close() throws Exception { return schema.getColumnMap(); } - public Map> getVariableMap() { + public Map getVariableMap() { return schema.getVariableMap(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java index 8e5a9ab2d99..33a792e4bfc 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java @@ -14,12 +14,15 @@ package tech.pegasys.teku.storage.server.kvstore.dataaccess; import com.google.errorprone.annotations.MustBeClosed; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public interface V4MigratableSourceDao { @@ -27,7 +30,9 @@ public interface V4MigratableSourceDao { Map> getVariableMap(); - Optional getRawVariable(final KvStoreVariable var); + Optional getRawVariable(final KvStoreUnchunckedVariable var); + + Optional> getRawVariable(final KvStoreChunkedVariable var); @MustBeClosed Stream> streamRawColumn(final KvStoreColumn kvStoreColumn); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java new file mode 100644 index 00000000000..a28e5564ceb --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java @@ -0,0 +1,73 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.schema; + +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; + +import java.util.Objects; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreChunkingSerializer; + +public class KvStoreChunkedVariable implements KvStoreVariable { + private final Bytes id; + private final KvStoreChunkingSerializer serializer; + + private KvStoreChunkedVariable( + final byte[] id, final KvStoreChunkingSerializer serializer) { + this.id = Bytes.wrap(id); + this.serializer = serializer; + } + + public static KvStoreChunkedVariable create( + final int id, final KvStoreChunkingSerializer serializer) { + final byte byteId = toByteExact(id); + return new KvStoreChunkedVariable(new byte[] {byteId}, serializer); + } + + public Bytes getId() { + return id; + } + + public KvStoreChunkingSerializer getSerializer() { + return serializer; + } + + @Override + public Optional> toChunkedVariable() { + return Optional.of(this); + } + + @Override + public Optional> toUnchunkedVariable() { + return Optional.empty(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KvStoreChunkedVariable that = (KvStoreChunkedVariable) o; + return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); + } + + @Override + public int hashCode() { + return Objects.hash(id, serializer); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java index 8e9c96b5c0e..eac85d861ef 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java @@ -1,5 +1,5 @@ /* - * Copyright Consensys Software Inc., 2022 + * Copyright Consensys Software Inc., 2025 * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -13,48 +13,11 @@ package tech.pegasys.teku.storage.server.kvstore.schema; -import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; +import java.util.Optional; -import java.util.Objects; -import org.apache.tuweni.bytes.Bytes; -import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; +public interface KvStoreVariable { -public class KvStoreVariable { - private final Bytes id; - private final KvStoreSerializer serializer; + Optional> toChunkedVariable(); - private KvStoreVariable(final byte[] id, final KvStoreSerializer serializer) { - this.id = Bytes.wrap(id); - this.serializer = serializer; - } - - public static KvStoreVariable create(final int id, final KvStoreSerializer serializer) { - final byte byteId = toByteExact(id); - return new KvStoreVariable(new byte[] {byteId}, serializer); - } - - public Bytes getId() { - return id; - } - - public KvStoreSerializer getSerializer() { - return serializer; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final KvStoreVariable that = (KvStoreVariable) o; - return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); - } - - @Override - public int hashCode() { - return Objects.hash(id, serializer); - } + Optional> toUnchunkedVariable(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java index c74190ae832..3fa8f501308 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java @@ -63,29 +63,29 @@ public interface SchemaCombined extends Schema { getColumnNonCanonicalBlobSidecarBySlotRootBlobIndex(); // Variables - KvStoreVariable getVariableGenesisTime(); + KvStoreUnchunckedVariable getVariableGenesisTime(); - KvStoreVariable getVariableJustifiedCheckpoint(); + KvStoreUnchunckedVariable getVariableJustifiedCheckpoint(); - KvStoreVariable getVariableBestJustifiedCheckpoint(); + KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint(); - KvStoreVariable getVariableFinalizedCheckpoint(); + KvStoreUnchunckedVariable getVariableFinalizedCheckpoint(); - KvStoreVariable getVariableLatestFinalizedState(); + KvStoreChunkedVariable getVariableLatestFinalizedState(); - KvStoreVariable getVariableMinGenesisTimeBlock(); + KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock(); - KvStoreVariable getVariableWeakSubjectivityCheckpoint(); + KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint(); - KvStoreVariable getVariableAnchorCheckpoint(); + KvStoreUnchunckedVariable getVariableAnchorCheckpoint(); - KvStoreVariable getOptimisticTransitionBlockSlot(); + KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot(); - KvStoreVariable getVariableEarliestBlobSidecarSlot(); + KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot(); - KvStoreVariable getVariableEarliestBlockSlot(); + KvStoreUnchunckedVariable getVariableEarliestBlockSlot(); - KvStoreVariable getVariableFinalizedDepositSnapshot(); + KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot(); Map> getColumnMap(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java index 84c43de472a..8439a112e5f 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java @@ -22,7 +22,7 @@ public interface SchemaCombinedSnapshotState extends SchemaCombined, SchemaFinal Map> getColumnMap(); @Override - Map> getVariableMap(); + Map> getVariableMap(); default SchemaFinalizedSnapshotStateAdapter asSchemaFinalized() { return new SchemaFinalizedSnapshotStateAdapter(this); @@ -34,7 +34,7 @@ default SchemaFinalizedSnapshotStateAdapter asSchemaFinalized() { } @Override - default Collection> getAllVariables() { + default Collection> getAllVariables() { return getVariableMap().values(); } } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java index 49215949bcd..393c52ed02d 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java @@ -26,7 +26,7 @@ public interface SchemaCombinedTreeState extends SchemaCombined { Map> getColumnMap(); @Override - Map> getVariableMap(); + Map> getVariableMap(); @Override default Collection> getAllColumns() { @@ -34,7 +34,7 @@ public interface SchemaCombinedTreeState extends SchemaCombined { } @Override - default Collection> getAllVariables() { + default Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java index a98a89dcbc6..2d7b5a1caea 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java @@ -70,7 +70,7 @@ public KvStoreColumn getColumnFinalizedStatesBySlot() { return getColumnMap().values(); } - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } @@ -98,19 +98,19 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { return delegate.getColumnNonCanonicalRootsBySlot(); } - public KvStoreVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot() { return delegate.getOptimisticTransitionBlockSlot(); } - public KvStoreVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot() { return delegate.getVariableEarliestBlobSidecarSlot(); } - public KvStoreVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunckedVariable getVariableEarliestBlockSlot() { return delegate.getVariableEarliestBlockSlot(); } - public Map> getVariableMap() { + public Map> getVariableMap() { return Map.of( "OPTIMISTIC_TRANSITION_BLOCK_SLOT", getOptimisticTransitionBlockSlot(), diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java index 4d4547e0982..cd71c84e21a 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java @@ -70,39 +70,39 @@ public KvStoreColumn getColumnHotStatesByRoot() { return delegate.getColumnBlobSidecarBySlotRootBlobIndex(); } - public KvStoreVariable getVariableGenesisTime() { + public KvStoreUnchunckedVariable getVariableGenesisTime() { return delegate.getVariableGenesisTime(); } - public KvStoreVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunckedVariable getVariableJustifiedCheckpoint() { return delegate.getVariableJustifiedCheckpoint(); } - public KvStoreVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint() { return delegate.getVariableBestJustifiedCheckpoint(); } - public KvStoreVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunckedVariable getVariableFinalizedCheckpoint() { return delegate.getVariableFinalizedCheckpoint(); } - public KvStoreVariable getVariableLatestFinalizedState() { + public KvStoreChunkedVariable getVariableLatestFinalizedState() { return delegate.getVariableLatestFinalizedState(); } - public KvStoreVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock() { return delegate.getVariableMinGenesisTimeBlock(); } - public KvStoreVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint() { return delegate.getVariableWeakSubjectivityCheckpoint(); } - public KvStoreVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunckedVariable getVariableAnchorCheckpoint() { return delegate.getVariableAnchorCheckpoint(); } - public KvStoreVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot() { return delegate.getVariableFinalizedDepositSnapshot(); } @@ -126,7 +126,7 @@ public KvStoreVariable getVariableFinalizedDepositSnapshot( getColumnBlobSidecarBySlotRootBlobIndex()); } - public Map> getVariableMap() { + public Map getVariableMap() { return Map.of( "GENESIS_TIME", getVariableGenesisTime(), @@ -154,7 +154,7 @@ public Map> getVariableMap() { } @Override - public Collection> getAllVariables() { + public Collection getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java index 15630607eb5..44c2dfd70c0 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreChunkingSerializer; import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; public abstract class V6SchemaCombined implements SchemaCombined { @@ -63,28 +64,28 @@ public abstract class V6SchemaCombined implements SchemaCombined { KvStoreColumn.create(7, BYTES32_SERIALIZER, CHECKPOINT_EPOCHS_SERIALIZER); // Variables - private static final KvStoreVariable GENESIS_TIME = - KvStoreVariable.create(1, UINT64_SERIALIZER); - private static final KvStoreVariable JUSTIFIED_CHECKPOINT = - KvStoreVariable.create(2, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable BEST_JUSTIFIED_CHECKPOINT = - KvStoreVariable.create(3, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable FINALIZED_CHECKPOINT = - KvStoreVariable.create(4, CHECKPOINT_SERIALIZER); - private final KvStoreVariable latestFinalizedState; - private static final KvStoreVariable MIN_GENESIS_TIME_BLOCK = - KvStoreVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); + private static final KvStoreUnchunckedVariable GENESIS_TIME = + KvStoreUnchunckedVariable.create(1, UINT64_SERIALIZER); + private static final KvStoreUnchunckedVariable JUSTIFIED_CHECKPOINT = + KvStoreUnchunckedVariable.create(2, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunckedVariable BEST_JUSTIFIED_CHECKPOINT = + KvStoreUnchunckedVariable.create(3, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunckedVariable FINALIZED_CHECKPOINT = + KvStoreUnchunckedVariable.create(4, CHECKPOINT_SERIALIZER); + private final KvStoreChunkedVariable latestFinalizedState; + private static final KvStoreUnchunckedVariable MIN_GENESIS_TIME_BLOCK = + KvStoreUnchunckedVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); // 7 was the protoarray snapshot variable but is no longer used. - private static final KvStoreVariable WEAK_SUBJECTIVITY_CHECKPOINT = - KvStoreVariable.create(8, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable ANCHOR_CHECKPOINT = - KvStoreVariable.create(9, CHECKPOINT_SERIALIZER); - private static final KvStoreVariable FINALIZED_DEPOSIT_SNAPSHOT = - KvStoreVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); + private static final KvStoreUnchunckedVariable WEAK_SUBJECTIVITY_CHECKPOINT = + KvStoreUnchunckedVariable.create(8, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunckedVariable ANCHOR_CHECKPOINT = + KvStoreUnchunckedVariable.create(9, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunckedVariable FINALIZED_DEPOSIT_SNAPSHOT = + KvStoreUnchunckedVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); - private final KvStoreVariable optimisticTransitionBlockSlot; - private final KvStoreVariable earliestBlobSidecarSlot; - private final KvStoreVariable earliestBlockSlot; + private final KvStoreUnchunckedVariable optimisticTransitionBlockSlot; + private final KvStoreUnchunckedVariable earliestBlobSidecarSlot; + private final KvStoreUnchunckedVariable earliestBlockSlot; protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { this.finalizedOffset = finalizedOffset; @@ -95,13 +96,18 @@ protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { KvStoreSerializer.createStateSerializer(spec); checkpointStates = KvStoreColumn.create(2, CHECKPOINT_SERIALIZER, stateSerializer); hotStatesByRoot = KvStoreColumn.create(6, BYTES32_SERIALIZER, stateSerializer); - latestFinalizedState = KvStoreVariable.create(5, stateSerializer); + final KvStoreChunkingSerializer stateChunkingSerializer = + KvStoreChunkingSerializer.createStateSerializer(spec); + + latestFinalizedState = KvStoreChunkedVariable.create(5, stateSerializer); votes = KvStoreColumn.create(3, UINT64_SERIALIZER, VOTE_TRACKER_SERIALIZER); - optimisticTransitionBlockSlot = KvStoreVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); - earliestBlobSidecarSlot = KvStoreVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); - earliestBlockSlot = KvStoreVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); + optimisticTransitionBlockSlot = + KvStoreUnchunckedVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); + earliestBlobSidecarSlot = + KvStoreUnchunckedVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); + earliestBlockSlot = KvStoreUnchunckedVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); } @Override @@ -140,62 +146,62 @@ public KvStoreColumn getColumnHotStatesByRoot() { } @Override - public KvStoreVariable getVariableGenesisTime() { + public KvStoreUnchunckedVariable getVariableGenesisTime() { return GENESIS_TIME; } @Override - public KvStoreVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunckedVariable getVariableJustifiedCheckpoint() { return JUSTIFIED_CHECKPOINT; } @Override - public KvStoreVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint() { return BEST_JUSTIFIED_CHECKPOINT; } @Override - public KvStoreVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunckedVariable getVariableFinalizedCheckpoint() { return FINALIZED_CHECKPOINT; } @Override - public KvStoreVariable getVariableLatestFinalizedState() { + public KvStoreChunkedVariable getVariableLatestFinalizedState() { return latestFinalizedState; } @Override - public KvStoreVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock() { return MIN_GENESIS_TIME_BLOCK; } @Override - public KvStoreVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint() { return WEAK_SUBJECTIVITY_CHECKPOINT; } @Override - public KvStoreVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunckedVariable getVariableAnchorCheckpoint() { return ANCHOR_CHECKPOINT; } @Override - public KvStoreVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot() { return optimisticTransitionBlockSlot; } @Override - public KvStoreVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot() { return FINALIZED_DEPOSIT_SNAPSHOT; } @Override - public KvStoreVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot() { return earliestBlobSidecarSlot; } @Override - public KvStoreVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunckedVariable getVariableEarliestBlockSlot() { return earliestBlockSlot; } @@ -221,8 +227,8 @@ public KvStoreVariable getVariableEarliestBlockSlot() { } @Override - public Map> getVariableMap() { - return ImmutableMap.>builder() + public Map> getVariableMap() { + return ImmutableMap.>builder() .put("GENESIS_TIME", getVariableGenesisTime()) .put("JUSTIFIED_CHECKPOINT", getVariableJustifiedCheckpoint()) .put("BEST_JUSTIFIED_CHECKPOINT", getVariableBestJustifiedCheckpoint()) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java index cff71d1b931..9aa200257a0 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java @@ -172,7 +172,7 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { } @Override - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java index eed62f47bdd..8ae186c5955 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java @@ -150,8 +150,8 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { } @Override - public Map> getVariableMap() { - return ImmutableMap.>builder() + public Map> getVariableMap() { + return ImmutableMap.>builder() .put("GENESIS_TIME", getVariableGenesisTime()) .put("JUSTIFIED_CHECKPOINT", getVariableJustifiedCheckpoint()) .put("BEST_JUSTIFIED_CHECKPOINT", getVariableBestJustifiedCheckpoint()) @@ -200,7 +200,7 @@ public Map> getVariableMap() { } @Override - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java new file mode 100644 index 00000000000..58cb94a7b62 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java @@ -0,0 +1,61 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import java.util.List; +import java.util.Objects; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.ssz.sos.SszByteArrayChunksWriter; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; + +class BeaconStateChunkingSerializer implements KvStoreChunkingSerializer { + + private final Spec spec; + + BeaconStateChunkingSerializer(final Spec spec) { + this.spec = spec; + } + + @Override + public BeaconState deserialize(final List data) { + return spec.deserializeBeaconState(Bytes.wrap(data.stream().map(Bytes::wrap).toList())); + } + + @Override + public List serialize(final BeaconState value) { + final SszByteArrayChunksWriter sszByteArrayChunksWriter = + new SszByteArrayChunksWriter( + value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 10240); + value.sszSerialize(sszByteArrayChunksWriter); + return sszByteArrayChunksWriter.getChunks(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final BeaconStateChunkingSerializer that = (BeaconStateChunkingSerializer) o; + return Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(spec); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java new file mode 100644 index 00000000000..48fd7f82231 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java @@ -0,0 +1,57 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.tuweni.bytes.Bytes; + +/** + * This serializer is intended to be used as a Key so that it preserve slot ordering when we stream + * data. This is useful for values that are always looked up by root and slot, giving us the ability + * to quickly lookup most recent\oldest values by slot as well as perform pruning based on slot + */ +public class ChunkedVariableKeySerializer + implements KvStoreSerializer { + static final int ID_KEY_SIZE = 1; + + // we support only 256 chunks + static final int CHUNK_KEY_SIZE = 1; + + static final int ID_KEY_OFFSET = 0; + static final int CHUNK_KEY_OFFSET = ID_KEY_OFFSET + ID_KEY_SIZE; + static final int DATA_SIZE = CHUNK_KEY_OFFSET + CHUNK_KEY_SIZE; + + @Override + public IdAndChunkKey deserialize(final byte[] data) { + checkArgument(data.length == DATA_SIZE); + return IdAndChunkKey.fromBytes(Bytes.wrap(data)); + } + + @Override + public byte[] serialize(final IdAndChunkKey value) { + return value.toBytes().toArrayUnsafe(); + } + + public record IdAndChunkKey(Bytes id, Bytes chunkKey) { + public static IdAndChunkKey fromBytes(Bytes bytes) { + return new IdAndChunkKey( + bytes.slice(0, ID_KEY_SIZE), bytes.slice(ID_KEY_SIZE, CHUNK_KEY_SIZE)); + } + + public Bytes toBytes() { + return Bytes.concatenate(id, chunkKey); + } + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java new file mode 100644 index 00000000000..44f476ab380 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreChunkingSerializer.java @@ -0,0 +1,29 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import java.util.List; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; + +public interface KvStoreChunkingSerializer { + + static KvStoreChunkingSerializer createStateSerializer(final Spec spec) { + return new BeaconStateChunkingSerializer(spec); + } + + T deserialize(List data); + + List serialize(T value); +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java index ecaa64c3b50..ea464391af4 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/KvStoreSerializer.java @@ -57,6 +57,8 @@ public interface KvStoreSerializer { SLOT_AND_BLOCK_ROOT_AND_BLOB_INDEX_KEY_SERIALIZER = new SlotAndBlockRootAndBlobIndexKeySerializer(); + ChunkedVariableKeySerializer CHUNKED_VARIABLE_KEY_SERIALIZER = new ChunkedVariableKeySerializer(); + static KvStoreSerializer createStateSerializer(final Spec spec) { return new BeaconStateSerializer(spec); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java index 634bfac9bf2..a536cefeddf 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java @@ -49,8 +49,9 @@ import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; /** * Implements {@link KvStoreAccessor} using LevelDB to store the data. @@ -92,17 +93,30 @@ public LevelDbInstance( } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunckedVariable variable) { return getRaw(variable) .map(value -> variable.getSerializer().deserialize(value.toArrayUnsafe())); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + return getRaw(variable) + .map(value -> variable.getSerializer().deserialize(value.toArrayUnsafe())); + } + + @Override + public Optional getRaw(final KvStoreUnchunckedVariable variable) { assertOpen(); return Optional.ofNullable(db.get(getVariableKey(variable))).map(Bytes::wrap); } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + assertOpen(); + // TODO implement the lookup to get number of chunks and then get all the chunks + return Optional.empty(); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java index 1235aaac991..f6c27eaef26 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java @@ -26,8 +26,9 @@ import org.iq80.leveldb.WriteBatch; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; public class LevelDbTransaction implements KvStoreTransaction { @@ -46,12 +47,17 @@ public LevelDbTransaction( } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunckedVariable variable, final T value) { putRaw(variable, Bytes.wrap(variable.getSerializer().serialize(value))); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + putRaw(variable, Bytes.wrap(variable.getValueSerializer().serialize(value))); + } + + @Override + public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { applyUpdate(() -> writeBatch.put(getVariableKey(variable), value.toArrayUnsafe())); } @@ -84,7 +90,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunckedVariable variable) { applyUpdate(() -> writeBatch.delete(getVariableKey(variable))); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java index cc17103b12d..aa069aa24a6 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java @@ -20,7 +20,7 @@ import java.util.Optional; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; class LevelDbUtils { @@ -33,7 +33,7 @@ static byte[] getKeyAfterColumn(final KvStoreColumn column) { return keyAfterColumn; } - static byte[] getVariableKey(final KvStoreVariable variable) { + static byte[] getVariableKey(final KvStoreUnchunckedVariable variable) { final byte[] suffix = variable.getId().toArrayUnsafe(); final byte[] key = new byte[suffix.length + 1]; // All 1s in binary so right at the end of the index. diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java index 8ed14e3ab5d..dca7578864e 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java @@ -35,7 +35,7 @@ import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; public class RocksDbInstance implements KvStoreAccessor { @@ -59,12 +59,12 @@ public class RocksDbInstance implements KvStoreAccessor { } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunckedVariable variable) { return getRaw(variable).map(data -> variable.getSerializer().deserialize(data.toArrayUnsafe())); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional getRaw(final KvStoreUnchunckedVariable variable) { assertOpen(); try { return Optional.ofNullable(db.get(defaultHandle, variable.getId().toArrayUnsafe())) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java index c3b1b3c969d..30b23229665 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java @@ -27,7 +27,7 @@ import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; public class RocksDbTransaction implements KvStoreTransaction { private final ColumnFamilyHandle defaultHandle; @@ -53,13 +53,13 @@ public class RocksDbTransaction implements KvStoreTransaction { } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunckedVariable variable, final T value) { final Bytes serialized = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, serialized); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { applyUpdate( () -> { try { @@ -122,7 +122,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunckedVariable variable) { applyUpdate( () -> { try { diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java new file mode 100644 index 00000000000..0dd1289fdc2 --- /dev/null +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializerTest.java @@ -0,0 +1,39 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.serialization; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class BeaconStateChunkingSerializerTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final KvStoreChunkingSerializer stateSerializer = + new BeaconStateChunkingSerializer(spec); + + @Test + public void roundTrip_state() { + final BeaconState value = dataStructureUtil.randomBeaconState(11); + final List bytes = stateSerializer.serialize(value); + final BeaconState deserialized = stateSerializer.deserialize(bytes); + assertThat(deserialized).isEqualTo(value); + } +} diff --git a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java index 5c54c615a65..f700dd69c44 100644 --- a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java +++ b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java @@ -31,22 +31,22 @@ import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; public class MockKvStoreInstance implements KvStoreAccessor { private final Set> columns; - private final Set> variables; + private final Set> variables; private final Map, NavigableMap> columnData; - private final Map, Bytes> variableData; + private final Map, Bytes> variableData; private final AtomicBoolean closed = new AtomicBoolean(false); public MockKvStoreInstance( final Collection> columns, - final Collection> variables, + final Collection> variables, final Map, NavigableMap> columnData, - final Map, Bytes> variableData) { + final Map, Bytes> variableData) { this.columns = new HashSet<>(columns); this.variables = new HashSet<>(variables); this.columnData = columnData; @@ -63,23 +63,23 @@ public MockKvStoreInstance reopen() { public static MockKvStoreInstance createEmpty( final Collection> columns, - final Collection> variables) { + final Collection> variables) { checkArgument(columns.size() > 0, "No columns attached to schema"); final Map, NavigableMap> columnData = columns.stream() .collect(Collectors.toConcurrentMap(col -> col, __ -> new ConcurrentSkipListMap<>())); - final Map, Bytes> variableData = new ConcurrentHashMap<>(); + final Map, Bytes> variableData = new ConcurrentHashMap<>(); return new MockKvStoreInstance(columns, variables, columnData, variableData); } @Override - public Optional get(final KvStoreVariable variable) { + public Optional get(final KvStoreUnchunckedVariable variable) { return getRaw(variable).map(Bytes::toArrayUnsafe).map(variable.getSerializer()::deserialize); } @Override - public Optional getRaw(final KvStoreVariable variable) { + public Optional getRaw(final KvStoreUnchunckedVariable variable) { assertOpen(); assertValidVariable(variable); return Optional.ofNullable(variableData.get(variable)); @@ -242,7 +242,7 @@ public void close() { closed.set(true); } - private void assertValidVariable(final KvStoreVariable variable) { + private void assertValidVariable(final KvStoreUnchunckedVariable variable) { checkArgument(variables.contains(variable), "Unknown RocksDbVariable supplied"); } @@ -261,7 +261,8 @@ private static class MockKvStoreTransaction implements KvStoreTransaction { private final MockKvStoreInstance dbInstance; private final Map, Map> columnUpdates = new HashMap<>(); private final Map, Set> deletedColumnKeys = new HashMap<>(); - private final Map, Optional> variableUpdates = new HashMap<>(); + private final Map, Optional> variableUpdates = + new HashMap<>(); private boolean closed = false; public MockKvStoreTransaction(final MockKvStoreInstance mockRocksDbInstance) { @@ -269,13 +270,13 @@ public MockKvStoreTransaction(final MockKvStoreInstance mockRocksDbInstance) { } @Override - public void put(final KvStoreVariable variable, final T value) { + public void put(final KvStoreUnchunckedVariable variable, final T value) { final Bytes valueBytes = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, valueBytes); } @Override - public void putRaw(final KvStoreVariable variable, final Bytes value) { + public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.of(value)); @@ -321,7 +322,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreVariable variable) { + public void delete(final KvStoreUnchunckedVariable variable) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.empty()); From c6ef262c3337e1fc8f1b34affd6848859badd79b Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 22 Jan 2025 17:51:30 +0100 Subject: [PATCH 2/4] minimal levelDB --- docker/jdk21/Dockerfile | 2 +- .../server/kvstore/KvStoreAccessor.java | 14 ++-- .../dataaccess/CombinedKvStoreDao.java | 6 +- .../dataaccess/KvStoreCombinedDaoAdapter.java | 18 +++-- .../dataaccess/V4FinalizedKvStoreDao.java | 12 +++- .../kvstore/dataaccess/V4HotKvStoreDao.java | 11 ++- .../dataaccess/V4MigratableSourceDao.java | 4 +- .../schema/KvStoreChunkedVariable.java | 2 +- .../kvstore/schema/KvStoreVariable.java | 2 +- .../server/kvstore/schema/SchemaCombined.java | 22 +++--- .../schema/SchemaCombinedSnapshotState.java | 4 +- .../schema/SchemaCombinedTreeState.java | 4 +- .../SchemaFinalizedSnapshotStateAdapter.java | 10 +-- .../kvstore/schema/SchemaHotAdapter.java | 20 +++--- .../kvstore/schema/V6SchemaCombined.java | 72 +++++++++---------- .../schema/V6SchemaCombinedSnapshot.java | 2 +- .../schema/V6SchemaCombinedTreeState.java | 6 +- .../BeaconStateChunkingSerializer.java | 2 +- .../ChunkedVariableKeySerializer.java | 2 +- .../server/leveldb/LevelDbInstance.java | 33 +++++++-- .../server/leveldb/LevelDbTransaction.java | 27 +++++-- .../storage/server/leveldb/LevelDbUtils.java | 26 ++++++- .../server/rocksdb/RocksDbInstance.java | 17 ++++- .../server/rocksdb/RocksDbTransaction.java | 20 ++++-- .../server/kvstore/MockKvStoreInstance.java | 52 ++++++++++---- 25 files changed, 259 insertions(+), 131 deletions(-) diff --git a/docker/jdk21/Dockerfile b/docker/jdk21/Dockerfile index 71bb2229c92..996efa66fcf 100644 --- a/docker/jdk21/Dockerfile +++ b/docker/jdk21/Dockerfile @@ -1,7 +1,7 @@ FROM eclipse-temurin:21 as jre-build # Create a custom Java runtime -RUN JAVA_TOOL_OPTIONS="-Djdk.lang.Process.launchMechanism=vfork -XX:UseSVE=0" $JAVA_HOME/bin/jlink \ +RUN JAVA_TOOL_OPTIONS="-Djdk.lang.Process.launchMechanism=vfork" $JAVA_HOME/bin/jlink \ --add-modules ALL-MODULE-PATH \ --strip-debug \ --no-man-pages \ diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java index 0c0843e0868..d389c10b238 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreAccessor.java @@ -21,15 +21,15 @@ import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public interface KvStoreAccessor extends AutoCloseable { - Optional get(KvStoreUnchunckedVariable variable); + Optional get(KvStoreUnchunkedVariable variable); Optional get(KvStoreChunkedVariable variable); - Optional getRaw(KvStoreUnchunckedVariable variable); + Optional getRaw(KvStoreUnchunkedVariable variable); Optional> getRaw(KvStoreChunkedVariable variable); @@ -118,7 +118,7 @@ , V> Stream> stream( interface KvStoreTransaction extends AutoCloseable { - void put(KvStoreUnchunckedVariable variable, T value); + void put(KvStoreUnchunkedVariable variable, T value); void put(KvStoreChunkedVariable variable, T value); @@ -127,9 +127,9 @@ interface KvStoreTransaction extends AutoCloseable { * *

WARNING: should only be used to migrate data between database instances */ - void putRaw(KvStoreUnchunckedVariable variable, Bytes value); + void putRaw(KvStoreUnchunkedVariable variable, Bytes value); - void putRaw(KvStoreChunkedVariable variable, List value); + void putRaw(KvStoreChunkedVariable variable, List valueChunks); void put(KvStoreColumn column, K key, V value); @@ -144,7 +144,7 @@ interface KvStoreTransaction extends AutoCloseable { void delete(KvStoreColumn column, K key); - void delete(KvStoreUnchunckedVariable variable); + void delete(KvStoreUnchunkedVariable variable); void commit(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java index f66b7a3aa0e..e21fe756ed0 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/CombinedKvStoreDao.java @@ -47,7 +47,7 @@ import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedStateStorageLogic.FinalizedStateUpdater; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaCombined; @@ -241,7 +241,7 @@ private void handleVariableMigration( } if (variable.toUnchunkedVariable().isPresent()) { - final KvStoreUnchunckedVariable unchunckedVariable = + final KvStoreUnchunkedVariable unchunckedVariable = variable.toUnchunkedVariable().orElseThrow(); dao.getRawVariable(unchunckedVariable) .ifPresent(value -> transaction.putRaw(unchunckedVariable, value)); @@ -262,7 +262,7 @@ public Map> getVariableMap() { } @Override - public Optional getRawVariable(final KvStoreUnchunckedVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { return db.getRaw(var); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java index 0d223d11064..41dad1d16c1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/KvStoreCombinedDaoAdapter.java @@ -40,8 +40,9 @@ import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedKvStoreDao.V4FinalizedUpdater; import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4HotKvStoreDao.V4HotUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public class KvStoreCombinedDaoAdapter implements KvStoreCombinedDao, V4MigratableSourceDao { @@ -360,15 +361,24 @@ public CombinedUpdater combinedUpdater() { } @Override - public Map getVariableMap() { - final Map allVariables = new HashMap<>(); + public Map> getVariableMap() { + final Map> allVariables = new HashMap<>(); allVariables.putAll(hotDao.getVariableMap()); allVariables.putAll(finalizedDao.getVariableMap()); return allVariables; } @Override - public Optional getRawVariable(final KvStoreUnchunckedVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + if (hotDao.getVariableMap().containsValue(var)) { + return hotDao.getRawVariable(var); + } else { + return finalizedDao.getRawVariable(var); + } + } + + @Override + public Optional> getRawVariable(final KvStoreChunkedVariable var) { if (hotDao.getVariableMap().containsValue(var)) { return hotDao.getRawVariable(var); } else { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java index 80c80c6a7fa..cf02709a5d1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4FinalizedKvStoreDao.java @@ -38,8 +38,10 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.FinalizedUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaFinalizedSnapshotStateAdapter; public class V4FinalizedKvStoreDao { @@ -195,7 +197,11 @@ public Optional getEarliestBlobSidecarSlot() { return db.get(schema.getVariableEarliestBlobSidecarSlot()); } - public Optional getRawVariable(final KvStoreUnchunckedVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + return db.getRaw(var); + } + + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } @@ -219,7 +225,7 @@ public V4FinalizedUpdater finalizedUpdater() { return schema.getColumnMap(); } - public Map> getVariableMap() { + public Map> getVariableMap() { return schema.getVariableMap(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java index 6305fa33142..ec9814d6566 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4HotKvStoreDao.java @@ -37,8 +37,9 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.HotUpdater; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; import tech.pegasys.teku.storage.server.kvstore.schema.SchemaHotAdapter; @@ -143,7 +144,11 @@ public V4HotUpdater hotUpdater() { return new V4HotUpdater(db, schema); } - public Optional getRawVariable(final KvStoreUnchunckedVariable var) { + public Optional getRawVariable(final KvStoreUnchunkedVariable var) { + return db.getRaw(var); + } + + public Optional> getRawVariable(final KvStoreChunkedVariable var) { return db.getRaw(var); } @@ -165,7 +170,7 @@ public void close() throws Exception { return schema.getColumnMap(); } - public Map getVariableMap() { + public Map> getVariableMap() { return schema.getVariableMap(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java index 33a792e4bfc..fe988cbebaa 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/dataaccess/V4MigratableSourceDao.java @@ -22,7 +22,7 @@ import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public interface V4MigratableSourceDao { @@ -30,7 +30,7 @@ public interface V4MigratableSourceDao { Map> getVariableMap(); - Optional getRawVariable(final KvStoreUnchunckedVariable var); + Optional getRawVariable(final KvStoreUnchunkedVariable var); Optional> getRawVariable(final KvStoreChunkedVariable var); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java index a28e5564ceb..5ec8c8d4435 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreChunkedVariable.java @@ -50,7 +50,7 @@ public Optional> toChunkedVariable() { } @Override - public Optional> toUnchunkedVariable() { + public Optional> toUnchunkedVariable() { return Optional.empty(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java index eac85d861ef..c602785a467 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreVariable.java @@ -19,5 +19,5 @@ public interface KvStoreVariable { Optional> toChunkedVariable(); - Optional> toUnchunkedVariable(); + Optional> toUnchunkedVariable(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java index 3fa8f501308..a6b2e17b178 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombined.java @@ -63,29 +63,29 @@ public interface SchemaCombined extends Schema { getColumnNonCanonicalBlobSidecarBySlotRootBlobIndex(); // Variables - KvStoreUnchunckedVariable getVariableGenesisTime(); + KvStoreUnchunkedVariable getVariableGenesisTime(); - KvStoreUnchunckedVariable getVariableJustifiedCheckpoint(); + KvStoreUnchunkedVariable getVariableJustifiedCheckpoint(); - KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint(); + KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint(); - KvStoreUnchunckedVariable getVariableFinalizedCheckpoint(); + KvStoreUnchunkedVariable getVariableFinalizedCheckpoint(); KvStoreChunkedVariable getVariableLatestFinalizedState(); - KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock(); + KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock(); - KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint(); + KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint(); - KvStoreUnchunckedVariable getVariableAnchorCheckpoint(); + KvStoreUnchunkedVariable getVariableAnchorCheckpoint(); - KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot(); + KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot(); - KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot(); + KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot(); - KvStoreUnchunckedVariable getVariableEarliestBlockSlot(); + KvStoreUnchunkedVariable getVariableEarliestBlockSlot(); - KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot(); + KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot(); Map> getColumnMap(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java index 8439a112e5f..84c43de472a 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedSnapshotState.java @@ -22,7 +22,7 @@ public interface SchemaCombinedSnapshotState extends SchemaCombined, SchemaFinal Map> getColumnMap(); @Override - Map> getVariableMap(); + Map> getVariableMap(); default SchemaFinalizedSnapshotStateAdapter asSchemaFinalized() { return new SchemaFinalizedSnapshotStateAdapter(this); @@ -34,7 +34,7 @@ default SchemaFinalizedSnapshotStateAdapter asSchemaFinalized() { } @Override - default Collection> getAllVariables() { + default Collection> getAllVariables() { return getVariableMap().values(); } } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java index 393c52ed02d..49215949bcd 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaCombinedTreeState.java @@ -26,7 +26,7 @@ public interface SchemaCombinedTreeState extends SchemaCombined { Map> getColumnMap(); @Override - Map> getVariableMap(); + Map> getVariableMap(); @Override default Collection> getAllColumns() { @@ -34,7 +34,7 @@ public interface SchemaCombinedTreeState extends SchemaCombined { } @Override - default Collection> getAllVariables() { + default Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java index 2d7b5a1caea..c92498a6a64 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaFinalizedSnapshotStateAdapter.java @@ -70,7 +70,7 @@ public KvStoreColumn getColumnFinalizedStatesBySlot() { return getColumnMap().values(); } - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } @@ -98,19 +98,19 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { return delegate.getColumnNonCanonicalRootsBySlot(); } - public KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot() { return delegate.getOptimisticTransitionBlockSlot(); } - public KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot() { return delegate.getVariableEarliestBlobSidecarSlot(); } - public KvStoreUnchunckedVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlockSlot() { return delegate.getVariableEarliestBlockSlot(); } - public Map> getVariableMap() { + public Map> getVariableMap() { return Map.of( "OPTIMISTIC_TRANSITION_BLOCK_SLOT", getOptimisticTransitionBlockSlot(), diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java index cd71c84e21a..1b5255e1589 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/SchemaHotAdapter.java @@ -70,19 +70,19 @@ public KvStoreColumn getColumnHotStatesByRoot() { return delegate.getColumnBlobSidecarBySlotRootBlobIndex(); } - public KvStoreUnchunckedVariable getVariableGenesisTime() { + public KvStoreUnchunkedVariable getVariableGenesisTime() { return delegate.getVariableGenesisTime(); } - public KvStoreUnchunckedVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableJustifiedCheckpoint() { return delegate.getVariableJustifiedCheckpoint(); } - public KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint() { return delegate.getVariableBestJustifiedCheckpoint(); } - public KvStoreUnchunckedVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunkedVariable getVariableFinalizedCheckpoint() { return delegate.getVariableFinalizedCheckpoint(); } @@ -90,19 +90,19 @@ public KvStoreChunkedVariable getVariableLatestFinalizedState() { return delegate.getVariableLatestFinalizedState(); } - public KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock() { return delegate.getVariableMinGenesisTimeBlock(); } - public KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint() { return delegate.getVariableWeakSubjectivityCheckpoint(); } - public KvStoreUnchunckedVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunkedVariable getVariableAnchorCheckpoint() { return delegate.getVariableAnchorCheckpoint(); } - public KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot() { return delegate.getVariableFinalizedDepositSnapshot(); } @@ -126,7 +126,7 @@ public KvStoreUnchunckedVariable getVariableFinalizedDeposi getColumnBlobSidecarBySlotRootBlobIndex()); } - public Map getVariableMap() { + public Map> getVariableMap() { return Map.of( "GENESIS_TIME", getVariableGenesisTime(), @@ -154,7 +154,7 @@ public Map getVariableMap() { } @Override - public Collection getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java index 44c2dfd70c0..5d304d8fdc8 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombined.java @@ -64,28 +64,28 @@ public abstract class V6SchemaCombined implements SchemaCombined { KvStoreColumn.create(7, BYTES32_SERIALIZER, CHECKPOINT_EPOCHS_SERIALIZER); // Variables - private static final KvStoreUnchunckedVariable GENESIS_TIME = - KvStoreUnchunckedVariable.create(1, UINT64_SERIALIZER); - private static final KvStoreUnchunckedVariable JUSTIFIED_CHECKPOINT = - KvStoreUnchunckedVariable.create(2, CHECKPOINT_SERIALIZER); - private static final KvStoreUnchunckedVariable BEST_JUSTIFIED_CHECKPOINT = - KvStoreUnchunckedVariable.create(3, CHECKPOINT_SERIALIZER); - private static final KvStoreUnchunckedVariable FINALIZED_CHECKPOINT = - KvStoreUnchunckedVariable.create(4, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable GENESIS_TIME = + KvStoreUnchunkedVariable.create(1, UINT64_SERIALIZER); + private static final KvStoreUnchunkedVariable JUSTIFIED_CHECKPOINT = + KvStoreUnchunkedVariable.create(2, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable BEST_JUSTIFIED_CHECKPOINT = + KvStoreUnchunkedVariable.create(3, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable FINALIZED_CHECKPOINT = + KvStoreUnchunkedVariable.create(4, CHECKPOINT_SERIALIZER); private final KvStoreChunkedVariable latestFinalizedState; - private static final KvStoreUnchunckedVariable MIN_GENESIS_TIME_BLOCK = - KvStoreUnchunckedVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); + private static final KvStoreUnchunkedVariable MIN_GENESIS_TIME_BLOCK = + KvStoreUnchunkedVariable.create(6, MIN_GENESIS_TIME_BLOCK_EVENT_SERIALIZER); // 7 was the protoarray snapshot variable but is no longer used. - private static final KvStoreUnchunckedVariable WEAK_SUBJECTIVITY_CHECKPOINT = - KvStoreUnchunckedVariable.create(8, CHECKPOINT_SERIALIZER); - private static final KvStoreUnchunckedVariable ANCHOR_CHECKPOINT = - KvStoreUnchunckedVariable.create(9, CHECKPOINT_SERIALIZER); - private static final KvStoreUnchunckedVariable FINALIZED_DEPOSIT_SNAPSHOT = - KvStoreUnchunckedVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); + private static final KvStoreUnchunkedVariable WEAK_SUBJECTIVITY_CHECKPOINT = + KvStoreUnchunkedVariable.create(8, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable ANCHOR_CHECKPOINT = + KvStoreUnchunkedVariable.create(9, CHECKPOINT_SERIALIZER); + private static final KvStoreUnchunkedVariable FINALIZED_DEPOSIT_SNAPSHOT = + KvStoreUnchunkedVariable.create(10, DEPOSIT_SNAPSHOT_SERIALIZER); - private final KvStoreUnchunckedVariable optimisticTransitionBlockSlot; - private final KvStoreUnchunckedVariable earliestBlobSidecarSlot; - private final KvStoreUnchunckedVariable earliestBlockSlot; + private final KvStoreUnchunkedVariable optimisticTransitionBlockSlot; + private final KvStoreUnchunkedVariable earliestBlobSidecarSlot; + private final KvStoreUnchunkedVariable earliestBlockSlot; protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { this.finalizedOffset = finalizedOffset; @@ -99,15 +99,15 @@ protected V6SchemaCombined(final Spec spec, final int finalizedOffset) { final KvStoreChunkingSerializer stateChunkingSerializer = KvStoreChunkingSerializer.createStateSerializer(spec); - latestFinalizedState = KvStoreChunkedVariable.create(5, stateSerializer); + latestFinalizedState = KvStoreChunkedVariable.create(5, stateChunkingSerializer); votes = KvStoreColumn.create(3, UINT64_SERIALIZER, VOTE_TRACKER_SERIALIZER); optimisticTransitionBlockSlot = - KvStoreUnchunckedVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); + KvStoreUnchunkedVariable.create(finalizedOffset + 1, UINT64_SERIALIZER); earliestBlobSidecarSlot = - KvStoreUnchunckedVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); - earliestBlockSlot = KvStoreUnchunckedVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); + KvStoreUnchunkedVariable.create(finalizedOffset + 2, UINT64_SERIALIZER); + earliestBlockSlot = KvStoreUnchunkedVariable.create(finalizedOffset + 3, UINT64_SERIALIZER); } @Override @@ -146,22 +146,22 @@ public KvStoreColumn getColumnHotStatesByRoot() { } @Override - public KvStoreUnchunckedVariable getVariableGenesisTime() { + public KvStoreUnchunkedVariable getVariableGenesisTime() { return GENESIS_TIME; } @Override - public KvStoreUnchunckedVariable getVariableJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableJustifiedCheckpoint() { return JUSTIFIED_CHECKPOINT; } @Override - public KvStoreUnchunckedVariable getVariableBestJustifiedCheckpoint() { + public KvStoreUnchunkedVariable getVariableBestJustifiedCheckpoint() { return BEST_JUSTIFIED_CHECKPOINT; } @Override - public KvStoreUnchunckedVariable getVariableFinalizedCheckpoint() { + public KvStoreUnchunkedVariable getVariableFinalizedCheckpoint() { return FINALIZED_CHECKPOINT; } @@ -171,37 +171,37 @@ public KvStoreChunkedVariable getVariableLatestFinalizedState() { } @Override - public KvStoreUnchunckedVariable getVariableMinGenesisTimeBlock() { + public KvStoreUnchunkedVariable getVariableMinGenesisTimeBlock() { return MIN_GENESIS_TIME_BLOCK; } @Override - public KvStoreUnchunckedVariable getVariableWeakSubjectivityCheckpoint() { + public KvStoreUnchunkedVariable getVariableWeakSubjectivityCheckpoint() { return WEAK_SUBJECTIVITY_CHECKPOINT; } @Override - public KvStoreUnchunckedVariable getVariableAnchorCheckpoint() { + public KvStoreUnchunkedVariable getVariableAnchorCheckpoint() { return ANCHOR_CHECKPOINT; } @Override - public KvStoreUnchunckedVariable getOptimisticTransitionBlockSlot() { + public KvStoreUnchunkedVariable getOptimisticTransitionBlockSlot() { return optimisticTransitionBlockSlot; } @Override - public KvStoreUnchunckedVariable getVariableFinalizedDepositSnapshot() { + public KvStoreUnchunkedVariable getVariableFinalizedDepositSnapshot() { return FINALIZED_DEPOSIT_SNAPSHOT; } @Override - public KvStoreUnchunckedVariable getVariableEarliestBlobSidecarSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlobSidecarSlot() { return earliestBlobSidecarSlot; } @Override - public KvStoreUnchunckedVariable getVariableEarliestBlockSlot() { + public KvStoreUnchunkedVariable getVariableEarliestBlockSlot() { return earliestBlockSlot; } @@ -227,8 +227,8 @@ public KvStoreUnchunckedVariable getVariableEarliestBlockSlot() { } @Override - public Map> getVariableMap() { - return ImmutableMap.>builder() + public Map> getVariableMap() { + return ImmutableMap.>builder() .put("GENESIS_TIME", getVariableGenesisTime()) .put("JUSTIFIED_CHECKPOINT", getVariableJustifiedCheckpoint()) .put("BEST_JUSTIFIED_CHECKPOINT", getVariableBestJustifiedCheckpoint()) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java index 9aa200257a0..cff71d1b931 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedSnapshot.java @@ -172,7 +172,7 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { } @Override - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java index 8ae186c5955..eed62f47bdd 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/V6SchemaCombinedTreeState.java @@ -150,8 +150,8 @@ public KvStoreColumn> getColumnNonCanonicalRootsBySlot() { } @Override - public Map> getVariableMap() { - return ImmutableMap.>builder() + public Map> getVariableMap() { + return ImmutableMap.>builder() .put("GENESIS_TIME", getVariableGenesisTime()) .put("JUSTIFIED_CHECKPOINT", getVariableJustifiedCheckpoint()) .put("BEST_JUSTIFIED_CHECKPOINT", getVariableBestJustifiedCheckpoint()) @@ -200,7 +200,7 @@ public Map> getVariableMap() { } @Override - public Collection> getAllVariables() { + public Collection> getAllVariables() { return getVariableMap().values(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java index 58cb94a7b62..4a32509e25d 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java @@ -37,7 +37,7 @@ public BeaconState deserialize(final List data) { public List serialize(final BeaconState value) { final SszByteArrayChunksWriter sszByteArrayChunksWriter = new SszByteArrayChunksWriter( - value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 10240); + value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 1024_000); value.sszSerialize(sszByteArrayChunksWriter); return sszByteArrayChunksWriter.getChunks(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java index 48fd7f82231..5d7253ecc69 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/ChunkedVariableKeySerializer.java @@ -45,7 +45,7 @@ public byte[] serialize(final IdAndChunkKey value) { } public record IdAndChunkKey(Bytes id, Bytes chunkKey) { - public static IdAndChunkKey fromBytes(Bytes bytes) { + public static IdAndChunkKey fromBytes(final Bytes bytes) { return new IdAndChunkKey( bytes.slice(0, ID_KEY_SIZE), bytes.slice(ID_KEY_SIZE, CHUNK_KEY_SIZE)); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java index a536cefeddf..f672efc037f 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbInstance.java @@ -16,6 +16,8 @@ import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.asColumnEntry; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.asOptionalColumnEntry; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.deserializeKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableChunksKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getColumnKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getKeyAfterColumn; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getVariableKey; @@ -29,10 +31,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,7 +55,7 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; /** * Implements {@link KvStoreAccessor} using LevelDB to store the data. @@ -93,7 +97,7 @@ public LevelDbInstance( } @Override - public Optional get(final KvStoreUnchunckedVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable) .map(value -> variable.getSerializer().deserialize(value.toArrayUnsafe())); } @@ -101,11 +105,15 @@ public Optional get(final KvStoreUnchunckedVariable variable) { @Override public Optional get(final KvStoreChunkedVariable variable) { return getRaw(variable) - .map(value -> variable.getSerializer().deserialize(value.toArrayUnsafe())); + .map( + value -> + variable + .getSerializer() + .deserialize(value.stream().map(Bytes::toArrayUnsafe).toList())); } @Override - public Optional getRaw(final KvStoreUnchunckedVariable variable) { + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); return Optional.ofNullable(db.get(getVariableKey(variable))).map(Bytes::wrap); } @@ -113,8 +121,21 @@ public Optional getRaw(final KvStoreUnchunckedVariable variable) { @Override public Optional> getRaw(final KvStoreChunkedVariable variable) { assertOpen(); - // TODO implement the lookup to get number of chunks and then get all the chunks - return Optional.empty(); + return Optional.ofNullable(db.get(getChunkedVariableChunksKey(variable))) + .map(Bytes::wrap) + .map( + chunksNumber -> { + if (chunksNumber.size() > 1) { + // this is a non chunked variable to be migrated + return List.of(Bytes.wrap(chunksNumber)); + } + // TODO implement with an iterator + return IntStream.range(0, Byte.toUnsignedInt(chunksNumber.get(0))) + .mapToObj(i -> db.get(getChunkedVariableKey(variable, i))) + .peek(Objects::requireNonNull) + .map(Bytes::wrap) + .toList(); + }); } @Override diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java index f6c27eaef26..747e1dd0b9a 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbTransaction.java @@ -14,11 +14,15 @@ package tech.pegasys.teku.storage.server.leveldb; import static tech.pegasys.teku.infrastructure.logging.DbLogger.DB_LOGGER; +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableChunksKey; +import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getChunkedVariableKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getColumnKey; import static tech.pegasys.teku.storage.server.leveldb.LevelDbUtils.getVariableKey; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.apache.tuweni.bytes.Bytes; @@ -28,7 +32,7 @@ import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class LevelDbTransaction implements KvStoreTransaction { @@ -47,20 +51,33 @@ public LevelDbTransaction( } @Override - public void put(final KvStoreUnchunckedVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { putRaw(variable, Bytes.wrap(variable.getSerializer().serialize(value))); } @Override public void put(final KvStoreChunkedVariable variable, final T value) { - putRaw(variable, Bytes.wrap(variable.getValueSerializer().serialize(value))); + putRaw(variable, variable.getSerializer().serialize(value).stream().map(Bytes::wrap).toList()); } @Override - public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { applyUpdate(() -> writeBatch.put(getVariableKey(variable), value.toArrayUnsafe())); } + @Override + public void putRaw(final KvStoreChunkedVariable variable, final List valueChunks) { + applyUpdate( + () -> { + writeBatch.put( + getChunkedVariableChunksKey(variable), new byte[] {toByteExact(valueChunks.size())}); + for (int index = 0; index < valueChunks.size(); index++) { + writeBatch.put( + getChunkedVariableKey(variable, index), valueChunks.get(index).toArrayUnsafe()); + } + }); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { putRaw( @@ -90,7 +107,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreUnchunckedVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { applyUpdate(() -> writeBatch.delete(getVariableKey(variable))); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java index aa069aa24a6..60da943ae32 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/leveldb/LevelDbUtils.java @@ -14,13 +14,15 @@ package tech.pegasys.teku.storage.server.leveldb; import static com.google.common.base.Preconditions.checkArgument; +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; import java.util.Arrays; import java.util.Map; import java.util.Optional; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; class LevelDbUtils { @@ -33,7 +35,27 @@ static byte[] getKeyAfterColumn(final KvStoreColumn column) { return keyAfterColumn; } - static byte[] getVariableKey(final KvStoreUnchunckedVariable variable) { + static byte[] getVariableKey(final KvStoreUnchunkedVariable variable) { + final byte[] suffix = variable.getId().toArrayUnsafe(); + final byte[] key = new byte[suffix.length + 1]; + // All 1s in binary so right at the end of the index. + key[0] = VARIABLE_COLUMN_PREFIX; + System.arraycopy(suffix, 0, key, 1, suffix.length); + return key; + } + + static byte[] getChunkedVariableKey(final KvStoreChunkedVariable variable, final int chunkId) { + final byte[] idSuffix = variable.getId().toArrayUnsafe(); + final byte[] key = new byte[idSuffix.length + 2]; + // All 1s in binary so right at the end of the index. + key[0] = VARIABLE_COLUMN_PREFIX; + System.arraycopy(idSuffix, 0, key, 1, idSuffix.length); + // last is chunk id + key[key.length - 1] = toByteExact(chunkId); + return key; + } + + static byte[] getChunkedVariableChunksKey(final KvStoreChunkedVariable variable) { final byte[] suffix = variable.getId().toArrayUnsafe(); final byte[] key = new byte[suffix.length + 1]; // All 1s in binary so right at the end of the index. diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java index dca7578864e..c963d5066f8 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbInstance.java @@ -34,8 +34,9 @@ import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.ColumnEntry; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class RocksDbInstance implements KvStoreAccessor { @@ -59,12 +60,17 @@ public class RocksDbInstance implements KvStoreAccessor { } @Override - public Optional get(final KvStoreUnchunckedVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable).map(data -> variable.getSerializer().deserialize(data.toArrayUnsafe())); } @Override - public Optional getRaw(final KvStoreUnchunckedVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); try { return Optional.ofNullable(db.get(defaultHandle, variable.getId().toArrayUnsafe())) @@ -74,6 +80,11 @@ public Optional getRaw(final KvStoreUnchunckedVariable variable) { } } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java index 30b23229665..ac3ae40ba42 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/rocksdb/RocksDbTransaction.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.storage.server.rocksdb; import com.google.common.collect.ImmutableMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -26,8 +27,9 @@ import org.rocksdb.WriteOptions; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; public class RocksDbTransaction implements KvStoreTransaction { private final ColumnFamilyHandle defaultHandle; @@ -53,13 +55,18 @@ public class RocksDbTransaction implements KvStoreTransaction { } @Override - public void put(final KvStoreUnchunckedVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { final Bytes serialized = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, serialized); } @Override - public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { applyUpdate( () -> { try { @@ -70,6 +77,11 @@ public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes }); } + @Override + public void putRaw(final KvStoreChunkedVariable variable, final List valueChunks) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { final Bytes keyBytes = Bytes.wrap(column.getKeySerializer().serialize(key)); @@ -122,7 +134,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreUnchunckedVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { applyUpdate( () -> { try { diff --git a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java index f700dd69c44..8cbc0e74ae1 100644 --- a/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java +++ b/storage/src/testFixtures/java/tech/pegasys/teku/storage/server/kvstore/MockKvStoreInstance.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -30,23 +31,25 @@ import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.storage.server.ShuttingDownException; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable; import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn; -import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunckedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable; +import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable; public class MockKvStoreInstance implements KvStoreAccessor { private final Set> columns; - private final Set> variables; + private final Set> variables; private final Map, NavigableMap> columnData; - private final Map, Bytes> variableData; + private final Map, Bytes> variableData; private final AtomicBoolean closed = new AtomicBoolean(false); public MockKvStoreInstance( final Collection> columns, - final Collection> variables, + final Collection> variables, final Map, NavigableMap> columnData, - final Map, Bytes> variableData) { + final Map, Bytes> variableData) { this.columns = new HashSet<>(columns); this.variables = new HashSet<>(variables); this.columnData = columnData; @@ -63,28 +66,38 @@ public MockKvStoreInstance reopen() { public static MockKvStoreInstance createEmpty( final Collection> columns, - final Collection> variables) { + final Collection> variables) { checkArgument(columns.size() > 0, "No columns attached to schema"); final Map, NavigableMap> columnData = columns.stream() .collect(Collectors.toConcurrentMap(col -> col, __ -> new ConcurrentSkipListMap<>())); - final Map, Bytes> variableData = new ConcurrentHashMap<>(); + final Map, Bytes> variableData = new ConcurrentHashMap<>(); return new MockKvStoreInstance(columns, variables, columnData, variableData); } @Override - public Optional get(final KvStoreUnchunckedVariable variable) { + public Optional get(final KvStoreUnchunkedVariable variable) { return getRaw(variable).map(Bytes::toArrayUnsafe).map(variable.getSerializer()::deserialize); } @Override - public Optional getRaw(final KvStoreUnchunckedVariable variable) { + public Optional get(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public Optional getRaw(final KvStoreUnchunkedVariable variable) { assertOpen(); assertValidVariable(variable); return Optional.ofNullable(variableData.get(variable)); } + @Override + public Optional> getRaw(final KvStoreChunkedVariable variable) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public Optional get(final KvStoreColumn column, final K key) { assertOpen(); @@ -242,7 +255,7 @@ public void close() { closed.set(true); } - private void assertValidVariable(final KvStoreUnchunckedVariable variable) { + private void assertValidVariable(final KvStoreUnchunkedVariable variable) { checkArgument(variables.contains(variable), "Unknown RocksDbVariable supplied"); } @@ -261,7 +274,7 @@ private static class MockKvStoreTransaction implements KvStoreTransaction { private final MockKvStoreInstance dbInstance; private final Map, Map> columnUpdates = new HashMap<>(); private final Map, Set> deletedColumnKeys = new HashMap<>(); - private final Map, Optional> variableUpdates = + private final Map, Optional> variableUpdates = new HashMap<>(); private boolean closed = false; @@ -270,18 +283,29 @@ public MockKvStoreTransaction(final MockKvStoreInstance mockRocksDbInstance) { } @Override - public void put(final KvStoreUnchunckedVariable variable, final T value) { + public void put(final KvStoreUnchunkedVariable variable, final T value) { final Bytes valueBytes = Bytes.wrap(variable.getSerializer().serialize(value)); putRaw(variable, valueBytes); } @Override - public void putRaw(final KvStoreUnchunckedVariable variable, final Bytes value) { + public void put(final KvStoreChunkedVariable variable, final T value) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public void putRaw(final KvStoreUnchunkedVariable variable, final Bytes value) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.of(value)); } + @Override + public void putRaw( + final KvStoreChunkedVariable variable, final List valueChunks) { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public void put(final KvStoreColumn column, final K key, final V value) { final Bytes keyBytes = dbInstance.keyToBytes(column, key); @@ -322,7 +346,7 @@ public void delete(final KvStoreColumn column, final K key) { } @Override - public void delete(final KvStoreUnchunckedVariable variable) { + public void delete(final KvStoreUnchunkedVariable variable) { assertOpen(); dbInstance.assertValidVariable(variable); variableUpdates.put(variable, Optional.empty()); From 152ba5e0746a93b3bfaedec6f73efd918fa4f71c Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 22 Jan 2025 21:29:07 +0100 Subject: [PATCH 3/4] rise to 8mb chunks --- .../kvstore/serialization/BeaconStateChunkingSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java index 4a32509e25d..5b0f82d28dd 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/serialization/BeaconStateChunkingSerializer.java @@ -37,7 +37,7 @@ public BeaconState deserialize(final List data) { public List serialize(final BeaconState value) { final SszByteArrayChunksWriter sszByteArrayChunksWriter = new SszByteArrayChunksWriter( - value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 1024_000); + value.getBeaconStateSchema().getSszSize(value.getBackingNode()), 1 << 23); value.sszSerialize(sszByteArrayChunksWriter); return sszByteArrayChunksWriter.getChunks(); } From 16e0b332f64980d08e77a919a139bdd582211216 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Thu, 23 Jan 2025 12:30:35 +0100 Subject: [PATCH 4/4] add missing class --- .../schema/KvStoreUnchunkedVariable.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java new file mode 100644 index 00000000000..859ae2f87a5 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/schema/KvStoreUnchunkedVariable.java @@ -0,0 +1,72 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server.kvstore.schema; + +import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact; + +import java.util.Objects; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreSerializer; + +public class KvStoreUnchunkedVariable implements KvStoreVariable { + private final Bytes id; + private final KvStoreSerializer serializer; + + private KvStoreUnchunkedVariable(final byte[] id, final KvStoreSerializer serializer) { + this.id = Bytes.wrap(id); + this.serializer = serializer; + } + + public static KvStoreUnchunkedVariable create( + final int id, final KvStoreSerializer serializer) { + final byte byteId = toByteExact(id); + return new KvStoreUnchunkedVariable(new byte[] {byteId}, serializer); + } + + public Bytes getId() { + return id; + } + + public KvStoreSerializer getSerializer() { + return serializer; + } + + @Override + public Optional> toChunkedVariable() { + return Optional.empty(); + } + + @Override + public Optional> toUnchunkedVariable() { + return Optional.of(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KvStoreUnchunkedVariable that = (KvStoreUnchunkedVariable) o; + return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer); + } + + @Override + public int hashCode() { + return Objects.hash(id, serializer); + } +}