-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdbHandler.js
More file actions
159 lines (143 loc) · 5.04 KB
/
dbHandler.js
File metadata and controls
159 lines (143 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/**
* @notice This module handles the SQLite database connection and initialization for the job queue system of queueCTL.
* @module dbHandler
* @license GPL-3.0
* @author Dave Meshak J
*/
const sqlite3 = require('sqlite3').verbose();
const path = require('path');
const { open } = require('sqlite');
const fs = require('fs');
const { argv } = require('process');
const DB_PATH = path.resolve(process.cwd(), "job-queue.db");
/**
* Open and return a connection to the job queue database.
*
* This returns the Promise-based connection returned by `sqlite.open` with
* `sqlite3.Database` as the driver. Callers are responsible for closing the
* connection when finished (db.close()).
*
* @async
* @returns {Promise<import('sqlite').Database>} Promise resolving to an open DB instance
*/
async function getDBConnection() {
return open({
filename: DB_PATH,
driver: sqlite3.Database
});
}
/**
* Initialize the database if it doesn't already exist.
*
* This will create the database file at `DB_PATH`, create the `jobs`,
* `config`, and `workers` tables, create helpful indexes, and insert a set of
* default configuration values. If the DB already exists, the function
* returns immediately.
*
* @async
* @param {boolean} [silent=false] - When true, suppress non-error console output
* @returns {Promise<void>} Resolves when initialization is complete
*/
async function initializeDB(silent = false) {
if (!fs.existsSync(DB_PATH)) {
const db = await getDBConnection();
if (!silent) {
console.log("[*] Database file not found. Creating new database at:", DB_PATH);
console.log("[*] Initializing database schema...");
console.log("[*] Creating 'jobs' table...");
}
await db.exec(`
CREATE TABLE IF NOT EXISTS jobs(
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
state TEXT NOT NULL CHECK(state IN ('pending', 'processing', 'completed', 'failed', 'dead')) default 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
next_run_at TIMESTAMP NOT NULL,
last_error TEXT
);
`);
if (!silent) {
console.log("[*] Creating indexes on 'jobs' table...");
}
await db.exec(`
CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(state, next_run_at);
`);
if (!silent) {
console.log("[*] Creating index on 'updated_at' column...");
}
await db.exec(`
CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at);
`);
if (!silent) {
console.log("[*] Creating 'config' table...");
}
await db.exec(`
CREATE TABLE IF NOT EXISTS config(
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`);
if (!silent) {
console.log("[*] Creating 'workers' table...");
}
await db.exec(`
CREATE TABLE IF NOT EXISTS workers (
pid INTEGER PRIMARY KEY,
hostname TEXT NOT NULL,
started_at TIMESTAMP NOT NULL,
last_heartbeat TIMESTAMP NOT NULL
);
`);
// @notice Insert default configuration values
if (!silent) {
console.log("[*] Inserting default configuration values...");
}
const defaultConfigs = [
{ key: 'max_concurrent_jobs', value: '5' },
{ key: 'job_retry_delay', value: '60' },
{ key: 'worker_heartbeat_interval', value: '30' },
{ key: 'job_timeout', value: '30000' }, // in milliseconds
{ key: "backoff_strategy", value: "exponential" },
{ key: "backoff_base", value: "2" },
{ key: "default_max_tries", value: "3" }
];
for (const config of defaultConfigs) {
await db.run(`
INSERT INTO config (key, value)
VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value;
`, [config.key, config.value]);
}
if (!silent) {
console.log("[*] Database initialized successfully.");
}
await db.close();
}
else{
if (!silent) {
console.log("[*] Database already exists at:", DB_PATH);
}
return;
}
}
if (require.main === module) {
const args = process.argv.slice(2);
if(args.includes('--init')) {
initializeDB(args.includes('--silent') ? true : false).then(() => {
console.log("[+] Database initialization complete.");
process.exit(0);
}).catch(err => {
console.error("[-] Error during database initialization:", err);
process.exit(1);
});
} else {
console.log("Usage: node dbHandler.js --init");
}
}
module.exports = {
getDBConnection,
initializeDB
};