Skip to content

WIP Controlling EmuHawk through Websocket Server and passthrough for Lua #4156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1fb1be1
rename WebSocketServer to WebSocketClient
austinmilt Dec 1, 2024
278116f
add minimal websocket server with cli configs
austinmilt Dec 1, 2024
2d02269
websocket message types with topics echo, error, registration
austinmilt Dec 2, 2024
7050cb5
null enable flags
austinmilt Dec 3, 2024
f8d7ed3
fix serialization of error enum
austinmilt Dec 4, 2024
a04072f
revert formatting changes
austinmilt Dec 5, 2024
c3b1cce
revert formatting changes
austinmilt Dec 5, 2024
c998719
get input options request
austinmilt Dec 6, 2024
c16c51c
minimal inputs
austinmilt Dec 6, 2024
423f6e9
add request ID to single-client messages
austinmilt Dec 26, 2024
331ed0d
toggle sticky state in StickyController
austinmilt Dec 26, 2024
bb19d92
emulator speed change message
austinmilt Dec 26, 2024
fa19719
reboot core message
austinmilt Dec 26, 2024
fb77cf8
fix DecreaseSpeed
austinmilt Dec 27, 2024
b83673e
support setting button toggles
austinmilt Dec 27, 2024
c56c809
add save state command
austinmilt Dec 27, 2024
0f7a58f
improve input processing
austinmilt Dec 29, 2024
61fb786
fix sending request ID in emulator command response
austinmilt Dec 29, 2024
b96e99c
lock concurrent stick access
austinmilt Dec 30, 2024
c1bde68
custom topic messaging
austinmilt Jan 1, 2025
14ba2d2
hooking up handlers to lua
austinmilt Jan 1, 2025
a2fd4d0
add optional request ID to custom topics
austinmilt Jan 1, 2025
e447bdf
cleanup websockets a little
austinmilt Jan 1, 2025
e04103e
add custom topic broadcast registration
austinmilt Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/BizHawk.Client.Common/Api/Classes/CommApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ namespace BizHawk.Client.Common
{
public sealed class CommApi : ICommApi
{
private static readonly WebSocketServer _wsServer = new WebSocketServer();
private static readonly WebSocketClient _wsClient = new();

private readonly (HttpCommunication? HTTP, MemoryMappedFiles MMF, SocketServer? Sockets) _networkingHelpers;
private readonly (HttpCommunication? HTTP, MemoryMappedFiles MMF, SocketServer? Sockets, WebSocketServer? WebSocketServer) _networkingHelpers;

public HttpCommunication? HTTP => _networkingHelpers.HTTP;

public MemoryMappedFiles MMF => _networkingHelpers.MMF;

public SocketServer? Sockets => _networkingHelpers.Sockets;

public WebSocketServer WebSockets => _wsServer;
public WebSocketClient WebSockets => _wsClient;

public WebSocketServer? WebSocketServer => _networkingHelpers.WebSocketServer;

public CommApi(IMainFormForApi mainForm) => _networkingHelpers = mainForm.NetworkingHelpers;

Expand Down
4 changes: 1 addition & 3 deletions src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ public interface ICommApi : IExternalApi

SocketServer? Sockets { get; }

#if ENABLE_WEBSOCKETS
WebSocketServer WebSockets { get; }
#endif
WebSocketServer? WebSocketServer { get; }

string? HttpTest();

Expand Down
9 changes: 9 additions & 0 deletions src/BizHawk.Client.Common/Api/WebSocketClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#nullable enable

namespace BizHawk.Client.Common
{
public sealed class WebSocketClient
{
public ClientWebSocketWrapper Open(Uri uri) => new(uri);
}
}
337 changes: 333 additions & 4 deletions src/BizHawk.Client.Common/Api/WebSocketServer.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,343 @@
#nullable enable

using System.Net;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Threading;
using System.Text;
using System.Collections.Generic;
using BizHawk.Client.Common.Websocket.Messages;
using BizHawk.Common.CollectionExtensions;
using BizHawk.Client.Common.Websocket;

namespace BizHawk.Client.Common
{
public sealed class WebSocketServer
{
public ClientWebSocketWrapper Open(
Uri uri,
CancellationToken cancellationToken = default/* == CancellationToken.None */)
=> new(uri, cancellationToken);
private static readonly HashSet<Topic> forcedRegistrationTopics = [ Topic.Error, Topic.Registration, Topic.GetInputOptions ];
private readonly HttpListener clientRegistrationListener;
private CancellationToken _cancellationToken = default;
private bool _running = false;
private readonly Dictionary<string, WebSocket> clients = [ ];
private readonly Dictionary<Topic, HashSet<string>> topicRegistrations = [ ];
private readonly Dictionary<string, HashSet<string>> customTopicRegistrations = [ ];
private readonly Dictionary<Topic, HashSet<Func<RequestMessageWrapper, Task<ResponseMessageWrapper?>>>> handlers = [ ];

private readonly Dictionary<string, HashSet<Func<RequestMessageWrapper, Task<ResponseMessageWrapper?>>>> customTopicHandlers = [ ];

/// <param name="host">
/// host address to register for listening to connections, defaults to <see cref="IPAddress.Loopback"/>>
/// </param>
/// <param name="port">port to register for listening to connections</param>
public WebSocketServer(IPAddress? host = null, int port = 3333)
{
clientRegistrationListener = new();
clientRegistrationListener.Prefixes.Add($"http://{host}:{port}/");
}

/// <summary>
/// Stops the server. Alternatively, use the cancellation token passed into <see cref="Start"/>.
/// The server can be restarted by calling <see cref="Start"/> again.
/// </summary>
public void Stop()
{
var cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = cancellationTokenSource.Token;
cancellationTokenSource.Cancel();
_running = false;
}

/// <summary>
/// Starts the websocket server at the configured address and registers clients.
/// </summary>
/// <param name="cancellationToken">optional cancellation token to stop the server</param>
/// <returns>async task for the server loop</returns>
public async Task Start(CancellationToken cancellationToken = default)
{
if (_running)
{
throw new InvalidOperationException("Server has already been started");
}
_cancellationToken = cancellationToken;
_running = true;

clientRegistrationListener.Start();
await ListenForAndRegisterClients();
}

private async Task ListenForAndRegisterClients()
{
while (_running && !_cancellationToken.IsCancellationRequested)
{
var context = await clientRegistrationListener.GetContextAsync();
if (context is null) return;

if (!context.Request.IsWebSocketRequest)
{
context.Response.Abort();
return;
}

var webSocketContext = await context.AcceptWebSocketAsync(subProtocol: null);
if (webSocketContext is null) return;
RegisterClient(webSocketContext.WebSocket);
}
}

private void RegisterClient(WebSocket newClient)
{
string clientId = Guid.NewGuid().ToString();
clients.Add(clientId, newClient);
_ = Task.Run(() => ClientMessageReceiveLoop(clientId), _cancellationToken);
}

private async Task ClientMessageReceiveLoop(string clientId)
{
byte[] buffer = new byte[1024];
var messageStringBuilder = new StringBuilder(2048);
var client = clients[clientId];
while (client.State == WebSocketState.Open && !_cancellationToken.IsCancellationRequested)
{
ArraySegment<byte> messageBuffer = new(buffer);
var receiveResult = await client.ReceiveAsync(messageBuffer, _cancellationToken);
if (receiveResult.Count == 0)
return;

messageStringBuilder.Append(Encoding.ASCII.GetString(buffer, 0, receiveResult.Count));
if (receiveResult.EndOfMessage)
{
string messageString = messageStringBuilder.ToString();
messageStringBuilder = new StringBuilder(2048);

try
{
var request = JsonSerde.Deserialize<RequestMessageWrapper>(messageString);
await HandleRequest(clientId, request);
}
catch (Exception e)
{
// TODO proper logging
Console.WriteLine("Error deserializing message {0} produced error {1}", messageString, e);
await SendClientGenericError(clientId);
}
}
}
}

/// <summary>
/// Broadcasts a message to all clients registered on the message's topic.
/// </summary>
/// <param name="message"> message to broadcast</param>
/// <returns>task that will complete when all clients have been sent the message</returns>
public async Task BroadcastMessage(ResponseMessageWrapper message)
{
if (message.Topic == Topic.Custom) {
await BroadcastCustomMessage(message);
} else {
var registeredClients = topicRegistrations[message.Topic];
if (registeredClients != null) {
var tasks = new Task[registeredClients.Count];
int i = 0;
foreach(string clientId in registeredClients)
{
tasks[i++] = SendClientMessage(clientId, message);
}
await Task.WhenAll(tasks);
}
}
}

private async Task BroadcastCustomMessage(ResponseMessageWrapper message)
{
var customRegistrants = new HashSet<string>(topicRegistrations[message.Topic]);
if (message.Custom == null)
{
throw new ArgumentNullException(null, "message.Custom");
}
var subtopicRegistrants = customTopicRegistrations[message.Custom.Value.SubTopic];
if ((customRegistrants != null) && (subtopicRegistrants != null)) {
customRegistrants.IntersectWith(subtopicRegistrants);
var tasks = new Task[customRegistrants.Count];
int i = 0;
foreach (string clientId in customRegistrants) {
tasks[i++] = SendClientMessage(clientId, message);
}
await Task.WhenAll(tasks);
}
}

private async Task HandleRequest(string clientId, RequestMessageWrapper request)
{
try
{
switch (request.Topic)
{
case Topic.Error:
// clients arent allowed to publish to this topic
await SendClientGenericError(clientId);
break;

case Topic.Registration:
await HandleRegistrationRequest(clientId, request.Registration!.Value);
break;

case Topic.Echo:
await HandleEchoRequest(clientId, request.Echo!.Value);
break;

case Topic.GetInputOptions:
await HandleInputOptionsRequest(clientId, request);
break;

case Topic.Input:
await HandleInputRequest(clientId, request);
break;

case Topic.EmulatorCommand:
await HandleEmulatorCommandRequest(clientId, request);
break;

case Topic.Custom:
await HandleCustomRequest(clientId, request);
break;
}

}
catch (Exception e)
{
// this could happen if, for instance, the client sent a registration request to the echo topic, such
// that we tried to access the wrong field of the wrapper
// TODO proper logging
Console.WriteLine("Error handling message {0}", e);
await SendClientGenericError(clientId);
}
}

private async Task HandleRegistrationRequest(string clientId, RegistrationRequestMessage request)
{
foreach (Topic topic in Enum.GetValues(typeof(Topic)))
{
if (forcedRegistrationTopics.Contains(topic))
{
// we dont need to keep track of topics that clients must be registered for.
continue;
}
else if (request.Topics.Contains(topic))
{
_ = topicRegistrations.GetValueOrPut(topic, (_) => [ ]).Add(clientId);
}
else
{
_ = topicRegistrations.GetValueOrDefault(topic, [ ])?.Remove(clientId);
}
}

// limiting registrations to those topics that handlers have registered under the custom
// topics prevents clients from blowing up the custom topics, and also requires them
// to subscribe after the custom handlers have started.
foreach (string topic in customTopicRegistrations.Keys)
{
if (request.CustomTopics.Contains(topic))
{
_ = customTopicRegistrations.GetValueOrPut(topic, (_) => [ ]).Add(clientId);
}
else
{
_ = customTopicRegistrations.GetValueOrDefault(topic, [ ])?.Remove(clientId);
}
}

var registeredTopics = request.Topics;
var registeredCustomTopics = request.CustomTopics;
registeredTopics.AddRange(forcedRegistrationTopics);
var responseMessage = new ResponseMessageWrapper(new RegistrationResponseMessage(
request.RequestId,
registeredTopics, registeredCustomTopics
));
await SendClientMessage(clientId, responseMessage);
}

private async Task HandleEchoRequest(string clientId, EchoRequestMessage request)
{
if (topicRegistrations.GetValueOrDefault(Topic.Echo, [ ])?.Contains(clientId) ?? false)
{
await SendClientMessage(
clientId,
new ResponseMessageWrapper(new EchoResponseMessage(request.RequestId, request.Message))
);
}
}

private async Task HandleInputOptionsRequest(string clientId, RequestMessageWrapper request)
{
foreach (var handler in handlers.GetValueOrDefault(Topic.GetInputOptions, [ ])!)
{
var response = await handler(request);
if (response is not null) {
await SendClientMessage(clientId, response.Value);
}
}
}

private async Task HandleInputRequest(string clientId, RequestMessageWrapper request)
{
foreach (var handler in handlers.GetValueOrDefault(Topic.Input, [ ])!)
{
var response = await handler(request);
if (response is not null) {
await SendClientMessage(clientId, response.Value);
}
}
}

private async Task HandleEmulatorCommandRequest(string clientId, RequestMessageWrapper request)
{
foreach (var handler in handlers.GetValueOrDefault(Topic.EmulatorCommand, [ ])!)
{
var response = await handler(request);
if (response is not null) {
await SendClientMessage(clientId, response.Value);
}
}
}

private async Task HandleCustomRequest(string clientId, RequestMessageWrapper request)
{
string? customTopic = (request.Custom?.SubTopic) ?? throw new ArgumentNullException(null, "message.Custom.SubTopic");
foreach (var handler in customTopicHandlers.GetValueOrDefault(customTopic, [ ])!)
{
var response = await handler(request);
if (response is not null) {
await SendClientMessage(clientId, response.Value);
}
}
}

// clients always get error topics
private async Task SendClientGenericError(string clientId) => await SendClientMessage(
clientId, new ResponseMessageWrapper(new ErrorMessage(ErrorType.UnknownRequest))
);

private async Task SendClientMessage(string clientId, ResponseMessageWrapper message)
{
await clients[clientId].SendAsync(
JsonSerde.Serialize(message),
WebSocketMessageType.Text,
endOfMessage: true,
_cancellationToken
);
}

public void RegisterHandler(Topic topic, Func<RequestMessageWrapper, Task<ResponseMessageWrapper?>> handler) =>
_ = handlers.GetValueOrPut(topic, (_) => [ ]).Add(handler);

public void RegisterCustomHandler(string customTopic, Func<RequestMessageWrapper, Task<ResponseMessageWrapper?>> handler)
{
_ = customTopicHandlers.GetValueOrPut(customTopic, (_) => [ ]).Add(handler);
_ = customTopicRegistrations.GetValueOrPut(customTopic, (_) => [ ]);
}

public void RegisterCustomBroadcastTopic(string customTopic) =>
_ = customTopicRegistrations.GetValueOrPut(customTopic, (_) => [ ]);
}
}
Loading