Skip to content

Commit

Permalink
prevent 'this' from leaking
Browse files Browse the repository at this point in the history
  • Loading branch information
shangm2 committed Jan 22, 2025
1 parent 84955a8 commit c9a62af
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public final class HttpRemoteTask

private final EventLoop taskEventLoop;

public HttpRemoteTask(
public static HttpRemoteTask createHttpRemoteTask(
Session session,
TaskId taskId,
String nodeId,
Expand Down Expand Up @@ -268,6 +268,81 @@ public HttpRemoteTask(
ConnectorTypeSerdeManager connectorTypeSerdeManager,
SchedulerStatsTracker schedulerStatsTracker,
EventLoop taskEventLoop)
{
HttpRemoteTask task = new HttpRemoteTask(session,
taskId,
nodeId,
location,
remoteLocation,
planFragment,
initialSplits,
outputBuffers,
httpClient,
maxErrorDuration,
taskStatusRefreshMaxWait,
taskInfoRefreshMaxWait,
taskInfoUpdateInterval,
summarizeTaskInfo,
taskStatusCodec,
taskInfoCodec,
taskInfoJsonCodec,
taskUpdateRequestCodec,
planFragmentCodec,
metadataUpdatesCodec,
nodeStatsTracker,
stats,
binaryTransportEnabled,
thriftTransportEnabled,
taskInfoThriftTransportEnabled,
thriftProtocol,
tableWriteInfo,
maxTaskUpdateSizeInBytes,
metadataManager,
queryManager,
taskUpdateRequestSize,
handleResolver,
connectorTypeSerdeManager,
schedulerStatsTracker,
taskEventLoop);
task.initialize();
return task;
}

private HttpRemoteTask(Session session,
TaskId taskId,
String nodeId,
URI location,
URI remoteLocation,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
HttpClient httpClient,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoRefreshMaxWait,
Duration taskInfoUpdateInterval,
boolean summarizeTaskInfo,
Codec<TaskStatus> taskStatusCodec,
Codec<TaskInfo> taskInfoCodec,
Codec<TaskInfo> taskInfoJsonCodec,
Codec<TaskUpdateRequest> taskUpdateRequestCodec,
Codec<PlanFragment> planFragmentCodec,
Codec<MetadataUpdates> metadataUpdatesCodec,
NodeStatsTracker nodeStatsTracker,
RemoteTaskStats stats,
boolean binaryTransportEnabled,
boolean thriftTransportEnabled,
boolean taskInfoThriftTransportEnabled,
Protocol thriftProtocol,
TableWriteInfo tableWriteInfo,
int maxTaskUpdateSizeInBytes,
MetadataManager metadataManager,
QueryManager queryManager,
DecayCounter taskUpdateRequestSize,
HandleResolver handleResolver,
ConnectorTypeSerdeManager connectorTypeSerdeManager,
SchedulerStatsTracker schedulerStatsTracker,
EventLoop taskEventLoop)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
Expand Down Expand Up @@ -390,6 +465,13 @@ public HttpRemoteTask(
connectorTypeSerdeManager,
thriftProtocol);

updateTaskStats();
taskEventLoop.execute(this::updateSplitQueueSpace);
}

// this is a separate method to ensure that the `this` reference is not leaked during construction
private void initialize()
{
taskStatusFetcher.addStateChangeListener(newStatus -> {
verify(taskEventLoop.inEventLoop());

Expand All @@ -402,9 +484,6 @@ public HttpRemoteTask(
updateSplitQueueSpace();
}
});

updateTaskStats();
taskEventLoop.execute(this::updateSplitQueueSpace);
}

public PlanFragment getPlanFragment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public RemoteTask createRemoteTask(
TableWriteInfo tableWriteInfo,
SchedulerStatsTracker schedulerStatsTracker)
{
return new HttpRemoteTask(
return HttpRemoteTask.createHttpRemoteTask(
session,
taskId,
node.getNodeIdentifier(),
Expand Down

0 comments on commit c9a62af

Please sign in to comment.