Skip to content

Commit ca19836

Browse files
resolve the LinkisJobInfo is null for adding task.
1 parent 20d3d5c commit ca19836

File tree

1 file changed

+12
-11
lines changed
  • streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api

1 file changed

+12
-11
lines changed

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,14 +263,20 @@ public Message addTask(HttpServletRequest req,
263263
@RequestParam(value = "appId") String appId,
264264
@RequestParam(value = "appUrl") String appUrl) {
265265
String username = SecurityFilter.getLoginUsername(req);
266-
LOG.info("User {} try to add a new task for Streamis job {} with appId: {}, appUrl: {}.", username, jobName, appId, appUrl);
266+
LOG.info("User {} try to add a new task for Streamis job {}.{} with appId: {}, appUrl: {}.", username, projectName, jobName, appId, appUrl);
267267
if(StringUtils.isBlank(appId)) {
268268
return Message.error("appId cannot be empty!");
269269
}
270270
return withStreamJob(req, projectName, jobName, username, streamJob -> {
271271
// 如果存在正在运行的,先将其停止掉
272272
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJob.getId());
273-
if(streamTask == null) {
273+
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
274+
LOG.warn("Streamis Job {} exists running task, update its status from Running to stopped at first.", jobName);
275+
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
276+
streamTask.setErrDesc("stopped by App's new task.");
277+
streamTaskService.updateTask(streamTask);
278+
}
279+
if(streamTask == null || StringUtils.isBlank(streamTask.getLinkisJobInfo())) {
274280
// 这里取个巧,从该工程该用户有权限的Job中找到一个Flink的历史作业,作为这个Spark Streaming作业的jobId和jobInfo
275281
// 替换掉JobInfo中的 yarn 信息,这样我们前端就可以在不修改任何逻辑的情况下正常展示Spark Streaming作业了
276282
PageInfo<QueryJobListVo> jobList = streamJobService.getByProList(streamJob.getProjectName(), username, null, null, null);
@@ -280,9 +286,10 @@ public Message addTask(HttpServletRequest req,
280286
return Message.error("no Flink Job has been submitted, the register to Streamis cannot be succeeded.");
281287
}
282288
int index = 0;
289+
streamTask = null;
283290
while(streamTask == null && index < copyJobs.size()) {
284291
StreamTask copyTask = streamTaskService.getLatestTaskByJobId(copyJobs.get(index).getId());
285-
if(copyTask == null) {
292+
if(copyTask == null || StringUtils.isBlank(copyTask.getLinkisJobInfo())) {
286293
index ++;
287294
} else {
288295
LOG.warn("Streamis Job {} will bind the linkisJobInfo from history Flink Job {} with linkisJobId: {}, linkisJobInfo: {}.",
@@ -296,12 +303,6 @@ public Message addTask(HttpServletRequest req,
296303
return Message.error("no Flink task has been executed, the register to Streamis cannot be succeeded.");
297304
}
298305
} else {
299-
if(JobConf.isRunning(streamTask.getStatus())) {
300-
LOG.warn("Streamis Job {} exists running task, update its status from Running to stopped at first.", jobName);
301-
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
302-
streamTask.setErrDesc("stopped by App's new task.");
303-
streamTaskService.updateTask(streamTask);
304-
}
305306
StreamTask newStreamTask = streamTaskService.createTask(streamJob.getId(), (Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue(), username);
306307
streamTask.setId(newStreamTask.getId());
307308
streamTask.setVersion(newStreamTask.getVersion());
@@ -343,7 +344,7 @@ public Message updateTask(HttpServletRequest req,
343344
@RequestParam(value = "appId") String appId,
344345
@RequestParam(value = "metrics") String metrics) {
345346
String username = SecurityFilter.getLoginUsername(req);
346-
LOG.info("User {} try to update task for Streamis job {} with appId: {}, metrics: {}.", username, jobName, appId, metrics);
347+
LOG.info("User {} try to update task for Streamis job {}.{} with appId: {}, metrics: {}.", username, projectName, jobName, appId, metrics);
347348
return withStreamJob(req, projectName, jobName, username, streamJob -> {
348349
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJob.getId());
349350
if (streamTask == null) {
@@ -388,7 +389,7 @@ public Message stopTask(HttpServletRequest req,
388389
@RequestParam(value = "appId") String appId,
389390
@RequestParam(value = "appUrl") String appUrl) {
390391
String username = SecurityFilter.getLoginUsername(req);
391-
LOG.info("User {} try to stop task for Streamis job {} with appId: {}, appUrl: {}.", username, jobName, appId, appUrl);
392+
LOG.info("User {} try to stop task for Streamis job {}.{} with appId: {}, appUrl: {}.", username, projectName, jobName, appId, appUrl);
392393
return withStreamJob(req, projectName, jobName, username,
393394
streamJob -> tryStopTask(streamJob, appId));
394395
}

0 commit comments

Comments
 (0)