Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<dep.alluxio.version>313</dep.alluxio.version>
<dep.slf4j.version>2.0.16</dep.slf4j.version>
<dep.kafka.version>3.9.1</dep.kafka.version>
<dep.pinot.version>1.3.0</dep.pinot.version>
<dep.pinot.version>1.4.0</dep.pinot.version>
<dep.druid.version>30.0.1</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
<dep.jaxb.runtime.version>4.0.5</dep.jaxb.runtime.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
import org.apache.pinot.common.utils.grpc.ServerGrpcRequestBuilder;

import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -30,25 +30,25 @@
*/
public class PinotStreamingQueryClient
{
private final Map<String, GrpcQueryClient> grpcQueryClientMap = new HashMap<>();
private final Map<String, ServerGrpcQueryClient> grpcQueryClientMap = new HashMap<>();
private final GrpcConfig config;

public PinotStreamingQueryClient(GrpcConfig config)
{
this.config = config;
}

public Iterator<Server.ServerResponse> submit(String host, int port, GrpcRequestBuilder requestBuilder)
public Iterator<Server.ServerResponse> submit(String host, int port, ServerGrpcRequestBuilder requestBuilder)
{
GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
ServerGrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
return client.submit(requestBuilder.build());
}

private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port)
private ServerGrpcQueryClient getOrCreateGrpcQueryClient(String host, int port)
{
String key = String.format("%s_%d", host, port);
if (!grpcQueryClientMap.containsKey(key)) {
grpcQueryClientMap.put(key, new GrpcQueryClient(host, port, config));
grpcQueryClientMap.put(key, new ServerGrpcQueryClient(host, port, config));
}
return grpcQueryClientMap.get(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.presto.pinot.PinotErrorCode;
import com.facebook.presto.pinot.PinotException;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.common.utils.grpc.ServerGrpcRequestBuilder;
import org.apache.pinot.spi.utils.CommonConstants;

import java.util.HashMap;
Expand All @@ -25,7 +25,7 @@
import java.util.Optional;

public class PinotProxyGrpcRequestBuilder
extends GrpcRequestBuilder
extends ServerGrpcRequestBuilder
{
private static final String KEY_OF_PROXY_GRPC_FORWARD_HOST = "FORWARD_HOST";
private static final String KEY_OF_PROXY_GRPC_FORWARD_PORT = "FORWARD_PORT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.common.utils.grpc.ServerGrpcRequestBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderV4;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -172,9 +173,6 @@ protected static DataTable createDataTableWithAllTypes()
case STRING:
dataTableBuilder.setColumn(colId, generateRandomStringWithLength(RANDOM.nextInt(20)));
break;
case OBJECT:
dataTableBuilder.setColumn(colId, (Object) RANDOM.nextDouble());
break;
case BOOLEAN_ARRAY:
int length = RANDOM.nextInt(20);
int[] booleanArray = new int[length];
Expand Down Expand Up @@ -233,7 +231,7 @@ protected static DataTable createDataTableWithAllTypes()
case BYTES:
try {
dataTableBuilder.setColumn(colId,
Hex.decodeHex("0DE0B6B3A7640000".toCharArray())); // Hex of BigDecimal.ONE
new ByteArray(Hex.decodeHex("0DE0B6B3A7640000".toCharArray()))); // Hex of BigDecimal.ONE
}
catch (DecoderException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -390,7 +388,6 @@ public void testPinotProxyGrpcRequest()
Assert.assertEquals(grpcRequest.getSql(), "SELECT * FROM myTable");
Assert.assertEquals(grpcRequest.getSegmentsCount(), 1);
Assert.assertEquals(grpcRequest.getSegments(0), "segment1");
Assert.assertEquals(grpcRequest.getMetadataCount(), 9);
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k1"), "v1");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k2"), "v2");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("FORWARD_HOST"), "localhost");
Expand All @@ -401,6 +398,7 @@ public void testPinotProxyGrpcRequest()
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");

Assert.assertTrue(grpcRequest.getMetadataCount() >= 9, "Expected at least 9 metadata entries, but got " + grpcRequest.getMetadataCount());
grpcRequest = new PinotProxyGrpcRequestBuilder()
.setSegments(ImmutableList.of("segment1"))
.setEnableStreaming(true)
Expand All @@ -412,20 +410,23 @@ public void testPinotProxyGrpcRequest()
Assert.assertEquals(grpcRequest.getSql(), "SELECT * FROM myTable");
Assert.assertEquals(grpcRequest.getSegmentsCount(), 1);
Assert.assertEquals(grpcRequest.getSegments(0), "segment1");
Assert.assertEquals(grpcRequest.getMetadataCount(), 7);
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k1"), "v1");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k2"), "v2");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID), "121");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE), "false");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");

// Verifying minimum metadata count (7 fields: k1, k2, requestId, brokerId, enableTrace, enableStreaming, payloadType)
Assert.assertTrue(grpcRequest.getMetadataCount() >= 7,
"Expected at least 7 metadata entries, but got " + grpcRequest.getMetadataCount());
}

@Test
public void testPinotGrpcRequest()
{
final Server.ServerRequest grpcRequest = new GrpcRequestBuilder()
final Server.ServerRequest grpcRequest = new ServerGrpcRequestBuilder()
.setSegments(ImmutableList.of("segment1"))
.setEnableStreaming(true)
.setRequestId(121)
Expand All @@ -435,12 +436,14 @@ public void testPinotGrpcRequest()
Assert.assertEquals(grpcRequest.getSql(), "SELECT * FROM myTable");
Assert.assertEquals(grpcRequest.getSegmentsCount(), 1);
Assert.assertEquals(grpcRequest.getSegments(0), "segment1");
Assert.assertEquals(grpcRequest.getMetadataCount(), 5);
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID), "121");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE), "false");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("correlationId"), "121");
Assert.assertTrue(grpcRequest.getMetadataCount() >= 6,
"Expected at least 6 metadata entries, but got " + grpcRequest.getMetadataCount());
}

private static final class TestingPinotStreamingQueryClient
Expand All @@ -455,7 +458,7 @@ private static final class TestingPinotStreamingQueryClient
}

@Override
public Iterator<Server.ServerResponse> submit(String host, int port, GrpcRequestBuilder requestBuilder)
public Iterator<Server.ServerResponse> submit(String host, int port, ServerGrpcRequestBuilder requestBuilder)
{
return new Iterator<Server.ServerResponse>()
{
Expand Down
Loading