Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions src/AzurePipelines/PipelineCachingCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ string s when int.TryParse(s, out int i) => i,
_startupTask = Task.Run(() => QueryPipelineCaching(rootContext, new VisualStudio.Services.PipelineCache.WebApi.Fingerprint("init"), CancellationToken.None));
}

protected override async Task AddNodeAsync(
protected override async Task<AddNodeResult> AddNodeAsync(
Context context,
StrongFingerprint fingerprint,
IReadOnlyDictionary<string, ContentHash> outputs,
Expand All @@ -195,10 +195,11 @@ protected override async Task AddNodeAsync(
{
if (_remoteCacheIsReadOnly)
{
return;
return AddNodeResult.Skipped;
}

// write the SFP -> manifest
bool sfpAddded;
List<string> tempFilePaths = new();
try
{
Expand Down Expand Up @@ -268,7 +269,7 @@ protected override async Task AddNodeAsync(
infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray();
}

var result = await WithHttpRetries(
PublishResult result = await WithHttpRetries(
() => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken),
cacheContext: context,
message: $"Publishing content for {fingerprint}",
Expand All @@ -292,12 +293,24 @@ protected override async Task AddNodeAsync(
result.ProofNodes,
ContentFormatConstants.Files);

CreateResult createResult = await WithHttpRetries(
() => _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken),
sfpAddded = await WithHttpRetries(
async () =>
{
try
{
CreateResult createResult = await _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken);
Tracer.Debug(context, $"Cache entry for {fingerprint} stored in scope `{createResult.ScopeUsed}`");
return true;
}
catch (PipelineCacheItemAlreadyExistsException)
{
Tracer.Debug(context, $"Cache entry for {fingerprint} already exists.");
return false;
}
},
cacheContext: context,
message: $"Storing cache key for {fingerprint}",
cancellationToken);
Tracer.Debug(context, $"Cache entry stored in scope `{createResult.ScopeUsed}`");
}
finally
{
Expand All @@ -310,6 +323,7 @@ protected override async Task AddNodeAsync(
}

// add the WFP -> Selector mapping
bool wfpAddded;
List<TempFile> pathSetTempFiles = new();
try
{
Expand Down Expand Up @@ -363,27 +377,41 @@ protected override async Task AddNodeAsync(
}
}

var result = await WithHttpRetries(
PublishResult result = await WithHttpRetries(
() => _manifestClient.PublishAsync(TempFolder, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken),
cacheContext: context,
message: $"Publishing content for {fingerprint}",
cancellationToken);

var entry = new CreatePipelineCacheArtifactContract(
CreatePipelineCacheArtifactContract entry = new(
DomainId,
new VisualStudio.Services.PipelineCache.WebApi.Fingerprint(key.Split(KeySegmentSeperator)),
result.ManifestId,
result.RootId,
result.ProofNodes,
ContentFormatConstants.Files);

CreateResult createResult = await WithHttpRetries(
() => _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken),
wfpAddded = await WithHttpRetries(
async () =>
{
try
{
CreateResult createResult = await _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken);
Tracer.Debug(context, $"SFP `{fingerprint}` stored in scope `{createResult.ScopeUsed}`");
return true;
}
catch (PipelineCacheItemAlreadyExistsException)
{
return false;
}
},
cacheContext: context,
message: $"Storing cache key for {fingerprint}",
cancellationToken);

Tracer.Debug(context, $"SFP `{fingerprint}` stored in scope `{createResult.ScopeUsed}`");
return wfpAddded || sfpAddded
? AddNodeResult.Added
: AddNodeResult.AlreadyExists;
}
finally
{
Expand Down Expand Up @@ -733,13 +761,13 @@ private string ComputeSelectorsKey(BuildXL.Cache.MemoizationStore.Interfaces.Ses
private Task<T> WithHttpRetries<T>(Func<Task<T>> taskFactory, Context cacheContext, string message, CancellationToken token)
{
return AsyncHttpRetryHelper<T>.InvokeAsync(
taskFactory,
maxRetries: 10,
tracer: _azureDevopsTracer,
canRetryDelegate: _ => true, // retry on any exception
cancellationToken: token,
continueOnCapturedContext: false,
context: EmbedCacheContext(cacheContext, message));
taskFactory,
maxRetries: 10,
tracer: _azureDevopsTracer,
canRetryDelegate: _ => true, // retry on any exception
cancellationToken: token,
continueOnCapturedContext: false,
context: EmbedCacheContext(cacheContext, message));
}

public override async ValueTask DisposeAsync()
Expand Down
16 changes: 12 additions & 4 deletions src/Common/Caching/CacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@

namespace Microsoft.MSBuildCache.Caching;

public enum AddNodeResult
{
Added,
AlreadyExists,
Skipped
}

public abstract class CacheClient : ICacheClient
{
private static readonly byte[] EmptySelectorOutput = new byte[1];
Expand Down Expand Up @@ -122,7 +129,8 @@ protected CacheClient(
/* abstract methods for subclasses to implement */
protected abstract Task<OpenStreamResult> OpenStreamAsync(Context context, ContentHash contentHash, CancellationToken cancellationToken);

protected abstract Task AddNodeAsync(
/// <returns>True if added, false if already exists. Otherwise throws.</returns>
protected abstract Task<AddNodeResult> AddNodeAsync(
Context context,
StrongFingerprint fingerprint,
IReadOnlyDictionary<string, ContentHash> outputs,
Expand Down Expand Up @@ -342,16 +350,16 @@ public async Task AddNodeInternalAsync(

StrongFingerprint cacheStrongFingerprint = new(cacheWeakFingerprint, selector);

Tracer.Debug(context, $"StrongFingerprint is {cacheStrongFingerprint} for {nodeContext.Id}");

await AddNodeAsync(
AddNodeResult addresult = await AddNodeAsync(
context,
cacheStrongFingerprint,
outputsToCache,
(nodeBuildResultHash, nodeBuildResultBytes),
pathSetBytes,
cancellationToken);

Tracer.Debug(context, $"Adding StrongFingerprint {cacheStrongFingerprint} for {nodeContext.Id} resulted in {addresult}.");

if (_localCacheStateManager is not null)
{
await _localCacheStateManager.WriteStateFileAsync(nodeContext, nodeBuildResult);
Expand Down
6 changes: 5 additions & 1 deletion src/Common/Caching/CasCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected override async IAsyncEnumerable<Selector> GetSelectors(Context context
protected override Task<OpenStreamResult> OpenStreamAsync(Context context, ContentHash contentHash, CancellationToken cancellationToken)
=> _twoLevelCacheSession.OpenStreamAsync(context, contentHash, cancellationToken);

protected override async Task AddNodeAsync(
protected override async Task<AddNodeResult> AddNodeAsync(
Context context,
StrongFingerprint fingerprint,
IReadOnlyDictionary<string, ContentHash> outputs,
Expand Down Expand Up @@ -253,6 +253,10 @@ static async Task checkUploadResultsAsync(List<Task<PutFileOperation>> uploadTas
throw new CacheException($"{nameof(_twoLevelCacheSession.AddOrGetContentHashListAsync)} failed for {fingerprint}.");
}

return contentHashList.Equals(addResult?.ContentHashListWithDeterminism.ContentHashList)
? AddNodeResult.Added
: AddNodeResult.AlreadyExists;

// TODO dfederm: Handle CHL races
}

Expand Down
Loading