@@ -16,74 +16,73 @@ import scala.concurrent.Future
16
16
17
17
object Tapir extends Endpoints {
18
18
import sttp .tapir ._
19
- def route (nRoutes : Int , withServerLog : Boolean = false ): Vertx => Router => Route = { vertx =>
20
- router =>
21
- val serverOptions = buildOptions(VertxFutureServerOptions .customiseInterceptors, withServerLog)
22
- val interpreter = VertxFutureServerInterpreter (serverOptions)
23
- val wsEndpoint = wsBaseEndpoint
24
- .out(
25
- webSocketBody[Long , CodecFormat .TextPlain , Long , CodecFormat .TextPlain ](VertxStreams )
26
- .concatenateFragmentedFrames(false )
27
- )
28
-
29
- val laggedTimestampPipe : ReadStream [Long ] => ReadStream [Long ] = { inputStream =>
30
- new ReadStream [Long ] {
31
-
32
- override def fetch (amount : Long ): ReadStream [Long ] = this
33
-
34
- private var dataHandler : Handler [Long ] = _
35
- private var endHandler : Handler [Void ] = _
36
- private var exceptionHandler : Handler [Throwable ] = _
37
-
38
- inputStream.handler(new Handler [Long ] {
39
- override def handle (event : Long ): Unit = {
40
- vertx.setTimer(
41
- WebSocketSingleResponseLag .toMillis,
42
- _ => {
43
- if (dataHandler != null ) dataHandler.handle(System .currentTimeMillis())
44
- }
45
- ): Unit
46
- }
47
- })
48
-
49
- inputStream.endHandler(new Handler [Void ] {
50
- override def handle (e : Void ): Unit = {
51
- if (endHandler != null ) endHandler.handle(e)
52
- }
53
- })
54
-
55
- inputStream.exceptionHandler(new Handler [Throwable ] {
56
- override def handle (e : Throwable ): Unit = {
57
- if (exceptionHandler != null ) exceptionHandler.handle(e)
58
- }
59
- })
60
-
61
- override def handler (handler : Handler [Long ]): ReadStream [Long ] = {
62
- this .dataHandler = handler
63
- this
19
+ def route (nRoutes : Int , withServerLog : Boolean = false ): Vertx => Router => Route = { vertx => router =>
20
+ val serverOptions = buildOptions(VertxFutureServerOptions .customiseInterceptors, withServerLog)
21
+ val interpreter = VertxFutureServerInterpreter (serverOptions)
22
+ val wsEndpoint = wsBaseEndpoint
23
+ .out(
24
+ webSocketBody[Long , CodecFormat .TextPlain , Long , CodecFormat .TextPlain ](VertxStreams )
25
+ .concatenateFragmentedFrames(false )
26
+ )
27
+
28
+ val laggedTimestampPipe : ReadStream [Long ] => ReadStream [Long ] = { inputStream =>
29
+ new ReadStream [Long ] {
30
+
31
+ override def fetch (amount : Long ): ReadStream [Long ] = this
32
+
33
+ private var dataHandler : Handler [Long ] = _
34
+ private var endHandler : Handler [Void ] = _
35
+ private var exceptionHandler : Handler [Throwable ] = _
36
+
37
+ inputStream.handler(new Handler [Long ] {
38
+ override def handle (event : Long ): Unit = {
39
+ vertx.setTimer(
40
+ WebSocketSingleResponseLag .toMillis,
41
+ _ => {
42
+ if (dataHandler != null ) dataHandler.handle(System .currentTimeMillis())
43
+ }
44
+ ): Unit
64
45
}
46
+ })
65
47
66
- override def pause (): ReadStream [Long ] = this
67
- override def resume (): ReadStream [Long ] = this
68
-
69
- override def endHandler (endHandler : Handler [Void ]): ReadStream [Long ] = {
70
- this .endHandler = endHandler
71
- this
48
+ inputStream.endHandler(new Handler [Void ] {
49
+ override def handle (e : Void ): Unit = {
50
+ if (endHandler != null ) endHandler.handle(e)
72
51
}
52
+ })
73
53
74
- override def exceptionHandler (exceptionHandler : Handler [Throwable ]) : ReadStream [ Long ] = {
75
- this .exceptionHandler = exceptionHandler
76
- this
54
+ inputStream. exceptionHandler(new Handler [Throwable ] {
55
+ override def handle ( e : Throwable ) : Unit = {
56
+ if (exceptionHandler != null ) exceptionHandler.handle(e)
77
57
}
58
+ })
59
+
60
+ override def handler (handler : Handler [Long ]): ReadStream [Long ] = {
61
+ this .dataHandler = handler
62
+ this
78
63
}
79
64
80
- }
65
+ override def pause (): ReadStream [Long ] = this
66
+ override def resume (): ReadStream [Long ] = this
67
+
68
+ override def endHandler (endHandler : Handler [Void ]): ReadStream [Long ] = {
69
+ this .endHandler = endHandler
70
+ this
71
+ }
81
72
82
- val wsServerEndpoint = wsEndpoint.serverLogicSuccess[ Future ] { _ =>
83
- Future .successful {
84
- laggedTimestampPipe
73
+ override def exceptionHandler ( exceptionHandler : Handler [ Throwable ]) : ReadStream [ Long ] = {
74
+ this .exceptionHandler = exceptionHandler
75
+ this
85
76
}
86
77
}
78
+
79
+ }
80
+
81
+ val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Future ] { _ =>
82
+ Future .successful {
83
+ laggedTimestampPipe
84
+ }
85
+ }
87
86
(wsServerEndpoint :: genEndpointsFuture(nRoutes)).map(interpreter.route(_)(router)).last
88
87
}
89
88
}
@@ -93,65 +92,60 @@ object Vanilla extends Endpoints {
93
92
def webSocketHandler (vertx : Vertx ): Router => Route = { router =>
94
93
router.get(" /ws/ts" ).handler { ctx =>
95
94
val wss = ctx.request().toWebSocket()
96
- wss.map {
97
- ws : ServerWebSocket =>
98
- ws.textMessageHandler(_ => ())
99
-
100
- // Set a periodic timer to send timestamps every 100 milliseconds
101
- val timerId = vertx.setPeriodic(
102
- WebSocketSingleResponseLag .toMillis,
103
- { _ =>
104
- ws.writeTextMessage(System .currentTimeMillis().toString): Unit
105
- }
106
- )
95
+ wss.map { ws : ServerWebSocket =>
96
+ ws.textMessageHandler(_ => ())
97
+
98
+ // Set a periodic timer to send timestamps every 100 milliseconds
99
+ val timerId = vertx.setPeriodic(
100
+ WebSocketSingleResponseLag .toMillis,
101
+ { _ =>
102
+ ws.writeTextMessage(System .currentTimeMillis().toString): Unit
103
+ }
104
+ )
107
105
108
- // Close the timer when the WebSocket is closed
109
- ws.closeHandler(_ => vertx.cancelTimer(timerId): Unit )
106
+ // Close the timer when the WebSocket is closed
107
+ ws.closeHandler(_ => vertx.cancelTimer(timerId): Unit )
110
108
}: Unit
111
109
}
112
110
}
113
111
def route : Int => Vertx => Router => Route = { (nRoutes : Int ) => _ => router =>
114
112
(0 until nRoutes).map { n =>
115
- router.get(s " /path $n/:id " ).handler {
116
- ctx : RoutingContext =>
117
- val id = ctx.request().getParam(" id" ).toInt
118
- val _ = ctx
119
- .response()
120
- .putHeader(" content-type" , " text/plain" )
121
- .end(s " ${id + n}" )
113
+ router.get(s " /path $n/:id " ).handler { ctx : RoutingContext =>
114
+ val id = ctx.request().getParam(" id" ).toInt
115
+ val _ = ctx
116
+ .response()
117
+ .putHeader(" content-type" , " text/plain" )
118
+ .end(s " ${id + n}" )
122
119
}
123
120
124
- router.post(s " /path $n" ).handler(bodyHandler).handler {
125
- ctx : RoutingContext =>
126
- val body = ctx.body.asString()
127
- val _ = ctx
128
- .response()
129
- .putHeader(" content-type" , " text/plain" )
130
- .end(s " Ok [ $n], string length = ${body.length}" )
121
+ router.post(s " /path $n" ).handler(bodyHandler).handler { ctx : RoutingContext =>
122
+ val body = ctx.body.asString()
123
+ val _ = ctx
124
+ .response()
125
+ .putHeader(" content-type" , " text/plain" )
126
+ .end(s " Ok [ $n], string length = ${body.length}" )
131
127
}
132
128
133
- router.post(s " /pathBytes $n" ).handler(bodyHandler).handler {
134
- ctx : RoutingContext =>
135
- val bytes = ctx.body().asString()
136
- val _ = ctx
137
- .response()
138
- .putHeader(" content-type" , " text/plain" )
139
- .end(s " Ok [ $n], bytes length = ${bytes.length}" )
129
+ router.post(s " /pathBytes $n" ).handler(bodyHandler).handler { ctx : RoutingContext =>
130
+ val bytes = ctx.body().asString()
131
+ val _ = ctx
132
+ .response()
133
+ .putHeader(" content-type" , " text/plain" )
134
+ .end(s " Ok [ $n], bytes length = ${bytes.length}" )
140
135
}
141
136
142
- router.post(s " /pathFile $n" ).handler(bodyHandler).handler {
143
- ctx : RoutingContext =>
144
- val filePath = newTempFilePath()
145
- val fs = ctx.vertx.fileSystem
146
- val _ = fs
147
- .createFile(filePath.toString)
148
- .flatMap(_ => fs.writeFile(filePath.toString, ctx.body().buffer()))
149
- .flatMap(_ =>
150
- ctx
151
- .response()
152
- .putHeader(" content-type" , " text/plain" )
153
- .end(s " Ok [ $n], file saved to $filePath" )
154
- )
137
+ router.post(s " /pathFile $n" ).handler(bodyHandler).handler { ctx : RoutingContext =>
138
+ val filePath = newTempFilePath()
139
+ val fs = ctx.vertx.fileSystem
140
+ val _ = fs
141
+ .createFile(filePath.toString)
142
+ .flatMap(_ => fs.writeFile(filePath.toString, ctx.body().buffer()))
143
+ .flatMap(_ =>
144
+ ctx
145
+ .response()
146
+ .putHeader(" content-type" , " text/plain" )
147
+ .end(s " Ok [ $n], file saved to $filePath" )
148
+ )
155
149
}
156
150
}.last
157
151
}
0 commit comments