Skip to content

Commit d1b83ce

Browse files
authored
Metrics refactor (#221)
Added the following: * Basic metrics interfaces, a metrics factory and an implementation of those interfaces for Opentelemetry * A MetricsEngine that handles creation and registration of metrics * Creation and initialization helper methods for the existing replication, storage and API metrics * MetricsEngine to the service context Updates and fixes: * Updated service runners to initialize the MetricsEngine and the existing metrics * Updated replication modules to use the MetricsEngine instead of the old singleton metrics instance * Moved metrics registration to service-core * Removed unused dependencies in service package * Bugfix: Only set up data replication metrics recorder for PostgresModule when it is started in a replication context. * Moved the prometheus metrics port from an env variable to the collected powersync config
1 parent d166170 commit d1b83ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1014
-812
lines changed

.changeset/beige-camels-reply.md

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
'@powersync/service-types': minor
10+
---
11+
12+
Refactored Metrics to use a MetricsEngine which is telemetry framework agnostic.
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import { container } from '@powersync/lib-services-framework';
2-
import { test_utils } from '@powersync/service-core-tests';
32
import { beforeAll, beforeEach } from 'vitest';
3+
import { METRICS_HELPER } from '@powersync/service-core-tests';
44

55
beforeAll(async () => {
66
// Executes for every test file
77
container.registerDefaults();
8-
await test_utils.initMetrics();
98
});
109

1110
beforeEach(async () => {
12-
await test_utils.resetMetrics();
11+
METRICS_HELPER.resetMetrics();
1312
});

modules/module-mongodb/src/module/MongoModule.ts

+3
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ export class MongoModule extends replication.ReplicationModule<types.MongoConnec
3737
id: this.getDefaultId(normalisedConfig.database ?? ''),
3838
syncRuleProvider: syncRuleProvider,
3939
storageEngine: context.storageEngine,
40+
metricsEngine: context.metricsEngine,
4041
connectionFactory: connectionFactory,
4142
rateLimiter: new MongoErrorRateLimiter()
4243
});
4344
}
4445

46+
async onInitialized(context: system.ServiceContextContainer): Promise<void> {}
47+
4548
/**
4649
* Combines base config with normalized connection settings
4750
*/

modules/module-mongodb/src/replication/ChangeStream.ts

+7-10
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,20 @@ import {
88
ReplicationAssertionError,
99
ServiceError
1010
} from '@powersync/lib-services-framework';
11-
import {
12-
BSON_DESERIALIZE_DATA_OPTIONS,
13-
Metrics,
14-
SaveOperationTag,
15-
SourceEntityDescriptor,
16-
SourceTable,
17-
storage
18-
} from '@powersync/service-core';
11+
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
1912
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
2013
import { MongoLSN } from '../common/MongoLSN.js';
2114
import { PostImagesOption } from '../types/types.js';
2215
import { escapeRegExp } from '../utils.js';
2316
import { MongoManager } from './MongoManager.js';
2417
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
2518
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
19+
import { ReplicationMetric } from '@powersync/service-types';
2620

2721
export interface ChangeStreamOptions {
2822
connections: MongoManager;
2923
storage: storage.SyncRulesBucketStorage;
24+
metrics: MetricsEngine;
3025
abort_signal: AbortSignal;
3126
}
3227

@@ -59,13 +54,15 @@ export class ChangeStream {
5954
private connections: MongoManager;
6055
private readonly client: mongo.MongoClient;
6156
private readonly defaultDb: mongo.Db;
57+
private readonly metrics: MetricsEngine;
6258

6359
private abort_signal: AbortSignal;
6460

6561
private relation_cache = new Map<string | number, storage.SourceTable>();
6662

6763
constructor(options: ChangeStreamOptions) {
6864
this.storage = options.storage;
65+
this.metrics = options.metrics;
6966
this.group_id = options.storage.group_id;
7067
this.connections = options.connections;
7168
this.client = this.connections.client;
@@ -318,7 +315,7 @@ export class ChangeStream {
318315
}
319316

320317
at += docBatch.length;
321-
Metrics.getInstance().rows_replicated_total.add(docBatch.length);
318+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(docBatch.length);
322319
const duration = performance.now() - lastBatch;
323320
lastBatch = performance.now();
324321
logger.info(
@@ -446,7 +443,7 @@ export class ChangeStream {
446443
return null;
447444
}
448445

449-
Metrics.getInstance().rows_replicated_total.add(1);
446+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
450447
if (change.operationType == 'insert') {
451448
const baseRecord = constructAfterRecord(change.fullDocument);
452449
return await batch.save({

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
7171
const stream = new ChangeStream({
7272
abort_signal: this.abortController.signal,
7373
storage: this.options.storage,
74+
metrics: this.options.metrics,
7475
connections: connectionManager
7576
});
7677
await stream.replicate();

modules/module-mongodb/src/replication/ChangeStreamReplicator.ts

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator<Chang
2020
return new ChangeStreamReplicationJob({
2121
id: this.createJobId(options.storage.group_id),
2222
storage: options.storage,
23+
metrics: this.metrics,
2324
connectionFactory: this.connectionFactory,
2425
lock: options.lock,
2526
rateLimiter: new MongoErrorRateLimiter()

modules/module-mongodb/test/src/change_stream_utils.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import {
33
BucketStorageFactory,
4+
createCoreReplicationMetrics,
5+
initializeCoreReplicationMetrics,
46
InternalOpId,
57
ProtocolOpId,
68
ReplicationCheckpoint,
79
SyncRulesBucketStorage
810
} from '@powersync/service-core';
9-
import { test_utils } from '@powersync/service-core-tests';
11+
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1012

1113
import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
1214
import { MongoManager } from '@module/replication/MongoManager.js';
@@ -38,7 +40,10 @@ export class ChangeStreamTestContext {
3840
constructor(
3941
public factory: BucketStorageFactory,
4042
public connectionManager: MongoManager
41-
) {}
43+
) {
44+
createCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
45+
initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
46+
}
4247

4348
async dispose() {
4449
this.abortController.abort();
@@ -78,6 +83,7 @@ export class ChangeStreamTestContext {
7883
}
7984
const options: ChangeStreamOptions = {
8085
storage: this.storage,
86+
metrics: METRICS_HELPER.metricsEngine,
8187
connections: this.connectionManager,
8288
abort_signal: this.abortController.signal
8389
};
+2-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
import { container } from '@powersync/lib-services-framework';
2-
import { test_utils } from '@powersync/service-core-tests';
2+
import { METRICS_HELPER } from '@powersync/service-core-tests';
33
import { beforeEach } from 'node:test';
44
import { beforeAll } from 'vitest';
55

66
beforeAll(async () => {
77
// Executes for every test file
88
container.registerDefaults();
9-
10-
await test_utils.initMetrics();
119
});
1210

1311
beforeEach(async () => {
14-
await test_utils.resetMetrics();
12+
METRICS_HELPER.resetMetrics();
1513
});

modules/module-mysql/src/module/MySQLModule.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
2525
});
2626
}
2727

28-
async initialize(context: system.ServiceContextContainer): Promise<void> {
29-
await super.initialize(context);
30-
}
28+
async onInitialized(context: system.ServiceContextContainer): Promise<void> {}
3129

3230
protected createRouteAPIAdapter(): api.RouteAPI {
3331
return new MySQLRouteAPIAdapter(this.resolveConfig(this.decodedConfig!));
@@ -42,6 +40,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
4240
id: this.getDefaultId(normalisedConfig.database),
4341
syncRuleProvider: syncRuleProvider,
4442
storageEngine: context.storageEngine,
43+
metricsEngine: context.metricsEngine,
4544
connectionFactory: connectionFactory,
4645
rateLimiter: new MySQLErrorRateLimiter()
4746
});

modules/module-mysql/src/replication/BinLogReplicationJob.ts

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
6363
const stream = new BinLogStream({
6464
abortSignal: this.abortController.signal,
6565
storage: this.options.storage,
66+
metrics: this.options.metrics,
6667
connections: connectionManager
6768
});
6869
await stream.replicate();

modules/module-mysql/src/replication/BinLogReplicator.ts

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class BinLogReplicator extends replication.AbstractReplicator<BinLogRepli
1919
return new BinLogReplicationJob({
2020
id: this.createJobId(options.storage.group_id),
2121
storage: options.storage,
22+
metrics: this.metrics,
2223
lock: options.lock,
2324
connectionFactory: this.connectionFactory,
2425
rateLimiter: this.rateLimiter

modules/module-mysql/src/replication/BinLogStream.ts

+20-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError } from '@pow
22
import * as sync_rules from '@powersync/service-sync-rules';
33
import async from 'async';
44

5-
import { ColumnDescriptor, framework, getUuidReplicaIdentityBson, Metrics, storage } from '@powersync/service-core';
5+
import {
6+
ColumnDescriptor,
7+
framework,
8+
getUuidReplicaIdentityBson,
9+
MetricsEngine,
10+
storage
11+
} from '@powersync/service-core';
612
import mysql, { FieldPacket } from 'mysql2';
713

814
import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongji';
@@ -12,10 +18,12 @@ import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../
1218
import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js';
1319
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
1420
import * as zongji_utils from './zongji/zongji-utils.js';
21+
import { ReplicationMetric } from '@powersync/service-types';
1522

1623
export interface BinLogStreamOptions {
1724
connections: MySQLConnectionManager;
1825
storage: storage.SyncRulesBucketStorage;
26+
metrics: MetricsEngine;
1927
abortSignal: AbortSignal;
2028
}
2129

@@ -62,7 +70,7 @@ export class BinLogStream {
6270

6371
private tableCache = new Map<string | number, storage.SourceTable>();
6472

65-
constructor(protected options: BinLogStreamOptions) {
73+
constructor(private options: BinLogStreamOptions) {
6674
this.storage = options.storage;
6775
this.connections = options.connections;
6876
this.syncRules = options.storage.getParsedSyncRules({ defaultSchema: this.defaultSchema });
@@ -74,6 +82,10 @@ export class BinLogStream {
7482
return this.connections.connectionTag;
7583
}
7684

85+
private get metrics() {
86+
return this.options.metrics;
87+
}
88+
7789
get connectionId() {
7890
const { connectionId } = this.connections;
7991
// Default to 1 if not set
@@ -335,6 +347,8 @@ AND table_type = 'BASE TABLE';`,
335347
after: record,
336348
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
337349
});
350+
351+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
338352
}
339353
await batch.flush();
340354
}
@@ -461,7 +475,7 @@ AND table_type = 'BASE TABLE';`,
461475
});
462476
break;
463477
case zongji_utils.eventIsXid(evt):
464-
Metrics.getInstance().transactions_replicated_total.add(1);
478+
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1);
465479
// Need to commit with a replicated GTID with updated next position
466480
await batch.commit(
467481
new common.ReplicatedGTID({
@@ -606,7 +620,7 @@ AND table_type = 'BASE TABLE';`,
606620
): Promise<storage.FlushedResult | null> {
607621
switch (payload.type) {
608622
case storage.SaveOperationTag.INSERT:
609-
Metrics.getInstance().rows_replicated_total.add(1);
623+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
610624
const record = common.toSQLiteRow(payload.data, payload.columns);
611625
return await batch.save({
612626
tag: storage.SaveOperationTag.INSERT,
@@ -617,7 +631,7 @@ AND table_type = 'BASE TABLE';`,
617631
afterReplicaId: getUuidReplicaIdentityBson(record, payload.sourceTable.replicaIdColumns)
618632
});
619633
case storage.SaveOperationTag.UPDATE:
620-
Metrics.getInstance().rows_replicated_total.add(1);
634+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
621635
// "before" may be null if the replica id columns are unchanged
622636
// It's fine to treat that the same as an insert.
623637
const beforeUpdated = payload.previous_data
@@ -637,7 +651,7 @@ AND table_type = 'BASE TABLE';`,
637651
});
638652

639653
case storage.SaveOperationTag.DELETE:
640-
Metrics.getInstance().rows_replicated_total.add(1);
654+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
641655
const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns);
642656

643657
return await batch.save({

0 commit comments

Comments
 (0)