Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics refactor #221

Merged
merged 28 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6a16c3d
Added basic metrics interfaces, a metrics factory and an implementati…
Rentacookie Mar 3, 2025
82385bd
Added creation and initialization helper methods for the existing rep…
Rentacookie Mar 3, 2025
c655147
Updated service runners to initialize the MetricsEngine and the exist…
Rentacookie Mar 3, 2025
437bd7a
Updated replication modules to use the MetricsEngine instead of the o…
Rentacookie Mar 3, 2025
4ab4112
Updated tests
Rentacookie Mar 3, 2025
8e9f75b
Updated Opentelemetry dependencies
Rentacookie Mar 3, 2025
5dc7599
Remove unused import
Rentacookie Mar 3, 2025
816081a
Merge branch 'main' into metrics-refactor
Rentacookie Mar 3, 2025
9495d49
Branch update fixes
Rentacookie Mar 3, 2025
419e08b
Merge branch 'main' into metrics-refactor
Rentacookie Mar 4, 2025
d239244
Added changeset
Rentacookie Mar 4, 2025
d7fab45
Moved metric enums to types package
Rentacookie Mar 6, 2025
bf0e399
Updated changeset
Rentacookie Mar 6, 2025
0398ec8
Merge branch 'main' into metrics-refactor
Rentacookie Mar 6, 2025
04b3d4f
Export metric types as named export
Rentacookie Mar 7, 2025
60801d9
Keep original metrics type export as well
Rentacookie Mar 7, 2025
ec896f3
Made metric enum names consistent
Rentacookie Mar 7, 2025
247ff3a
Merge branch 'main' into metrics-refactor
Rentacookie Mar 10, 2025
58a021a
Merge branch 'main' into metrics-refactor
Rentacookie Mar 20, 2025
e0bea96
Merge branch 'main' into metrics-refactor
Rentacookie Mar 20, 2025
38744b3
Merge branch 'main' into metrics-refactor
Rentacookie Mar 25, 2025
c1ccf73
Merge branch 'main' into metrics-refactor
Rentacookie Mar 25, 2025
9c8ed97
Move metrics registration to service-core
Rentacookie Mar 26, 2025
ed8e76e
Removed unused dependencies in service package
Rentacookie Mar 26, 2025
233b8b2
Merge branch 'main' into metrics-refactor
Rentacookie Mar 26, 2025
af146f6
Only set up data replication metrics recorder for PostgresModule when…
Rentacookie Mar 26, 2025
511ff94
Merge branch 'main' into metrics-refactor
Rentacookie Apr 1, 2025
7c358b8
Moved the prometheus metrics port from an env variable to the collect…
Rentacookie Apr 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/beige-camels-reply.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/service-types': minor
---

Refactored Metrics to use a MetricsEngine which is telemetry framework agnostic.
5 changes: 2 additions & 3 deletions modules/module-mongodb-storage/test/src/setup.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { container } from '@powersync/lib-services-framework';
import { test_utils } from '@powersync/service-core-tests';
import { beforeAll, beforeEach } from 'vitest';
import { METRICS_HELPER } from '@powersync/service-core-tests';

beforeAll(async () => {
// Executes for every test file
container.registerDefaults();
await test_utils.initMetrics();
});

beforeEach(async () => {
await test_utils.resetMetrics();
METRICS_HELPER.resetMetrics();
});
3 changes: 3 additions & 0 deletions modules/module-mongodb/src/module/MongoModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ export class MongoModule extends replication.ReplicationModule<types.MongoConnec
id: this.getDefaultId(normalisedConfig.database ?? ''),
syncRuleProvider: syncRuleProvider,
storageEngine: context.storageEngine,
metricsEngine: context.metricsEngine,
connectionFactory: connectionFactory,
rateLimiter: new MongoErrorRateLimiter()
});
}

async onInitialized(context: system.ServiceContextContainer): Promise<void> {}

/**
* Combines base config with normalized connection settings
*/
Expand Down
17 changes: 7 additions & 10 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,20 @@ import {
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import {
BSON_DESERIALIZE_DATA_OPTIONS,
Metrics,
SaveOperationTag,
SourceEntityDescriptor,
SourceTable,
storage
} from '@powersync/service-core';
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { ReplicationMetric } from '@powersync/service-types';

export interface ChangeStreamOptions {
connections: MongoManager;
storage: storage.SyncRulesBucketStorage;
metrics: MetricsEngine;
abort_signal: AbortSignal;
}

Expand Down Expand Up @@ -59,13 +54,15 @@ export class ChangeStream {
private connections: MongoManager;
private readonly client: mongo.MongoClient;
private readonly defaultDb: mongo.Db;
private readonly metrics: MetricsEngine;

private abort_signal: AbortSignal;

private relation_cache = new Map<string | number, storage.SourceTable>();

constructor(options: ChangeStreamOptions) {
this.storage = options.storage;
this.metrics = options.metrics;
this.group_id = options.storage.group_id;
this.connections = options.connections;
this.client = this.connections.client;
Expand Down Expand Up @@ -318,7 +315,7 @@ export class ChangeStream {
}

at += docBatch.length;
Metrics.getInstance().rows_replicated_total.add(docBatch.length);
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(docBatch.length);
const duration = performance.now() - lastBatch;
lastBatch = performance.now();
logger.info(
Expand Down Expand Up @@ -446,7 +443,7 @@ export class ChangeStream {
return null;
}

Metrics.getInstance().rows_replicated_total.add(1);
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
if (change.operationType == 'insert') {
const baseRecord = constructAfterRecord(change.fullDocument);
return await batch.save({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
const stream = new ChangeStream({
abort_signal: this.abortController.signal,
storage: this.options.storage,
metrics: this.options.metrics,
connections: connectionManager
});
await stream.replicate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator<Chang
return new ChangeStreamReplicationJob({
id: this.createJobId(options.storage.group_id),
storage: options.storage,
metrics: this.metrics,
connectionFactory: this.connectionFactory,
lock: options.lock,
rateLimiter: new MongoErrorRateLimiter()
Expand Down
10 changes: 8 additions & 2 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { mongo } from '@powersync/lib-service-mongodb';
import {
BucketStorageFactory,
createCoreReplicationMetrics,
initializeCoreReplicationMetrics,
InternalOpId,
ProtocolOpId,
ReplicationCheckpoint,
SyncRulesBucketStorage
} from '@powersync/service-core';
import { test_utils } from '@powersync/service-core-tests';
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';

import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
import { MongoManager } from '@module/replication/MongoManager.js';
Expand Down Expand Up @@ -38,7 +40,10 @@ export class ChangeStreamTestContext {
constructor(
public factory: BucketStorageFactory,
public connectionManager: MongoManager
) {}
) {
createCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
}

async dispose() {
this.abortController.abort();
Expand Down Expand Up @@ -78,6 +83,7 @@ export class ChangeStreamTestContext {
}
const options: ChangeStreamOptions = {
storage: this.storage,
metrics: METRICS_HELPER.metricsEngine,
connections: this.connectionManager,
abort_signal: this.abortController.signal
};
Expand Down
6 changes: 2 additions & 4 deletions modules/module-mongodb/test/src/setup.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { container } from '@powersync/lib-services-framework';
import { test_utils } from '@powersync/service-core-tests';
import { METRICS_HELPER } from '@powersync/service-core-tests';
import { beforeEach } from 'node:test';
import { beforeAll } from 'vitest';

beforeAll(async () => {
// Executes for every test file
container.registerDefaults();

await test_utils.initMetrics();
});

beforeEach(async () => {
await test_utils.resetMetrics();
METRICS_HELPER.resetMetrics();
});
5 changes: 2 additions & 3 deletions modules/module-mysql/src/module/MySQLModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
});
}

async initialize(context: system.ServiceContextContainer): Promise<void> {
await super.initialize(context);
}
async onInitialized(context: system.ServiceContextContainer): Promise<void> {}

protected createRouteAPIAdapter(): api.RouteAPI {
return new MySQLRouteAPIAdapter(this.resolveConfig(this.decodedConfig!));
Expand All @@ -42,6 +40,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
id: this.getDefaultId(normalisedConfig.database),
syncRuleProvider: syncRuleProvider,
storageEngine: context.storageEngine,
metricsEngine: context.metricsEngine,
connectionFactory: connectionFactory,
rateLimiter: new MySQLErrorRateLimiter()
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
const stream = new BinLogStream({
abortSignal: this.abortController.signal,
storage: this.options.storage,
metrics: this.options.metrics,
connections: connectionManager
});
await stream.replicate();
Expand Down
1 change: 1 addition & 0 deletions modules/module-mysql/src/replication/BinLogReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class BinLogReplicator extends replication.AbstractReplicator<BinLogRepli
return new BinLogReplicationJob({
id: this.createJobId(options.storage.group_id),
storage: options.storage,
metrics: this.metrics,
lock: options.lock,
connectionFactory: this.connectionFactory,
rateLimiter: this.rateLimiter
Expand Down
26 changes: 20 additions & 6 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError } from '@pow
import * as sync_rules from '@powersync/service-sync-rules';
import async from 'async';

import { ColumnDescriptor, framework, getUuidReplicaIdentityBson, Metrics, storage } from '@powersync/service-core';
import {
ColumnDescriptor,
framework,
getUuidReplicaIdentityBson,
MetricsEngine,
storage
} from '@powersync/service-core';
import mysql, { FieldPacket } from 'mysql2';

import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongji';
Expand All @@ -12,10 +18,12 @@ import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../
import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js';
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
import * as zongji_utils from './zongji/zongji-utils.js';
import { ReplicationMetric } from '@powersync/service-types';

export interface BinLogStreamOptions {
connections: MySQLConnectionManager;
storage: storage.SyncRulesBucketStorage;
metrics: MetricsEngine;
abortSignal: AbortSignal;
}

Expand Down Expand Up @@ -62,7 +70,7 @@ export class BinLogStream {

private tableCache = new Map<string | number, storage.SourceTable>();

constructor(protected options: BinLogStreamOptions) {
constructor(private options: BinLogStreamOptions) {
this.storage = options.storage;
this.connections = options.connections;
this.syncRules = options.storage.getParsedSyncRules({ defaultSchema: this.defaultSchema });
Expand All @@ -74,6 +82,10 @@ export class BinLogStream {
return this.connections.connectionTag;
}

private get metrics() {
return this.options.metrics;
}

get connectionId() {
const { connectionId } = this.connections;
// Default to 1 if not set
Expand Down Expand Up @@ -335,6 +347,8 @@ AND table_type = 'BASE TABLE';`,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});

this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
}
await batch.flush();
}
Expand Down Expand Up @@ -461,7 +475,7 @@ AND table_type = 'BASE TABLE';`,
});
break;
case zongji_utils.eventIsXid(evt):
Metrics.getInstance().transactions_replicated_total.add(1);
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1);
// Need to commit with a replicated GTID with updated next position
await batch.commit(
new common.ReplicatedGTID({
Expand Down Expand Up @@ -606,7 +620,7 @@ AND table_type = 'BASE TABLE';`,
): Promise<storage.FlushedResult | null> {
switch (payload.type) {
case storage.SaveOperationTag.INSERT:
Metrics.getInstance().rows_replicated_total.add(1);
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const record = common.toSQLiteRow(payload.data, payload.columns);
return await batch.save({
tag: storage.SaveOperationTag.INSERT,
Expand All @@ -617,7 +631,7 @@ AND table_type = 'BASE TABLE';`,
afterReplicaId: getUuidReplicaIdentityBson(record, payload.sourceTable.replicaIdColumns)
});
case storage.SaveOperationTag.UPDATE:
Metrics.getInstance().rows_replicated_total.add(1);
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const beforeUpdated = payload.previous_data
Expand All @@ -637,7 +651,7 @@ AND table_type = 'BASE TABLE';`,
});

case storage.SaveOperationTag.DELETE:
Metrics.getInstance().rows_replicated_total.add(1);
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns);

return await batch.save({
Expand Down
Loading