Skip to content

Commit

Permalink
[DSIP-73] Add dolphinscheduler-task-executor module to unify the task…
Browse files Browse the repository at this point in the history
… execution logic
  • Loading branch information
ruanwenjun committed Nov 12, 2024
1 parent 5e2abd7 commit 1f87d7f
Show file tree
Hide file tree
Showing 261 changed files with 5,435 additions and 7,270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -306,11 +306,11 @@ public Result stopTask(User loginUser, long projectCode, Integer taskInstanceId)
}

// todo: we only support streaming task for now
final TaskInstanceKillResponse taskInstanceKillResponse = Clients
.withService(ITaskInstanceOperator.class)
final TaskExecutorKillResponse taskExecutorKillResponse = Clients
.withService(IPhysicalTaskExecutorOperator.class)
.withHost(taskInstance.getHost())
.killTask(new TaskInstanceKillRequest(taskInstanceId));
log.info("TaskInstance kill response: {}", taskInstanceKillResponse);
.killTask(TaskExecutorKillRequest.of(taskInstanceId));
log.info("TaskInstance kill response: {}", taskExecutorKillResponse);

putMsg(result, Status.SUCCESS);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand Down Expand Up @@ -131,11 +129,6 @@ public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLog
return new TaskInstanceLogPageQueryResponse();
}

@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
return new GetAppIdResponse();
}

@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Slf4j
public class ThreadUtils {

/**
* Create a daemon fixed thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadNameFormat, int threadsNum) {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}
Expand All @@ -43,9 +49,10 @@ public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(S
* Create a daemon scheduler thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
* @param threadsNum the number of threads in the pool
*/
public static ScheduledExecutorService newDaemonScheduledExecutorService(String threadNameFormat, int threadsNum) {
public static ScheduledExecutorService newDaemonScheduledExecutorService(final String threadNameFormat,
final int threadsNum) {
return Executors.newScheduledThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.experimental.SuperBuilder;

/**
* The abstract class of delay event, the event will be triggered after the delay time.
* <p> You can extend this class to implement your own delay event.
*/
@SuperBuilder
public abstract class AbstractDelayEvent implements IEvent, Delayed {

protected long delayTime;
Expand All @@ -34,7 +37,7 @@ public AbstractDelayEvent() {
this(0);
}

public AbstractDelayEvent(long delayTime) {
public AbstractDelayEvent(final long delayTime) {
if (delayTime == 0) {
this.triggerTimeInMillis = System.currentTimeMillis();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public Optional<T> poll() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;

/**
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> peek();

/**
* Remove the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> remove();

/**
* Whether the bus is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,28 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.TimeZone;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

/**
* json serialize or deserialize
*/
@Slf4j
public class JsonSerializer {

Expand All @@ -60,13 +57,6 @@ private JsonSerializer() {

}

/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
if (obj == null) {
return null;
Expand All @@ -79,44 +69,14 @@ public static <T> byte[] serialize(T obj) {
}
}

/**
* serialize to string
*
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("serializeToString exception!", e);
}

return json;
}

/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
@SneakyThrows
public static <T> T deserialize(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}

String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
log.error("deserialize exception!", e);
return null;
}
return objectMapper.readValue(json, clazz);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
<version>dev-SNAPSHOT</version>
</parent>

<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand All @@ -35,9 +33,6 @@ public interface ILogService {
@RpcMethod
TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);

@RpcMethod
GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);

@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);

Expand Down
12 changes: 9 additions & 3 deletions dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,27 @@
<artifactId>dolphinscheduler-extract-master</artifactId>

<dependencies>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-executor</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.master;

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.task.executor.ITaskExecutorLifecycleEventReporter;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorPauseResponse;

@RpcService
public interface ILogicTaskExecutorOperator {

@RpcMethod
TaskExecutorDispatchResponse dispatchTask(final TaskExecutorDispatchRequest taskExecutorDispatchRequest);

@RpcMethod
TaskExecutorKillResponse killTask(final TaskExecutorKillRequest taskExecutorKillRequest);

@RpcMethod
TaskExecutorPauseResponse pauseTask(final TaskExecutorPauseRequest taskExecutorPauseRequest);

@RpcMethod
void ackTaskExecutorLifecycleEvent(final ITaskExecutorLifecycleEventReporter.TaskExecutorLifecycleEventAck taskExecutorLifecycleEventAck);

}
Loading

0 comments on commit 1f87d7f

Please sign in to comment.