diff --git a/.github/workflows/unified-test-suite.yml b/.github/workflows/unified-test-suite.yml new file mode 100644 index 000000000..49abbc971 --- /dev/null +++ b/.github/workflows/unified-test-suite.yml @@ -0,0 +1,40 @@ +name: Unified Test Suite + +on: + pull_request: + push: + branches: + - main + +jobs: + unified-test-suite: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + + - name: Set up Node.js 18 + uses: actions/setup-node@v4 + with: + node-version: '18' + + - name: Build Adapter + working-directory: adapter + run: | + ./gradlew build + JAR_PATH=$(find $(pwd) -name '*standalone.jar' | head -n 1) + echo "JAR_PATH=$JAR_PATH" >> $GITHUB_ENV + + - name: Install uts-pubsub globally + run: npm install -g @ably-labs/uts-pubsub + + - name: Run uts-pubsub with ADAPTER_EXECUTABLE + env: + ADAPTER_EXECUTABLE: java -jar ${{ env.JAR_PATH }} + run: uts-pubsub diff --git a/adapter/build.gradle.kts b/adapter/build.gradle.kts new file mode 100644 index 000000000..8d5b8e7e8 --- /dev/null +++ b/adapter/build.gradle.kts @@ -0,0 +1,51 @@ +val ktor_version: String by project +val kotlin_version: String by project +val logback_version: String by project + +plugins { + kotlin("jvm") version "1.9.22" + id("io.ktor.plugin") version "2.3.7" +} + +group = "com.ably.java" +version = "0.0.1" + +application { + mainClass.set("com.ably.java.ApplicationKt") +} + +tasks { + val fatJar = register("fatJar") { + dependsOn.addAll(listOf("compileJava", "compileKotlin", "processResources")) // We need this for Gradle optimization to work + archiveClassifier.set("standalone") // Naming the jar + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + manifest { attributes(mapOf("Main-Class" to application.mainClass)) } // Provided we set it up in the application plugin configuration + val sourcesMain = sourceSets.main.get() + val contents = configurations.runtimeClasspath.get() + .map { if (it.isDirectory) it else zipTree(it) } + + sourcesMain.output + from(contents) + } + build { + dependsOn(fatJar) + } +} + +repositories { + mavenCentral() +} + +dependencies { + implementation("io.ably:ably-java:1.2.38") + implementation("com.benasher44:uuid:0.8.2") + implementation("io.ktor:ktor-client-core") + implementation("io.ktor:ktor-client-cio") + implementation("io.ktor:ktor-client-logging") + implementation("io.ktor:ktor-client-websockets") + implementation("io.ktor:ktor-serialization-kotlinx-json") + implementation("io.ktor:ktor-client-content-negotiation") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-core:1.5.1") + implementation("com.google.code.gson:gson:2.9.0") + implementation("ch.qos.logback:logback-classic:$logback_version") +} diff --git a/adapter/gradle.properties b/adapter/gradle.properties new file mode 100644 index 000000000..d631891d5 --- /dev/null +++ b/adapter/gradle.properties @@ -0,0 +1,4 @@ +ktor_version=2.3.7 +kotlin_version=1.9.22 +logback_version=1.4.11 +kotlin.code.style=official diff --git a/adapter/gradle/wrapper/gradle-wrapper.jar b/adapter/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..7454180f2 Binary files /dev/null and b/adapter/gradle/wrapper/gradle-wrapper.jar differ diff --git a/adapter/gradle/wrapper/gradle-wrapper.properties b/adapter/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..e411586a5 --- /dev/null +++ b/adapter/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/adapter/gradlew b/adapter/gradlew new file mode 100755 index 000000000..1b6c78733 --- /dev/null +++ b/adapter/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/adapter/gradlew.bat b/adapter/gradlew.bat new file mode 100644 index 000000000..107acd32c --- /dev/null +++ b/adapter/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/adapter/settings.gradle.kts b/adapter/settings.gradle.kts new file mode 100644 index 000000000..2189f2e6d --- /dev/null +++ b/adapter/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "adapter" diff --git a/adapter/src/main/kotlin/com/ably/java/AblyRealtimeAdapter.kt b/adapter/src/main/kotlin/com/ably/java/AblyRealtimeAdapter.kt new file mode 100644 index 000000000..34600bb81 --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/AblyRealtimeAdapter.kt @@ -0,0 +1,718 @@ +package com.ably.java + +import io.ably.lib.realtime.AblyRealtime +import io.ably.lib.realtime.ChannelBase.MessageListener +import io.ably.lib.realtime.Connection +import io.ably.lib.realtime.Presence +import io.ably.lib.rest.AblyRest +import io.ably.lib.rest.Auth +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.* + +typealias RealtimeChannels = AblyRealtime.Channels +typealias RealtimeChannel = io.ably.lib.realtime.Channel +typealias RestChannels = io.ably.lib.rest.AblyBase.Channels +typealias RestChannel = io.ably.lib.rest.Channel +typealias RestPresence = io.ably.lib.rest.ChannelBase.Presence +typealias RealtimePresence = Presence + +class AblyRealtimeAdapter( + val sendMessage: MutableSharedFlow, + val authCallbackHandler: AuthCallbackHandler, +) { + private val idToAblyRest: MutableMap = mutableMapOf() + private val idToAblyRealtime: MutableMap = mutableMapOf() + private val idToRestChannels: MutableMap = mutableMapOf() + private val idToRestChannel: MutableMap = mutableMapOf() + private val idToRealtimeChannels: MutableMap = mutableMapOf() + private val idToRealtimeChannel: MutableMap = mutableMapOf() + private val idToAuth: MutableMap = mutableMapOf() + private val idToConnection: MutableMap = mutableMapOf() + private val idToRestPresence: MutableMap = mutableMapOf() + private val idToRealtimePresence: MutableMap = mutableMapOf() + private val idToSubscribeCallback: MutableMap = mutableMapOf() + + suspend fun handleRpcCall(rpcRequest: JsonElement): String? { + if (!rpcRequest.isJsonRpc() || !rpcRequest.hasMethod()) return null + + return when (rpcRequest.methodName) { + "AblyRest" -> { + val options = rpcRequest.getArg("options").asClientOptions(authCallbackHandler) + val refId = generateId() + idToAblyRest[refId] = AblyRest(options) + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "AblyRest#auth" -> { + val refId = rpcRequest.getRefId() + val instance = idToAblyRest[refId]!! + val field = instance.auth + val fieldRefId = generateId() + idToAuth[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "AblyRest#channels" -> { + val refId = rpcRequest.getRefId() + val instance = idToAblyRest[refId]!! + val field = instance.channels + val fieldRefId = generateId() + idToRestChannels[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "AblyRest.request" -> { + val method = rpcRequest.getArg("method").asString() + val path = rpcRequest.getArg("path").asString() + val params = rpcRequest.getArg("params").asRequestParams() + val body = rpcRequest.getArg("body").asRequestBody() + val headers = rpcRequest.getArg("headers").asRequestHeaders() + val refId = rpcRequest.getRefId() + val instance = idToAblyRest[refId]!! + val result = instance.request(method, path, params, body, headers) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "AblyRest.time" -> { + + val refId = rpcRequest.getRefId() + val instance = idToAblyRest[refId]!! + val result = instance.time() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result) + }) + } + + "AblyRest.close" -> { + + val refId = rpcRequest.getRefId() + val instance = idToAblyRest[refId]!! + instance.close() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "AblyRealtime" -> { + val options = rpcRequest.getArg("options").asClientOptions(authCallbackHandler) + val refId = generateId() + idToAblyRealtime[refId] = AblyRealtime(options) + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "AblyRealtime#auth" -> { + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + val field = instance.auth + val fieldRefId = generateId() + idToAuth[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "AblyRealtime#channels" -> { + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + val field = instance.channels + val fieldRefId = generateId() + idToRealtimeChannels[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "AblyRealtime#connection" -> { + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + val field = instance.connection + val fieldRefId = generateId() + idToConnection[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "AblyRealtime.request" -> { + val method = rpcRequest.getArg("method").asString() + val path = rpcRequest.getArg("path").asString() + val params = rpcRequest.getArg("params").asRequestParams() + val body = rpcRequest.getArg("body").asRequestBody() + val headers = rpcRequest.getArg("headers").asRequestHeaders() + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + val result = instance.request(method, path, params, body, headers) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "AblyRealtime.time" -> { + + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + val result = instance.time() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result) + }) + } + + "AblyRealtime.close" -> { + + val refId = rpcRequest.getRefId() + val instance = idToAblyRealtime[refId]!! + instance.close() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "Auth#clientId" -> { + val refId = rpcRequest.getRefId() + val instance = idToAuth[refId]!! + val field = instance.clientId + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field) + }) + } + + "Auth.authorize" -> { + val tokenParams = rpcRequest.getOptionalArg("tokenParams")?.asTokenParams() + val authOptions = rpcRequest.getOptionalArg("authOptions")?.asAuthOptions() + val refId = rpcRequest.getRefId() + val instance = idToAuth[refId]!! + val result = instance.authorize(tokenParams, authOptions) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "Auth.createTokenRequest" -> { + val tokenParams = rpcRequest.getOptionalArg("tokenParams")?.asTokenParams() + val authOptions = rpcRequest.getOptionalArg("authOptions")?.asAuthOptions() + val refId = rpcRequest.getRefId() + val instance = idToAuth[refId]!! + val result = instance.createTokenRequest(tokenParams, authOptions) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "Auth.requestToken" -> { + val tokenParams = rpcRequest.getOptionalArg("tokenParams")?.asTokenParams() + val authOptions = rpcRequest.getOptionalArg("authOptions")?.asAuthOptions() + val refId = rpcRequest.getRefId() + val instance = idToAuth[refId]!! + val result = instance.requestToken(tokenParams, authOptions) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "RestChannels.get" -> { + val name = rpcRequest.getArg("name").asString() + val channelOptions = rpcRequest.getOptionalArg("channelOptions")?.asChannelOptions() + val refId = rpcRequest.getRefId() + val instance = idToRestChannels[refId]!! + val result = channelOptions?.let { instance.get(name, channelOptions) } ?: instance.get(name) + val resultRefId = generateId() + idToRestChannel[resultRefId] = result + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RestChannels.release" -> { + val name = rpcRequest.getArg("name").asString() + val refId = rpcRequest.getRefId() + val instance = idToRestChannels[refId]!! + instance.release(name) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RestChannel#name" -> { + val refId = rpcRequest.getRefId() + val instance = idToRestChannel[refId]!! + val field = instance.name + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field) + }) + } + + "RestChannel.publish" -> { + val messages = rpcRequest.getOptionalArg("messages")?.asArrayOfMessage() + val name = rpcRequest.getOptionalArg("name")?.asString() + val data = rpcRequest.getOptionalArg("data")?.asMessageData() + val refId = rpcRequest.getRefId() + val instance = idToRestChannel[refId]!! + if (messages != null) { + instance.publish(messages) + } else { + instance.publish(name, data) + } + + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "RestChannel.history" -> { + val params = rpcRequest.getArg("params").asRequestParams() + val refId = rpcRequest.getRefId() + val instance = idToRestChannel[refId]!! + val result = instance.history(params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "RealtimeChannels.get" -> { + val name = rpcRequest.getArg("name").asString() + val channelOptions = rpcRequest.getOptionalArg("channelOptions")?.asChannelOptions() + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannels[refId]!! + val result = instance.get(name, channelOptions) + val resultRefId = generateId() + idToRealtimeChannel[resultRefId] = result + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RealtimeChannels.release" -> { + val name = rpcRequest.getArg("name").asString() + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannels[refId]!! + instance.release(name) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RealtimeChannel#name" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.name + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field) + }) + } + + "RealtimeChannel#state" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.state + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "RealtimeChannel#presence" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.presence + val fieldRefId = generateId() + idToRealtimePresence[fieldRefId] = field + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", fieldRefId) + }) + } + + "RealtimeChannel#errorReason" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.reason + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "RealtimeChannel#params" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.params + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "RealtimeChannel#modes" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val field = instance.modes + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "RealtimeChannel.publish_0" -> { + val messages = rpcRequest.getArg("messages").asArrayOfMessage() + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + instance.publish(messages) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RealtimeChannel.publish_1" -> { + val name = rpcRequest.getArg("name").asString() + val data = rpcRequest.getArg("data").asMessageData() + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + instance.publish(name, data) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "RealtimeChannel.subscribe_0" -> { + val refId = rpcRequest.getRefId() + val callbackId = rpcRequest.getCallbackId() + val instance = idToRealtimeChannel[refId]!! + val messageListener = MessageListener { message -> + emitCallback(callbackId, buildJsonObject { + put("type", "message") + put("message", message.toJsonElement()) + }) + } + idToSubscribeCallback[callbackId] = messageListener + instance.subscribe(messageListener) + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.subscribe_1" -> { + val events = rpcRequest.getArg("events").asArrayOfStings() + val refId = rpcRequest.getRefId() + val callbackId = rpcRequest.getCallbackId() + val instance = idToRealtimeChannel[refId]!! + val messageListener = MessageListener { message -> + emitCallback(callbackId, buildJsonObject { + put("type", "message") + put("message", message.toJsonElement()) + }) + } + idToSubscribeCallback[callbackId] = messageListener + instance.subscribe(events, messageListener) + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.unsubscribe_0" -> { + val name = rpcRequest.getArg("name").asString() + val refId = rpcRequest.getRefId() + val callbackId = rpcRequest.getCallbackId() + val instance = idToRealtimeChannel[refId]!! + instance.unsubscribe(name, idToSubscribeCallback[callbackId]!!) + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.unsubscribe_1" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + instance.unsubscribe() + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.attach" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + instance.attachAsync() + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.detach" -> { + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + instance.detachAsync() + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", refId) + }) + } + + "RealtimeChannel.history" -> { + val params = rpcRequest.getArg("params").asRequestParams() + val refId = rpcRequest.getRefId() + val instance = idToRealtimeChannel[refId]!! + val result = instance.history(params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "Connection#errorReason" -> { + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + val field = instance.reason + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "Connection#id" -> { + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + val field = instance.id + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field) + }) + } + + "Connection#key" -> { + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + val field = instance.key + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field) + }) + } + + "Connection#state" -> { + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + val field = instance.state + jsonRpcResponse(rpcRequest, buildJsonObject { + put("response", field.toJsonElement()) + }) + } + + "Connection.createRecoveryKey" -> { + + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + val result = instance.createRecoveryKey() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result) + }) + } + + "Connection.close" -> { + + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + instance.close() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + + }) + } + + "Connection.connect" -> { + + val refId = rpcRequest.getRefId() + val instance = idToConnection[refId]!! + instance.connect() + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "Connection.on" -> { + val refId = rpcRequest.getRefId() + val callbackId = rpcRequest.getCallbackId() + val events = rpcRequest.getOptionalArg("events")?.jsonArray?.map { it.asConnectionState() } + val instance = idToConnection[refId]!! + if (events != null) { + events.forEach { + instance.on(it.connectionEvent, { stateChange -> + emitCallback(callbackId, buildJsonObject { + put("type", "stateChange") + put("stateChange", stateChange.toJsonElement()) + }) + }) + } + } else { + instance.on({ stateChange -> + emitCallback(callbackId, buildJsonObject { + put("type", "stateChange") + put("stateChange", stateChange.toJsonElement()) + }) + }) + } + + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "Connection.once" -> { + val refId = rpcRequest.getRefId() + val callbackId = rpcRequest.getCallbackId() + val event = rpcRequest.getOptionalArg("event")?.asConnectionState() + val instance = idToConnection[refId]!! + if (event != null) { + instance.once(event.connectionEvent, { stateChange -> + emitCallback(callbackId, buildJsonObject { + put("type", "stateChange") + put("stateChange", stateChange.toJsonElement()) + }) + }) + } else { + instance.once({ stateChange -> + emitCallback(callbackId, buildJsonObject { + put("type", "stateChange") + put("stateChange", stateChange.toJsonElement()) + }) + }) + } + + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + }) + } + + "RestPresence.get" -> { + val params = rpcRequest.getArg("params").asRestPresenceParams() + val refId = rpcRequest.getRefId() + val instance = idToRestPresence[refId]!! + val result = instance.get(params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "RestPresence.history" -> { + val params = rpcRequest.getArg("params").asRestPresenceParams() + val refId = rpcRequest.getRefId() + val instance = idToRestPresence[refId]!! + val result = instance.history(params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "RealtimePresence.get" -> { + val params = rpcRequest.getArg("params").asPresenceParams() + val refId = rpcRequest.getRefId() + val instance = idToRealtimePresence[refId]!! + val result = instance.get(*params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + "RealtimePresence.history" -> { + val params = rpcRequest.getArg("params").asRestPresenceParams() + val refId = rpcRequest.getRefId() + val instance = idToRealtimePresence[refId]!! + val result = instance.history(params) + val resultRefId = generateId() + + jsonRpcResponse(rpcRequest, buildJsonObject { + put("refId", resultRefId) + put("response", result.toJsonElement()) + }) + } + + + else -> null + } + } + + fun emitCallback(callbackId: String, payload: JsonElement? = null) { + val callbackJson = buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", generateId()) + put("method", "callback") + put("params", buildJsonObject { + put("callbackId", callbackId) + if (payload != null) { + put("payload", payload) + } + }) + } + + runBlocking { + sendMessage.emit(callbackJson.toString()) + } + } +} diff --git a/adapter/src/main/kotlin/com/ably/java/Application.kt b/adapter/src/main/kotlin/com/ably/java/Application.kt new file mode 100644 index 000000000..9249fee72 --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/Application.kt @@ -0,0 +1,80 @@ +package com.ably.java + +import io.ably.lib.types.AblyException +import io.ktor.client.* +import io.ktor.client.plugins.logging.* +import io.ktor.client.plugins.websocket.* +import io.ktor.http.* +import io.ktor.websocket.* +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.* + +fun main() { + val client = HttpClient { + install(WebSockets) + install(Logging) { + level = LogLevel.ALL + } + } + + val sendMessageFlow = MutableSharedFlow() + val json = Json + val authCallbackHandler = AuthCallbackHandler() + val ablyRealtimeAdapter = AblyRealtimeAdapter(sendMessageFlow, authCallbackHandler) + + runBlocking { + client.webSocket(HttpMethod.Get, host = "localhost", port = 3000) { + val messageReceivedRoutine = launch { + try { + for (message in incoming) { + message as? Frame.Text ?: continue + val messageText = message.readText() + println("Received: $messageText") + val rpcRequest = json.parseToJsonElement(messageText) + try { + ablyRealtimeAdapter.handleRpcCall(rpcRequest)?.let { + sendMessageFlow.emit(it) + } ?: run { + if (rpcRequest.isJsonRpc() && rpcRequest.hasMethod()) { + sendMessageFlow.emit(notImplementedResponse(rpcRequest)) + } + } + } catch (e: Exception) { + e.printStackTrace() + println("Error while handling: $e") + + if (e is AblyException) { + sendMessageFlow.emit(ablyExceptionResponse(rpcRequest, e)) + } else { + sendMessageFlow.emit(unhandledExceptionResponse(rpcRequest, e)) + } + } + } + } catch (e: Exception) { + e.printStackTrace() + println("Error while receiving: $e") + } + } + + val messageSendRoutine = sendMessageFlow.onEach { + try { + println("Sent: $it") + send(it) + } catch (e: Exception) { + println("Error while sending: $e") + } + }.launchIn(this@runBlocking) + + send("{\"role\":\"IMPLEMENTATION\"}") + + messageSendRoutine.join() + messageReceivedRoutine.cancelAndJoin() + } + client.close() + } +} diff --git a/adapter/src/main/kotlin/com/ably/java/AuthCallbackHandler.kt b/adapter/src/main/kotlin/com/ably/java/AuthCallbackHandler.kt new file mode 100644 index 000000000..42eb7392e --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/AuthCallbackHandler.kt @@ -0,0 +1,48 @@ +package com.ably.java + +import io.ably.lib.rest.Auth.TokenCallback +import io.ktor.client.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.* + +class AuthCallbackHandler { + + val httpClient: HttpClient = HttpClient {} + val json = Json + + fun createTokenRequest(authCallbackId: String): TokenCallback { + return TokenCallback { tokenParams -> + runBlocking { + val responseBody = httpClient.post("http://localhost:3000/auth-callback") { + contentType(ContentType.Application.Json) + setBody( + buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", generateId()) + put("method", "authCallback") + put("params", buildJsonObject { + put("authCallbackId", authCallbackId) + if (tokenParams != null) { + put("tokenParams", tokenParams.toJsonElement()) + } + }) + }.toString() + ) + }.bodyAsText() + val responseJson = json.parseToJsonElement(responseBody) + val result = responseJson.jsonObject["result"]!!.jsonObject + val type = result["type"]!!.jsonPrimitive.content + val tokenResponse = result["response"]!! + return@runBlocking when (type) { + "TokenRequest" -> tokenResponse.asTokenRequest() + "TokenDetails" -> tokenResponse.asTokenDetails() + "string" -> tokenResponse.asString() + else -> throw IllegalStateException("Unexpected token type: $type") + } + } + } + } +} diff --git a/adapter/src/main/kotlin/com/ably/java/Decoders.kt b/adapter/src/main/kotlin/com/ably/java/Decoders.kt new file mode 100644 index 000000000..79ac364be --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/Decoders.kt @@ -0,0 +1,140 @@ +package com.ably.java + +import io.ably.lib.http.HttpUtils.JsonRequestBody +import io.ably.lib.realtime.ConnectionState +import io.ably.lib.rest.Auth.* +import io.ably.lib.types.ChannelOptions +import io.ably.lib.types.ClientOptions +import io.ably.lib.types.Message +import io.ably.lib.types.Param +import kotlinx.serialization.json.* + +fun JsonElement.asRequestParams(): Array { + return when (this) { + is JsonArray -> jsonArray.map { it.asRequestParam() }.toTypedArray() + else -> arrayOf(asRequestParam()) + } +} + +fun JsonElement.asRequestParam(): Param { + return Param( + jsonObject["key"]?.jsonPrimitive?.content, + jsonObject["value"]?.jsonPrimitive?.content, + ) +} + +fun JsonElement.asRequestBody() = JsonRequestBody(this.toString()) + +fun JsonElement.asClientOptions(authCallbackHandler: AuthCallbackHandler): ClientOptions { + val options = ClientOptions() + + get("clientId")?.let { options.clientId = it.asString() } + get("logLevel")?.let { options.logLevel = it.asInt() } + get("tls")?.let { options.tls = it.asBoolean() } + get("restHost")?.let { options.restHost = it.asString() } + get("realtimeHost")?.let { options.realtimeHost = it.asString() } + get("port")?.let { options.port = it.asInt() } + get("tlsPort")?.let { options.tlsPort = it.asInt() } + get("autoConnect")?.let { options.autoConnect = it.asBoolean() } + get("useBinaryProtocol")?.let { options.useBinaryProtocol = it.asBoolean() } + get("queueMessages")?.let { options.queueMessages = it.asBoolean() } + get("echoMessages")?.let { options.echoMessages = it.asBoolean() } + get("recover")?.let { options.recover = it.asString() } + get("environment")?.let { options.environment = it.asString() } + get("idempotentRestPublishing")?.let { options.idempotentRestPublishing = it.asBoolean() } + get("httpOpenTimeout")?.let { options.httpOpenTimeout = it.asInt() } + get("httpRequestTimeout")?.let { options.httpRequestTimeout = it.asInt() } + get("httpMaxRetryDuration")?.let { options.httpMaxRetryDuration = it.asInt() } + get("httpMaxRetryCount")?.let { options.httpMaxRetryCount = it.asInt() } + get("realtimeRequestTimeout")?.let { options.realtimeRequestTimeout = it.asLong() } + get("disconnectedRetryTimeout")?.let { options.disconnectedRetryTimeout = it.asLong() } + get("suspendedRetryTimeout")?.let { options.suspendedRetryTimeout = it.asLong() } + get("fallbackRetryTimeout")?.let { options.fallbackRetryTimeout = it.asLong() } + get("defaultTokenParams")?.let { options.defaultTokenParams = it.asTokenParams() } + get("channelRetryTimeout")?.let { options.channelRetryTimeout = it.asInt() } + get("asyncHttpThreadpoolSize")?.let { options.asyncHttpThreadpoolSize = it.asInt() } + get("pushFullWait")?.let { options.pushFullWait = it.asBoolean() } + get("addRequestIds")?.let { options.addRequestIds = it.asBoolean() } + get("authUrl")?.let { options.authUrl = it.asString() } + get("authMethod")?.let { options.authMethod = it.asString() } + get("key")?.let { options.key = it.asString() } + get("token")?.let { options.token = if (it is JsonPrimitive) it.asString() else it.jsonObject["token"]?.asString() } + get("queryTime")?.let { options.queryTime = it.asBoolean() } + get("useTokenAuth")?.let { options.useTokenAuth = it.asBoolean() } + get("authCallback")?.let { options.authCallback = authCallbackHandler.createTokenRequest(it.asString()) } + + return options +} + +fun JsonElement.asMessageData(): String { + return jsonPrimitive.content +} + +fun JsonElement.asArrayOfMessage(): Array = jsonArray.map { it.asMessage() }.toTypedArray() + +fun JsonElement.asMessage(): Message { + val message = Message() + get("name")?.let { message.name = it.asString() } + get("connectionKey")?.let { message.connectionKey = it.asString() } + get("id")?.let { message.id = it.asString() } + get("clientId")?.let { message.clientId = it.asString() } + get("connectionId")?.let { message.connectionId = it.asString() } + get("encoding")?.let { message.encoding = it.asString() } + get("timestamp")?.let { message.timestamp = it.asLong() } + get("connectionKey")?.let { message.connectionKey = it.asString() } + return message +} + +fun JsonElement.asTokenParams(): TokenParams { + val tokenParams = TokenParams() + get("ttl")?.let { tokenParams.ttl = it.asLong() } + get("capability")?.let { tokenParams.capability = if (it is JsonObject) it.toString() else it.asString() } + get("clientId")?.let { tokenParams.clientId = it.asString() } + get("timestamp")?.let { tokenParams.timestamp = it.asLong() } + return tokenParams +} + +fun JsonElement.asTokenDetails(): TokenDetails { + val tokenDetails = TokenDetails() + get("token")?.let { tokenDetails.token = it.asString() } + get("expires")?.let { tokenDetails.expires = it.asLong() } + get("issued")?.let { tokenDetails.issued = it.asLong() } + get("capability")?.let { tokenDetails.capability = if (it is JsonObject) it.toString() else it.asString() } + get("clientId")?.let { tokenDetails.clientId = it.asString() } + return tokenDetails +} + +fun JsonElement.asTokenRequest(): TokenRequest { + val tokenRequest = TokenRequest() + get("keyName")?.let { tokenRequest.keyName = it.asString() } + get("nonce")?.let { tokenRequest.nonce = it.asString() } + get("mac")?.let { tokenRequest.mac = it.asString() } + get("timestamp")?.let { tokenRequest.timestamp = it.asLong() } + get("ttl")?.let { tokenRequest.ttl = it.asLong() } + get("capability")?.let { tokenRequest.capability = if (it is JsonObject) it.toString() else it.asString() } + return tokenRequest +} + +fun JsonElement.asAuthOptions(): AuthOptions { + val authOptions = AuthOptions() + get("authUrl")?.let { authOptions.authUrl = it.asString() } + get("authMethod")?.let { authOptions.authMethod = it.asString() } + get("key")?.let { authOptions.key = it.asString() } + get("token")?.let { authOptions.token = it.asString() } + get("authHeaders")?.let { authOptions.authHeaders = it.asRequestHeaders() } + get("authParams")?.let { authOptions.authParams = it.asRequestParams() } + get("queryTime")?.let { authOptions.queryTime = it.asBoolean() } + get("useTokenAuth")?.let { authOptions.useTokenAuth = it.asBoolean() } + return authOptions +} + +fun JsonElement.asChannelOptions(): ChannelOptions { + val channelOptions = ChannelOptions() + get("params")?.let { channelOptions.params = it.asChannelParams() } + get("encrypted")?.let { channelOptions.encrypted = it.asBoolean() } + return channelOptions +} + +fun JsonElement.asConnectionState(): ConnectionState = ConnectionState.valueOf(asString()) + +fun JsonElement.asChannelParams(): Map = buildMap {} diff --git a/adapter/src/main/kotlin/com/ably/java/Encoders.kt b/adapter/src/main/kotlin/com/ably/java/Encoders.kt new file mode 100644 index 000000000..235d5570a --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/Encoders.kt @@ -0,0 +1,80 @@ +package com.ably.java + +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ConnectionState +import io.ably.lib.realtime.ConnectionStateListener.ConnectionStateChange +import io.ably.lib.rest.Auth.* +import io.ably.lib.types.* +import kotlinx.serialization.json.* + +fun ErrorInfo.toJsonElement(): JsonElement = buildJsonObject { + put("code", code) + put("statusCode", statusCode) + put("message", message) + put("href", href) +} + + +fun HttpPaginatedResponse.toJsonElement(): JsonElement = JsonArray(items().map { + it.toKotlin() +}) + +fun TokenDetails.toJsonElement(): JsonElement = buildJsonObject { + put("token", token) + put("expires", expires) + put("issued", issued) + put("capability", capability) + put("clientId", clientId) +} +fun TokenRequest.toJsonElement(): JsonElement = buildJsonObject { + put("keyName", keyName) + put("nonce", nonce) + put("mac", mac) + put("timestamp", timestamp) + if (ttl != 0L) put("ttl", ttl) + if (capability != null) put("capability", capability) +} + +fun PresenceMessage.toJsonElement(): JsonElement = buildJsonObject { + put("action", action.value) + put("id", id) + put("clientId", clientId) + put("connectionId", connectionId) + put("encoding", encoding) + put("timestamp", timestamp) +} + +fun Message.toJsonElement(): JsonElement = buildJsonObject { + put("id", id) + put("name", name) + put("data", data.toString()) + put("connectionKey", connectionKey) + put("clientId", clientId) + put("connectionId", connectionId) + put("encoding", encoding) + put("timestamp", timestamp) +} + +@JvmName("fromPresenceMessageToJsonElement") +fun PaginatedResult.toJsonElement(): JsonElement = JsonArray(items().map { it.toJsonElement() }) +@JvmName("fromMessageToJsonElement") +fun PaginatedResult.toJsonElement(): JsonElement = JsonArray(items().map { it.toJsonElement() }) +fun Array.toJsonElement(): JsonElement = JsonArray(map { it.toJsonElement() }) + +fun Map.toJsonElement(): JsonElement = JsonObject(mapValues { JsonPrimitive(it.value) }) +fun Array.toJsonElement() = JsonArray(map { JsonPrimitive(it.name) }) +fun ChannelState.toJsonElement() = JsonPrimitive(this.name) +fun ConnectionState.toJsonElement() = JsonPrimitive(this.name) +fun ConnectionStateChange.toJsonElement(): JsonElement = buildJsonObject { + put("previous", previous.name) + put("current", current.name) + reason?.let { put("reason", it.toJsonElement()) } + put("retryIn", retryIn) +} + +fun TokenParams.toJsonElement(): JsonElement = buildJsonObject { + if (ttl != 0L) put("ttl", ttl) + if (timestamp != 0L) put("timestamp", timestamp) + capability?.let { put("capability", it) } + clientId?.let { put("clientId", it) } +} diff --git a/adapter/src/main/kotlin/com/ably/java/Utils.kt b/adapter/src/main/kotlin/com/ably/java/Utils.kt new file mode 100644 index 000000000..e0c7d3612 --- /dev/null +++ b/adapter/src/main/kotlin/com/ably/java/Utils.kt @@ -0,0 +1,125 @@ +package com.ably.java + +import io.ably.lib.realtime.AblyRealtime +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import kotlinx.serialization.json.* +import java.util.* +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +fun generateId() = UUID.randomUUID().toString().replace("-", "") + +fun JsonElement.isJsonRpc() = jsonObject.keys.contains("jsonrpc") +fun JsonElement.hasMethod() = jsonObject.keys.contains("method") +val JsonElement.methodName get() = jsonObject["method"]?.jsonPrimitive?.content + + +fun JsonElement.getParams() = jsonObject["params"]?.jsonObject!! +fun JsonElement.getRefId() = getParams() ["refId"]?.jsonPrimitive?.content!! +fun JsonElement.getCallbackId() = getParams()["callbackId"]?.jsonPrimitive?.content!! +fun JsonElement.getArg(name: String) = getParams()["args"]!!.jsonObject[name]!! +fun JsonElement.getOptionalArg(name: String) = getParams()["args"]?.jsonObject?.get(name)?.takeUnless { it is JsonNull } +fun JsonElement.get(name: String) = jsonObject[name].takeUnless { it is JsonNull } +fun JsonElement.asString() = jsonPrimitive.content +fun JsonElement.asInt() = jsonPrimitive.int +fun JsonElement.asLong() = jsonPrimitive.long +fun JsonElement.asBoolean() = jsonPrimitive.boolean +fun JsonElement.asRequestHeaders() = asRequestParams() +fun JsonElement.asPresenceParams() = asRequestParams() +fun JsonElement.asRestPresenceParams() = asRequestParams() +fun JsonElement.asArrayOfStings() = jsonArray.map { it.asString() }.toTypedArray() + +fun notImplementedResponse(rpcRequest: JsonElement) = buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", rpcRequest.jsonObject["id"]!!) + put("error", buildJsonObject { + put("data", buildJsonObject { + put("ablyError", false) + }) + put("message", "Not implemented") + }) +}.toString() + +fun unhandledExceptionResponse(rpcRequest: JsonElement, exception: Exception) = buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", rpcRequest.jsonObject["id"]!!) + put("error", buildJsonObject { + put("data", buildJsonObject { + put("ablyError", false) + }) + put("message", exception.message) + }) +}.toString() + +fun ablyExceptionResponse(rpcRequest: JsonElement, exception: AblyException) = buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", rpcRequest.jsonObject["id"]!!) + put("error", buildJsonObject { + put("data", buildJsonObject { + put("ablyError", true) + put("errorInfo", exception.errorInfo.toJsonElement()) + }) + put("message", exception.message) + }) +}.toString() + +fun jsonRpcResponse(rpcRequest: JsonElement, result: JsonElement): String = buildJsonObject { + put("jsonrpc", JsonPrimitive("2.0")) + put("id", rpcRequest.jsonObject["id"]!!) + put("result", result) +}.toString() + +/** + * From Gson to Kotlinx + */ +fun com.google.gson.JsonElement.toKotlin(): JsonElement { + return when { + isJsonArray -> JsonArray(asJsonArray.map { it.toKotlin() }) + isJsonObject -> buildJsonObject { + val obj = asJsonObject + obj.keySet().forEach { key -> put(key, obj[key].toKotlin()) } + } + isJsonNull -> JsonNull + isJsonPrimitive -> { + val primitive = asJsonPrimitive + when { + primitive.isString -> JsonPrimitive(asString) + primitive.isNumber -> JsonPrimitive(asInt) + primitive.isBoolean -> JsonPrimitive(asBoolean) + else -> error("Unknown type") + } + } + else -> error("Unknown type") + } +} + + +suspend fun RealtimeChannel.attachAsync() = + suspendCoroutine { cont -> + val callback = object : CompletionListener { + override fun onSuccess() { + cont.resume(Unit) + } + override fun onError(errorInfo: ErrorInfo) { + cont.resumeWithException(AblyException.fromErrorInfo(errorInfo)) + } + } + attach(callback) + } + +suspend fun RealtimeChannel.detachAsync() = + suspendCoroutine { cont -> + val callback = object : CompletionListener { + override fun onSuccess() { + cont.resume(Unit) + } + override fun onError(errorInfo: ErrorInfo) { + cont.resumeWithException(AblyException.fromErrorInfo(errorInfo)) + } + } + detach(callback) + } diff --git a/adapter/src/main/resources/logback.xml b/adapter/src/main/resources/logback.xml new file mode 100644 index 000000000..0490f9d76 --- /dev/null +++ b/adapter/src/main/resources/logback.xml @@ -0,0 +1,10 @@ + + + + %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + +