Skip to content

Commit

Permalink
chore(): misc cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagosiebler committed Jan 21, 2025
1 parent fcffd85 commit 39ce4c4
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 116 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 21 additions & 58 deletions src/util/BaseWSClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ export abstract class BaseWebsocketClient<

private timeOffsetMs: number = 0;

// private pendingTopicsSubscriptionsOld: TopicsPendingSubscriptions[] = [];

private pendingTopicSubscriptionRequests: {
[key in TWSKey]?: {
[requestKey: string]:
Expand Down Expand Up @@ -259,52 +257,32 @@ export abstract class BaseWebsocketClient<
params: any,
): Promise<unknown>;

protected getTimeOffsetMs() {
public getTimeOffsetMs() {
return this.timeOffsetMs;
}

protected setTimeOffsetMs(newOffset: number) {
public setTimeOffsetMs(newOffset: number) {
this.timeOffsetMs = newOffset;
}

// protected upsertPendingTopicsSubscriptionsOld(
// wsKey: string,
// topicKey: string,
// resolver: TopicsPendingSubscriptionsResolver,
// rejector: TopicsPendingSubscriptionsRejector,
// ) {
// const existingWsKeyPendingSubscriptions =
// this.pendingTopicsSubscriptionsOld.find((s) => s.wsKey === wsKey);

// if (!existingWsKeyPendingSubscriptions) {
// this.pendingTopicsSubscriptionsOld.push({
// wsKey,
// resolver,
// rejector,
// failedTopicsSubscriptions: new Set(),
// pendingTopicsSubscriptions: new Set([topicKey]),
// });
// return;
// }

// existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey);
// }

protected upsertPendingTopicSubscribeRequests(
wsKey: TWSKey,
requestData: MidflightWsRequestEvent<TWSRequestEvent>,
) {
private getWsKeyPendingSubscriptionStore(wsKey: TWSKey) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}

const existingWsKeyPendingRequests =
this.pendingTopicSubscriptionRequests[wsKey]!;
return this.pendingTopicSubscriptionRequests[wsKey]!;
}

protected upsertPendingTopicSubscribeRequests(
wsKey: TWSKey,
requestData: MidflightWsRequestEvent<TWSRequestEvent>,
) {
// a unique identifier for this subscription request (e.g. csv of topics, or request id, etc)
const requestKey = requestData.requestKey;

// Should not be possible to see a requestKey collision in the current design, since the req ID increments automatically with every request, so this should never be true, but just in case a future mistake happens...
const existingWsKeyPendingRequests =
this.getWsKeyPendingSubscriptionStore(wsKey);
if (existingWsKeyPendingRequests[requestKey]) {
throw new Error(
'Implementation error: attempted to upsert pending topics with duplicate request ID!',
Expand All @@ -316,10 +294,8 @@ export abstract class BaseWebsocketClient<
resolver: TopicsPendingSubscriptionsResolver<TWSRequestEvent>,
rejector: TopicsPendingSubscriptionsRejector<TWSRequestEvent>,
) => {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
this.pendingTopicSubscriptionRequests[wsKey][requestKey] = {
const store = this.getWsKeyPendingSubscriptionStore(wsKey);
store[requestKey] = {
requestData: requestData.requestEvent,
resolver,
rejector,
Expand All @@ -329,11 +305,8 @@ export abstract class BaseWebsocketClient<
}

protected removeTopicPendingSubscription(wsKey: TWSKey, requestKey: string) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}

delete this.pendingTopicSubscriptionRequests[wsKey][requestKey];
const store = this.getWsKeyPendingSubscriptionStore(wsKey);
delete store[requestKey];
}

private clearTopicsPendingSubscriptions(
Expand All @@ -342,13 +315,9 @@ export abstract class BaseWebsocketClient<
rejectReason: string,
) {
if (rejectAll) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}

const requests = this.pendingTopicSubscriptionRequests[wsKey]!;
for (const requestKey in requests) {
const request = requests[requestKey];
const wsKeyPendingRequests = this.getWsKeyPendingSubscriptionStore(wsKey);
for (const requestKey in wsKeyPendingRequests) {
const request = wsKeyPendingRequests[requestKey];
request?.rejector(request.requestData, rejectReason);
}
}
Expand All @@ -367,21 +336,16 @@ export abstract class BaseWebsocketClient<
msg: object,
isTopicSubscriptionSuccessEvent: boolean,
) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
const wsKeyPendingRequests = this.getWsKeyPendingSubscriptionStore(wsKey);
if (!wsKeyPendingRequests) {
return;
}

const pendingSubscriptionRequest =
this.pendingTopicSubscriptionRequests[wsKey][requestKey];
const pendingSubscriptionRequest = wsKeyPendingRequests[requestKey];
if (!pendingSubscriptionRequest) {
return;
}

console.log('updatePendingTopicSubscriptionStatus', {
isTopicSubscriptionSuccessEvent,
msg,
});

if (isTopicSubscriptionSuccessEvent) {
pendingSubscriptionRequest.resolver(
pendingSubscriptionRequest.requestData,
Expand Down Expand Up @@ -413,7 +377,6 @@ export abstract class BaseWebsocketClient<
wsTopicRequests: WsTopicRequestOrStringTopic<string>[],
wsKey: TWSKey,
) {
console.log('subscribeTopicsForWsKey: ', { wsTopicRequests, wsKey });
const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests);

// Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically
Expand Down
103 changes: 48 additions & 55 deletions src/websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,46 @@ import {

const WS_LOGGER_CATEGORY = { category: 'bybit-ws' };

/**
* Groups topics in request into per-wsKey groups
* @param normalisedTopicRequests
* @param wsKey
* @param isPrivateTopic
* @returns
*/
function getTopicsPerWSKey(
normalisedTopicRequests: WsTopicRequest[],
wsKey?: WsKey,
isPrivateTopic?: boolean,
): {
[key in WsKey]?: WsTopicRequest<WsTopic>[];
} {
const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};

// Sort into per wsKey arrays, in case topics are mixed together for different wsKeys
for (const topicRequest of normalisedTopicRequests) {
const derivedWsKey =
wsKey ||
getWsKeyForTopic(
this.options.market,
topicRequest.topic,
isPrivateTopic,
topicRequest.category,
);

if (
!perWsKeyTopics[derivedWsKey] ||
!Array.isArray(perWsKeyTopics[derivedWsKey])
) {
perWsKeyTopics[derivedWsKey] = [];
}

perWsKeyTopics[derivedWsKey]!.push(topicRequest);
}

return perWsKeyTopics;
}

// export class WebsocketClient extends EventEmitter {
export class WebsocketClient extends BaseWebsocketClient<
WsKey,
Expand Down Expand Up @@ -183,12 +223,12 @@ export class WebsocketClient extends BaseWebsocketClient<
category: CategoryV5,
isPrivateTopic?: boolean,
): Promise<unknown>[] {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
const topicRequests = Array.isArray(wsTopics) ? wsTopics : [wsTopics];

const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};

// Sort into per-WsKey batches, in case there is a mix of topics here
for (const topic of topics) {
for (const topic of topicRequests) {
const derivedWsKey = getWsKeyForTopic(
this.options.market,
topic,
Expand All @@ -208,7 +248,7 @@ export class WebsocketClient extends BaseWebsocketClient<
perWsKeyTopics[derivedWsKey] = [];
}

perWsKeyTopics[derivedWsKey].push(wsRequest);
perWsKeyTopics[derivedWsKey]!.push(wsRequest);
}

const promises: Promise<unknown>[] = [];
Expand Down Expand Up @@ -245,12 +285,12 @@ export class WebsocketClient extends BaseWebsocketClient<
category: CategoryV5,
isPrivateTopic?: boolean,
): Promise<unknown>[] {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
const topicRequests = Array.isArray(wsTopics) ? wsTopics : [wsTopics];

const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};

// Sort into per-WsKey batches, in case there is a mix of topics here
for (const topic of topics) {
for (const topic of topicRequests) {
const derivedWsKey = getWsKeyForTopic(
this.options.market,
topic,
Expand All @@ -270,7 +310,7 @@ export class WebsocketClient extends BaseWebsocketClient<
perWsKeyTopics[derivedWsKey] = [];
}

perWsKeyTopics[derivedWsKey].push(wsRequest);
perWsKeyTopics[derivedWsKey]!.push(wsRequest);
}

const promises: Promise<unknown>[] = [];
Expand Down Expand Up @@ -315,30 +355,7 @@ export class WebsocketClient extends BaseWebsocketClient<
const topicRequests = Array.isArray(requests) ? requests : [requests];
const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests);

const isPrivateTopic = undefined;

const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};

// Sort into per wsKey arrays, in case topics are mixed together for different wsKeys
for (const topicRequest of normalisedTopicRequests) {
const derivedWsKey =
wsKey ||
getWsKeyForTopic(
this.options.market,
topicRequest.topic,
isPrivateTopic,
topicRequest.category,
);

if (
!perWsKeyTopics[derivedWsKey] ||
!Array.isArray(perWsKeyTopics[derivedWsKey])
) {
perWsKeyTopics[derivedWsKey] = [];
}

perWsKeyTopics[derivedWsKey].push(topicRequest);
}
const perWsKeyTopics = getTopicsPerWSKey(normalisedTopicRequests, wsKey);

// Batch sub topics per ws key
for (const wsKey in perWsKeyTopics) {
Expand All @@ -364,30 +381,7 @@ export class WebsocketClient extends BaseWebsocketClient<
const topicRequests = Array.isArray(requests) ? requests : [requests];
const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests);

const isPrivateTopic = undefined;

const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};

// Sort into per wsKey arrays, in case topics are mixed together for different wsKeys
for (const topicRequest of normalisedTopicRequests) {
const derivedWsKey =
wsKey ||
getWsKeyForTopic(
this.options.market,
topicRequest.topic,
isPrivateTopic,
topicRequest.category,
);

if (
!perWsKeyTopics[derivedWsKey] ||
!Array.isArray(perWsKeyTopics[derivedWsKey])
) {
perWsKeyTopics[derivedWsKey] = [];
}

perWsKeyTopics[derivedWsKey].push(topicRequest);
}
const perWsKeyTopics = getTopicsPerWSKey(normalisedTopicRequests, wsKey);

// Batch sub topics per ws key
for (const wsKey in perWsKeyTopics) {
Expand Down Expand Up @@ -811,7 +805,6 @@ export class WebsocketClient extends BaseWebsocketClient<

// WS API Exception
if (isError) {
// console.log('wsAPI error: ', parsed);
try {
this.getWsStore().rejectDeferredPromise(
wsKey,
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
"baseUrl": ".",
"outDir": "./lib"
},
"include": ["src/**/*", "src/.ts"],
"include": ["src/**/*"],
"exclude": ["node_modules", "**/node_modules/*", "coverage", "doc"]
}

0 comments on commit 39ce4c4

Please sign in to comment.