Skip to content

Commit bfca002

Browse files
authored
feat(gui): show region layer on workflow editor (#3753)
1 parent 7b8b484 commit bfca002

File tree

15 files changed

+560
-31
lines changed

15 files changed

+560
-31
lines changed

amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.amber.engine.architecture.controller
2121

2222
import akka.actor.SupervisorStrategy.Stop
2323
import akka.actor.{AllForOneStrategy, Props, SupervisorStrategy}
24+
import org.apache.texera.web.model.websocket.response.RegionUpdateEvent
2425
import org.apache.amber.config.ApplicationConfig
2526
import org.apache.amber.core.virtualidentity.ChannelIdentity
2627
import org.apache.amber.core.workflow.{PhysicalPlan, WorkflowContext}
@@ -42,6 +43,7 @@ import org.apache.amber.engine.common.ambermessage.{
4243
}
4344
import org.apache.amber.engine.common.virtualidentity.util.{CLIENT, CONTROLLER, SELF}
4445
import org.apache.amber.engine.common.{CheckpointState, SerializedState}
46+
import org.apache.texera.web.SessionState
4547

4648
import scala.concurrent.duration.DurationInt
4749

@@ -111,6 +113,16 @@ class Controller(
111113
override def initState(): Unit = {
112114
attachRuntimeServicesToCPState()
113115
cp.workflowScheduler.updateSchedule(physicalPlan)
116+
117+
val regions: List[List[String]] =
118+
cp.workflowScheduler.getSchedule.getRegions.map { region =>
119+
region.physicalOps.map(_.id.logicalOpId.id).toList
120+
}
121+
122+
SessionState.getAllSessionStates.foreach { state =>
123+
state.send(RegionUpdateEvent(regions))
124+
}
125+
114126
val controllerRestoreConf = controllerConfig.stateRestoreConfOpt
115127
if (controllerRestoreConf.isDefined) {
116128
globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = true)

amber/src/main/scala/org/apache/amber/engine/architecture/controller/WorkflowScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class WorkflowScheduler(
3434
var physicalPlan: PhysicalPlan = _
3535
private var schedule: Schedule = _
3636

37+
def getSchedule: Schedule = schedule
38+
3739
/**
3840
* Update the schedule to be executed, based on the given physicalPlan.
3941
*/

amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Schedule.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ package org.apache.amber.engine.architecture.scheduling
2222
case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] {
2323
private var currentLevel = levelSets.keys.minOption.getOrElse(0)
2424

25+
def getRegions: List[Region] = levelSets.values.flatten.toList
26+
2527
override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
2628

2729
override def next(): Set[Region] = {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.texera.web.model.websocket.response
21+
22+
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
23+
24+
case class RegionUpdateEvent(regions: List[List[String]]) extends TexeraWebSocketEvent

frontend/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
"@types/plotly.js-basic-dist-min": "2.12.4",
4848
"ajv": "8.10.0",
4949
"backbone": "1.4.1",
50+
"concaveman": "2.0.0",
5051
"content-disposition": "0.5.4",
52+
"d3": "6.4.0",
5153
"dagre": "0.8.5",
5254
"deep-map": "2.0.0",
5355
"edit-distance": "1.0.4",
@@ -111,7 +113,9 @@
111113
"@nrwl/nx-cloud": "19.1.0",
112114
"@nx/angular": "20.0.3",
113115
"@types/backbone": "1.4.15",
116+
"@types/concaveman": "1.1.6",
114117
"@types/content-disposition": "0",
118+
"@types/d3-shape": "2.1.2",
115119
"@types/dagre": "0.7.47",
116120
"@types/file-saver": "2.0.5",
117121
"@types/graphlib": "2.1.8",

frontend/src/app/workspace/component/menu/menu.component.html

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,14 @@
164164
>Grid</label
165165
>
166166
</li>
167-
</ul>
168-
<ul nz-menu>
167+
<li nz-menu-item>
168+
<label
169+
nz-checkbox
170+
[(ngModel)]="showRegion"
171+
(ngModelChange)="toggleRegion()"
172+
>Region</label
173+
>
174+
</li>
169175
<li nz-menu-item>
170176
<label
171177
nz-checkbox

frontend/src/app/workspace/component/menu/menu.component.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export class MenuComponent implements OnInit, OnDestroy {
8888
public isWorkflowModifiable: boolean = false;
8989
public workflowId?: number;
9090
public isExportDeactivate: boolean = false;
91+
public showRegion: boolean = false;
9192
public showGrid: boolean = false;
9293
public showNumWorkers: boolean = false;
9394
protected readonly DASHBOARD_USER_WORKFLOW = DASHBOARD_USER_WORKFLOW;
@@ -469,6 +470,10 @@ export class MenuComponent implements OnInit, OnDestroy {
469470
this.workflowActionService.getJointGraphWrapper().mainPaper.setGridSize(this.showGrid ? 2 : 1);
470471
}
471472

473+
public toggleRegion(): void {
474+
this.workflowActionService.getJointGraphWrapper().mainPaper.el.classList.toggle("hide-region", !this.showRegion);
475+
}
476+
472477
/**
473478
* This method will run the autoLayout function
474479
*

frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.scss

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,8 @@
2525
height: 100%;
2626
}
2727

28-
::ng-deep #workflow-editor .connection-wrap {
29-
stroke-width: 0;
30-
}
31-
32-
::ng-deep #workflow-editor .link-tools .tool-remove path {
33-
fill: #d8656a;
34-
transform: translate(-12px, -12px);
35-
}
36-
37-
::ng-deep #workflow-editor .link-tools .tool-remove circle {
38-
fill-opacity: 0;
28+
::ng-deep .hide-region .region {
29+
display: none;
3930
}
4031

4132
::ng-deep .hide-worker-count .operator-worker-count {

frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import * as _ from "lodash";
4141
import * as joint from "jointjs";
4242
import { isDefined } from "../../../common/util/predicate";
4343
import { GuiConfigService } from "../../../common/service/gui-config.service";
44+
import { line, curveCatmullRomClosed } from "d3-shape";
45+
import concaveman from "concaveman";
4446

4547
// jointjs interactive options for enabling and disabling interactivity
4648
// https://resources.jointjs.com/docs/jointjs/v3.2/joint.html#dia.Paper.prototype.options.interactive
@@ -161,6 +163,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy
161163
this.handlePortHighlightEvent();
162164
this.registerPortDisplayNameChangeHandler();
163165
this.handleOperatorStatisticsUpdate();
166+
this.handleRegionUpdate();
164167
this.handleOperatorSuggestionHighlightEvent();
165168
this.handleElementDelete();
166169
this.handleElementSelectAll();
@@ -330,6 +333,65 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy
330333
});
331334
}
332335

336+
private handleRegionUpdate(): void {
337+
this.editor.classList.add("hide-region");
338+
const Region = joint.dia.Element.define(
339+
"region",
340+
{
341+
attrs: {
342+
body: {
343+
fill: "rgba(255,213,79,0.2)",
344+
pointerEvents: "none",
345+
class: "region",
346+
},
347+
},
348+
},
349+
{
350+
markup: [{ tagName: "path", selector: "body" }],
351+
}
352+
);
353+
354+
let regionMap: { regionElement: joint.dia.Element; operators: joint.dia.Cell[] }[] = [];
355+
// update region elements on execution
356+
this.executeWorkflowService
357+
.getRegionUpdateStream()
358+
.pipe(untilDestroyed(this))
359+
.subscribe(regions => {
360+
this.paper.model
361+
.getCells()
362+
.filter(element => element instanceof Region)
363+
.forEach(element => element.remove());
364+
365+
regionMap = regions.map(region => {
366+
const element = new Region();
367+
const ops = region.map(id => this.paper.getModelById(id));
368+
this.paper.model.addCell(element);
369+
this.updateRegionElement(element, ops);
370+
return { regionElement: element, operators: ops };
371+
});
372+
});
373+
374+
this.paper.model.on("change:position", operator => {
375+
regionMap
376+
.filter(region => region.operators.includes(operator))
377+
.forEach(region => this.updateRegionElement(region.regionElement, region.operators));
378+
});
379+
}
380+
381+
private updateRegionElement(regionElement: joint.dia.Element, operators: joint.dia.Cell[]) {
382+
const points = operators.flatMap(op => {
383+
const { x, y, width, height } = op.getBBox(),
384+
padding = 15;
385+
return [
386+
[x - padding, y - padding],
387+
[x + width + padding, y - padding],
388+
[x - padding, y + height + padding + 10],
389+
[x + width + padding, y + height + padding + 10],
390+
];
391+
});
392+
regionElement.attr("body/d", line().curve(curveCatmullRomClosed)(concaveman(points, 2, 0) as [number, number][]));
393+
}
394+
333395
/**
334396
* Handles restore offset default event by translating jointJS paper
335397
* back to original position

frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export class ExecuteWorkflowService {
8585
current: ExecutionStateInfo;
8686
}>();
8787

88+
private regionUpdateStream = new Subject<readonly string[][]>();
89+
8890
// TODO: move this to another service, or redesign how this
8991
// information is stored on the frontend.
9092
private assignedWorkerIds: Map<string, readonly string[]> = new Map();
@@ -99,6 +101,9 @@ export class ExecuteWorkflowService {
99101
) {
100102
workflowWebsocketService.websocketEvent().subscribe(event => {
101103
switch (event.type) {
104+
case "RegionUpdateEvent":
105+
this.regionUpdateStream.next(event.regions);
106+
break;
102107
case "WorkerAssignmentUpdateEvent":
103108
this.assignedWorkerIds.set(event.operatorId, event.workerIds);
104109
break;
@@ -329,6 +334,10 @@ export class ExecuteWorkflowService {
329334
return this.executionStateStream.asObservable();
330335
}
331336

337+
public getRegionUpdateStream(): Observable<readonly string[][]> {
338+
return this.regionUpdateStream.asObservable();
339+
}
340+
332341
public resetExecutionState(): void {
333342
this.currentState = {
334343
state: ExecutionState.Uninitialized,

0 commit comments

Comments
 (0)