@@ -28,59 +28,73 @@ object EncodeDFG {
28
28
Json .obj(
29
29
(" id" , node.id.asJson),
30
30
(" parallelism" , node.parallelism.asJson),
31
- (" kind" , node.kind.asJson(node.id )),
31
+ (" kind" , node.kind.asJson(node)),
32
32
)
33
33
34
- implicit def encodeNodeKind (id : String ): Encoder [NodeKind ] = {
34
+ implicit def encodeNodeKind (node : Node ): Encoder [NodeKind ] = {
35
35
case source : Source =>
36
+ val outputId = source.successors(0 ) match {
37
+ case Local (succ) => StructId .from(node.id, succ.id)
38
+ case Remote (succ, _) => StructId .from(node.id, succ.id)
39
+ }
36
40
Json .obj(
37
41
(" Source" ,
38
- Json .obj(
39
- (" source_type" , source.sourceType.asJson(encodeType(source.successors(0 ) match {
40
- case Local (node) => StructId .from(id, node.id)
41
- case Remote (node, _) => StructId .from(id, node.id)
42
- }))),
43
- (" format" , source.format.asJson),
44
- (" channel_strategy" , source.channelStrategy.asJson),
45
- (" successors" , source.successors.asJson),
46
- (" kind" , source.kind.asJson),
47
- )))
42
+ Json .obj(
43
+ (" source_type" , source.sourceType.asJson(encodeType(outputId))),
44
+ (" format" , source.format.asJson),
45
+ (" channel_strategy" , source.channelStrategy.asJson),
46
+ (" successors" , source.successors.asJson),
47
+ (" kind" , source.kind.asJson),
48
+ )))
48
49
case task : Task =>
50
+ val (a, b) = getInputId(task.predecessor, node.id)
51
+ val inputId = StructId .from(a, b)
52
+ val outputId = task.kind match {
53
+ case TaskKind .Filter => inputId
54
+ case _ => task.successors(0 ) match {
55
+ case Local (succ) => StructId .from(node.id, succ.id)
56
+ case Remote (succ, _) => StructId .from(node.id, succ.id)
57
+ }
58
+ }
49
59
Json .obj(
50
60
(" Task" ,
51
- Json .obj(
52
- (" weld_code" , pretty(task.weldFunc).asJson),
53
- (" input_type" , task.inputType.asJson(encodeType(StructId .from(task.predecessor.id, id)))),
54
- (" output_type" , task.outputType.asJson(encodeType(task.successors(0 ) match {
55
- case Local (node) => StructId .from(id, node.id)
56
- case Remote (node, _) => StructId .from(id, node.id)
57
- }))),
58
- (" channel_strategy" , task.channelStrategy.asJson),
59
- (" predecessor" , task.predecessor.id.asJson),
60
- (" successors" , task.successors.asJson),
61
- (" kind" , task.kind.asJson),
62
- )))
61
+ Json .obj(
62
+ (" weld_code" , pretty(task.weldFunc).asJson),
63
+ (" input_type" , task.inputType.asJson(encodeType(inputId))),
64
+ (" output_type" , task.outputType.asJson(encodeType(outputId))),
65
+ (" channel_strategy" , task.channelStrategy.asJson),
66
+ (" predecessor" , task.predecessor.id.asJson),
67
+ (" successors" , task.successors.asJson),
68
+ (" kind" , task.kind.asJson),
69
+ )))
63
70
case sink : Sink =>
71
+ val inputId = StructId .from(sink.predecessor.id, node.id)
64
72
Json .obj(
65
73
(" Sink" ,
66
- Json .obj(
67
- (" sink_type" , sink.sinkType.asJson(encodeType(StructId .from(sink.predecessor.id, id) ))),
68
- (" format" , sink.format.asJson),
69
- (" predecessor" , sink.predecessor.id.asJson),
70
- (" kind" , sink.kind.asJson),
71
- )))
74
+ Json .obj(
75
+ (" sink_type" , sink.sinkType.asJson(encodeType(inputId ))),
76
+ (" format" , sink.format.asJson),
77
+ (" predecessor" , sink.predecessor.id.asJson),
78
+ (" kind" , sink.kind.asJson),
79
+ )))
72
80
case window : Window =>
81
+ val (a, b) = getInputId(window.predecessor, node.id)
82
+ val inputId = StructId .from(a, b)
83
+ val outputId = window.successors(0 ) match {
84
+ case Local (succ) => StructId .from(node.id, succ.id)
85
+ case Remote (succ, _) => StructId .from(node.id, succ.id)
86
+ }
73
87
Json .obj(
74
88
(" Window" ,
75
- Json .obj(
76
- (" channel_strategy" , window.channelStrategy.asJson),
77
- (" predecessor" , window.predecessor.id.asJson),
78
- (" successors" , window.successors.asJson),
79
- (" assigner" , window.assigner.asJson),
80
- (" window_function" , window.function.asJson),
81
- (" time_kind" , window.time.asJson),
82
- (" window_kind" , window.kind.asJson),
83
- )))
89
+ Json .obj(
90
+ (" channel_strategy" , window.channelStrategy.asJson),
91
+ (" predecessor" , window.predecessor.id.asJson),
92
+ (" successors" , window.successors.asJson),
93
+ (" assigner" , window.assigner.asJson),
94
+ (" window_function" , window.function.asJson(encodeWindowFunction(inputId, outputId)) ),
95
+ (" time_kind" , window.time.asJson),
96
+ (" window_kind" , window.kind.asJson),
97
+ )))
84
98
}
85
99
86
100
implicit val encodeSourceKind : Encoder [SourceKind ] = {
@@ -151,15 +165,16 @@ object EncodeDFG {
151
165
case Broadcast => " Broadcast" .asJson
152
166
}
153
167
154
- implicit val encodeWindowFunction : Encoder [WindowFunction ] = function =>
155
- Json .obj(
156
- (" input_type" , function.inputType.asJson(encodeType())),
157
- (" output_type" , function.outputType.asJson(encodeType())),
158
- (" builder_type" , function.builderType.asJson(encodeType())),
159
- (" builder" , pretty(function.init).asJson),
160
- (" udf" , pretty(function.lift).asJson),
161
- (" materialiser" , pretty(function.lower).asJson),
162
- )
168
+ implicit def encodeWindowFunction (inputId : String , outputId : String ): Encoder [WindowFunction ] =
169
+ function =>
170
+ Json .obj(
171
+ (" input_type" , function.inputType.asJson(encodeType(inputId))),
172
+ (" output_type" , function.outputType.asJson(encodeType(outputId))),
173
+ (" builder_type" , function.builderType.asJson(encodeType())),
174
+ (" builder" , pretty(function.init).asJson),
175
+ (" udf" , pretty(function.lift).asJson),
176
+ (" materialiser" , pretty(function.lower).asJson),
177
+ )
163
178
164
179
implicit val encodeWindowAssigner : Encoder [WindowAssigner ] = {
165
180
case tumbling : Tumbling =>
@@ -179,13 +194,12 @@ object EncodeDFG {
179
194
case All => " All" .asJson
180
195
}
181
196
182
- // implicit val encodeType: Encoder[Type] = {
183
- // case _: Appender => "Appender".asJson
184
- // case _: Merger => "Merger".asJson
185
- // case _: VecMerger => "VecMerger".asJson
186
- // case _: DictMerger => "DictMerger".asJson
187
- // case _: GroupMerger => "GroupMerger".asJson
188
- // case _ => ???
189
- // }
197
+ // Walks backwards and finds the first output coming from a non-filter
198
+ private def getInputId (node : Node , succ_id : String ): (String , String ) = {
199
+ node.kind match {
200
+ case Task (_, _, _, pred, _, _, Filter , _) => getInputId(pred, node.id)
201
+ case _ : Source | _ : Task | _ : Window | _ : Sink => (node.id, succ_id)
202
+ }
203
+ }
190
204
191
205
}
0 commit comments