Skip to content

Subscription listeners per subscription + support for graphql-transport-ws subprotocol #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
41 changes: 35 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -44,7 +44,8 @@ That being said, this is intended to be a primarily code based package, so keep
| WebGL |||

This should work with all platforms (Mono/IL2CPP) except for subscriptions on WebGL.
It makes use of UnityWebRequest where possible, but C# WebSockets are the main issue, so subscriptions will not properly work. If you do not need
It makes use of UnityWebRequest where possible, but C# WebSockets are the main issue, so subscriptions will not properly
work. If you do not need
subscriptions, WebGL will work just fine. Work may be added to support WebGL in the future, but for now, there is no
support.

@@ -84,10 +85,11 @@ var request = new Request
};
var responseType = new { continent = new { name = "" } };
var response = await client.Send(() => responseType, request);
Debug.Log(response.Result.Data.continent.name);
Debug.Log(response.Data.continent.name);
```

SimpleGraphQL also lets you store queries in .graphql files that you must write yourself. It is up to you to make sure they are valid. Many IDEs support this function natively or through plugins.
SimpleGraphQL also lets you store queries in .graphql files that you must write yourself. It is up to you to make sure
they are valid. Many IDEs support this function natively or through plugins.

## Configuration

@@ -124,6 +126,7 @@ public async void QueryOrMutation()
{"variable", "value"}
}),
null,
null,
"authToken",
"Bearer"
);
@@ -210,7 +213,8 @@ public void OnComplete(string result)

# Authentication and Headers

> Depending on your authentication method, it is up to you to ensure that your authentication data and headers are set correctly.
> Depending on your authentication method, it is up to you to ensure that your authentication data and headers are set
> correctly.
### Custom headers and auth tokens are natively supported in SimpleGraphQL. They can be passed in as parameters when calling `Subscribe` or `Send`.

@@ -256,16 +260,41 @@ mutation UpsertScore($user_id: String!, $level: String!, $score: bigint! $metada
}
}

subscription GetScoresForLevel($level: String!) {
query ListLevelScores($level: String!) {
leaderboards(where: {level: {_eq: $level}}) {
user_id
level
score
metadata
}
}
```

### Subscriptions.graphql

```graphql
subscription OnScoresUpdated($level: String!) {
leaderboards(where: {level: {_eq: $level}}) {
user_id
level
score
metadata
}
}

subscription OnAnyScoresUpdated {
leaderboards {
user_id
level
score
metadata
}
}
```

> NOTE: We recommend putting graphQL subscriptions in a separate file. Mixing queries, mutations, and subscriptions
> together in one file may lead to odd/undocumented behavior on various servers.
# Things to Note

- During testing, we found that Unity's version of .NET occasionally has issues with HttpClient and WebSocket. If you
@@ -275,4 +304,4 @@ subscription GetScoresForLevel($level: String!) {
been fixed in a recent .NET version (but we don't have those fixes yet.)

<!-- ## Auth with Hasura
TBA -->
TBA -->
111 changes: 79 additions & 32 deletions Runtime/SimpleGraphQL/GraphQLClient.cs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
using System.Threading.Tasks;
using JetBrains.Annotations;
using Newtonsoft.Json;
using UnityEngine;

namespace SimpleGraphQL
{
@@ -20,6 +21,9 @@ public class GraphQLClient
public string Endpoint;
public string AuthScheme;

// track the running subscriptions ids
internal HashSet<string> RunningSubscriptions;

public GraphQLClient(
string endpoint,
IEnumerable<Query> queries = null,
@@ -31,6 +35,7 @@ public GraphQLClient(
AuthScheme = authScheme;
SearchableQueries = queries?.ToList();
CustomHeaders = headers;
RunningSubscriptions = new HashSet<string>();
}

public GraphQLClient(GraphQLConfig config)
@@ -39,18 +44,21 @@ public GraphQLClient(GraphQLConfig config)
SearchableQueries = config.Files.SelectMany(x => x.Queries).ToList();
CustomHeaders = config.CustomHeaders.ToDictionary(header => header.Key, header => header.Value);
AuthScheme = config.AuthScheme;
RunningSubscriptions = new HashSet<string>();
}

/// <summary>
/// Send a query!
/// </summary>
/// <param name="request">The request you are sending.</param>
/// <param name="serializerSettings"></param>
/// <param name="headers">Any headers you want to pass</param>
/// <param name="authToken">The authToken</param>
/// <param name="authScheme">The authScheme to be used.</param>
/// <returns></returns>
public async Task<string> Send(
Request request,
JsonSerializerSettings serializerSettings = null,
Dictionary<string, string> headers = null,
string authToken = null,
string authScheme = null
@@ -74,6 +82,7 @@ public async Task<string> Send(
string postQueryAsync = await HttpUtils.PostRequest(
Endpoint,
request,
serializerSettings,
headers,
authToken,
authScheme
@@ -84,23 +93,25 @@ public async Task<string> Send(

public async Task<Response<TResponse>> Send<TResponse>(
Request request,
JsonSerializerSettings serializerSettings = null,
Dictionary<string, string> headers = null,
string authToken = null,
string authScheme = null
)
{
string json = await Send(request, headers, authToken, authScheme);
string json = await Send(request, serializerSettings, headers, authToken, authScheme);
return JsonConvert.DeserializeObject<Response<TResponse>>(json);
}

public async Task<Response<TResponse>> Send<TResponse>(
Func<TResponse> responseTypeResolver,
Request request,
JsonSerializerSettings serializerSettings = null,
Dictionary<string, string> headers = null,
string authToken = null,
string authScheme = null)
{
return await Send<TResponse>(request, headers, authToken, authScheme);
return await Send<TResponse>(request, serializerSettings, headers, authToken, authScheme);
}

/// <summary>
@@ -112,6 +123,21 @@ public void RegisterListener(Action<string> listener)
HttpUtils.SubscriptionDataReceived += listener;
}

public void RegisterListener(string id, Action<string> listener)
{
if (!HttpUtils.SubscriptionDataReceivedPerChannel.ContainsKey(id))
{
HttpUtils.SubscriptionDataReceivedPerChannel[id] = null;
}

HttpUtils.SubscriptionDataReceivedPerChannel[id] += listener;
}

public void RegisterListener(Request request, Action<string> listener)
{
RegisterListener(request.Query.ToMurmur2Hash().ToString(), listener);
}

/// <summary>
/// Unregisters a listener for subscriptions.
/// </summary>
@@ -121,6 +147,19 @@ public void UnregisterListener(Action<string> listener)
HttpUtils.SubscriptionDataReceived -= listener;
}

public void UnregisterListener(string id, Action<string> listener)
{
if (HttpUtils.SubscriptionDataReceivedPerChannel.ContainsKey(id))
{
HttpUtils.SubscriptionDataReceivedPerChannel[id] -= listener;
}
}

public void UnregisterListener(Request request, Action<string> listener)
{
UnregisterListener(request.Query.ToMurmur2Hash().ToString(), listener);
}

/// <summary>
/// Subscribe to a query in GraphQL.
/// </summary>
@@ -138,28 +177,7 @@ public async Task<bool> Subscribe(
string protocol = "graphql-ws"
)
{
if (CustomHeaders != null)
{
if (headers == null) headers = new Dictionary<string, string>();

foreach (KeyValuePair<string, string> header in CustomHeaders)
{
headers.Add(header.Key, header.Value);
}
}

if (authScheme == null)
{
authScheme = AuthScheme;
}

if (!HttpUtils.IsWebSocketReady())
{
// Prepare the socket before continuing.
await HttpUtils.WebSocketConnect(Endpoint, headers, authToken, authScheme, protocol);
}

return await HttpUtils.WebSocketSubscribe(request.Query.ToMurmur2Hash().ToString(), request);
return await Subscribe(request.Query.ToMurmur2Hash().ToString(), request, headers, authToken, authScheme, protocol);
}

/// <summary>
@@ -198,11 +216,27 @@ public async Task<bool> Subscribe(

if (!HttpUtils.IsWebSocketReady())
{
Debug.Log("websocket not ready: open connection");
// Prepare the socket before continuing.
await HttpUtils.WebSocketConnect(Endpoint, headers, authToken, authScheme, protocol);
}

return await HttpUtils.WebSocketSubscribe(id, request);
bool success = await HttpUtils.WebSocketSubscribe(id, request);
if (success)
{
RunningSubscriptions.Add(id);
}
else
{
// if no other subscriptions exist, close connection again
if (RunningSubscriptions.Count == 0)
{
Debug.Log("No running subscription remain: close connection");
await HttpUtils.WebSocketDisconnect();
}
}
return success;

}


@@ -212,13 +246,7 @@ public async Task<bool> Subscribe(
/// <param name="request"></param>
public async Task Unsubscribe(Request request)
{
if (!HttpUtils.IsWebSocketReady())
{
// Socket is already apparently closed, so this wouldn't work anyways.
return;
}

await HttpUtils.WebSocketUnsubscribe(request.Query.ToMurmur2Hash().ToString());
await Unsubscribe(request.Query.ToMurmur2Hash().ToString());
}

/// <summary>
@@ -233,7 +261,26 @@ public async Task Unsubscribe(string id)
return;
}

// when unsubscribing an unexisting id (or already unsubscribed)
if (!RunningSubscriptions.Contains(id))
{
Debug.LogError("Attempted to unsubscribe to a query without subscribing first!");
return;
}

// TODO: what if this fails?
await HttpUtils.WebSocketUnsubscribe(id);

RunningSubscriptions.Remove(id);

// if no active subscriptions remain, stop the connection
// this will also stop the update loop
if (RunningSubscriptions.Count == 0)
{
Debug.Log("No running subscription remain: close connection");
await HttpUtils.WebSocketDisconnect();
Debug.Log("connection closed");
}
}

/// <summary>
147 changes: 99 additions & 48 deletions Runtime/SimpleGraphQL/HttpUtils.cs
Original file line number Diff line number Diff line change
@@ -22,11 +22,14 @@ public static class HttpUtils
/// </summary>
public static event Action<string> SubscriptionDataReceived;

public static Dictionary<string, Action<string>> SubscriptionDataReceivedPerChannel;

[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
public static void PreInit()
{
_webSocket?.Dispose();
SubscriptionDataReceived = null;
SubscriptionDataReceivedPerChannel = new Dictionary<string, Action<string>>();
}

/// <summary>
@@ -45,19 +48,21 @@ public static void Dispose()
/// <param name="request">The GraphQL request</param>
/// <param name="authScheme">The authentication scheme to be used.</param>
/// <param name="authToken">The actual auth token.</param>
/// <param name="serializerSettings"></param>
/// <param name="headers">Any headers that should be passed in</param>
/// <returns></returns>
public static async Task<string> PostRequest(
string url,
Request request,
JsonSerializerSettings serializerSettings = null,
Dictionary<string, string> headers = null,
string authToken = null,
string authScheme = null
)
{
var uri = new Uri(url);

byte[] payload = request.ToBytes();
byte[] payload = request.ToBytes(serializerSettings);

using (var webRequest = new UnityWebRequest(uri, "POST")
{
@@ -138,10 +143,30 @@ public static async Task WebSocketConnect(
_webSocket = new ClientWebSocket();
_webSocket.Options.AddSubProtocol(protocol);

var payload = new Dictionary<string, string>();

if (protocol == "graphql-transport-ws")
{
payload["content-type"] = "application/json";
}
else
{
_webSocket.Options.SetRequestHeader("Content-Type", "application/json");
}

if (authToken != null)
_webSocket.Options.SetRequestHeader("Authorization", $"{authScheme} {authToken}");
{
if (protocol == "graphql-transport-ws")
{
// set Authorization as payload
payload["Authorization"] = $"{authScheme} {authToken}";
}
else
{
_webSocket.Options.SetRequestHeader("Authorization", $"{authScheme} {authToken}");
}
}

_webSocket.Options.SetRequestHeader("Content-Type", "application/json");

if (headers != null)
{
@@ -156,12 +181,23 @@ public static async Task WebSocketConnect(
Debug.Log("Websocket is connecting");
await _webSocket.ConnectAsync(uri, CancellationToken.None);

string json = JsonConvert.SerializeObject(
new
{
type = "connection_init",
payload = payload
},
Formatting.None,
new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
}
);

Debug.Log("Websocket is starting");
// Initialize the socket at the server side
await _webSocket.SendAsync(
new ArraySegment<byte>(
Encoding.UTF8.GetBytes(@"{""type"":""connection_init"",""payload"": {}}")
),
new ArraySegment<byte>(Encoding.UTF8.GetBytes(json)),
WebSocketMessageType.Text,
true,
CancellationToken.None
@@ -190,6 +226,7 @@ public static async Task WebSocketDisconnect()
}

await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Socket closed.", CancellationToken.None);
Dispose();
}

/// <summary>
@@ -210,7 +247,7 @@ public static async Task<bool> WebSocketSubscribe(string id, Request request)
new
{
id,
type = "start",
type = _webSocket.SubProtocol == "graphql-transport-ws" ? "subscribe" : "start",
payload = new
{
query = request.Query,
@@ -248,8 +285,10 @@ public static async Task WebSocketUnsubscribe(string id)
return;
}

string type = _webSocket.SubProtocol == "graphql-transport-ws" ? "complete" : "stop";

await _webSocket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes($@"{{""type"":""stop"",""id"":""{id}""}}")),
new ArraySegment<byte>(Encoding.UTF8.GetBytes($@"{{""type"":""{type}"",""id"":""{id}""}}")),
WebSocketMessageType.Text,
true,
CancellationToken.None
@@ -260,8 +299,14 @@ private static async void WebSocketUpdate()
{
while (true)
{
ArraySegment<byte> buffer;
buffer = WebSocket.CreateClientBuffer(1024, 1024);
// break the loop as soon as the websocket was closed
if (!IsWebSocketReady())
{
Debug.Log("websocket was closed, stop the loop");
break;
}

ArraySegment<byte> buffer = WebSocket.CreateClientBuffer(1024, 1024);

if (buffer.Array == null)
{
@@ -292,57 +337,63 @@ private static async void WebSocketUpdate()
}

var msgType = (string)jsonObj["type"];
var id = (string)jsonObj["id"];
switch (msgType)
{
case "connection_error":
{
throw new WebSocketException("Connection error. Error: " + jsonResult);
}
{
throw new WebSocketException("Connection error. Error: " + jsonResult);
}
case "connection_ack":
{
Debug.Log("Websocket connection acknowledged.");
continue;
}
{
Debug.Log($"Websocket connection acknowledged ({id}).");
continue;
}
case "data":
case "next":
{
JToken jToken = jsonObj["payload"];

if (jToken != null)
{
SubscriptionDataReceived?.Invoke(jToken.ToString());
}
JToken jToken = jsonObj["payload"];

continue;
}
if (jToken != null)
{
SubscriptionDataReceived?.Invoke(jToken.ToString());

if (id != null)
{
SubscriptionDataReceivedPerChannel?[id]?.Invoke(jToken.ToString());
}
}

continue;
}
case "error":
{
throw new WebSocketException("Handshake error. Error: " + jsonResult);
}
{
throw new WebSocketException("Handshake error. Error: " + jsonResult);
}
case "complete":
{
Debug.Log("Server sent complete, it's done sending data.");
break;
}
{
Debug.Log("Server sent complete, it's done sending data.");
continue;
}
case "ka":
{
// stayin' alive, stayin' alive
continue;
}
{
// stayin' alive, stayin' alive
continue;
}
case "subscription_fail":
{
throw new WebSocketException("Subscription failed. Error: " + jsonResult);
}
{
throw new WebSocketException("Subscription failed. Error: " + jsonResult);
}
case "ping":
{
await _webSocket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes($@"{{""type"":""pong""}}")),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
continue;
}
{
await _webSocket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes($@"{{""type"":""pong""}}")),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
continue;
}
}

break;
20 changes: 14 additions & 6 deletions Runtime/SimpleGraphQL/Request.cs
Original file line number Diff line number Diff line change
@@ -26,18 +26,26 @@ public override string ToString()
[PublicAPI]
public static class RequestExtensions
{
public static byte[] ToBytes(this Request request)
private static JsonSerializerSettings defaultSerializerSettings = new JsonSerializerSettings
{ NullValueHandling = NullValueHandling.Ignore };

public static byte[] ToBytes(this Request request, JsonSerializerSettings serializerSettings = null)
{
return Encoding.UTF8.GetBytes(request.ToJson());
return Encoding.UTF8.GetBytes(request.ToJson(false, serializerSettings));
}

public static string ToJson(this Request request,
bool prettyPrint = false)
public static string ToJson(this Request request, bool prettyPrint = false,
JsonSerializerSettings serializerSettings = null)
{
if (serializerSettings == null)
{
serializerSettings = defaultSerializerSettings;
}

return JsonConvert.SerializeObject
( request,
(request,
prettyPrint ? Formatting.Indented : Formatting.None,
new JsonSerializerSettings {NullValueHandling = NullValueHandling.Ignore}
serializerSettings
);
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "com.lastabyss.simplegraphql",
"version": "1.3.2",
"version": "2.1.0",
"displayName": "SimpleGraphQL",
"description": "A simple graphQL client that allows one to use .graphql files (or code) for queries, mutations, and subscriptions with Unity.",
"unity": "2019.4",