-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathBinLogReplicationJob.ts
95 lines (85 loc) · 2.75 KB
/
BinLogReplicationJob.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import { container } from '@powersync/lib-services-framework';
import { replication } from '@powersync/service-core';
import { BinlogConfigurationError, BinLogStream } from './BinLogStream.js';
import { MySQLConnectionManagerFactory } from './MySQLConnectionManagerFactory.js';
export interface BinLogReplicationJobOptions extends replication.AbstractReplicationJobOptions {
connectionFactory: MySQLConnectionManagerFactory;
}
export class BinLogReplicationJob extends replication.AbstractReplicationJob {
private connectionFactory: MySQLConnectionManagerFactory;
constructor(options: BinLogReplicationJobOptions) {
super(options);
this.connectionFactory = options.connectionFactory;
}
get slot_name() {
return this.options.storage.slot_name;
}
async keepAlive() {}
async replicate() {
try {
await this.replicateLoop();
} catch (e) {
// Fatal exception
container.reporter.captureException(e, {
metadata: {
replication_slot: this.slot_name
}
});
this.logger.error(`Replication failed on ${this.slot_name}`, e);
} finally {
this.abortController.abort();
}
}
async replicateLoop() {
while (!this.isStopped) {
await this.replicateOnce();
if (!this.isStopped) {
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}
async replicateOnce() {
// New connections on every iteration (every error with retry),
// otherwise we risk repeating errors related to the connection,
// such as caused by cached PG schemas.
const connectionManager = this.connectionFactory.create({
// Pool connections are only used intermittently.
idleTimeout: 30_000
});
try {
await this.rateLimiter?.waitUntilAllowed({ signal: this.abortController.signal });
if (this.isStopped) {
return;
}
const stream = new BinLogStream({
abortSignal: this.abortController.signal,
storage: this.options.storage,
metrics: this.options.metrics,
connections: connectionManager
});
await stream.replicate();
} catch (e) {
if (this.abortController.signal.aborted) {
return;
}
this.logger.error(`Sync rules ${this.id} Replication error`, e);
if (e.cause != null) {
this.logger.error(`cause`, e.cause);
}
if (e instanceof BinlogConfigurationError) {
throw e;
} else {
// Report the error if relevant, before retrying
container.reporter.captureException(e, {
metadata: {
replication_slot: this.slot_name
}
});
// This sets the retry delay
this.rateLimiter?.reportError(e);
}
} finally {
await connectionManager.end();
}
}
}