Skip to content

test: snp server integration tests #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export default tseslint.config(
},
},
{
ignores: ['dist/**', 'coverage/**', 'testing/**', 'chunk-parser/**'],
ignores: ['dist/**', 'coverage/**', 'testing/**', 'chunk-parser/**', 'src/vendored/**'],
},
{
rules: {
Expand Down
25 changes: 13 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"migrate": "ts-node node_modules/.bin/node-pg-migrate -j ts",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"vendor-secp256k1": "mkdir -p src/vendored/@noble/secp256k1 && cp node_modules/@noble/secp256k1/index.ts src/vendored/@noble/secp256k1/index.ts",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library is esm-only, which jest is unable to handle. Luckily we can simply copy the single ts source file of this library into our repo and it is compiled to cjs. This a hacky workaround, and the approach is unlikely to work with most other other esm-only dependencies. The long term solution is to move away from jest.

"generate:openapi": "rimraf ./tmp && node -r ts-node/register ./util/openapi-generator.ts",
"generate:docs": "redocly build-docs ./tmp/openapi.yaml --output ./tmp/index.html",
"generate:git-info": "rimraf .git-info && node_modules/.bin/api-toolkit-git-info",
Expand Down Expand Up @@ -55,7 +56,7 @@
"@fastify/cors": "^9.0.1",
"@fastify/swagger": "^8.15.0",
"@fastify/type-provider-typebox": "^4.1.0",
"@hirosystems/api-toolkit": "^1.8.0",
"@hirosystems/api-toolkit": "^1.9.0",
"@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1",
"@noble/secp256k1": "^2.2.3",
"@sinclair/typebox": "^0.28.17",
Expand Down
8 changes: 7 additions & 1 deletion src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { SignerMessagesEventPayload } from '../pg/types';
import { ThreadedParser } from './threaded-parser';
import { SERVER_VERSION } from '@hirosystems/api-toolkit';
import { EventEmitter } from 'node:events';

// TODO: move this into the @hirosystems/salt-n-pepper-client lib
function sanitizeRedisClientName(value: string): string {
Expand All @@ -30,6 +31,10 @@ export class EventStreamHandler {
eventStream: StacksEventStream;
threadedParser: ThreadedParser;

readonly events = new EventEmitter<{
processedMessage: [{ msgId: string }];
}>();

constructor(opts: { db: PgStore; lastMessageId: string }) {
this.db = opts.db;
const appName = sanitizeRedisClientName(
Expand All @@ -53,7 +58,7 @@ export class EventStreamHandler {
}

async handleMsg(messageId: string, timestamp: string, path: string, body: any) {
this.logger.info(`${path}: received Stacks stream event`);
this.logger.info(`${path}: received Stacks stream event, msgId: ${messageId}`);
switch (path) {
case '/new_block': {
const blockMsg = body as CoreNodeBlockMessage;
Expand Down Expand Up @@ -99,6 +104,7 @@ export class EventStreamHandler {
this.logger.warn(`Unhandled stacks stream event: ${path}`);
break;
}
this.events.emit('processedMessage', { msgId: messageId });
}

async stop(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/msg-parsing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as crypto from 'node:crypto';
import * as secp from '@noble/secp256k1';
import * as secp from '../vendored/@noble/secp256k1';
import { CoreNodeNakamotoBlockMessage, ModifiedSlot, StackerDbChunk } from './core-node-message';
import { BufferCursor } from './buffer-cursor';
import { BufferWriter } from './buffer-writer';
Expand Down
16 changes: 13 additions & 3 deletions src/event-stream/threaded-parser.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as WorkerThreads from 'node:worker_threads';
import * as path from 'node:path';
import { waiter, Waiter, logger as defaultLogger } from '@hirosystems/api-toolkit';
import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message';
import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing';
Expand All @@ -22,12 +23,21 @@
if (!WorkerThreads.isMainThread) {
throw new Error('ThreadedParser must be instantiated in the main thread');
}
this.worker = new WorkerThreads.Worker(workerFile);
const workerOpt: WorkerThreads.WorkerOptions = {};
if (path.extname(workerFile) === '.ts') {
if (process.env.NODE_ENV !== 'test') {
throw new Error(
'Worker threads are being created with ts-node outside of a test environment'
);
}

Check warning on line 32 in src/event-stream/threaded-parser.ts

View check run for this annotation

Codecov / codecov/patch

src/event-stream/threaded-parser.ts#L29-L32

Added lines #L29 - L32 were not covered by tests
workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only'];
}
this.worker = new WorkerThreads.Worker(workerFile, workerOpt);
this.worker.on('error', err => {
this.logger.error('Worker error', err);
this.logger.error(err, 'Worker error');

Check warning on line 37 in src/event-stream/threaded-parser.ts

View check run for this annotation

Codecov / codecov/patch

src/event-stream/threaded-parser.ts#L37

Added line #L37 was not covered by tests
});
this.worker.on('messageerror', err => {
this.logger.error('Worker message error', err);
this.logger.error(err, 'Worker message error');

Check warning on line 40 in src/event-stream/threaded-parser.ts

View check run for this annotation

Codecov / codecov/patch

src/event-stream/threaded-parser.ts#L40

Added line #L40 was not covered by tests
});
this.worker.on('message', (msg: ThreadedParserMsgReply) => {
const waiter = this.msgRequests.get(msg.msgId);
Expand Down
33 changes: 2 additions & 31 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { addAbortListener, EventEmitter } from 'node:events';
import { addAbortListener } from 'node:events';
import { parseISO, sub, isValid, Duration } from 'date-fns';

export const isDevEnv = process.env.NODE_ENV === 'development';
Expand All @@ -24,6 +24,7 @@ export function normalizeHexString(hexString: string): string {
return hexString.startsWith('0x') ? hexString : '0x' + hexString;
}

// This is a workaround for Node.js versions that do not support Symbol.dispose
const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose');

export function sleep(ms: number, signal?: AbortSignal): Promise<void> {
Expand Down Expand Up @@ -86,33 +87,3 @@ export type BlockIdParam =
| { type: 'height'; height: number }
| { type: 'hash'; hash: string }
| { type: 'latest'; latest: true };

/**
* Similar to `node:events.once` but with a predicate to filter events and supports typed EventEmitters
*/
export function waitForEvent<T extends Record<string, any[]>, K extends keyof T>(
emitter: EventEmitter<T>,
event: K,
predicate: (...args: T[K]) => boolean,
signal?: AbortSignal
): Promise<T[K]> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(signal.reason as Error);
return;
}
const disposable = signal ? addAbortListener(signal, onAbort) : undefined;
const handler = (...args: T[K]) => {
if (predicate(...args)) {
disposable?.[DisposeSymbol]();
(emitter as EventEmitter).off(event as string, handler);
resolve(args);
}
};
(emitter as EventEmitter).on(event as string, handler);
function onAbort() {
(emitter as EventEmitter).off(event as string, handler);
reject((signal?.reason as Error) ?? new Error('Aborted'));
}
});
}
Loading