Skip to content

Commit 9ed79cd

Browse files
idear1203Dongwei Wang
andauthored
[Synapse] - Enhance SparkBatchOperation polling logic to support both job submission and job execution scenarios and fix SparkSessionOperation and SparkStatementOperation (Azure#23706)
* [Synapse] - SparkBatchOperation should not return when state is starting * Support both job submission and job execution scenarios * Update export api * Fix a few comments * Save current status * Add test cases * Add file comment * Refresh session records * Resolve conflicts * update exported API * Regenerate code * Update session records * Add public constructors for operations * Update Azure.Analytics.Synapse.Spark.netstandard2.0 Co-authored-by: Dongwei Wang <[email protected]>
1 parent b2c8470 commit 9ed79cd

33 files changed

+1529834
-3253
lines changed

sdk/synapse/Azure.Analytics.Synapse.Spark/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 1.0.0-preview.8 (Unreleased)
44

55
### Features Added
6+
- Enhance Long Running Operation (LRO) logic for `SparkBatchClient` to support both scenarios of job submission and job execution.
67

78
### Breaking Changes
89

sdk/synapse/Azure.Analytics.Synapse.Spark/api/Azure.Analytics.Synapse.Spark.netstandard2.0.cs

Lines changed: 66 additions & 6 deletions
Large diffs are not rendered by default.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
6+
namespace Azure.Analytics.Synapse.Spark.Models
7+
{
8+
/// <summary> The SparkBatchJobOptions. </summary>
9+
public partial class SparkBatchJobOptions
10+
{
11+
/// <summary> Initializes a new instance of SparkBatchJobOptions. </summary>
12+
/// <param name="name"></param>
13+
/// <param name="file"></param>
14+
/// <param name="creationCompletionType"></param>
15+
/// <exception cref="ArgumentNullException"> <paramref name="name"/> or <paramref name="file"/> is null. </exception>
16+
public SparkBatchJobOptions(string name, string file, SparkBatchOperationCompletionType creationCompletionType = SparkBatchOperationCompletionType.JobSubmission) : this(name, file)
17+
{
18+
CreationCompletionType = creationCompletionType;
19+
}
20+
21+
/// <summary>
22+
/// Describes the different ways of Spark batch job operation could complete.
23+
/// If <see cref="SparkBatchOperationCompletionType.JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
24+
/// If <see cref="SparkBatchOperationCompletionType.JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
25+
/// </summary>
26+
public SparkBatchOperationCompletionType CreationCompletionType { get; set; }
27+
}
28+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
namespace Azure.Analytics.Synapse.Spark.Models
5+
{
6+
/// <summary>
7+
/// Describes the different ways of Spark batch job operation could complete.
8+
/// If <see cref="JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
9+
/// If <see cref="JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
10+
/// </summary>
11+
public enum SparkBatchOperationCompletionType
12+
{
13+
JobSubmission,
14+
JobExecution
15+
}
16+
}

sdk/synapse/Azure.Analytics.Synapse.Spark/src/Customization/SparkBatchClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private async Task<SparkBatchOperation> StartCreateSparkBatchJobInternal (bool a
3535
{
3636
batchSession = RestClient.CreateSparkBatchJob(sparkBatchJobOptions, detailed, cancellationToken);
3737
}
38-
return new SparkBatchOperation(this, _clientDiagnostics, batchSession);
38+
return new SparkBatchOperation(this, _clientDiagnostics, batchSession, sparkBatchJobOptions.CreationCompletionType);
3939
}
4040
catch (Exception e)
4141
{

sdk/synapse/Azure.Analytics.Synapse.Spark/src/Customization/SparkBatchOperation.cs

Lines changed: 98 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT License.
33

44
using System;
5-
using System.Collections.Generic;
65
using System.Globalization;
76
using System.Threading;
87
using System.Threading.Tasks;
@@ -21,26 +20,56 @@ public class SparkBatchOperation : Operation<SparkBatchJob>
2120
{
2221
private static readonly TimeSpan s_defaultPollingInterval = TimeSpan.FromSeconds(5);
2322

23+
/// <summary>
24+
/// Provides tools for exception creation in case of failure.
25+
/// </summary>
2426
private readonly ClientDiagnostics _diagnostics;
27+
28+
/// <summary>
29+
/// Get the completion type of Spark batch job operation.
30+
/// </summary>
31+
private readonly SparkBatchOperationCompletionType _completionType;
32+
33+
/// <summary>
34+
/// The client used to check for completion.
35+
/// </summary>
2536
private readonly SparkBatchClient _client;
26-
private readonly SparkBatchJob _value;
27-
private Response<SparkBatchJob> _response;
28-
private bool _completed;
29-
private RequestFailedException _requestFailedException;
3037

31-
internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, Response<SparkBatchJob> response)
32-
{
33-
_client = client;
34-
_value = response.Value ?? throw new InvalidOperationException("The response does not contain a value.");
35-
_response = response;
36-
_diagnostics = diagnostics;
37-
}
38+
/// <summary>
39+
/// Whether the operation has completed.
40+
/// </summary>
41+
private bool _hasCompleted;
3842

39-
/// <summary> Initializes a new instance of <see cref="SparkBatchOperation" /> for mocking. </summary>
40-
protected SparkBatchOperation() {}
43+
/// <summary>
44+
/// Gets the created Spark batch job.
45+
/// </summary>
46+
private SparkBatchJob _value;
47+
48+
/// <summary>
49+
/// Raw HTTP response.
50+
/// </summary>
51+
private Response _rawResponse;
52+
53+
/// <summary>
54+
/// <c>true</c> if the long-running operation has a value. Otherwise, <c>false</c>.
55+
/// </summary>
56+
private bool _hasValue;
57+
58+
/// <summary>
59+
/// Gets the Id of the created Spark batch job.
60+
/// </summary>
61+
private int _batchId;
62+
63+
/// <summary>
64+
/// Gets a value indicating whether the operation has completed.
65+
/// </summary>
66+
public override bool HasCompleted => _hasCompleted;
67+
68+
/// <inheritdoc/>
69+
public override bool HasValue => _hasValue;
4170

4271
/// <inheritdoc/>
43-
public override string Id => _value.Id.ToString(CultureInfo.InvariantCulture);
72+
public override string Id => _batchId.ToString(CultureInfo.InvariantCulture);
4473

4574
/// <summary>
4675
/// Gets the <see cref="SparkBatchJob"/>.
@@ -49,34 +78,38 @@ protected SparkBatchOperation() {}
4978
/// <remarks>
5079
/// Azure Synapse will return a <see cref="SparkBatchJob"/> immediately but may take time to the session to be ready.
5180
/// </remarks>
52-
public override SparkBatchJob Value
81+
public override SparkBatchJob Value => OperationHelpers.GetValue(ref _value);
82+
83+
/// <summary>
84+
/// Get the completion type of Spark batch job operation.
85+
/// </summary>
86+
public SparkBatchOperationCompletionType CompletionType => _completionType;
87+
88+
/// <summary>
89+
/// Initializes a new instance of the <see cref="SparkBatchOperation"/> class.
90+
/// </summary>
91+
/// <param name="batchId">The ID of the Spark batch job.</param>
92+
/// <param name="client">The client used to check for completion.</param>
93+
/// <param name="completionType">The operation completion type.</param>
94+
public SparkBatchOperation(int batchId, SparkBatchClient client, SparkBatchOperationCompletionType completionType = SparkBatchOperationCompletionType.JobSubmission)
5395
{
54-
get
55-
{
56-
#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
57-
if (!HasCompleted)
58-
{
59-
throw new InvalidOperationException("The operation is not complete.");
60-
}
61-
if (_requestFailedException != null)
62-
{
63-
throw _requestFailedException;
64-
}
65-
#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
66-
return _value;
67-
}
96+
_batchId = batchId;
97+
_client = client;
98+
_completionType = completionType;
6899
}
69100

70-
/// <inheritdoc/>
71-
public override bool HasCompleted => _completed;
72-
73-
/// <inheritdoc/>
74-
public override bool HasValue => !_responseHasError && HasCompleted;
101+
internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, Response<SparkBatchJob> response, SparkBatchOperationCompletionType completionType)
102+
: this(response.Value.Id, client, completionType)
103+
{
104+
_diagnostics = diagnostics;
105+
_rawResponse = response.GetRawResponse();
106+
}
75107

76-
private bool _responseHasError => StringComparer.OrdinalIgnoreCase.Equals ("error", _response?.Value?.State);
108+
/// <summary> Initializes a new instance of <see cref="SparkBatchOperation" /> for mocking. </summary>
109+
protected SparkBatchOperation() {}
77110

78111
/// <inheritdoc/>
79-
public override Response GetRawResponse() => _response.GetRawResponse();
112+
public override Response GetRawResponse() => _rawResponse;
80113

81114
/// <inheritdoc/>
82115
public override Response UpdateStatus(CancellationToken cancellationToken = default) => UpdateStatusAsync(false, cancellationToken).EnsureCompleted();
@@ -94,67 +127,53 @@ public override ValueTask<Response<SparkBatchJob>> WaitForCompletionAsync(TimeSp
94127

95128
private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToken cancellationToken)
96129
{
97-
if (!_completed)
130+
if (!_hasCompleted)
98131
{
99-
using DiagnosticScope scope = _diagnostics.CreateScope($"{nameof(SparkSessionOperation)}.{nameof(UpdateStatus)}");
100-
scope.Start();
132+
using DiagnosticScope? scope = _diagnostics?.CreateScope($"{nameof(SparkSessionOperation)}.{nameof(UpdateStatus)}");
133+
scope?.Start();
101134

102135
try
103136
{
104-
if (async)
137+
// Get the latest status
138+
Response<SparkBatchJob> update = async
139+
? await _client.GetSparkBatchJobAsync(_batchId, true, cancellationToken).ConfigureAwait(false)
140+
: _client.GetSparkBatchJob(_batchId, true, cancellationToken);
141+
142+
// Check if the operation is no longer running
143+
_hasCompleted = IsJobComplete(update.Value.Result ?? SparkBatchJobResultType.Uncertain, update.Value.State.Value, _completionType);
144+
if (_hasCompleted)
105145
{
106-
_response = await _client.RestClient.GetSparkBatchJobAsync(_value.Id, true, cancellationToken).ConfigureAwait(false);
146+
_hasValue = true;
147+
_value = update.Value;
107148
}
108-
else
109-
{
110-
_response = _client.RestClient.GetSparkBatchJob(_value.Id, true, cancellationToken);
111-
}
112-
_completed = IsJobComplete(_response.Value.Result.ToString(), _response.Value.State);
113-
}
114-
catch (RequestFailedException e)
115-
{
116-
_requestFailedException = e;
117-
scope.Failed(e);
118-
throw;
149+
150+
// Update raw response
151+
_rawResponse = update.GetRawResponse();
119152
}
120153
catch (Exception e)
121154
{
122-
_requestFailedException = new RequestFailedException("Unexpected failure", e);
123-
scope.Failed(e);
124-
throw _requestFailedException;
125-
}
126-
if (_responseHasError)
127-
{
128-
_requestFailedException = new RequestFailedException("SparkBatchOperation ended in state: 'error'");
129-
scope.Failed(_requestFailedException);
130-
throw _requestFailedException;
155+
scope?.Failed(e);
156+
throw;
131157
}
132158
}
133159

134160
return GetRawResponse();
135161
}
136162

137-
private static bool IsJobComplete(string jobState, string livyState)
163+
private static bool IsJobComplete(SparkBatchJobResultType jobState, LivyStates livyState, SparkBatchOperationCompletionType creationCompletionType)
138164
{
139-
switch (jobState)
140-
{
141-
case "succeeded":
142-
case "failed":
143-
case "cancelled":
144-
return true;
145-
}
146-
147-
switch (livyState)
165+
if (jobState == SparkBatchJobResultType.Succeeded || jobState == SparkBatchJobResultType.Failed || jobState == SparkBatchJobResultType.Cancelled)
148166
{
149-
case "starting":
150-
case "error":
151-
case "dead":
152-
case "success":
153-
case "killed":
154-
return true;
167+
return true;
155168
}
156169

157-
return false;
170+
return creationCompletionType == SparkBatchOperationCompletionType.JobSubmission
171+
&& (livyState == LivyStates.Starting
172+
|| livyState == LivyStates.Running
173+
|| livyState == LivyStates.Error
174+
|| livyState == LivyStates.Dead
175+
|| livyState == LivyStates.Success
176+
|| livyState == LivyStates.Killed);
158177
}
159178
}
160179
}

0 commit comments

Comments
 (0)