Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feature/#2170-Use-…
Browse files Browse the repository at this point in the history
…ErrorMessage-type-from-spark-commons
  • Loading branch information
TebaleloS committed Mar 2, 2023
2 parents cfd669b + 03c01d5 commit dba19a7
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 86 deletions.
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,8 @@ The coverage reports are written in each module's `target` directory.
#### REST API requirements:
- [**Tomcat 8.5/9.0** installation](https://tomcat.apache.org/download-90.cgi)
- [**MongoDB 4.0** installation](https://docs.mongodb.com/manual/administration/install-community/)
- [**Spline UI deployment**](https://absaoss.github.io/spline/) - place the [spline.war](https://search.maven.org/remotecontent?filepath=za/co/absa/spline/spline-web/0.3.9/spline-web-0.3.9.war)
in your Tomcat webapps directory (rename after downloading to _spline.war_); NB! don't forget to set up the `spline.mongodb.url` configuration for the _war_
- `HADOOP_CONF_DIR` environment variable, pointing to the location of your hadoop configuration (pointing to a hadoop installation)

The _Spline UI_ can be omitted; in such case the **REST API** `spline.urlTemplate` setting should be set to empty string.

#### Deploying REST API
Simply copy the `rest-api.war` file produced when building the project into Tomcat's webapps directory.
Another possible method is building the Docker image based on the existing Dockerfile and deploying it as a container.
Expand Down Expand Up @@ -142,7 +138,7 @@ password=changeme
--deploy-mode <client/cluster> \
--driver-cores <num> \
--driver-memory <num>G \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_for_standardized_output>-{0}-{1}-{2}-{3} -Dspline.mongodb.url=<mongo_url_for_spline> -Dspline.mongodb.name=<spline_database_name> -Dhdp.version=<hadoop_version>" \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_for_standardized_output>-{0}-{1}-{2}-{3} -Dhdp.version=<hadoop_version>" \
--class za.co.absa.enceladus.standardization.StandardizationJob \
<spark-jobs_<build_version>.jar> \
--rest-api-auth-keytab <path_to_keytab_file> \
Expand All @@ -166,7 +162,7 @@ password=changeme
--driver-cores <num> \
--driver-memory <num>G \
--conf 'spark.ui.port=29000' \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_of_standardized_input>-{0}-{1}-{2}-{3} -Dconformance.mappingtable.pattern=reportDate={0}-{1}-{2} -Dspline.mongodb.url=<mongo_url_for_spline> -Dspline.mongodb.name=<spline_database_name>" -Dhdp.version=<hadoop_version> \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_of_standardized_input>-{0}-{1}-{2}-{3} -Dconformance.mappingtable.pattern=reportDate={0}-{1}-{2} -Dhdp.version=<hadoop_version>" \
--packages za.co.absa:enceladus-parent:<version>,za.co.absa:enceladus-conformance:<version> \
--class za.co.absa.enceladus.conformance.DynamicConformanceJob \
<spark-jobs_<build_version>.jar> \
Expand All @@ -186,7 +182,7 @@ password=changeme
--deploy-mode <client/cluster> \
--driver-cores <num> \
--driver-memory <num>G \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_for_standardized_output>-{0}-{1}-{2}-{3} -Dspline.mongodb.url=<mongo_url_for_spline> -Dspline.mongodb.name=<spline_database_name> -Dhdp.version=<hadoop_version>" \
--conf "spark.driver.extraJavaOptions=-Denceladus.rest.uri=<rest_api_uri:port> -Dstandardized.hdfs.path=<path_for_standardized_output>-{0}-{1}-{2}-{3} -Dhdp.version=<hadoop_version>" \
--class za.co.absa.enceladus.standardization_conformance.StandardizationAndConformanceJob \
<spark-jobs_<build_version>.jar> \
--rest-api-auth-keytab <path_to_keytab_file> \
Expand Down
12 changes: 10 additions & 2 deletions menas/ui/components/run/runDetail.view.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
xmlns="sap.m"
xmlns:form="sap.ui.layout.form"
xmlns:table="sap.ui.table"
xmlns:l="sap.ui.layout"
xmlns:cust="http://schemas.sap.com/sapui5/extension/sap.ui.core.CustomData/1">
<Page id="detailPage" enableScrolling="true">
<customHeader>
Expand Down Expand Up @@ -137,8 +138,15 @@
</Table>
</IconTabFilter>
<IconTabFilter id="Lineage" icon="sap-icon://org-chart" key="lineage" text="Lineage" visible="{= ${/lineageExecutionIdApiTemplate} !== ''}">
<Label class="lineageErrorLabel" id="LineageErrorLabel" text="" visible="false"/>
<core:HTML content="&lt;iframe id=&quot;lineage_iframe&quot; height=&quot;100%&quot; width=&quot;100%&quot; style=&quot;border: Ridge;&quot; src=&quot;{run>/lineageUrl}&quot;&gt;&lt;/iframe&gt;" />
<l:VerticalLayout class="sapUiContentPadding" width="80%">
<Text text="Spline architecture changed as is not intended as an embedded system anymore.&#13;&#10;"/>
<Text text="Spline is a standalone application available at the link bellow&#13;&#10;"/>
<Link
text="Lineage URL&#13;&#10;"
target="_blank"
href="{run>/lineageUrl}" />
<Text text="Use appId {run>/splineRef/sparkApplicationId} and &#13;&#10;destination {run>/splineRef/outputPath} to search for your run.&#13;&#10;"/>
</l:VerticalLayout>
</IconTabFilter>
</items>
</IconTabBar>
Expand Down
1 change: 0 additions & 1 deletion menas/ui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

$.getJSON('package.json', function(json) {
window.apiUrl = json.config.apiUrl + "/api";
window.lineageConsumerApiUrl = json.config.lineageConsumerApiUrl;
window.lineageUiCdn = json.config.lineageUiCdn;

_menasLoadElem("link", "css/style.css", [
Expand Down
1 change: 0 additions & 1 deletion menas/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
},
"config": {
"apiUrl": "${API_URL}",
"lineageConsumerApiUrl": "${LINEAGE_CONSUMER_URL}",
"lineageUiCdn": "${LINEAGE_UI_CDN}"
},
"dependencies": {
Expand Down
20 changes: 0 additions & 20 deletions menas/ui/service/RunRestDAO.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,4 @@ class RunRestDAO {
return RestClient.get(`/runs/${encodeURI(datasetName)}/latestrun`)
}

getLineageId(urlTemplate, outputPath, applicationId) {
const url = urlTemplate
.replace("%s", applicationId)
.replace("%s", outputPath);

RestClient.getSync(url,false,true).then((response) => {
this._totalCount = response.totalCount;
if (this._totalCount > 0) {
this._executionEventId = response.items[0].executionEventId;
} else {
this._executionEventId = undefined
}
});

return {
totalCount: this._totalCount,
executionEventId: this._executionEventId
}
}

}
33 changes: 1 addition & 32 deletions menas/ui/service/RunService.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,46 +118,15 @@ var RunService = new function () {
this._preprocessRun = function (oRun, aCheckpoints) {
let info = oRun.controlMeasure.metadata.additionalInfo;
oRun.controlMeasure.metadata.additionalInfo = this._mapAdditionalInfo(info);

oRun.status = Formatters.statusToPrettyString(oRun.runStatus.status);
let lineageInfo = this._buildLineageUrl(oRun.splineRef.outputPath, oRun.splineRef.sparkApplicationId);
oRun.lineageUrl = lineageInfo.lineageUrl;
oRun.lineageError = lineageInfo.lineageError;
oRun.lineageUrl = window.lineageUiCdn;

const sStdName = this._nameExists(aCheckpoints, "Standardization Finish") ? "Standardization Finish" : "Standardization - End";

oRun.stdTime = this._getTimeSummary(aCheckpoints, sStdName, sStdName);
oRun.cfmTime = this._getTimeSummary(aCheckpoints, "Conformance - Start", "Conformance - End");
};

this._buildLineageUrl = function(outputPath, applicationId) {
const urlTemplate = "%s?_splineConsumerApiUrl=%s&_isEmbeddedMode=true&_targetUrl=/events/overview/%s/graph";
if (window.lineageConsumerApiUrl) {
let lineageExecutionIdApiTemplate = window.lineageConsumerApiUrl + "/execution-events?applicationId=%s&dataSourceUri=%s";
const lineageIdInfo = new RunRestDAO().getLineageId(lineageExecutionIdApiTemplate, outputPath, applicationId);

if (lineageIdInfo.totalCount === 1) {
return {
lineageUrl: urlTemplate
.replace("%s", window.lineageUiCdn)
.replace("%s", window.lineageConsumerApiUrl)
.replace("%s", lineageIdInfo.executionEventId),
lineageError: ""
};
} else {
return {
lineageUrl: "",
lineageError: !!lineageIdInfo.totalCount ? "Multiple lineage records found" : "No lineage found"
};
}
} else {
return {
lineageUrl: "",
lineageError: "Lineage service not configured"
};
}
};

this._mapAdditionalInfo = function (info) {
return Object.keys(info).map(key => {
return {"infoKey": key, "infoValue": info[key]}
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@
<spark.version>3.2.2</spark.version>
<spark.xml.version>0.5.0</spark.xml.version>
<specs.version>3.8.6</specs.version>
<spline.agent.version>0.7.8</spline.agent.version>
<spray.json.version>1.3.5</spray.json.version>
<spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
<spring.ldap.version>5.0.14.RELEASE</spring.ldap.version>
Expand Down
10 changes: 10 additions & 0 deletions scripts/bash/enceladus_env.template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ ENCELADUS_FILES="/absolute/path/application.conf#application.conf"
# ADDITIONAL_SPARK_CONF="--conf spark.yarn.principal=<principal_name> --conf spark.yarn.keytab=<path_to_keytab>"
ADDITIONAL_SPARK_CONF=""

# Spline Config for a codeless itegration with Spline
# Intentionally adding the `--conf` here so that the run_enceladus.sh does not break if there is no Spline conf.
SPLINE_QUERY_LISTENER="spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"
SPLINE_PRODUCER_URL="spark.spline.lineageDispatcher.http.producer.url=http://localhost:9090/producer"
SPLINE_CONF="--conf '$SPLINE_QUERY_LISTENER' --conf '$SPLINE_PRODUCER_URL'"

# Additional JVM options
# Example: ADDITIONAL_JVM_CONF="-Dtimezone=UTC -Dfoo=bar"
# for deployment mode: client
Expand All @@ -115,3 +121,7 @@ ADDITIONAL_JVM_EXECUTOR_CONF_CLUSTER="$KRB5_CONF_CLUSTER $TRUST_STORE_CLUSTER $T
# Switch that tells the script if it should exit if it encounters unrecognized.
# On true it prints an Error and exits with 127, on false it only prints a warning
EXIT_ON_UNRECOGNIZED_OPTIONS="true"

# Additional JAR files
# Intentionally adding the `--jars` here so that the run_enceladus.sh does not break if there are no extra jars.
ADDITIONAL_JARS="--jars /path/to/spline-agent.jar"
3 changes: 2 additions & 1 deletion scripts/bash/run_enceladus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,10 @@ else
add_spark_conf_cmd "spark.yarn.submit.waitAppCompletion" "false"
fi

CMD_LINE="${CMD_LINE} ${ADDITIONAL_SPARK_CONF} ${SPARK_CONF}"
CMD_LINE="${CMD_LINE} ${ADDITIONAL_SPARK_CONF} ${SPARK_CONF} ${SPLINE_CONF}"
CMD_LINE="${CMD_LINE} --conf \"${JVM_CONF} ${ADDITIONAL_JVM_CONF}\""
CMD_LINE="${CMD_LINE} --conf \"spark.executor.extraJavaOptions=${ADDITIONAL_JVM_EXECUTOR_CONF}\""
CMD_LINE="${CMD_LINE} ${ADDITIONAL_JARS}"
CMD_LINE="${CMD_LINE} --class ${CLASS} ${JAR}"

# Adding command line parameters that go AFTER the jar file
Expand Down
6 changes: 0 additions & 6 deletions spark-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@
<version>${spark.fast.tests.version}</version>
<scope>test</scope>
</dependency>
<!-- Lineage tracking -->
<dependency>
<groupId>za.co.absa.spline.agent.spark</groupId>
<artifactId>agent-core_${scala.compat.version}</artifactId>
<version>${spline.agent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
10 changes: 0 additions & 10 deletions spark-jobs/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,6 @@ control.info.dataset.properties.prefix=""
# system-wide time zone
timezone="UTC"

# Spline mode - the way how Spline is integrated. For details see Spline documentation
# possible values (default is BEST_EFFORT):
# DISABLED - no Spline integration (no lineage will be recorded)
# REQUIRED - Spline service has to be running on the spline.producer.url address; if not, job exits without execution
# BEST_EFFORT - job tries to connect to the provided Spline service (spline.producer.url address); but if that fails, job will still execute
spline.mode=BEST_EFFORT
#
#
spline.producer.url="http://localhost:8085/producer"

#possible values: plan, dataFrame, sample

partition.strategy="plan"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ trait CommonJobExecution extends ProjectMetadata {

validatePaths(pathCfg)

// Enable Spline
import za.co.absa.spline.harvester.SparkLineageInitializer._
spark.enableLineageTracking()

// Enable non-default persistence storage level if provided in the command line
cmd.persistStorageLevel.foreach(Atum.setCachingStorageLevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ case class MineConfd(id: Int, myOutputCol: Double, errCol: Seq[ErrorMessage])
class CustomRuleSuite extends AnyFunSuite with TZNormalizedSparkTestBase with HadoopFsTestBase {
import spark.implicits._

// we may WANT to enable control framework & spline here
// we may WANT to enable control framework here

implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: EnceladusDAO = mock(classOf[EnceladusDAO]) // you may have to hard-code your own implementation here
Expand Down

0 comments on commit dba19a7

Please sign in to comment.