Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 30, 2025
2 parents cd1f998 + ddb3db9 commit 626dbb8
Show file tree
Hide file tree
Showing 43 changed files with 647 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3141,13 +3141,26 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
"DataProcessInstance",
typeWiring ->
typeWiring
.dataFetcher("exists", new EntityExistsResolver(entityService))
.dataFetcher(
"platform",
new LoadableTypeResolver<>(
dataPlatformType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance != null
&& dataProcessInstance.getPlatform() != null
? dataProcessInstance.getPlatform().getUrn()
: null;
}))
.dataFetcher(
"dataPlatformInstance",
new LoadableTypeResolver<>(
dataPlatformInstanceType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getDataPlatformInstance() != null
return dataProcessInstance != null
&& dataProcessInstance.getDataPlatformInstance() != null
? dataProcessInstance.getDataPlatformInstance().getUrn()
: null;
}))
Expand All @@ -3160,6 +3173,11 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
final DataProcessInstance dpi = env.getSource();
return dpi.getContainer() != null ? dpi.getContainer().getUrn() : null;
}))
.dataFetcher(
"parentTemplate",
new EntityTypeResolver(
entityTypes,
(env) -> ((DataProcessInstance) env.getSource()).getParentTemplate()))
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public DataPlatformInstance apply(
result.setType(EntityType.DATA_PLATFORM_INSTANCE);
result.setUrn(input.getInstance().toString());
}
// Warning: This often cannot be read properly: overwritten by LoadableTypeResolver
result.setPlatform(
DataPlatform.builder()
.setUrn(input.getPlatform().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class DataProcessInstanceType
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
SUB_TYPES_ASPECT_NAME,
CONTAINER_ASPECT_NAME);
CONTAINER_ASPECT_NAME,
STATUS_ASPECT_NAME);

private final EntityClient _entityClient;
private final FeatureFlags _featureFlags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.Status;
import com.linkedin.common.SubTypes;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
Expand All @@ -13,12 +14,15 @@
import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper;
import com.linkedin.datahub.graphql.types.common.mappers.CustomPropertiesMapper;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper;
import com.linkedin.datahub.graphql.types.common.mappers.SubTypesMapper;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.common.mappers.util.MappingHelper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.datahub.graphql.types.mlmodel.mappers.MLHyperParamMapper;
import com.linkedin.datahub.graphql.types.mlmodel.mappers.MLMetricMapper;
import com.linkedin.dataprocess.DataProcessInstanceProperties;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.ml.metadata.MLTrainingRunProperties;
Expand Down Expand Up @@ -68,7 +72,7 @@ public DataProcessInstance apply(
mappingHelper.mapToResult(
DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
(dataProcessInstance, dataMap) ->
mapDataProcessProperties(context, dataProcessInstance, dataMap, entityUrn));
mapDataProcessInstanceProperties(context, dataProcessInstance, dataMap, entityUrn));
mappingHelper.mapToResult(
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
(dataProcessInstance, dataMap) ->
Expand All @@ -77,8 +81,10 @@ public DataProcessInstance apply(
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
(dataProcessInstance, dataMap) -> {
DataPlatformInstance dataPlatformInstance = new DataPlatformInstance(dataMap);
dataProcessInstance.setDataPlatformInstance(
DataPlatformInstanceAspectMapper.map(context, dataPlatformInstance));
com.linkedin.datahub.graphql.generated.DataPlatformInstance value =
DataPlatformInstanceAspectMapper.map(context, dataPlatformInstance);
dataProcessInstance.setPlatform(value.getPlatform());
dataProcessInstance.setDataPlatformInstance(value);
});
mappingHelper.mapToResult(
SUB_TYPES_ASPECT_NAME,
Expand All @@ -87,6 +93,14 @@ public DataProcessInstance apply(
mappingHelper.mapToResult(
CONTAINER_ASPECT_NAME,
(dataProcessInstance, dataMap) -> mapContainers(context, dataProcessInstance, dataMap));
mappingHelper.mapToResult(
STATUS_ASPECT_NAME,
(dataProcessInstance, dataMap) ->
dataProcessInstance.setStatus(StatusMapper.map(context, new Status(dataMap))));
mappingHelper.mapToResult(
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
(dataProcessInstance, dataMap) ->
mapDataProcessInstanceRelationships(context, dataProcessInstance, dataMap));

return mappingHelper.getResult();
}
Expand Down Expand Up @@ -124,8 +138,8 @@ private void mapTrainingRunProperties(
dpi.setMlTrainingRunProperties(properties);
}

private void mapDataProcessProperties(
@Nonnull QueryContext context,
private void mapDataProcessInstanceProperties(
@Nullable QueryContext context,
@Nonnull DataProcessInstance dpi,
@Nonnull DataMap dataMap,
@Nonnull Urn entityUrn) {
Expand All @@ -146,9 +160,20 @@ private void mapDataProcessProperties(
CustomPropertiesMapper.map(
dataProcessInstanceProperties.getCustomProperties(), entityUrn));
}
if (dataProcessInstanceProperties.hasCreated()) {
dpi.setCreated(AuditStampMapper.map(context, dataProcessInstanceProperties.getCreated()));
}
dpi.setCreated(AuditStampMapper.map(context, dataProcessInstanceProperties.getCreated()));
properties.setCreated(
AuditStampMapper.map(context, dataProcessInstanceProperties.getCreated()));
dpi.setProperties(properties);
}

private void mapDataProcessInstanceRelationships(
@Nullable QueryContext context, @Nonnull DataProcessInstance dpi, @Nonnull DataMap dataMap) {
DataProcessInstanceRelationships dataProcessInstanceRelationships =
new DataProcessInstanceRelationships(dataMap);

if (dataProcessInstanceRelationships.getParentTemplate() != null) {
dpi.setParentTemplate(
UrnToEntityMapper.map(context, dataProcessInstanceRelationships.getParentTemplate()));
}
}
}
29 changes: 24 additions & 5 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -6733,6 +6733,16 @@ type DataProcessInstance implements EntityWithRelationships & Entity {
"""
type: EntityType!

"""
Whether or not this entity exists on DataHub
"""
exists: Boolean

"""
Status metadata of the data process instance
"""
status: Status

"""
The history of state changes for the run
"""
Expand All @@ -6741,12 +6751,12 @@ type DataProcessInstance implements EntityWithRelationships & Entity {
"""
When the run was kicked off
"""
created: AuditStamp
created: AuditStamp @deprecated(reason: "Use `properties.created`")

"""
The name of the data process
"""
name: String
name: String @deprecated(reason: "Use `properties.name`")

"""
Edges extending from this entity.
Expand Down Expand Up @@ -13154,7 +13164,7 @@ type DataProcessInstanceProperties {
"""
When this process instance was created
"""
created: AuditStamp
created: AuditStamp!

"""
Additional custom properties specific to this process instance
Expand Down Expand Up @@ -13188,9 +13198,8 @@ type MLTrainingRunProperties {
}

extend type DataProcessInstance {

"""
Additional read only properties associated with the Data Job
Additional read only properties associated with the Data Process Instance
"""
properties: DataProcessInstanceProperties

Expand All @@ -13209,6 +13218,11 @@ extend type DataProcessInstance {
"""
container: Container

"""
Standardized platform urn where the data process instance is defined
"""
platform: DataPlatform

"""
Recursively get the lineage of containers for this entity
"""
Expand All @@ -13218,4 +13232,9 @@ extend type DataProcessInstance {
Additional properties when subtype is Training Run
"""
mlTrainingRunProperties: MLTrainingRunProperties

"""
The parent entity whose run instance it is
"""
parentTemplate: Entity
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.FabricType;
import com.linkedin.common.Status;
Expand Down Expand Up @@ -61,7 +62,11 @@ public class DataProcessInstanceTypeTest {
private static final DataProcessInstanceKey TEST_DPI_1_KEY =
new DataProcessInstanceKey().setId("id-1");
private static final DataProcessInstanceProperties TEST_DPI_1_PROPERTIES =
new DataProcessInstanceProperties().setName("Test DPI").setType(DataProcessType.STREAMING);
new DataProcessInstanceProperties()
.setName("Test DPI")
.setType(DataProcessType.STREAMING)
.setCreated(
new AuditStamp().setTime(1234L).setActor(UrnUtils.getUrn("urn:li:corpuser:1")));
private static final DataProcessInstanceInput TEST_DPI_1_DPI_INPUT =
new DataProcessInstanceInput().setInputs(new UrnArray(ImmutableList.of(DATASET_URN)));
private static final DataProcessInstanceOutput TEST_DPI_1_DPI_OUTPUT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.url.Url;
import com.linkedin.common.urn.Urn;
Expand All @@ -12,6 +13,7 @@
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.dataprocess.DataProcessInstanceProperties;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
Expand All @@ -28,6 +30,7 @@ public class DataProcessInstanceMapperTest {
private static final String TEST_INSTANCE_URN =
"urn:li:dataProcessInstance:(test-workflow,test-instance)";
private static final String TEST_CONTAINER_URN = "urn:li:container:testContainer";
private static final String TEST_USER_URN = "urn:li:corpuser:test";
private static final String TEST_EXTERNAL_URL = "https://example.com/process";
private static final String TEST_NAME = "Test Process Instance";

Expand All @@ -52,11 +55,15 @@ public void testMapBasicFields() throws Exception {
}

@Test
public void testMapDataProcessProperties() throws Exception {
public void testMapDataProcessInstanceProperties() throws Exception {
// Create DataProcessInstanceProperties
DataProcessInstanceProperties properties = new DataProcessInstanceProperties();
properties.setName(TEST_NAME);
properties.setExternalUrl(new Url(TEST_EXTERNAL_URL));
AuditStamp created = new AuditStamp();
created.setTime(123456789L);
created.setActor(Urn.createFromString(TEST_USER_URN));
properties.setCreated(created);

// Add properties aspect
addAspect(Constants.DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME, properties);
Expand All @@ -66,6 +73,20 @@ public void testMapDataProcessProperties() throws Exception {
assertNotNull(instance.getProperties());
assertEquals(instance.getName(), TEST_NAME);
assertEquals(instance.getExternalUrl(), TEST_EXTERNAL_URL);
assertEquals(instance.getCreated().getTime(), 123456789L);
assertEquals(instance.getCreated().getActor(), TEST_USER_URN);
}

@Test
public void testMapDataProcessInstanceRelationships() throws Exception {
DataProcessInstanceRelationships relationships = new DataProcessInstanceRelationships();
relationships.setParentTemplate(Urn.createFromString(TEST_INSTANCE_URN));

addAspect(Constants.DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME, relationships);

DataProcessInstance instance = DataProcessInstanceMapper.map(null, entityResponse);
assertNotNull(instance.getParentTemplate());
assertEquals(instance.getParentTemplate().getUrn(), TEST_INSTANCE_URN);
}

@Test
Expand Down
Loading

0 comments on commit 626dbb8

Please sign in to comment.