Skip to content

Commit 7ec4835

Browse files
authored
Merge pull request #54 from WeDataSphere/dev-0.2.3-log-collector
New log collector module
2 parents c5ded9b + 0344ec4 commit 7ec4835

File tree

106 files changed

+5660
-147
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+5660
-147
lines changed

pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@
4545
</modules>
4646

4747
<properties>
48-
<linkis.version>1.1.1</linkis.version>
48+
<linkis.version>1.1.3</linkis.version>
49+
<junit.version>4.12</junit.version>
4950
<dss.version>1.1.0</dss.version>
5051
<streamis.version>0.2.0</streamis.version>
5152
<scala.version>2.11.12</scala.version>
5253
<jdk.compile.version>1.8</jdk.compile.version>
5354
<maven.version>3.3.3</maven.version>
5455
<gson.version>2.8.5</gson.version>
55-
<fasterxml.jackson.version>2.11.3</fasterxml.jackson.version>
56+
<fasterxml.jackson.version>2.13.2</fasterxml.jackson.version>
5657
<math3.version>3.1.1</math3.version>
5758
<httpclient.version>4.5.4</httpclient.version>
5859
<httpmime.version>4.5.4</httpmime.version>

streamis-jobmanager/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<module>streamis-job-manager</module>
3333
<module>streamis-jobmanager-server</module>
3434
<module>streamis-projectmanager-server</module>
35+
<module>streamis-job-log</module>
3536
</modules>
3637

3738

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/java/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/url/LinkisURLStreamHandlerFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.url;
1717

18-
import org.apache.commons.lang.StringUtils;
1918

2019
import java.net.URLStreamHandler;
2120
import java.net.URLStreamHandlerFactory;

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/core/FlinkLogIterator.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ trait FlinkLogIterator extends Iterator[String] with Closeable {
1818
val engineConnLogOperator: EngineConnLogOperator
1919
def init(): Unit
2020
def getLogPath: String
21+
def getLogDirSuffix: String
2122
def getLogs: util.ArrayList[String]
2223
def getEndLine: Long
2324
}
@@ -28,6 +29,7 @@ class SimpleFlinkJobLogIterator(override val requestPayload: LogRequestPayload,
2829
private var logs: util.ArrayList[String] = _
2930
private var index = 0
3031
private var logPath: String = _
32+
private var logDirSuffix: String = _
3133
private var isClosed = true
3234
private var endLine = 0
3335

@@ -69,4 +71,8 @@ class SimpleFlinkJobLogIterator(override val requestPayload: LogRequestPayload,
6971
override def getLogs: util.ArrayList[String] = logs
7072

7173
override def getEndLine: Long = endLine
74+
75+
def setLogDirSuffix(logDirSuffix: String) : Unit = this.logDirSuffix = logDirSuffix
76+
77+
override def getLogDirSuffix: String = logDirSuffix
7278
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/entity/LogRequestPayload.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class LogRequestPayload {
1414
private var onlyKeywords: String = _
1515
private var lastRows = 0
1616
private var logType: String = _
17+
private var logHistory: Boolean = false
1718
def getPageSize: Int = pageSize
1819
def setPageSize(pageSize: Int): Unit = this.pageSize = pageSize
1920

@@ -32,4 +33,8 @@ class LogRequestPayload {
3233
def getLogType: String = logType
3334

3435
def setLogType(logType: String): Unit = this.logType = logType
36+
37+
def isLogHistory: Boolean = logHistory
38+
39+
def setLogHistory(logHistory: Boolean): Unit = this.logHistory = logHistory
3540
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/FlinkJobClient.scala

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.core.{FlinkLo
2323
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LogRequestPayload
2424
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.exception.{FlinkJobLaunchErrorException, FlinkJobStateFetchException, FlinkSavePointException}
2525
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.FlinkJobLaunchManager
26-
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator.{FlinkTriggerSavepointOperator, FlinkYarnLogOperator}
26+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator.{FlinkClientLogOperator, FlinkTriggerSavepointOperator, FlinkYarnLogOperator}
2727
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.state.{Checkpoint, Savepoint}
28+
import org.apache.commons.lang3.StringUtils
2829
import org.apache.linkis.common.utils.{Logging, Utils}
29-
import org.apache.linkis.computation.client.once.OnceJob
30-
import org.apache.linkis.computation.client.once.simple.SimpleOnceJob
30+
import org.apache.linkis.computation.client.once.action.ECResourceInfoAction
31+
import org.apache.linkis.computation.client.once.result.ECResourceInfoResult
32+
import org.apache.linkis.computation.client.once.{LinkisManagerClient, LinkisManagerClientImpl, OnceJob}
33+
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
3134
import org.apache.linkis.computation.client.operator.impl.EngineConnLogOperator
32-
35+
import org.apache.linkis.httpclient.dws.DWSHttpClient
36+
import java.util
3337
import java.net.URI
3438

3539
class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager: JobStateManager)
@@ -39,9 +43,13 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
3943
* Log operator
4044
*/
4145
private var logOperatorMap = Map(
42-
"client" -> EngineConnLogOperator.OPERATOR_NAME,
46+
"client" -> FlinkClientLogOperator.OPERATOR_NAME,
4347
"yarn" -> FlinkYarnLogOperator.OPERATOR_NAME
4448
)
49+
/**
50+
* The linkis client in onceJob
51+
*/
52+
private var linkisClient: DWSHttpClient = _
4553

4654
override def getJobInfo: FlinkJobInfo = {
4755
getJobInfo(false)
@@ -99,13 +107,38 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
99107
case Some(operator) =>
100108
onceJob.getOperator(operator) match {
101109
case engineConnLogOperator: EngineConnLogOperator =>
110+
val logIterator = new SimpleFlinkJobLogIterator(requestPayload, engineConnLogOperator)
111+
engineConnLogOperator match {
112+
case clientLogOperator: FlinkClientLogOperator =>
113+
var logDirSuffix = this.jobInfo.getLogDirSuffix
114+
if (StringUtils.isBlank(logDirSuffix) && requestPayload.isLogHistory){
115+
// If want to fetch the history log, must get the log directory suffix first
116+
getLinkisClient match {
117+
case client: DWSHttpClient =>
118+
Option(Utils.tryCatch{
119+
client.execute(ECResourceInfoAction.newBuilder().setUser(jobInfo.getUser)
120+
.setTicketid(clientLogOperator.getTicketId).build()).asInstanceOf[ECResourceInfoResult]
121+
}{
122+
case e: Exception =>
123+
warn("Fail to query the engine conn resource info from linkis", e)
124+
null
125+
}) match {
126+
case Some(result) => logDirSuffix = Utils.tryAndWarn{result.getData.getOrDefault("ecResourceInfoRecord", new util.HashMap[String, Any]).asInstanceOf[util.Map[String, Any]]
127+
.getOrDefault("logDirSuffix", "").asInstanceOf[String]}
128+
case _ =>
129+
}
130+
}
131+
}
132+
clientLogOperator.setLogDirSuffix(logDirSuffix)
133+
logIterator.setLogDirSuffix(logDirSuffix)
134+
case _ =>
135+
}
102136
engineConnLogOperator match {
103137
case yarnLogOperator: FlinkYarnLogOperator => yarnLogOperator.setApplicationId(jobInfo.getApplicationId)
104138
case _ =>
105139
}
106140
engineConnLogOperator.setECMServiceInstance(jobInfo.getECMInstance)
107141
engineConnLogOperator.setEngineConnType(FlinkJobLaunchManager.FLINK_ENGINE_CONN_TYPE)
108-
val logIterator = new SimpleFlinkJobLogIterator(requestPayload, engineConnLogOperator)
109142
logIterator.init()
110143
jobInfo match {
111144
case jobInfo: FlinkJobInfo => jobInfo.setLogPath(logIterator.getLogPath)
@@ -161,5 +194,27 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
161194
triggerSavepoint(savepointURI.toString, JobLauncherConfiguration.FLINK_TRIGGER_SAVEPOINT_MODE.getValue)
162195
}
163196

197+
/**
198+
* Get linkis client
199+
* @return
200+
*/
201+
def getLinkisClient: DWSHttpClient = {
202+
Utils.tryAndWarn{
203+
if (null == this.linkisClient){
204+
this.synchronized{
205+
if (null == this.linkisClient){
206+
this.linkisClient = SimpleOnceJobBuilder.getLinkisManagerClient match {
207+
case client: LinkisManagerClient =>
208+
val dwsClientField = classOf[LinkisManagerClientImpl].getDeclaredField("dwsHttpClient")
209+
dwsClientField.setAccessible(true)
210+
dwsClientField.get(client).asInstanceOf[DWSHttpClient]
211+
case _ => null
212+
}
164213

214+
}
215+
}
216+
}
217+
this.linkisClient
218+
}
219+
}
165220
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/FlinkJobInfo.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,41 +34,53 @@ class FlinkJobInfo extends YarnJobInfo {
3434
private var applicationUrl: String = _
3535
private var status: String = _
3636
private var logPath: String = _
37+
private var logDirSuffix: String = _
3738
private var resources: java.util.Map[String, Object] = _
3839
private var completedMsg: String = _
3940
private var jobStates: Array[JobStateInfo] = _
41+
4042
override def getApplicationId: String = applicationId
43+
4144
def setApplicationId(applicationId: String): Unit = this.applicationId = applicationId
4245

4346
override def getApplicationUrl: String = applicationUrl
47+
4448
def setApplicationUrl(applicationUrl: String): Unit = this.applicationUrl = applicationUrl
4549

4650
override def getId: String = id
47-
def setId(id: String): Unit = this.id = id
4851

52+
def setId(id: String): Unit = this.id = id
4953

5054
override def getECMInstance: ServiceInstance = ecmInstance
55+
5156
def setECMInstance(ecmInstance: ServiceInstance): Unit = this.ecmInstance = ecmInstance
5257

5358
override def getUser: String = user
59+
5460
def setUser(user: String): Unit = this.user = user
5561

5662
override def getStatus: String = status
63+
5764
override def setStatus(status: String): Unit = this.status = status
5865

5966
override def getLogPath: String = logPath
67+
6068
def setLogPath(logPath: String): Unit = this.logPath = logPath
6169

6270
override def getResources: util.Map[String, Object] = resources
71+
6372
def setResources(resources: java.util.Map[String, Object]): Unit = this.resources = resources
6473

6574
def getSavepoint: String = savepoint
75+
6676
def setSavepoint(savepoint: String): Unit = this.savepoint = savepoint
6777

6878
def getCheckpoint: String = checkpoint
79+
6980
def setCheckpoint(checkpoint: String): Unit = this.checkpoint = checkpoint
7081

7182
override def getCompletedMsg: String = completedMsg
83+
7284
def setCompletedMsg(completedMsg: String): Unit = this.completedMsg = completedMsg
7385

7486
override def toString: String = s"FlinkJobInfo(id: $id, status: $status, applicationId: $applicationId, applicationUrl: $applicationUrl, logPath: $logPath)"
@@ -85,6 +97,7 @@ class FlinkJobInfo extends YarnJobInfo {
8597
def setJobStates(jobStates: Array[JobStateInfo]): Unit = {
8698
this.jobStates = jobStates
8799
}
100+
88101
/**
89102
* Job name
90103
*
@@ -95,11 +108,16 @@ class FlinkJobInfo extends YarnJobInfo {
95108
def setName(name: String): Unit = {
96109
this.name = name
97110
}
98-
}
99111

100-
object FlinkJobInfo{
101-
def main(args: Array[String]): Unit = {
102-
val jobInfo = "{\"jobStates:\":{\"location\":\"xx\"}"
103-
DWSHttpClient.jacksonJson.readValue(jobInfo, classOf[FlinkJobInfo])
112+
/**
113+
* Job log directory suffix
114+
*
115+
* @return
116+
*/
117+
override def getLogDirSuffix: String = this.logDirSuffix
118+
119+
override def setLogDirSuffix(logDirSuffix: String): Unit = {
120+
this.logDirSuffix = logDirSuffix
104121
}
105122
}
123+

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/LinkisJobInfo.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,11 @@ trait LinkisJobInfo extends JobInfo {
1111
*/
1212
def getECMInstance: ServiceInstance
1313

14+
/**
15+
* Job log directory suffix
16+
* @return
17+
*/
18+
def getLogDirSuffix: String
1419

20+
def setLogDirSuffix(logDirSuffix: String): Unit
1521
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager
1717

18-
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.{JobState, JobStateInfo}
18+
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState
1919
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.{JobClient, LaunchJob}
2020
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration
21-
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.{FlinkJobClient, FlinkJobInfo, LinkisJobInfo}
2221
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.SimpleFlinkJobLaunchManager.INSTANCE_NAME
23-
import org.apache.commons.lang.StringEscapeUtils
22+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.{FlinkJobClient, FlinkJobInfo, LinkisJobInfo}
23+
import org.apache.commons.lang3.StringEscapeUtils
2424
import org.apache.linkis.common.utils.{RetryHandler, Utils}
2525
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SubmittableSimpleOnceJob}
2626
import org.apache.linkis.computation.client.once.{OnceJob, SubmittableOnceJob}
27-
import org.apache.linkis.computation.client.operator.impl.{EngineConnApplicationInfoOperator, EngineConnLogOperator}
27+
import org.apache.linkis.computation.client.operator.impl.EngineConnApplicationInfoOperator
2828
import org.apache.linkis.httpclient.dws.DWSHttpClient
2929
import org.apache.linkis.ujes.client.exception.UJESJobException
3030

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator
2+
3+
import org.apache.commons.lang3.StringUtils
4+
import org.apache.linkis.computation.client.once.action.EngineConnOperateAction
5+
import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnLogs}
6+
7+
/**
8+
* Append "logDirSuffix" parameter
9+
*/
10+
class FlinkClientLogOperator extends EngineConnLogOperator{
11+
12+
private var logDirSuffix: String = _
13+
14+
def setLogDirSuffix(logDirSuffix: String): Unit = {
15+
this.logDirSuffix = logDirSuffix
16+
}
17+
18+
protected override def addParameters(builder: EngineConnOperateAction.Builder): Unit = {
19+
builder.operatorName(EngineConnLogOperator.OPERATOR_NAME)
20+
if (StringUtils.isNotBlank(this.logDirSuffix)) {
21+
builder.addParameter("logDirSuffix", logDirSuffix)
22+
}
23+
super.addParameters(builder)
24+
}
25+
26+
27+
override def getTicketId: String = super.getTicketId
28+
29+
override def getName: String = FlinkClientLogOperator.OPERATOR_NAME
30+
}
31+
32+
object FlinkClientLogOperator {
33+
val OPERATOR_NAME = "engineConnLog_flink"
34+
}

0 commit comments

Comments
 (0)