Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ws proxy server and client #75

Merged
merged 8 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDATCCAekCFANi/vgvNCE/QMwmMvTqYzJ8KUdSMA0GCSqGSIb3DQEBCwUAMD0x
CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJXQTEQMA4GA1UEBwwHU2VhdHRsZTEPMA0G
A1UECgwGSHVza2x5MB4XDTIzMTIwNzAxMzQyMVoXDTI0MDEwNjAxMzQyMVowPTEL
MAkGA1UEBhMCVVMxCzAJBgNVBAgMAldBMRAwDgYDVQQHDAdTZWF0dGxlMQ8wDQYD
VQQKDAZIdXNrbHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaRoG5
iMYVNQBLQtyc/98xisbWT7XEbRhWkxSgGep/G/1by/Y0YiI79fFK9Bw7mlabSmmj
VnW5fI/iSGXp7WO1CWSwT5QNXpUnJ3igal38YhR1H3OCgi9VlJOiKLUi0omsKy48
fPp7mGUOxR4dSGdXMEobrOPH0PxpZpOUXmK6KKe+/iOxhw2uqN7B88vcSd9dwOEP
JgSrXU5iTrcNqqHEr+/Dj8tKXKETNly5Gi6233owqDUggcZ0wjXeysR2uM0GV25c
l7Hpar1csx6sYWfccmHoXdhMmAdyCS4dAGwN+J/jxqtb97jWqUSe8R4o/sIzu/fq
bpTB8T/Dx1fJqxXrAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAALXmKSzQNzuS9mj
FyUf9duuRPGcnRG95BBup0WQyyh8oQY8yEGPd1GnKmvBYIrLbdcuGA84IEUUC1ih
MuWrfT1KZ+7KKC6cRD7AsXHZl1hh4xlssiz6muBrhJzSvs6jhBaHnQM72FVizMjC
ET/B8GukZh2ba9nbg0JPn+IxgrDbMfBBtOPXIfZDpHsGHzoBPjrZjkHAXV8PaWWA
k57FDqOjnl/h26vujBQgOZScTvLfvcIG+h/GQwd5VRAH9wJdo6WFQlBZ9SUvWOH1
nWLbx0ohVFAOE7RV1d50bQjoASBz6zHIhR3Odd2oovtqb+cHAlhbv+I9Mkr6U6QW
VdXNs7M=
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDaRoG5iMYVNQBL
Qtyc/98xisbWT7XEbRhWkxSgGep/G/1by/Y0YiI79fFK9Bw7mlabSmmjVnW5fI/i
SGXp7WO1CWSwT5QNXpUnJ3igal38YhR1H3OCgi9VlJOiKLUi0omsKy48fPp7mGUO
xR4dSGdXMEobrOPH0PxpZpOUXmK6KKe+/iOxhw2uqN7B88vcSd9dwOEPJgSrXU5i
TrcNqqHEr+/Dj8tKXKETNly5Gi6233owqDUggcZ0wjXeysR2uM0GV25cl7Hpar1c
sx6sYWfccmHoXdhMmAdyCS4dAGwN+J/jxqtb97jWqUSe8R4o/sIzu/fqbpTB8T/D
x1fJqxXrAgMBAAECggEAI6btjHD3LcKU9DYNE8XFXnG07Y9ieJ17IrTuYwIop69a
MDK92auHvPR8f4okzGV2rPG4FHpMS0o5tDOwFcf1B75riFLPM2nWAem0Dbuh81XP
0pubAd+ivJ9Ch/OPNotd+lWpPS8KuMJZC1MOCqlnW7ni+OdB40LS36JmC49pH8+z
u2+pGq7H77nPkuk72Ma1sAqcyZmqungmpoa0q2x3EBOqq//njk74PJb+V7qcvR9g
lUwdM4Sh4HdhvmZ/C01YPC/4UcztJgowTBbKK6WdpsSwjhP60I3yPBloJr+lYMAX
YQVTrePK4JWqltvL+Bdjlz2VTd8YiXrwW7sknuXRWQKBgQD+kzw0nym9Rpg68Fa1
McWnfxXaFEh/50XY1cK3fPO5J7pdrHnZLHIBi/1q0nCo0YKb/P/8OjObhTnCa4J9
NtGhmju7QU1yTWGgNG+o2NlGrNrAvVwcWUmRyuyNczN3QCak86oteZk3jrDKL8CW
j48buonk7MP5oMp6U6Wj5PGbYwKBgQDbf0KMeSSJX05eqar9MjjHV8eAm4EvrCrZ
/Q8rGYq81dwklB/YZyf26bALSyqznbvp9sCdo58x4XPnHwwe+zWWKrsthZxsF7Fi
jW9BM9TRhvhe5IQWHywxtWvyv5eZNhpkbNLyogm7NOWwUIuE4z3j8bxi883GgoK9
rGjp56fV2QKBgQCPafKo0mF5N6Pa0DqIqRloWre8u2B8bZVzqjiflczXqgHbc6bR
KbCwHmUNILBG6oBh0A2F0mPwYQVA+b/xOkiueWzc+NTgZ6dv0Rp2THNa1VYG7qZN
ch93+pF4vkVoEMO0eXCNXctq+P+vZ2dfalB8loHIbXmZz3NBpo3R3tAdcQKBgHjb
MFRSW5i7/lXHDBwPrA2uum2IsfAC1zFh0hlEHgztoCIP4RzxZ6Lfdwww3hk6D366
W8IwlnYLkhq/EJh6bz3410kwWTl3Ljd6crivBk48B8OQBV582YOhRgfKEHnOvWdw
OBJawArxDEsxfjC0Qp6gur6tSS81KzGunbG02Me5AoGAKwTP+5uB0kdE24AvEDND
1ozkss9Dl4wjF0ucTzqwOT2iPlyM6sYEQzbjeMkpyGLVJbIWpNTSvzHj389mImRt
i2URj3r/EIrdgSLKXzxMyJc+ToM705v4OgZYAuSLQSTaSBH6C1Kuw+l8UCuWK6/p
i4X9qUqQUd9bT/Zh9nGs9G0=
-----END PRIVATE KEY-----
15 changes: 11 additions & 4 deletions src/client/realWsJsonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
RawPayloadResponse,
WsJsonRawMessage,
} from "./tdaWsJsonTypes";
import { Constructor, debugLog, findByTypeOrThrow } from "./util";
import { Constructor, debugLog, findByTypeOrThrow, throwError } from "./util";
import MulticastIterator from "obgen/multicastIterator";
import BufferedIterator from "obgen/bufferedIterator";
import { isConnectionResponse, isLoginResponse } from "./messageTypeHelpers";
Expand Down Expand Up @@ -92,7 +92,7 @@ export enum ChannelState {
ERROR,
}

const logger = debug("ws");
const logger = debug("realWsJsonClient");

const messageHandlers: WebSocketApiMessageHandler<never, any>[] = [
new CancelAlertMessageHandler(),
Expand Down Expand Up @@ -121,9 +121,9 @@ export default class RealWsJsonClient implements WsJsonClient {
private buffer = new BufferedIterator<ParsedWebSocketResponse>();
private iterator = new MulticastIterator(this.buffer);
private state = ChannelState.DISCONNECTED;
private accessToken?: string;

constructor(
private readonly accessToken: string,
private readonly socket = new WebSocket(
"wss://services.thinkorswim.com/Services/WsJson",
{
Expand All @@ -148,7 +148,10 @@ export default class RealWsJsonClient implements WsJsonClient {
)
) {}

async authenticate(): Promise<RawLoginResponseBody | null> {
async authenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.accessToken = accessToken;
const { state } = this;
switch (state) {
case ChannelState.DISCONNECTED:
Expand Down Expand Up @@ -189,6 +192,9 @@ export default class RealWsJsonClient implements WsJsonClient {
logger("⬅️\treceived %O", message);
if (isConnectionResponse(message)) {
const handler = findByTypeOrThrow(messageHandlers, LoginMessageHandler);
if (!accessToken) {
throwError("access token is required, cannot authenticate");
}
this.sendMessage(handler.buildRequest(accessToken));
} else if (isLoginResponse(message)) {
this.handleLoginResponse(message, resolve, reject);
Expand Down Expand Up @@ -356,6 +362,7 @@ export default class RealWsJsonClient implements WsJsonClient {
} else {
this.state = ChannelState.ERROR;
reject(`Login failed: ${body.message}`);
this.disconnect();
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/client/wsJsonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import {
} from "./types/alertTypes";
import { MarketDepthResponse } from "./services/marketDepthMessageHandler";
import { GetWatchlistResponse } from "./services/getWatchlistMessageHandler";
import { Disposable } from "../server/disposable";

export interface WsJsonClient {
authenticate(): Promise<RawLoginResponseBody | null>;
export interface WsJsonClient extends Disposable {
authenticate(accessToken: string): Promise<RawLoginResponseBody | null>;

isConnected(): boolean;

Expand Down Expand Up @@ -81,8 +82,6 @@ export interface WsJsonClient {

userProperties(): Promise<UserPropertiesResponse>;

disconnect(): void;

marketDepth(symbol: string): AsyncIterable<MarketDepthResponse>;

watchlist(watchlistId: number): Promise<GetWatchlistResponse>;
Expand Down
14 changes: 9 additions & 5 deletions src/client/wsJsonClientAuth.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { OAuth2Client, OAuth2Token } from "@badgateway/oauth2-client";
import { WsJsonClient } from "./wsJsonClient";
import debug from "debug";

const logger = debug("wsJsonClientAuth");

export default class WsJsonClientAuth {
private readonly oauthClient: OAuth2Client;

constructor(
private readonly wsJsonClientFactory: (accessToken: string) => WsJsonClient,
private readonly wsJsonClientFactory: () => WsJsonClient,
clientId: string,
originalFetch: typeof fetch
) {
Expand All @@ -22,21 +25,22 @@ export default class WsJsonClientAuth {
}

async authenticateWithRetry(token: OAuth2Token): Promise<AuthResult> {
const client = this.wsJsonClientFactory(token.accessToken);
const client = this.wsJsonClientFactory();
try {
await client.authenticate();
await client.authenticate(token.accessToken);
return { token, client };
} catch (e) {
return await this.refreshToken(token);
}
}

async refreshToken(token: OAuth2Token): Promise<AuthResult> {
logger("attempting token refresh");
const { oauthClient } = this;
try {
const newToken = await oauthClient.refreshToken(token);
const client = this.wsJsonClientFactory(newToken.accessToken);
await client.authenticate();
const client = this.wsJsonClientFactory();
await client.authenticate(newToken.accessToken);
// oauthClient.refreshToken() doesn't return the refresh token so we need to re-add it
const refreshedToken = { ...newToken, refreshToken: token.refreshToken };
return { token: refreshedToken, client };
Expand Down
226 changes: 226 additions & 0 deletions src/client/wsJsonClientProxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import { WsJsonClient } from "./wsJsonClient";
import { PositionsResponse } from "./services/positionsMessageHandler";
import { RawLoginResponseBody } from "./services/loginMessageHandler";
import {
CancelAlertResponse,
CreateAlertResponse,
LookupAlertsResponse,
} from "./types/alertTypes";
import { CancelOrderResponse } from "./services/cancelOrderMessageHandler";
import {
ChartRequestParams,
ChartResponse,
} from "./services/chartMessageHandler";
import { CreateAlertRequestParams } from "./services/createAlertMessageHandler";
import { MarketDepthResponse } from "./services/marketDepthMessageHandler";
import { OptionChainResponse } from "./services/optionSeriesMessageHandler";
import {
OptionChainDetailsRequest,
OptionChainDetailsResponse,
} from "./services/optionChainDetailsMessageHandler";
import { OptionSeriesQuotesResponse } from "./services/optionSeriesQuotesMessageHandler";
import {
OptionQuotesRequestParams,
OptionQuotesResponse,
} from "./services/optionQuotesMessageHandler";
import {
PlaceLimitOrderRequestParams,
PlaceOrderSnapshotResponse,
} from "./services/placeOrderMessageHandler";
import { QuotesResponse } from "./services/quotesMessageHandler";
import { OrderEventsResponse } from "./services/orderEventsMessageHandler";
import { InstrumentSearchResponse } from "./services/instrumentSearchMessageHandler";
import { UserPropertiesResponse } from "./services/userPropertiesMessageHandler";
import { GetWatchlistResponse } from "./services/getWatchlistMessageHandler";
import WebSocket from "isomorphic-ws";
import MulticastIterator from "obgen/multicastIterator";
import BufferedIterator from "obgen/bufferedIterator";
import { deferredWrap } from "obgen";
import { throwError } from "./util";
import debug from "debug";
import { ChannelState } from "./realWsJsonClient";
import { isString } from "lodash";

const logger = debug("wsClientProxy");

export const ALL_REQUESTS = [
"authenticate",
"optionChainQuotes",
"disconnect",
] as const;
type RequestType = typeof ALL_REQUESTS;
type Request = RequestType[number];

export type ProxiedRequest = {
request: Request;
args?: any[];
};

export type ProxiedResponse = ProxiedRequest & { response: unknown };

// A WsJsonClient proxy implementation that proxies requests to a WebSocket server using the provided `proxyUrl`.
export default class WsJsonClientProxy implements WsJsonClient {
private state = ChannelState.DISCONNECTED;
private buffer = new BufferedIterator<ProxiedResponse>();
private iterator = new MulticastIterator(this.buffer);
private socket?: WebSocket;

constructor(
private readonly proxyUrl: string,
private readonly options?: any
) {}

accountPositions(_: string): AsyncIterable<PositionsResponse> {
throwError("not implemented");
}

async authenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.socket = new WebSocket(this.proxyUrl, this.options);
this.state = ChannelState.CONNECTING;
this.buffer = new BufferedIterator<ProxiedResponse>();
this.iterator = new MulticastIterator(this.buffer);
const { buffer, socket } = this;
return new Promise((resolve, reject) => {
socket.onmessage = ({ data }) => {
buffer.emit(JSON.parse(data as string) as ProxiedResponse);
};
socket.onopen = () => {
logger("proxy ws connection opened");
this.state = ChannelState.CONNECTED;
this.doAuthenticate(accessToken).then((res) => {
logger("proxy ws authentication response: %O", res);
if (isString(res) && res.includes("NOT_AUTHORIZED")) {
reject(res);
} else {
resolve(res);
}
});
};
socket.onclose = (event) => {
this.state = ChannelState.DISCONNECTED;
logger("proxy ws connection closed: ", event?.reason);
reject(event?.reason);
};
socket.onerror = (err) => {
this.state = ChannelState.ERROR;
logger("proxy ws socket error: %O", err);
reject(err);
};
});
}

private doAuthenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.sendMessage({ request: "authenticate", args: [accessToken] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "authenticate")
.map(({ response }) => response)
.promise() as Promise<RawLoginResponseBody | null>;
}

cancelAlert(_: number): Promise<CancelAlertResponse> {
throwError("not implemented");
}

cancelOrder(_: number): Promise<CancelOrderResponse> {
throwError("not implemented");
}

chart(_: ChartRequestParams): AsyncIterable<ChartResponse> {
throwError("not implemented");
}

createAlert(_: CreateAlertRequestParams): Promise<CreateAlertResponse> {
throwError("not implemented");
}

disconnect(): void {
this.sendMessage({ request: "disconnect" });
}

ensureConnected(): void {
if (this.state !== ChannelState.CONNECTED) {
throw new Error("Not connected");
}
}

isConnected(): boolean {
return this.state === ChannelState.CONNECTED;
}

isConnecting(): boolean {
return this.state === ChannelState.CONNECTING;
}

lookupAlerts(): AsyncIterable<LookupAlertsResponse> {
throwError("not implemented");
}

marketDepth(_: string): AsyncIterable<MarketDepthResponse> {
throwError("not implemented");
}

optionChain(_: string): Promise<OptionChainResponse> {
throwError("not implemented");
}

optionChainDetails(
_: OptionChainDetailsRequest
): Promise<OptionChainDetailsResponse> {
throwError("not implemented");
}

optionChainQuotes(symbol: string): AsyncIterable<OptionSeriesQuotesResponse> {
this.sendMessage({ request: "optionChainQuotes", args: [symbol] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "optionChainQuotes")
.map(({ response }) => response)
.iterable() as AsyncIterable<OptionSeriesQuotesResponse>;
}

optionQuotes(
_: OptionQuotesRequestParams
): AsyncIterable<OptionQuotesResponse> {
throwError("not implemented");
}

placeOrder(
_: PlaceLimitOrderRequestParams
): Promise<PlaceOrderSnapshotResponse> {
throwError("not implemented");
}

quotes(_: string[]): AsyncIterable<QuotesResponse> {
throwError("not implemented");
}

replaceOrder(
_: Required<PlaceLimitOrderRequestParams>
): Promise<OrderEventsResponse> {
throwError("not implemented");
}

searchInstruments(_: string): Promise<InstrumentSearchResponse> {
throwError("not implemented");
}

userProperties(): Promise<UserPropertiesResponse> {
throwError("not implemented");
}

watchlist(_: number): Promise<GetWatchlistResponse> {
throwError("not implemented");
}

workingOrders(_: string): AsyncIterable<OrderEventsResponse> {
throwError("not implemented");
}

private sendMessage(request: ProxiedRequest) {
this.ensureConnected();
this.socket!.send(JSON.stringify(request));
}
}
Loading