Skip to content

Commit

Permalink
Merge pull request #23 from lujiajing1126/blocking
Browse files Browse the repository at this point in the history
Support Blocking Mode
  • Loading branch information
lujiajing1126 committed Dec 3, 2019
2 parents b239aed + 7b6a3bc commit 98cc475
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 63 deletions.
116 changes: 116 additions & 0 deletions lib/executor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
const colors = require('colors');
const util = require('util');

const INT_PREFIX = "(integer)";
const BLOCKING_CMDS = ["subscribe", "monitor", "psubscribe"];

class Executor {

constructor(client, commands) {
this._client = client;
this.commands = commands;

const CMD = this.commands.shift().toLowerCase();
this.blockingMode = BLOCKING_CMDS.includes(CMD);

this._executor = this._client.client[`${CMD}Async`];

if (typeof this._executor !== "function") {
this._executor = this._client.client[`send_commandAsync`];
// recombine commands
this.commands = [CMD, this.commands];
}
}

writeResult(result) {
if (Array.isArray(result)) {
this._client.next = result.map((item, index) => {
return util.format("%d) %s", index + 1, item);
});
} else if (result === null) {
this._client.next = "(nil)";
} else if (typeof result === 'object') {
this._client.next = Object.entries(result).flat().map((item, index) => {
return util.format("%d) %s", index + 1, item);
});
} else {
// number or string
// default to print it as `string`
this._client.next = util.format(Number.isInteger(result) ? `${INT_PREFIX} ${result}` : result);
}
}

run() {
return this._executor.bind(this._client.client)(...this.commands)
.then((result) => {
this.writeResult(result);
return this.blockingMode;
}).catch((e) => {
this._client.next = colors.red(`(error) ${e.message}`);
});
}

shutdown() {
// do nothing
}
}

class SubscribeExecutor extends Executor {
constructor(client, commands) {
super(client, commands);
}

run() {
this._client.client.on("subscribe", (channel, count) => {});

this._client.client.on("message", (channel, message) => {
this.writeResult(message);
});
return super.run();
}

shutdown() {
this._client.client.unsubscribe();
}
}

class PatternSubscribeExecutor extends SubscribeExecutor {
constructor(client, commands) {
super(client, commands);
}

run() {
this._client.client.on("psubscribe", (pattern, count) => {});

this._client.client.on("pmessage", (pattern, channel, message) => {
this.writeResult(message);
});
return super.run();
}
}

class MonitorExecutor extends Executor {
constructor(client, commands) {
super(client, commands);
}

run() {
this._client.client.on("monitor", (time, args, raw_reply) => {
this.writeResult(raw_reply);
});
return super.run();
}
}

module.exports = function (client, commands) {
const CMD = commands[0].toLowerCase();
if (CMD === 'subscribe') {
return new SubscribeExecutor(client, commands);
} else if (CMD === 'psubscribe') {
return new PatternSubscribeExecutor(client, commands);
} else if (CMD === 'monitor') {
return new MonitorExecutor(client, commands);
} else {
return new Executor(client, commands);
}
}
64 changes: 15 additions & 49 deletions lib/redis.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
const redis = require('redis');
const readline = require('readline');
const Promise = require('bluebird');
const util = require('util');
const splitargs = require('splitargs');
const colors = require('colors');
const InputBuffer = require('./buf');
const Subject = require('rxjs').Subject;
const createExecutor = require('./executor');
require('core-js/features/array/flat');
require('core-js/features/object/entries');

const INT_PREFIX = "(integer)";

class PromptResult {}
class ExitResult {
constructor(code) {
Expand Down Expand Up @@ -51,21 +48,6 @@ class RedisClient {
this.rl.setPrompt(`${this._host}> `);
}
this.rl.prompt();

this._subject = new Subject();
this._subject.subscribe({
next: (v) => {
if (typeof v === 'string') {
console.log(v);
} else if (Array.isArray(v)) {
console.log(v.join("\n"));
} else if (v instanceof PromptResult) {
this.rl.prompt();
} else if (v instanceof ExitResult) {
process.exit(v.code);
}
}
});
}

_attachRedisEvent() {
Expand All @@ -83,33 +65,8 @@ class RedisClient {
}

execute(commands) {
const CMD = commands.shift().toLowerCase();
let func = this._redis_client[`${CMD}Async`];
if (typeof func !== "function") {
func = this._redis_client[`send_commandAsync`];
// recombine commands
commands = [CMD, commands];
}
return func.bind(this._redis_client)(...commands)
.then((result) => {
if (Array.isArray(result)) {
this.next = result.map((item, index) => {
return util.format("%d) %s", index + 1, item);
});
} else if (result === null) {
this.next = "(nil)";
} else if (typeof result === 'object') {
this.next = Object.entries(result).flat().map((item, index) => {
return util.format("%d) %s", index + 1, item);
});
} else {
// number or string
// default to print it as `string`
this.next = util.format(Number.isInteger(result) ? `${INT_PREFIX} ${result}` : result);
}
}).catch((e) => {
this.next = colors.red(`(error) ${e.message}`);
});
this.executor = createExecutor(this, commands);
return this.executor.run();
}

attachEvent() {
Expand Down Expand Up @@ -137,15 +94,16 @@ class RedisClient {
if (CMD === 'exit') {
// all connections will be closed after `RedisClient` quit
// and an `end` event will be emitted to exit process.
if (this.executor) this.executor.shutdown();
this._redis_client.quit();
} else if (CMD === 'clear') {
this.next = '\x1b[0f'; /* ANSI clear screen code */
readline.cursorTo(process.stdout, 0, 0);
readline.clearScreenDown(process.stdout);
this.next = __PR__;
} else {
return this.execute([CMD, ...commands]).then(() => {
this.next = __PR__;
return this.execute([CMD, ...commands]).then((blocking) => {
if (!blocking) this.next = __PR__;
});
}
}
Expand All @@ -156,7 +114,15 @@ class RedisClient {
}

set next(v) {
this._subject.next(v);
if (typeof v === 'string') {
console.log(v);
} else if (Array.isArray(v)) {
console.log(v.join("\n"));
} else if (v instanceof PromptResult) {
this.rl.prompt();
} else if (v instanceof ExitResult) {
process.exit(v.code);
}
}

get client() {
Expand Down
19 changes: 6 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
"commander": "^2.19.0",
"core-js": "^3.0.0-beta.8",
"redis": "^2.8.0",
"rxjs": "^6.3.3",
"splitargs": "0.0.7"
},
"devDependencies": {
Expand Down
47 changes: 47 additions & 0 deletions test/executor.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const RedisClient = require('../lib/redis').RedisClient;
const __PR__ = require('../lib/redis').__PR__;
const _log = global.console.log;
const colors = require('colors');
const readline = require('readline');
const createExecutor = require('../lib/executor')

let spy = {};

let redisClient = {};

beforeAll(() => {
redisClient = new RedisClient("127.0.0.1", 6379);
/**
* Call unref() on the underlying socket connection to the Redis server, allowing the program to exit once no more commands are pending.
* This is an experimental feature, as documented in the following site:
* https://github.com/NodeRedis/node_redis#clientunref
*/
redisClient.client.unref();
// remove all listener to avoid async callback
redisClient.client.removeAllListeners();
// mock `console.log`
spy.next = jest.spyOn(redisClient, 'next', 'set').mockImplementation(() => {});
spy.exit = jest.spyOn(process, 'exit').mockImplementation(() => {});
return redisClient.execute(['flushall']);
});

afterAll(() => {
redisClient.client.quit();
redisClient.rl.close();
spy.next.mockRestore();
spy.exit.mockRestore();
});

describe('subscribe executor', () => {
let pub = {};
beforeAll(() => {
pub = new RedisClient("127.0.0.1", 6379);
pub.client.removeAllListeners();
});
it('subscribe to a channel', () => {
const sub = createExecutor(redisClient, ['subscribe', 'channel0']);
return sub.run().then(() => {
expect(spy.next).toBeCalledWith('channel0');
});
})
})

0 comments on commit 98cc475

Please sign in to comment.