Skip to content

Commit 82dcd04

Browse files
committed
Read input as JSON, enrich DFG with metadata
1 parent a39cc17 commit 82dcd04

File tree

11 files changed

+391
-102
lines changed

11 files changed

+391
-102
lines changed

build.sbt

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ lazy val compilerPipeline= (project in file("."))
1818
"com.lihaoyi" %% "pprint" % "0.5.5",
1919
"com.chuusai" %% "shapeless" % "2.3.3",
2020
"io.circe" %% "circe-core" % "0.11.1",
21-
"io.circe" %% "circe-generic" % "0.11.1"
21+
"io.circe" %% "circe-generic" % "0.11.1",
22+
"io.circe" %% "circe-parser" % "0.11.1"
2223
)
2324
).dependsOn(arc)
2425

@@ -77,7 +78,7 @@ scalacOptions ++= Seq(
7778
"-language:implicitConversions", // Allow definition of implicit functions called views
7879
"-unchecked", // Enable additional warnings where generated code depends on assumptions.
7980
"-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access.
80-
"-Xdev", // Indicates user is a developer - issue warnings about anything which seems amiss
81+
//"-Xdev", // Indicates user is a developer - issue warnings about anything which seems amiss
8182
"-Xlint:_", // Enable all lint warnings
8283
"-Xfuture", // Turn on future language features.
8384
"-Yno-adapted-args", // Do not adapt an argument list (either by inserting () or creating a tuple) to match the receiver.
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,44 @@
11
package se.kth.cda.compiler
22

3+
import io.circe.Json
34
import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
45
import se.kth.cda.arc.syntaxtree.PrettyPrint
56
import se.kth.cda.arc.syntaxtree.parser.Translator
67
import se.kth.cda.arc.syntaxtree.transformer.MacroExpansion
78
import se.kth.cda.arc.syntaxtree.typer.TypeInference
89
import se.kth.cda.arc.{ArcLexer, ArcParser}
910
import se.kth.cda.compiler.dataflow.transform.ToDFG._
10-
import se.kth.cda.compiler.dataflow.JsonEncoder.encodeDFG
11+
import se.kth.cda.compiler.dataflow.encode.EncodeDFG.encodeDFG
12+
import se.kth.cda.compiler.dataflow.decode.DecodeMetadata.metadataDecoder
13+
import se.kth.cda.compiler.dataflow.Metadata
1114
import se.kth.cda.compiler.dataflow.optimize.OptimizeDFG._
15+
import se.kth.cda.compiler.dataflow.enrich.EnrichDFG._
1216

1317
object Compiler {
14-
def compile(code: String): String = {
18+
def compile(input: String): String = {
1519

16-
val inputStream = CharStreams.fromString(code)
20+
val metadata = metadataDecoder(io.circe.parser.parse(input).getOrElse(Json.Null).hcursor) match {
21+
//val metadata = metadataDecoder.decodeJson(Json.fromString(input)) match {
22+
case Left(value) => throw value
23+
case Right(value) => value
24+
}
25+
26+
val inputStream = CharStreams.fromString(metadata.arc_code)
1727
val lexer = new ArcLexer(inputStream)
1828
val tokenStream = new CommonTokenStream(lexer)
19-
val parser = new ArcParser(tokenStream)
20-
val translator = Translator(parser)
29+
val arcparser = new ArcParser(tokenStream)
30+
val translator = Translator(arcparser)
2131
val ast = translator.expr()
2232
val expanded = MacroExpansion.expand(ast).get
2333
val typed = TypeInference.solve(expanded).get
2434
val dfg = typed.toDFG
25-
val optimized = dfg.optimize
35+
36+
val enriched_dfg = dfg.enrich(metadata)
37+
val optimized_dfg = enriched_dfg.optimize
2638

2739
//println(PrettyPrint.pretty(typed))
2840
//pprint.pprintln(optimized)
2941

30-
encodeDFG(optimized).noSpaces
42+
encodeDFG(optimized_dfg).noSpaces
3143
}
3244
}

src/main/scala/se/kth/cda/compiler/dataflow/DFG.scala

+70-19
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package se.kth.cda.compiler.dataflow
33
import se.kth.cda.arc.syntaxtree.AST.Expr
44
import se.kth.cda.arc.syntaxtree.Type
55
import se.kth.cda.compiler.dataflow.ChannelStrategy._
6-
import se.kth.cda.compiler.dataflow.DFG.newId
76
import se.kth.cda.compiler.dataflow.SinkKind.Debug
87
import se.kth.cda.compiler.dataflow.SourceKind.Socket
98
import se.kth.cda.compiler.dataflow.TimeKind.Ingestion
@@ -28,17 +27,11 @@ import se.kth.cda.compiler.dataflow.WindowKind.All
2827
// val trace: Option[Trace] = None
2928
// val scope: Option[Scope] = None
3029
//}
30+
trait Trace
3131

32-
object DFG {
33-
var idCounter = 0
34-
def newId: Int = {
35-
val id = idCounter
36-
idCounter += 1
37-
id
38-
}
39-
}
32+
final case class Metadata(nodes: List[Node], timestamp_extractor: Int, arc_code: String)
4033

41-
object Node {
34+
object DFG {
4235
var idCounter = 0
4336
def newId: Int = {
4437
val id = idCounter
@@ -56,23 +49,65 @@ object Channel {
5649
}
5750
}
5851

59-
trait Trace
60-
61-
final case class DFG(id: String = s"dfg${DFG.newId}", var nodes: List[Node], target: String = "x86-64-unknown-linux-gnu")
52+
final case class DFG(id: String = s"dfg${DFG.newId}",
53+
var timestamp_extractor: Int = 1,
54+
var nodes: List[Node],
55+
target: String = "x86-64-unknown-linux-gnu")
6256

6357
//case class Scope(depth: Long, parent: Option[Scope]) extends Id
6458

65-
final case class Node(var id: String = s"node${Node.newId}", parallelism: Long = 1, kind: NodeKind)
59+
final case class Node(var id: String, parallelism: Long = 1, kind: NodeKind)
6660

6761
sealed trait NodeKind
6862

6963
object NodeKind {
70-
final case class Source(sourceType: Type,
64+
object Task {
65+
var idCounter = 0
66+
def newId: Int = {
67+
val id = idCounter
68+
idCounter += 1
69+
id
70+
}
71+
}
72+
73+
object Source {
74+
var idCounter = 0
75+
def newId: Int = {
76+
val id = idCounter
77+
idCounter += 1
78+
id
79+
}
80+
}
81+
82+
object Sink {
83+
var idCounter = 0
84+
def newId: Int = {
85+
val id = idCounter
86+
idCounter += 1
87+
id
88+
}
89+
}
90+
91+
object Window {
92+
var idCounter = 0
93+
def newId: Int = {
94+
val id = idCounter
95+
idCounter += 1
96+
id
97+
}
98+
}
99+
100+
final case class Source(sourceType: Type = null,
101+
var format: Format = null,
71102
channelStrategy: ChannelStrategy = Forward,
72103
var successors: Vector[ChannelKind] = Vector.empty,
73-
kind: SourceKind = Socket("localhost", 1337))
104+
var kind: SourceKind = null)
105+
extends NodeKind
106+
final case class Sink(sinkType: Type = null,
107+
var format: Format = null,
108+
var predecessor: Node = null,
109+
var kind: SinkKind = null)
74110
extends NodeKind
75-
final case class Sink(sinkType: Type, var predecessor: Node = null, kind: SinkKind = Debug) extends NodeKind
76111
final case class Task(var weldFunc: Expr,
77112
inputType: Type,
78113
outputType: Type,
@@ -92,7 +127,12 @@ object NodeKind {
92127
extends NodeKind
93128
}
94129

95-
final case class WindowFunction(inputType: Type, outputType: Type, builderType: Type, init: Expr, lift: Expr, lower: Expr)
130+
final case class WindowFunction(inputType: Type,
131+
outputType: Type,
132+
builderType: Type,
133+
init: Expr,
134+
lift: Expr,
135+
lower: Expr)
96136

97137
sealed trait ChannelKind
98138

@@ -104,13 +144,24 @@ object ChannelKind {
104144
sealed trait SourceKind
105145

106146
object SourceKind {
107-
final case class Socket(host: String, port: Long) extends SourceKind
147+
final case class Socket(var host: String, var port: Long) extends SourceKind
148+
final case class LocalFile(var path: String) extends SourceKind
108149
}
109150

110151
sealed trait SinkKind
111152

112153
object SinkKind {
113154
final case object Debug extends SinkKind
155+
final case class Socket(var host: String, var port: Long) extends SinkKind
156+
final case class LocalFile(var path: String) extends SinkKind
157+
}
158+
159+
sealed trait Format
160+
161+
object Format {
162+
final case object JSON extends Format
163+
final case object CSV extends Format
164+
final case object UTF8 extends Format
114165
}
115166

116167
sealed trait TaskKind
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package se.kth.cda.compiler.dataflow.decode
2+
3+
import se.kth.cda.compiler.dataflow.NodeKind.{Task, Window}
4+
import se.kth.cda.compiler.dataflow._
5+
6+
object DecodeMetadata {
7+
import io.circe._
8+
import io.circe.generic.semiauto._
9+
10+
implicit val metadataDecoder: Decoder[Metadata] =
11+
(cursor: HCursor) =>
12+
for {
13+
nodes <- cursor.get[List[Node]]("nodes")
14+
timestamp_extractor <- cursor.get[Int]("timestamp_extractor")
15+
age <- cursor.get[String]("arc_code")
16+
} yield Metadata(nodes, timestamp_extractor, age)
17+
18+
implicit val nodeDecoder: Decoder[Node] =
19+
(cursor: HCursor) =>
20+
for {
21+
id <- cursor.get[String]("id")
22+
kind <- cursor.get[NodeKind]("kind")
23+
} yield Node(id = id, kind = kind)
24+
implicit val nodeKindDecoder: Decoder[NodeKind] = io.circe.generic.semiauto.deriveDecoder[NodeKind]
25+
26+
implicit val sourceDecoder: Decoder[NodeKind.Source] =
27+
(cursor: HCursor) =>
28+
for {
29+
format <- cursor.get[String]("format").map{
30+
case "CSV" => Format.CSV
31+
case "UTF8" => Format.UTF8
32+
case "JSON" => Format.JSON
33+
}
34+
kind <- cursor.get[SourceKind]("kind")
35+
} yield NodeKind.Source(format = format, kind = kind)
36+
implicit val sourceKindDecoder: Decoder[SourceKind] = deriveDecoder
37+
38+
implicit val localFileSourceDecoder: Decoder[SourceKind.LocalFile] =
39+
(cursor: HCursor) =>
40+
for {
41+
path <- cursor.get[String]("path")
42+
} yield SourceKind.LocalFile(path)
43+
44+
implicit val socketSourceDecoder: Decoder[SourceKind.Socket] =
45+
(cursor: HCursor) =>
46+
for {
47+
host <- cursor.get[String]("host")
48+
port <- cursor.get[Long]("port")
49+
} yield SourceKind.Socket(host, port)
50+
51+
implicit val sinkDecoder: Decoder[NodeKind.Sink] =
52+
(cursor: HCursor) =>
53+
for {
54+
format <- cursor.get[String]("format").map{
55+
case "CSV" => Format.CSV
56+
case "UTF8" => Format.UTF8
57+
case "JSON" => Format.JSON
58+
}
59+
kind <- cursor.get[SinkKind]("kind")
60+
} yield NodeKind.Sink(format = format, kind = kind)
61+
implicit val sinkKindDecoder: Decoder[SinkKind] = deriveDecoder
62+
63+
implicit val localFileSinkDecoder: Decoder[SinkKind.LocalFile] =
64+
(cursor: HCursor) => for { path <- cursor.get[String]("path") } yield SinkKind.LocalFile(path)
65+
66+
implicit val socketSinkDecoder: Decoder[SinkKind.Socket] =
67+
(cursor: HCursor) =>
68+
for {
69+
host <- cursor.get[String]("host")
70+
port <- cursor.get[Long]("port")
71+
} yield SinkKind.Socket(host, port)
72+
73+
implicit val formatDecoder: Decoder[Format] = deriveDecoder
74+
75+
implicit val windowDecoder: Decoder[Window] = (cursor: HCursor) => for {
76+
_ <- cursor.get[String]("name")
77+
} yield Window(null, null, null, null, null, null, null)
78+
79+
implicit val taskDecoder: Decoder[Task] = (cursor: HCursor) => for {
80+
_ <- cursor.get[String]("name")
81+
} yield Task(null, null, null, null)
82+
}

0 commit comments

Comments
 (0)