Skip to content

Commit 970405d

Browse files
committed
Improve tracing of filesystem calls in CacheFileSystem
Use a tracing wrapper on external FS to track any FS calls made by cache implementations. Removed redudant tracking of external FS calls from cache implementations. Updated tests for tracing FS calls to be closer to reality.
1 parent a810a18 commit 970405d

File tree

13 files changed

+450
-281
lines changed

13 files changed

+450
-281
lines changed

lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,13 @@
1616
import alluxio.client.file.URIStatus;
1717
import alluxio.client.file.cache.CacheManager;
1818
import alluxio.conf.AlluxioConfiguration;
19-
import io.opentelemetry.api.trace.Span;
2019
import io.opentelemetry.api.trace.Tracer;
2120
import io.trino.filesystem.TrinoInput;
2221
import io.trino.filesystem.TrinoInputFile;
2322

2423
import java.io.EOFException;
2524
import java.io.IOException;
2625

27-
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION;
28-
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
29-
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
30-
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
31-
import static io.trino.filesystem.tracing.Tracing.withTracing;
3226
import static java.lang.Math.min;
3327
import static java.util.Objects.checkFromIndexSize;
3428
import static java.util.Objects.requireNonNull;
@@ -39,9 +33,7 @@ public class AlluxioInput
3933
private final TrinoInputFile inputFile;
4034
private final long fileLength;
4135
private final AlluxioCacheStats statistics;
42-
private final String cacheKey;
4336
private final AlluxioInputHelper helper;
44-
private final Tracer tracer;
4537

4638
private TrinoInput input;
4739
private boolean closed;
@@ -55,11 +47,9 @@ public AlluxioInput(
5547
AlluxioConfiguration configuration,
5648
AlluxioCacheStats statistics)
5749
{
58-
this.tracer = requireNonNull(tracer, "tracer is null");
5950
this.inputFile = requireNonNull(inputFile, "inputFile is null");
6051
this.fileLength = requireNonNull(status, "status is null").getLength();
6152
this.statistics = requireNonNull(statistics, "statistics is null");
62-
this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
6353
this.helper = new AlluxioInputHelper(tracer, inputFile.location(), cacheKey, status, cacheManager, configuration, statistics);
6454
}
6555

@@ -90,22 +80,13 @@ private int doExternalRead(long position, byte[] buffer, int offset, int length)
9080
return 0;
9181
}
9282

93-
Span span = tracer.spanBuilder("Alluxio.readExternal")
94-
.setAttribute(CACHE_KEY, cacheKey)
95-
.setAttribute(CACHE_FILE_LOCATION, inputFile.location().toString())
96-
.setAttribute(CACHE_FILE_READ_SIZE, (long) length)
97-
.setAttribute(CACHE_FILE_READ_POSITION, position)
98-
.startSpan();
99-
100-
return withTracing(span, () -> {
101-
AlluxioInputHelper.PageAlignedRead aligned = helper.alignRead(position, length);
102-
byte[] readBuffer = new byte[aligned.length()];
103-
getInput().readFully(aligned.pageStart(), readBuffer, 0, readBuffer.length);
104-
helper.putCache(aligned.pageStart(), aligned.pageEnd(), readBuffer, aligned.length());
105-
System.arraycopy(readBuffer, aligned.pageOffset(), buffer, offset, length);
106-
statistics.recordExternalRead(readBuffer.length);
107-
return length;
108-
});
83+
AlluxioInputHelper.PageAlignedRead aligned = helper.alignRead(position, length);
84+
byte[] readBuffer = new byte[aligned.length()];
85+
getInput().readFully(aligned.pageStart(), readBuffer, 0, readBuffer.length);
86+
helper.putCache(aligned.pageStart(), aligned.pageEnd(), readBuffer, aligned.length());
87+
System.arraycopy(readBuffer, aligned.pageOffset(), buffer, offset, length);
88+
statistics.recordExternalRead(readBuffer.length);
89+
return length;
10990
}
11091

11192
private TrinoInput getInput()

lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private int doExternalRead0(long readPosition, byte[] buffer, int offset, int le
139139
return 0;
140140
}
141141

142-
Span span = tracer.spanBuilder("Alluxio.readExternal")
142+
Span span = tracer.spanBuilder("Alluxio.readExternalStream")
143143
.setAttribute(CACHE_KEY, key)
144144
.setAttribute(CACHE_FILE_LOCATION, inputFile.location().toString())
145145
.setAttribute(CACHE_FILE_READ_SIZE, (long) length)

lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystemAccessOperations.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@
5959
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
6060
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
6161
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
62+
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION;
63+
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION;
64+
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE;
6265
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
66+
import static java.util.Objects.requireNonNull;
6367
import static java.util.stream.Collectors.toCollection;
6468
import static org.assertj.core.api.Assertions.assertThat;
6569

@@ -128,7 +132,7 @@ public void testCache()
128132
.addCopies(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 11), readTimes)
129133
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 11))
130134
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 11))
131-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 11))
135+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 11))
132136
.build());
133137

134138
byte[] modifiedContent = "modified content".getBytes(StandardCharsets.UTF_8);
@@ -140,7 +144,7 @@ public void testCache()
140144
readTimes = 7;
141145
assertCacheOperations(location, modifiedContent, readTimes,
142146
ImmutableMultiset.<CacheOperationSpan>builder()
143-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 16))
147+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 16))
144148
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 16))
145149
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 16))
146150
.addCopies(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 16), readTimes)
@@ -165,7 +169,7 @@ public void testPartialCacheHits()
165169
ImmutableMultiset.<CacheOperationSpan>builder()
166170
.add(new CacheOperationSpan("Alluxio.readCached", "memory:///partial", 0, PAGE_SIZE))
167171
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
168-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 0, PAGE_SIZE))
172+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 0, PAGE_SIZE))
169173
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 0, PAGE_SIZE))
170174
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
171175
.build());
@@ -175,7 +179,7 @@ public void testPartialCacheHits()
175179
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), 0, PAGE_SIZE + 10))
176180
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
177181
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), PAGE_SIZE, PAGE_SIZE))
178-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), PAGE_SIZE, 10))
182+
.add(new CacheOperationSpan("Input.readFully", location.toString(), PAGE_SIZE, PAGE_SIZE))
179183
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), PAGE_SIZE, PAGE_SIZE))
180184
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), PAGE_SIZE, PAGE_SIZE))
181185
.build());
@@ -214,7 +218,7 @@ public void testMultiPageExternalsReads()
214218
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
215219
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), PAGE_SIZE, PAGE_SIZE))
216220
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), PAGE_SIZE + 1))
217-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), PAGE_SIZE + 1))
221+
.add(new CacheOperationSpan("Input.readFully", location.toString(), PAGE_SIZE * 2))
218222
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), PAGE_SIZE * 2))
219223
.build());
220224
cacheKeyProvider.increaseCacheVersion();
@@ -223,7 +227,7 @@ public void testMultiPageExternalsReads()
223227
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
224228
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, PAGE_SIZE))
225229
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), PAGE_SIZE, PAGE_SIZE))
226-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 2 * PAGE_SIZE))
230+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 2 * PAGE_SIZE))
227231
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), 2 * PAGE_SIZE))
228232
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 2 * PAGE_SIZE))
229233
.build());
@@ -276,7 +280,7 @@ public void testCacheWithMissingPage()
276280
.addCopies(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 12), readTimes)
277281
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 12))
278282
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 12))
279-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 12))
283+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 12))
280284
.build());
281285

282286
TrinoInputFile inputFile = fileSystem.newInputFile(location);
@@ -289,7 +293,7 @@ public void testCacheWithMissingPage()
289293
ImmutableMultiset.<CacheOperationSpan>builder()
290294
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), 12))
291295
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 12))
292-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 12))
296+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 12))
293297
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 12))
294298
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 12))
295299
.build());
@@ -313,7 +317,7 @@ public void testCacheWithCorruptedPage()
313317
.addCopies(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 14), readTimes)
314318
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 14))
315319
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 14))
316-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 14))
320+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 14))
317321
.build());
318322

319323
TrinoInputFile inputFile = fileSystem.newInputFile(location);
@@ -326,7 +330,7 @@ public void testCacheWithCorruptedPage()
326330
ImmutableMultiset.<CacheOperationSpan>builder()
327331
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), 14))
328332
.add(new CacheOperationSpan("AlluxioCacheManager.get", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 14))
329-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), 14))
333+
.add(new CacheOperationSpan("Input.readFully", location.toString(), 14))
330334
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), 14))
331335
.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), 0, 14))
332336
.build());
@@ -366,7 +370,7 @@ private void assertUnCachedRead(Location location, int fileSize)
366370
ImmutableMultiset.Builder<CacheOperationSpan> builder = ImmutableMultiset.<CacheOperationSpan>builder()
367371
.add(new CacheOperationSpan("Alluxio.readCached", location.toString(), fileSize))
368372
.add(new CacheOperationSpan("Alluxio.writeCache", location.toString(), fileSize))
369-
.add(new CacheOperationSpan("Alluxio.readExternal", location.toString(), fileSize));
373+
.add(new CacheOperationSpan("Input.readFully", location.toString(), fileSize));
370374

371375
for (int offset = 0; offset < fileSize; offset = offset + PAGE_SIZE) {
372376
builder.add(new CacheOperationSpan("AlluxioCacheManager.put", cacheKey(location, cacheKeyProvider.currentCacheVersion()), offset, PAGE_SIZE));
@@ -412,8 +416,9 @@ private void assertCacheOperations(Location location, byte[] content, int readTi
412416

413417
private Multiset<CacheOperationSpan> getCacheOperations(List<SpanData> spans)
414418
{
415-
return spans.stream().filter(span -> span.getName().startsWith("Alluxio"))
416-
.map(CacheOperationSpan::create)
419+
return spans.stream()
420+
.filter(span -> span.getName().startsWith("Input.") || span.getName().startsWith("Alluxio"))
421+
.map(CacheOperationSpan::create)
417422
.collect(toCollection(HashMultiset::create));
418423
}
419424

@@ -429,18 +434,20 @@ public static CacheOperationSpan create(SpanData span)
429434
Attributes attributes = span.getAttributes();
430435

431436
long length = switch (span.getName()) {
432-
case "Alluxio.readCached", "Alluxio.readExternal", "AlluxioCacheManager.get" -> attributes.get(CACHE_FILE_READ_SIZE);
437+
case "Alluxio.readCached", "Alluxio.readExternalStream", "AlluxioCacheManager.get" -> attributes.get(CACHE_FILE_READ_SIZE);
433438
case "Alluxio.writeCache", "AlluxioCacheManager.put" -> attributes.get(CACHE_FILE_WRITE_SIZE);
439+
case "Input.readFully" -> attributes.get(FILE_READ_SIZE);
434440
default -> throw new IllegalArgumentException("Unrecognized span " + span.getName() + " [" + span.getAttributes() + "]");
435441
};
436442

437443
long position = switch (span.getName()) {
438-
case "Alluxio.readCached", "Alluxio.readExternal", "AlluxioCacheManager.get" -> attributes.get(CACHE_FILE_READ_POSITION);
444+
case "Alluxio.readCached", "Alluxio.readExternalStream", "AlluxioCacheManager.get" -> attributes.get(CACHE_FILE_READ_POSITION);
439445
case "Alluxio.writeCache", "AlluxioCacheManager.put" -> attributes.get(CACHE_FILE_WRITE_POSITION);
446+
case "Input.readFully" -> attributes.get(FILE_READ_POSITION);
440447
default -> throw new IllegalArgumentException("Unrecognized span " + span.getName() + " [" + span.getAttributes() + "]");
441448
};
442449

443-
return new CacheOperationSpan(span.getName(), firstNonNull(attributes.get(CACHE_FILE_LOCATION), attributes.get(CACHE_KEY)), position, length);
450+
return new CacheOperationSpan(span.getName(), getLocation(span), position, length);
444451
}
445452

446453
@Override
@@ -454,4 +461,12 @@ private static String cacheKey(Location location, int cacheVersion)
454461
{
455462
return testingCacheKeyForLocation(location, cacheVersion);
456463
}
464+
465+
private static String getLocation(SpanData span)
466+
{
467+
if (span.getName().startsWith("Input.")) {
468+
return requireNonNull(span.getAttributes().get(FILE_LOCATION));
469+
}
470+
return firstNonNull(span.getAttributes().get(CACHE_FILE_LOCATION), span.getAttributes().get(CACHE_KEY));
471+
}
457472
}

lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,14 @@ static TrinoFileSystemFactory createFileSystemFactory(
140140
.orElseThrow(() -> new IllegalArgumentException("No factory for location: " + location));
141141

142142
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(loader);
143+
delegate = new TracingFileSystemFactory(tracer, delegate);
143144
if (fileSystemCache.isPresent()) {
144-
delegate = new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
145+
return new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
145146
}
146147
// use MemoryFileSystemCache only when no other TrinoFileSystemCache is configured
147-
else if (memoryFileSystemCache.isPresent()) {
148-
delegate = new CacheFileSystemFactory(tracer, delegate, memoryFileSystemCache.orElseThrow(), keyProvider.orElseThrow());
148+
if (memoryFileSystemCache.isPresent()) {
149+
return new CacheFileSystemFactory(tracer, delegate, memoryFileSystemCache.orElseThrow(), keyProvider.orElseThrow());
149150
}
150-
return new TracingFileSystemFactory(tracer, delegate);
151+
return delegate;
151152
}
152153
}

0 commit comments

Comments
 (0)