Skip to content

Commit

Permalink
Add protected method to allow overriding the computation of the size …
Browse files Browse the repository at this point in the history
…of a cache file region (elastic#105570)

This change introduces a protected method in SharedBlobCacheService 
that allows to initialize all cache file regions with the full region size. It 
causes the underlying SparseFileTracker to always track a full region, 
and therefore it makes it possible to write more bytes to a region that 
has its initial cache file length changed.
  • Loading branch information
tlrx authored Feb 21, 2024
1 parent 0dca71e commit d1ec0d2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.store.IndexInput;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.shared.SharedBytes;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.Streams;

Expand All @@ -31,6 +32,17 @@ public static int toIntBytes(long l) {
return ByteSizeUnit.BYTES.toIntBytes(l);
}

/**
* Rounds the length up so that it is aligned on the next page size (defined by SharedBytes.PAGE_SIZE). For example
*/
public static long toPageAlignedSize(long length) {
int remainder = (int) length % SharedBytes.PAGE_SIZE;
if (remainder > 0L) {
return length + (SharedBytes.PAGE_SIZE - remainder);
}
return length;
}

public static void throwEOF(long channelPos, long len) throws EOFException {
throw new EOFException(format("unexpected EOF reading [%d-%d]", channelPos, channelPos + len));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,14 @@ private ByteRange mapSubRangeToRegion(ByteRange range, int region) {
);
}

private int getRegionSize(long fileLength, int region) {
/**
* Compute the size of a cache file region.
*
* @param fileLength the length of the file/blob to cache
* @param region the region number
* @return a size in bytes of the cache file region
*/
protected int computeCacheFileRegionSize(long fileLength, int region) {
assert fileLength > 0;
final int maxRegion = getEndingRegion(fileLength);
assert region >= 0 && region <= maxRegion : region + " - " + maxRegion;
Expand Down Expand Up @@ -1209,7 +1216,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
// if we did not find an entry
var entry = keyMapping.get(regionKey);
if (entry == null) {
final int effectiveRegionSize = getRegionSize(fileLength, region);
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now));
}
// io is volatile, double locking is fine, as long as we assign it last.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.blobcache.BlobCacheMetrics;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -1052,7 +1053,6 @@ public void testPopulate() throws Exception {
.put("path.home", createTempDir())
.build();

final AtomicLong relativeTimeInMillis = new AtomicLong(0L);
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
Expand Down Expand Up @@ -1136,4 +1136,47 @@ public void testNonPositiveRecoveryRangeSizeRejected() {
assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING);
}

public void testUseFullRegionSize() throws IOException {
final long regionSize = size(randomIntBetween(1, 100));
final long cacheSize = regionSize * randomIntBetween(1, 10);

Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep())
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
) {
@Override
protected int computeCacheFileRegionSize(long fileLength, int region) {
// use full region
return super.getRegionSize();
}
}
) {
final var cacheKey = generateCacheKey();
final var blobLength = randomLongBetween(1L, cacheSize);

int regions = Math.toIntExact(blobLength / regionSize);
regions += (blobLength % regionSize == 0L ? 0L : 1L);
assertThat(
cacheService.computeCacheFileRegionSize(blobLength, randomFrom(regions)),
equalTo(BlobCacheUtils.toIntBytes(regionSize))
);
for (int region = 0; region < regions; region++) {
var cacheFileRegion = cacheService.get(cacheKey, blobLength, region);
assertThat(cacheFileRegion.tracker.getLength(), equalTo(regionSize));
}
}
}
}

0 comments on commit d1ec0d2

Please sign in to comment.