Skip to content

Commit

Permalink
add Netty Thread-local object pool (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuhuynh27 authored Dec 16, 2021
1 parent 1a34c94 commit 43f61e0
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 22 deletions.
13 changes: 9 additions & 4 deletions core/src/main/java/dev/keva/core/aof/AOFContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,26 @@ public void sync() throws IOException {
bufferLock.lock();
try {
for (Command command : buffer) {
output.writeObject(command);
output.writeObject(command.getObjects());
}
} finally {
output.flush();
for (Command command : buffer) {
command.recycle();
}
buffer.clear();
bufferLock.unlock();
}
}

public void syncPerMutation(Command command) {
try {
output.writeObject(command);
output.writeObject(command.getObjects());
output.flush();
} catch (IOException e) {
log.error("Error writing AOF file", e);
} finally {
command.recycle();
}
}

Expand All @@ -106,8 +111,8 @@ public List<Command> read() throws IOException {
ObjectInputStream input = new ObjectInputStream(fis);
while (true) {
try {
Command command = (Command) input.readObject();
commands.add(command);
byte[][] objects = (byte[][]) input.readObject();
commands.add(Command.newInstance(objects, false));
} catch (EOFException e) {
fis.close();
return commands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void executeExpire(byte[] key) {
byte[][] data = new byte[2][];
data[0] = "delete".getBytes();
data[1] = key;
Command command = new Command(data, false);
Command command = Command.newInstance(data, false);
Lock lock = database.getLock();
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public void init() {
}
Object[] objects = new Object[types.length];
command.toArguments(objects, types, ctx);
// If not in AOF mode, then recycle(),
// else, the command will be recycled in the AOF dump
if (!kevaConfig.getAof()) {
command.recycle();
}
return (Reply<?>) method.invoke(instance, objects);
} finally {
lock.unlock();
Expand Down
58 changes: 43 additions & 15 deletions resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,56 @@
package dev.keva.protocol.resp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import lombok.Getter;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;

public class Command implements Serializable {
private static final byte LOWER_DIFF = 'a' - 'A';

private final Object[] objects;
private static final Recycler<Command> RECYCLER = new Recycler<Command>() {
protected Command newObject(Recycler.Handle<Command> handle) {
return new Command(handle);
}
};

private final Recycler.Handle<Command> handle;

@Getter
private byte[][] objects;

private Command(Recycler.Handle<Command> handle) {
this.handle = handle;
}

public Command(Object[] objects, boolean inline) {
public static Command newInstance(byte[][] objects, boolean inline) {
if (objects == null) {
throw new IllegalArgumentException("objects must not be null");
}
if (objects.length == 0) {
throw new IllegalArgumentException("objects must not be empty");
}

Command command = RECYCLER.get();
if (inline) {
byte[] objs = ByteUtil.getBytes(objects[0]);
byte[] objs = objects[0];
String[] strings = new String(objs, StandardCharsets.UTF_8).trim().split("\\s+");
objects = new Object[strings.length];
objects = new byte[strings.length][];
for (int i = 0; i < strings.length; i++) {
objects[i] = ByteUtil.getBytes(strings[i]);
}
}
this.objects = objects;
command.objects = objects;
// LowerCase bytes
for (int i = 0; i < objects[0].length; i++) {
byte b = objects[0][i];
if (b >= 'A' && b <= 'Z') {
objects[0][i] = (byte) (b + LOWER_DIFF);
}
}
return command;
}

public int getLength() {
Expand All @@ -31,15 +62,7 @@ public int getLength() {
}

public byte[] getName() {
byte[] name = ByteUtil.getBytes(objects[0]);
// LowerCase bytes
for (int i = 0; i < name.length; i++) {
byte b = name[i];
if (b >= 'A' && b <= 'Z') {
name[i] = (byte) (b + LOWER_DIFF);
}
}
return name;
return objects[0];
}

public void toArguments(Object[] arguments, Class<?>[] types, ChannelHandlerContext ctx) {
Expand All @@ -60,11 +83,16 @@ public void toArguments(Object[] arguments, Class<?>[] types, ChannelHandlerCont
int left = isFirstVararg ? (objects.length - position - 1) : (objects.length - 1);
byte[][] lastArgument = new byte[left][];
for (int i = 0; i < left; i++) {
lastArgument[i] = isFirstVararg ? (byte[]) (objects[i + position + 1]) : (byte[]) (objects[i + position]);
lastArgument[i] = isFirstVararg ? objects[i + position + 1] : objects[i + position];
}
arguments[position] = lastArgument;
}
position++;
}
}

public void recycle() {
objects = null;
handle.recycle(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
}
try {
out.add(new Command(bytes, false));
out.add(Command.newInstance(bytes, false));
} finally {
bytes = null;
arguments = 0;
Expand All @@ -60,7 +60,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
b[0] = new byte[buf.readableBytes()];
buf.getBytes(0, b[0]);
in.skipBytes(isCRLF ? 2 : 1);
out.add(new Command(b, true));
out.add(Command.newInstance(b, true));
}
}
}

0 comments on commit 43f61e0

Please sign in to comment.