Skip to content

Commit ac4839c

Browse files
author
v_xiangbiaowu
committed
Application initialization is encapsulated into an interface, including configuration and task upload
1 parent 8ce4a9f commit ac4839c

File tree

4 files changed

+118
-5
lines changed
  • docs/zh_CN/0.2.0/使用文档
  • streamis-jobmanager
    • streamis-job-manager
      • streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/util
      • streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service
    • streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api

4 files changed

+118
-5
lines changed

docs/zh_CN/0.2.0/使用文档/Streamis用户手册.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060

6161
xxx.zip
6262
├── meta.json
63+
├── conf.json #可选文件
6364
├── test.sql
6465
├── test.jar
6566
├── file3
@@ -109,6 +110,34 @@
109110
}
110111
```
111112

113+
其中conf.json是job的配置文件,是一个可选的文件,格式为:
114+
```
115+
{
116+
"wds.linkis.flink.resource": { //资源配置
117+
"wds.linkis.flink.app.parallelism": "4", #parallelism并行度,默认4
118+
"wds.linkis.flink.jobmanager.memory": "1024", #JobManager Memory,默认1024
119+
"wds.linkis.flink.taskmanager.cpus": "2", #TaskManager CPUs,默认2
120+
"wds.linkis.flink.taskmanager.memory": "4096", #TaskManager Memory,默认4096
121+
"wds.linkis.flink.taskmanager.numberOfTaskSlots": "2",#TaskManager Slot数量,默认2
122+
"wds.linkis.rm.yarnqueue": "" #使用Yarn队列
123+
},
124+
"wds.linkis.flink.custom": { #Flink参数,K/V可自定义
125+
"aa": "aaa",
126+
"bb": "bbb",
127+
...
128+
},
129+
"wds.linkis.flink.produce": { #生产配置
130+
"wds.linkis.flink.checkpoint.switch": "OFF", #Checkpoint开关
131+
"wds.linkis.flink.savepoint.path": "", #快照文件位置
132+
"wds.linkis.flink.app.fail-restart.switch": "OFF", #作业失败自动拉起开关
133+
"wds.linkis.flink.app.start-auto-restore.switch": "ON" #作业启动状态自恢复
134+
},
135+
"wds.linkis.flink.authority": { #权限设置
136+
"wds.linkis.flink.authority.visible": "" #可见人员
137+
}
138+
}
139+
```
140+
112141
### 4.2.2 示例
113142

114143
​ streamisjobtest为flinksql文件,meta.json是该任务的元数据信息。

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/util/ReaderUtils.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
public class ReaderUtils {
3333
private static final String metaFileName = "meta.txt";
3434
private static final String metaFileJsonName = "meta.json";
35+
private static final String confFileJsonName = "conf.json";
3536
private static final String type = "type";
3637
private static final String fileName = "filename";
3738
private static final String projectName = "projectname";
@@ -86,7 +87,7 @@ public List<String> listFiles(String path) throws FileException {
8687
File[] files = file.listFiles();
8788
if (files != null) {
8889
for (File filePath : files) {
89-
if (!metaFileJsonName.equals(filePath.getName()) && !filePath.getName().endsWith(".zip")) {
90+
if (!metaFileJsonName.equals(filePath.getName()) &&!confFileJsonName.equals(filePath.getName()) && !filePath.getName().endsWith(".zip")) {
9091
if (!checkName(filePath.getName())) {
9192
throw FileExceptionManager.createException(30601,filePath.getName());
9293
}
@@ -151,6 +152,38 @@ private InputStream generateInputStream(String basePath) throws IOException, Fil
151152
return IoUtils.generateInputInputStream(basePath + File.separator + metaFileJsonName);
152153
}
153154

155+
public Map<String, Object> parseConfJson(String dirPath) throws IOException, FileException {
156+
return parseJson(dirPath, confFileJsonName);
157+
}
158+
159+
public Map<String, Object> parseJson(String dirPath,String fimeName) throws IOException, FileException {
160+
getBasePath(dirPath);
161+
try (InputStream inputStream = generateInputStream(basePath,fimeName);
162+
InputStreamReader streamReader = new InputStreamReader(inputStream);
163+
BufferedReader reader = new BufferedReader(streamReader);) {
164+
return readJson(reader,HashMap.class);
165+
}
166+
}
167+
168+
private InputStream generateInputStream(String basePath,String fimeName) throws IOException, FileException {
169+
File metaFile = new File(basePath + File.separator + fimeName);
170+
if (!metaFile.exists()) {
171+
throw new FileException(30603, fimeName);
172+
}
173+
return IoUtils.generateInputInputStream(basePath + File.separator + fimeName);
174+
}
175+
176+
private <T> T readJson(BufferedReader reader,Class<T> valueType) throws IOException {
177+
String line = null;
178+
StringBuilder sb = new StringBuilder();
179+
while ((line = reader.readLine()) != null) {
180+
sb.append(line);
181+
}
182+
ObjectMapper objectMapper = new ObjectMapper();
183+
return objectMapper.readValue(sb.toString(), valueType);
184+
}
185+
186+
154187
private PublishRequestVo read(InputStream inputStream) throws IOException, FileException {
155188
try (InputStreamReader streamReader = new InputStreamReader(inputStream);
156189
BufferedReader reader = new BufferedReader(streamReader);) {

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamJobService.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
package com.webank.wedatasphere.streamis.jobmanager.manager.service
1717

1818
import java.util
19-
import java.util.Date
19+
import java.util.{Date, Map}
20+
2021
import com.github.pagehelper.PageInfo
2122
import com.webank.wedatasphere.streamis.jobmanager.launcher.conf.JobConfKeyConstants
2223
import com.webank.wedatasphere.streamis.jobmanager.launcher.service.StreamJobConfService
@@ -35,8 +36,9 @@ import org.apache.commons.lang.StringUtils
3536
import org.springframework.beans.factory.annotation.Autowired
3637
import org.springframework.stereotype.Service
3738
import org.springframework.transaction.annotation.Transactional
38-
3939
import javax.annotation.Resource
40+
import org.apache.commons.collections.MapUtils
41+
4042
import scala.collection.JavaConverters._
4143

4244

@@ -178,8 +180,15 @@ class DefaultStreamJobService extends StreamJobService with Logging {
178180
}
179181
validateUpload(metaJsonInfo.getProjectName, metaJsonInfo.getJobName, userName)
180182
// 生成StreamJob,根据StreamJob生成StreamJobVersion
181-
val version = createStreamJob(metaJsonInfo, userName)
182-
// 上传所有非meta.json的文件
183+
val version = createStreamJob(metaJsonInfo, userName);
184+
val configContent = readerUtils.parseConfJson(inputPath).asInstanceOf[util.Map[String, Any]]
185+
if(MapUtils.isNotEmpty(configContent)){
186+
info(s"mapContent ${configContent} parsed according to the conf.json file")
187+
streamJobConfService.saveJobConfig(version.getJobId, configContent)
188+
}else{
189+
logger.warn("the conf.json file does not exist in the ZIP format material package")
190+
}
191+
// 上传所有非meta.json、conf.json、.zip的文件
183192
uploadFiles(metaJsonInfo, version, inputZipPath)
184193
version
185194
}

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/UploadRestfulApi.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import com.webank.wedatasphere.streamis.jobmanager.manager.util.IoUtils;
2525
import com.webank.wedatasphere.streamis.jobmanager.manager.util.ZipHelper;
2626
import org.apache.commons.io.FileUtils;
27+
import org.apache.commons.io.FilenameUtils;
2728
import org.apache.commons.io.IOUtils;
2829
import org.apache.commons.lang.exception.ExceptionUtils;
30+
import org.apache.commons.lang3.StringUtils;
2931
import org.apache.linkis.server.Message;
3032
import org.apache.linkis.server.security.SecurityFilter;
3133
import org.slf4j.Logger;
@@ -99,4 +101,44 @@ public Message uploadJar(HttpServletRequest request,
99101
IOUtils.closeQuietly(is);
100102
}
101103
}
104+
105+
@RequestMapping(path = "/initJob", method = RequestMethod.POST)
106+
public Message initJob(HttpServletRequest request,
107+
@RequestParam(name = "projectName", required = false) String projectName,
108+
@RequestParam(name = "jobFilePath", required = false) String jobFilePath) throws IOException, JobException {
109+
String userName = SecurityFilter.getLoginUsername(request);
110+
if (StringUtils.isBlank(projectName)){
111+
return Message.error("Project name cannot be empty(项目名不能为空,请指定)");
112+
}
113+
if (StringUtils.isBlank(jobFilePath)){
114+
return Message.error("jobFilePath can not be blank");
115+
}
116+
if(!ZipHelper.isZip(jobFilePath)){
117+
throw JobExceptionManager.createException(30302);
118+
}
119+
if (!projectPrivilegeService.hasEditPrivilege(request, projectName)) return Message.error("the current user has no operation permission");
120+
121+
InputStream is = null;
122+
OutputStream os = null;
123+
try{
124+
LOG.info("user {} start initJob to project {} according to jobFilePath {}.",userName, projectName,jobFilePath);
125+
String fileName = FilenameUtils.getName(jobFilePath);
126+
String inputPath = IoUtils.generateIOPath(userName, "streamis", fileName);
127+
File file = new File(inputPath);
128+
if(file.getParentFile().exists()){
129+
FileUtils.deleteDirectory(file.getParentFile());
130+
}
131+
is = IoUtils.generateInputInputStream(jobFilePath);
132+
os = IoUtils.generateExportOutputStream(inputPath);
133+
IOUtils.copy(is, os);
134+
StreamJobVersion job = streamJobService.uploadJob(projectName, userName, inputPath);
135+
return Message.ok().data("jobId",job.getJobId());
136+
} catch (Exception e){
137+
LOG.error("Failed to init {} to project {} for user {}.", jobFilePath, projectName, userName, e);
138+
return Message.error(ExceptionUtils.getRootCauseMessage(e));
139+
} finally{
140+
IOUtils.closeQuietly(os);
141+
IOUtils.closeQuietly(is);
142+
}
143+
}
102144
}

0 commit comments

Comments
 (0)