Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Netty Thread-local object pool for Command #72

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/src/main/java/dev/keva/core/aof/AOFContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,26 @@ public void sync() throws IOException {
bufferLock.lock();
try {
for (Command command : buffer) {
output.writeObject(command);
output.writeObject(command.getObjects());
}
} finally {
output.flush();
for (Command command : buffer) {
command.recycle();
}
buffer.clear();
bufferLock.unlock();
}
}

public void syncPerMutation(Command command) {
try {
output.writeObject(command);
output.writeObject(command.getObjects());
output.flush();
} catch (IOException e) {
log.error("Error writing AOF file", e);
} finally {
command.recycle();
}
}

Expand All @@ -106,8 +111,8 @@ public List<Command> read() throws IOException {
ObjectInputStream input = new ObjectInputStream(fis);
while (true) {
try {
Command command = (Command) input.readObject();
commands.add(command);
byte[][] objects = (byte[][]) input.readObject();
commands.add(Command.newInstance(objects, false));
} catch (EOFException e) {
fis.close();
return commands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void executeExpire(byte[] key) {
byte[][] data = new byte[2][];
data[0] = "delete".getBytes();
data[1] = key;
Command command = new Command(data, false);
Command command = Command.newInstance(data, false);
Lock lock = database.getLock();
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public void init() {
}
Object[] objects = new Object[types.length];
command.toArguments(objects, types, ctx);
// If not in AOF mode, then recycle(),
// else, the command will be recycled in the AOF dump
if (!kevaConfig.getAof()) {
command.recycle();
}
return (Reply<?>) method.invoke(instance, objects);
} finally {
lock.unlock();
Expand Down
58 changes: 43 additions & 15 deletions resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,56 @@
package dev.keva.protocol.resp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import lombok.Getter;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;

public class Command implements Serializable {
private static final byte LOWER_DIFF = 'a' - 'A';

private final Object[] objects;
private static final Recycler<Command> RECYCLER = new Recycler<Command>() {
protected Command newObject(Recycler.Handle<Command> handle) {
return new Command(handle);
}
};

private final Recycler.Handle<Command> handle;

@Getter
private byte[][] objects;

private Command(Recycler.Handle<Command> handle) {
this.handle = handle;
}

public Command(Object[] objects, boolean inline) {
public static Command newInstance(byte[][] objects, boolean inline) {
if (objects == null) {
throw new IllegalArgumentException("objects must not be null");
}
if (objects.length == 0) {
throw new IllegalArgumentException("objects must not be empty");
}

Command command = RECYCLER.get();
if (inline) {
byte[] objs = ByteUtil.getBytes(objects[0]);
byte[] objs = objects[0];
String[] strings = new String(objs, StandardCharsets.UTF_8).trim().split("\\s+");
objects = new Object[strings.length];
objects = new byte[strings.length][];
for (int i = 0; i < strings.length; i++) {
objects[i] = ByteUtil.getBytes(strings[i]);
}
}
this.objects = objects;
command.objects = objects;
// LowerCase bytes
for (int i = 0; i < objects[0].length; i++) {
byte b = objects[0][i];
if (b >= 'A' && b <= 'Z') {
objects[0][i] = (byte) (b + LOWER_DIFF);
}
}
return command;
}

public int getLength() {
Expand All @@ -31,15 +62,7 @@ public int getLength() {
}

public byte[] getName() {
byte[] name = ByteUtil.getBytes(objects[0]);
// LowerCase bytes
for (int i = 0; i < name.length; i++) {
byte b = name[i];
if (b >= 'A' && b <= 'Z') {
name[i] = (byte) (b + LOWER_DIFF);
}
}
return name;
return objects[0];
}

public void toArguments(Object[] arguments, Class<?>[] types, ChannelHandlerContext ctx) {
Expand All @@ -60,11 +83,16 @@ public void toArguments(Object[] arguments, Class<?>[] types, ChannelHandlerCont
int left = isFirstVararg ? (objects.length - position - 1) : (objects.length - 1);
byte[][] lastArgument = new byte[left][];
for (int i = 0; i < left; i++) {
lastArgument[i] = isFirstVararg ? (byte[]) (objects[i + position + 1]) : (byte[]) (objects[i + position]);
lastArgument[i] = isFirstVararg ? objects[i + position + 1] : objects[i + position];
}
arguments[position] = lastArgument;
}
position++;
}
}

public void recycle() {
objects = null;
handle.recycle(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
}
try {
out.add(new Command(bytes, false));
out.add(Command.newInstance(bytes, false));
} finally {
bytes = null;
arguments = 0;
Expand All @@ -60,7 +60,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
b[0] = new byte[buf.readableBytes()];
buf.getBytes(0, b[0]);
in.skipBytes(isCRLF ? 2 : 1);
out.add(new Command(b, true));
out.add(Command.newInstance(b, true));
}
}
}