diff --git a/driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java b/driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java index 3337ec31d45..759b293ff79 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java @@ -68,7 +68,8 @@ static ArrayBackedResultSet fromMessage( SessionManager session, ProtocolVersion protocolVersion, ExecutionInfo info, - Statement statement) { + Statement statement, + ProtocolFeatureStore featureStore) { switch (msg.kind) { case ROWS: @@ -94,7 +95,8 @@ static ArrayBackedResultSet fromMessage( // CASSANDRA-10786). MD5Digest newMetadataId = r.metadata.metadataId; assert !(actualStatement instanceof BoundStatement) - || ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion) + || ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy( + protocolVersion, featureStore) || newMetadataId == null; if (newMetadataId != null) { BoundStatement bs = ((BoundStatement) actualStatement); @@ -441,6 +443,9 @@ public void onSet( bs.preparedStatement().getPreparedId().resultSetMetadata = new PreparedId.PreparedMetadata( rows.metadata.metadataId, rows.metadata.columns); + } else if (rows.metadata.columns != null + && rows.metadata.columns.size() > 0) { + newMetadata = rows.metadata.columns; } MultiPage.this.nextPages.offer(new NextPage(newMetadata, rows.data)); MultiPage.this.fetchState = diff --git a/driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java index e2b7a805483..bda6a6b8d91 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java @@ -228,10 +228,10 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod // overestimate by a // few bytes. size += CBUtil.sizeOfConsistencyLevel(getSerialConsistencyLevel()); - if (ProtocolFeature.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { + if (ProtocolFeatures.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { size += 8; // timestamp } - if (ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) + if (ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) && getOutgoingPayload() != null) { size += CBUtil.sizeOfBytesMap(getOutgoingPayload()); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java index 9317bd0a58b..42513f8518f 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java @@ -325,7 +325,8 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod try { size += CBUtil.sizeOfShortBytes(preparedStatement().getPreparedId().boundValuesMetadata.id.bytes); - if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion)) { + ProtocolFeatureStore featureStore = getHost().getProtocolFeatureStore(); + if (ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion, featureStore)) { size += CBUtil.sizeOfShortBytes(preparedStatement().getPreparedId().resultSetMetadata.id.bytes); } @@ -353,10 +354,10 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod size += CBUtil.sizeOfValue(getPagingState()); } size += CBUtil.sizeOfConsistencyLevel(getSerialConsistencyLevel()); - if (ProtocolFeature.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { + if (ProtocolFeatures.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { size += 8; // timestamp } - if (ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) + if (ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) && getOutgoingPayload() != null) { size += CBUtil.sizeOfBytesMap(getOutgoingPayload()); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index e3a886972ce..f0a98d2c4ce 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -155,6 +155,7 @@ enum State { private final AtomicReference ownerRef = new AtomicReference(); private final ApplicationInfo applicationInfo; + private ProtocolFeatureStore protocolFeatureStore; /** * Create a new connection to a Cassandra node and associate it with the given pool. @@ -449,35 +450,31 @@ private AsyncFunction onOptionsResponse( final ProtocolVersion protocolVersion, final Executor initExecutor) { return new AsyncFunction() { @Override - public ListenableFuture apply(Message.Response response) throws Exception { + public ListenableFuture apply(Message.Response response) { switch (response.type) { case SUPPORTED: - Responses.Supported msg = (Supported) response; - ShardingInfo.ConnectionShardingInfo sharding = - ShardingInfo.parseShardingInfo(msg.supported); - if (sharding != null) { - getHost().setShardingInfo(sharding.shardingInfo); - Connection.this.shardId = sharding.shardId; + Supported supported = (Supported) response; + protocolFeatureStore = ProtocolFeatureStore.parseSupportedOptions(supported.supported); + protocolFeatureStore.storeInChannel(channel); + getHost().setProtocolFeatureStore(protocolFeatureStore); + + ShardingInfo.ConnectionShardingInfo shardingInfo = + protocolFeatureStore.getConnectionShardingInfo(); + if (protocolFeatureStore.getConnectionShardingInfo() != null) { + Connection.this.shardId = shardingInfo.shardId; if (Connection.this.requestedShardId != -1 - && Connection.this.requestedShardId != sharding.shardId) { + && Connection.this.requestedShardId != shardingInfo.shardId) { logger.warn( "Advanced shard awareness: requested connection to shard {}, but connected to {}. Is there a NAT between client and server?", Connection.this.requestedShardId, - sharding.shardId); + shardingInfo.shardId); // Owner is a HostConnectionPool if we are using adv. shard awareness ((HostConnectionPool) Connection.this.ownerRef.get()) .tempBlockAdvShardAwareness(ADV_SHARD_AWARENESS_BLOCK_ON_NAT); } } else { - getHost().setShardingInfo(null); Connection.this.shardId = 0; } - LwtInfo lwt = LwtInfo.parseLwtInfo(msg.supported); - if (lwt != null) { - getHost().setLwtInfo(lwt); - } - TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported); - getHost().setTabletInfo(tabletInfo); return MoreFutures.VOID_SUCCESS; case ERROR: Responses.Error error = (Responses.Error) response; @@ -506,20 +503,13 @@ private AsyncFunction onOptionsReady( @Override public ListenableFuture apply(Void input) throws Exception { ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions(); - Map extraOptions = new HashMap(); + Map extraOptions = new HashMap<>(); if (applicationInfo != null) { applicationInfo.addOption(extraOptions); } - LwtInfo lwtInfo = getHost().getLwtInfo(); - if (lwtInfo != null) { - lwtInfo.addOption(extraOptions); - } - TabletInfo tabletInfo = getHost().getTabletInfo(); - if (tabletInfo != null - && tabletInfo.isEnabled() - && ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) { - logger.debug("Enabling tablet support in OPTIONS message"); - TabletInfo.addOption(extraOptions); + + if (protocolFeatureStore != null) { + protocolFeatureStore.populateStartupOptions(protocolVersion, extraOptions); } Future startupResponseFuture = @@ -1065,6 +1055,10 @@ public int shardId() { return shardId == null ? 0 : shardId; } + public ProtocolFeatureStore getProtocolFeatureStore() { + return protocolFeatureStore; + } + /** * If the connection is part of a pool, return it to the pool. The connection should generally not * be reused after that. @@ -1955,21 +1949,6 @@ interface DefaultResponseHandler { } private static class Initializer extends ChannelInitializer { - // Stateless handlers - private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder(); - private static final Message.ProtocolEncoder messageEncoderV1 = - new Message.ProtocolEncoder(ProtocolVersion.V1); - private static final Message.ProtocolEncoder messageEncoderV2 = - new Message.ProtocolEncoder(ProtocolVersion.V2); - private static final Message.ProtocolEncoder messageEncoderV3 = - new Message.ProtocolEncoder(ProtocolVersion.V3); - private static final Message.ProtocolEncoder messageEncoderV4 = - new Message.ProtocolEncoder(ProtocolVersion.V4); - private static final Message.ProtocolEncoder messageEncoderV5 = - new Message.ProtocolEncoder(ProtocolVersion.V5); - private static final Message.ProtocolEncoder messageEncoderV6 = - new Message.ProtocolEncoder(ProtocolVersion.V6); - private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private final ProtocolVersion protocolVersion; private final Connection connection; @@ -2033,7 +2012,7 @@ protected void initChannel(SocketChannel channel) throws Exception { } pipeline.addLast("frameDecoder", new Frame.Decoder()); - pipeline.addLast("frameEncoder", frameEncoder); + pipeline.addLast("frameEncoder", new Frame.Encoder()); pipeline.addLast("framingFormatHandler", new FramingFormatHandler(connection.factory)); @@ -2046,7 +2025,7 @@ protected void initChannel(SocketChannel channel) throws Exception { pipeline.addLast("frameCompressor", new Frame.Compressor(compressor)); } - pipeline.addLast("messageDecoder", messageDecoder); + pipeline.addLast("messageDecoder", new Message.ProtocolDecoder(null)); pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion)); pipeline.addLast("idleStateHandler", idleStateHandler); @@ -2056,23 +2035,11 @@ protected void initChannel(SocketChannel channel) throws Exception { nettyOptions.afterChannelInitialized(channel); } - private Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) { - switch (version) { - case V1: - return messageEncoderV1; - case V2: - return messageEncoderV2; - case V3: - return messageEncoderV3; - case V4: - return messageEncoderV4; - case V5: - return messageEncoderV5; - case V6: - return messageEncoderV6; - default: - throw new DriverInternalError("Unsupported protocol version " + protocolVersion); + private static Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) { + if (version.toInt() > ProtocolVersion.V6.toInt()) { + throw new DriverInternalError("Unsupported protocol version " + version); } + return new Message.ProtocolEncoder(version, null); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java index 5132a06aa92..364215a0eaa 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java @@ -87,15 +87,29 @@ public void onSet( table, rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)); } + switch (rm.kind) { case SET_KEYSPACE: // propagate the keyspace change to other connections session.poolsState.setKeyspace(((Responses.Result.SetKeyspace) rm).keyspace); - set(ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement)); + set( + ArrayBackedResultSet.fromMessage( + rm, + session, + protocolVersion, + info, + statement, + connection.getProtocolFeatureStore())); break; case SCHEMA_CHANGE: ResultSet rs = - ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement); + ArrayBackedResultSet.fromMessage( + rm, + session, + protocolVersion, + info, + statement, + connection.getProtocolFeatureStore()); final Cluster.Manager cluster = session.cluster.manager; if (!cluster.configuration.getQueryOptions().isMetadataEnabled()) { cluster.waitForSchemaAgreementAndSignal(connection, this, rs); @@ -224,7 +238,14 @@ public void run() { } break; default: - set(ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement)); + set( + ArrayBackedResultSet.fromMessage( + rm, + session, + protocolVersion, + info, + statement, + connection.getProtocolFeatureStore())); break; } break; diff --git a/driver-core/src/main/java/com/datastax/driver/core/FramingFormatHandler.java b/driver-core/src/main/java/com/datastax/driver/core/FramingFormatHandler.java index 91459a1ab34..547ac574ca4 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/FramingFormatHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/FramingFormatHandler.java @@ -16,6 +16,7 @@ package com.datastax.driver.core; import com.datastax.driver.core.Message.Response.Type; +import com.datastax.driver.core.exceptions.DriverInternalError; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.MessageToMessageDecoder; @@ -43,8 +44,11 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List out) // By default, the pipeline is configured for legacy framing since this is the format used // by all protocol versions until handshake; after handshake however, we need to switch to // modern framing for protocol v5 and higher. + ProtocolFeatureStore featureStore = ProtocolFeatureStore.loadFromChannel(ctx.channel()); if (frame.header.version.compareTo(ProtocolVersion.V5) >= 0) { switchToModernFraming(ctx); + } else if (featureStore != null && featureStore.isUseMetadataId()) { + switchToCQL4MetadataId(ctx, frame.header.version, featureStore); } // once the handshake is successful, the framing format cannot change anymore; // we can safely remove ourselves from the pipeline. @@ -53,6 +57,16 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List out) out.add(frame); } + private void switchToCQL4MetadataId( + ChannelHandlerContext ctx, + ProtocolVersion protocolVersion, + ProtocolFeatureStore featureStore) { + ChannelPipeline pipeline = ctx.pipeline(); + pipeline.replace("messageDecoder", "messageDecoder", new Message.ProtocolDecoder(featureStore)); + pipeline.replace( + "messageEncoder", "messageEncoder", messageEncoderFor(protocolVersion, featureStore)); + } + private void switchToModernFraming(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.pipeline(); SegmentCodec segmentCodec = @@ -75,4 +89,12 @@ private void switchToModernFraming(ChannelHandlerContext ctx) { pipeline.addAfter( "bytesToSegmentDecoder", "segmentToFrameDecoder", new SegmentToFrameDecoder()); } + + private static Message.ProtocolEncoder messageEncoderFor( + ProtocolVersion version, ProtocolFeatureStore protocolFeatureStore) { + if (version.toInt() > ProtocolVersion.V6.toInt()) { + throw new DriverInternalError("Unsupported protocol version " + version); + } + return new Message.ProtocolEncoder(version, protocolFeatureStore); + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Host.java b/driver-core/src/main/java/com/datastax/driver/core/Host.java index 844b4db6005..1e88dff328f 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Host.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Host.java @@ -64,26 +64,21 @@ public class Host { private volatile UUID schemaVersion; - // Can be set concurrently but the value should always be the same. - private volatile ShardingInfo shardingInfo = null; - - // Can be set concurrently but the value should always be the same. - private volatile LwtInfo lwtInfo = null; - - // Whether host supports TABLETS_ROUTING_V1 - private volatile TabletInfo tabletInfo = null; + private volatile ProtocolFeatureStore protocolFeatureStore; enum State { ADDED, DOWN, - UP + UP; } volatile State state; + /** Ensures state change notifications for that host are handled serially */ final ReentrantLock notificationsLock = new ReentrantLock(true); final ConvictionPolicy convictionPolicy; + private final Cluster.Manager manager; // Tracks later reconnection attempts to that host so we avoid adding multiple tasks. @@ -433,29 +428,41 @@ void setTokens(Set tokens) { this.tokens = tokens; } - public ShardingInfo getShardingInfo() { - return shardingInfo; + public ProtocolFeatureStore getProtocolFeatureStore() { + return protocolFeatureStore; } - public void setShardingInfo(ShardingInfo shardingInfo) { - this.shardingInfo = shardingInfo; + protected void setProtocolFeatureStore(ProtocolFeatureStore protocolFeatureStore) { + this.protocolFeatureStore = protocolFeatureStore; } - public LwtInfo getLwtInfo() { - return lwtInfo; + public ShardingInfo getShardingInfo() { + return protocolFeatureStore != null && protocolFeatureStore.getConnectionShardingInfo() != null + ? protocolFeatureStore.getConnectionShardingInfo().shardingInfo + : null; } - public void setLwtInfo(LwtInfo lwtInfo) { - this.lwtInfo = lwtInfo; - } + /** + * Does nothing. shardingInfo should not be manipulated. + * + * @param shardingInfo a {@code ShardingInfo} instance. + * @deprecated shardingInfo should not be manipulated. + */ + @Deprecated + public void setShardingInfo(ShardingInfo shardingInfo) {} - public TabletInfo getTabletInfo() { - return tabletInfo; + public LwtInfo getLwtInfo() { + return protocolFeatureStore != null ? protocolFeatureStore.getLwtInfo() : null; } - public void setTabletInfo(TabletInfo tabletInfo) { - this.tabletInfo = tabletInfo; - } + /** + * Does nothing, lwtInfo should not be manipulated. + * + * @param lwtInfo a {@code LwtInfo} instance. + * @deprecated lwtInfo should not be manipulated. + */ + @Deprecated + public void setLwtInfo(LwtInfo lwtInfo) {} /** * Returns whether the host is considered up by the driver. diff --git a/driver-core/src/main/java/com/datastax/driver/core/Message.java b/driver-core/src/main/java/com/datastax/driver/core/Message.java index 84c214dd22d..f01908f65ae 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Message.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Message.java @@ -42,13 +42,18 @@ abstract class Message { AttributeKey.valueOf("com.datastax.driver.core.CodecRegistry"); interface Coder { - void encode(R request, ByteBuf dest, ProtocolVersion version); + void encode( + R request, ByteBuf dest, ProtocolVersion version, ProtocolFeatureStore featureStore); - int encodedSize(R request, ProtocolVersion version); + int encodedSize(R request, ProtocolVersion version, ProtocolFeatureStore featureStore); } interface Decoder { - R decode(ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry); + R decode( + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore); } private volatile int streamId = -1; @@ -268,6 +273,12 @@ Response setWarnings(List warnings) { @ChannelHandler.Sharable static class ProtocolDecoder extends MessageToMessageDecoder { + private final ProtocolFeatureStore protocolFeatureStore; + + ProtocolDecoder(ProtocolFeatureStore protocolFeatureStore) { + this.protocolFeatureStore = protocolFeatureStore; + } + @Override protected void decode(ChannelHandlerContext ctx, Frame frame, List out) throws Exception { @@ -294,7 +305,7 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List out) Response response = Response.Type.fromOpcode(frame.header.opcode) .decoder - .decode(frame.body, frame.header.version, codecRegistry); + .decode(frame.body, frame.header.version, codecRegistry, protocolFeatureStore); response .setTracingId(tracingId) .setWarnings(warnings) @@ -311,9 +322,11 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List out) static class ProtocolEncoder extends MessageToMessageEncoder { final ProtocolVersion protocolVersion; + final ProtocolFeatureStore protocolFeatureStore; - ProtocolEncoder(ProtocolVersion version) { + ProtocolEncoder(ProtocolVersion version, ProtocolFeatureStore protocolFeatureStore) { this.protocolVersion = version; + this.protocolFeatureStore = protocolFeatureStore; } @Override @@ -353,7 +366,7 @@ EnumSet computeFlags(Request request) { int encodedSize(Request request) { @SuppressWarnings("unchecked") Coder coder = (Coder) request.type.coder; - int messageSize = coder.encodedSize(request, protocolVersion); + int messageSize = coder.encodedSize(request, protocolVersion, protocolFeatureStore); int payloadLength = -1; if (request.getCustomPayload() != null) { payloadLength = CBUtil.sizeOfBytesMap(request.getCustomPayload()); @@ -377,7 +390,7 @@ void encode(Request request, ByteBuf destination) { } } - coder.encode(request, destination, protocolVersion); + coder.encode(request, destination, protocolVersion, protocolFeatureStore); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/MetadataIdInfo.java b/driver-core/src/main/java/com/datastax/driver/core/MetadataIdInfo.java new file mode 100644 index 00000000000..79370a7e978 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/MetadataIdInfo.java @@ -0,0 +1,24 @@ +package com.datastax.driver.core; + +import java.util.List; +import java.util.Map; + +public class MetadataIdInfo { + private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY = "SCYLLA_USE_METADATA_ID"; + private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE = ""; + + public static boolean parseUseMetadataId(Map> supported) { + if (!supported.containsKey(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY)) { + return false; + } + List values = supported.get(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY); + return values != null + && values.size() == 1 + && values.get(0).equals(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE); + } + + public static void addOption(Map options) { + options.put( + SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY, SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE); + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeature.java b/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeature.java deleted file mode 100644 index fdbdbbe6ef6..00000000000 --- a/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeature.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.driver.core; - -/** A listing of features that may or not apply to a given {@link ProtocolVersion}. */ -enum ProtocolFeature { - - /** - * The capability of updating a prepared statement if the result's metadata changes at runtime - * (for example, if the query is a {@code SELECT *} and the table is altered). - */ - PREPARED_METADATA_CHANGES, - - /** The capability of sending or receiving custom payloads. */ - CUSTOM_PAYLOADS, - - /** The capability of assigning client-generated timestamps to write requests. */ - CLIENT_TIMESTAMPS, - -// -; - - /** - * Determines whether or not the input version supports ths feature. - * - * @param version the version to test against. - * @return true if supported, false otherwise. - */ - boolean isSupportedBy(ProtocolVersion version) { - switch (this) { - case PREPARED_METADATA_CHANGES: - return version.compareTo(ProtocolVersion.V5) >= 0; - case CUSTOM_PAYLOADS: - return version.compareTo(ProtocolVersion.V4) >= 0; - case CLIENT_TIMESTAMPS: - return version.compareTo(ProtocolVersion.V3) >= 0; - default: - return false; - } - } -} diff --git a/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatureStore.java b/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatureStore.java new file mode 100644 index 00000000000..5b2b4d30937 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatureStore.java @@ -0,0 +1,97 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import java.util.List; +import java.util.Map; + +public class ProtocolFeatureStore { + + public static final AttributeKey CHANNEL_KEY = + AttributeKey.valueOf("com.datastax.driver.core.ProtocolFeatureStore"); + + /** Instance of {@link ProtocolFeatureStore} initialized with default values. */ + public static final ProtocolFeatureStore EMPTY = + new ProtocolFeatureStore(null, null, null, false); + + private final ShardingInfo.ConnectionShardingInfo connectionShardingInfo; + private final LwtInfo lwtInfo; + private final TabletInfo tabletInfo; + private final boolean useMetadataId; + + public ProtocolFeatureStore( + ShardingInfo.ConnectionShardingInfo connectionShardingInfo, + LwtInfo lwtInfo, + TabletInfo tabletInfo, + boolean useMetadataId) { + this.connectionShardingInfo = connectionShardingInfo; + this.lwtInfo = lwtInfo; + this.tabletInfo = tabletInfo; + this.useMetadataId = useMetadataId; + } + + public static ProtocolFeatureStore parseSupportedOptions(Map> supported) { + ShardingInfo.ConnectionShardingInfo connectionShardingInfo = + ShardingInfo.parseShardingInfo(supported); + LwtInfo lwtInfo = LwtInfo.parseLwtInfo(supported); + TabletInfo tabletInfo = TabletInfo.parseTabletInfo(supported); + boolean metadataIdSupported = MetadataIdInfo.parseUseMetadataId(supported); + return new ProtocolFeatureStore( + connectionShardingInfo, lwtInfo, tabletInfo, metadataIdSupported); + } + + public void populateStartupOptions(ProtocolVersion protocolVersion, Map options) { + if (lwtInfo != null) { + lwtInfo.addOption(options); + } + if (tabletInfo != null + && tabletInfo.isEnabled() + && ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) { + TabletInfo.addOption(options); + } + if (useMetadataId) { + MetadataIdInfo.addOption(options); + } + } + + /** + * Stores features in a {@link Host}. + * + * @param channel an instance of {@link Channel} + */ + public void storeInChannel(Channel channel) { + + channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this); + } + + public static ProtocolFeatureStore loadFromChannel(Channel channel) { + return channel.attr(ProtocolFeatureStore.CHANNEL_KEY).get(); + } + + public ShardingInfo.ConnectionShardingInfo getConnectionShardingInfo() { + return connectionShardingInfo; + } + + public LwtInfo getLwtInfo() { + return lwtInfo; + } + + public boolean isUseMetadataId() { + return useMetadataId; + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatures.java b/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatures.java new file mode 100644 index 00000000000..77f34a1cbf9 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/ProtocolFeatures.java @@ -0,0 +1,74 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +/** A listing of features that may or not apply to a given {@link ProtocolVersion}. */ +public class ProtocolFeatures { + + /** + * An abstract implementation of feature that may or not apply to a given {@link ProtocolVersion}. + */ + public static class Feature { + private final ProtocolVersion minSupportVersion; + + private Feature(ProtocolVersion minSupportVersion) { + this.minSupportVersion = minSupportVersion; + } + + /** + * Determines whether the input version supports ths feature. Does not take optional features + * from {@link ProtocolFeatureStore} into account. + * + * @param version the version to test against. + * @return true if supported, false otherwise. + * @see Feature#isSupportedBy(ProtocolVersion, ProtocolFeatureStore) + */ + public boolean isSupportedBy(ProtocolVersion version) { + return this.isSupportedBy(version, ProtocolFeatureStore.EMPTY); + } + + /** + * Determines whether the input version supports ths feature. Take optional features from {@link + * ProtocolFeatureStore} into account if applicable. + * + * @param version the version to test against. + * @param featureStore a feature store containing optional features. + * @return rue if supported, false otherwise. + */ + public boolean isSupportedBy(ProtocolVersion version, ProtocolFeatureStore featureStore) { + return version.compareTo(minSupportVersion) >= 0; + } + } + + /** + * The capability of updating a prepared statement if the result's metadata changes at runtime + * (for example, if the query is a {@code SELECT *} and the table is altered). + */ + public static final Feature PREPARED_METADATA_CHANGES = + new Feature(ProtocolVersion.V5) { + @Override + public boolean isSupportedBy(ProtocolVersion version, ProtocolFeatureStore featureStore) { + return super.isSupportedBy(version, featureStore) + || (featureStore != null && featureStore.isUseMetadataId()); + } + }; + + /** The capability of sending or receiving custom payloads. */ + public static final Feature CUSTOM_PAYLOADS = new Feature(ProtocolVersion.V4); + + /** The capability of assigning client-generated timestamps to write requests. */ + public static final Feature CLIENT_TIMESTAMPS = new Feature(ProtocolVersion.V3); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/RegularStatement.java b/driver-core/src/main/java/com/datastax/driver/core/RegularStatement.java index 3a9e8fe90d7..2a71568a615 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RegularStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RegularStatement.java @@ -214,10 +214,10 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod size += CBUtil.sizeOfValue(getPagingState()); } size += CBUtil.sizeOfConsistencyLevel(getSerialConsistencyLevel()); - if (ProtocolFeature.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { + if (ProtocolFeatures.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) { size += 8; // timestamp } - if (ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) + if (ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion) && getOutgoingPayload() != null) { size += CBUtil.sizeOfBytesMap(getOutgoingPayload()); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Requests.java b/driver-core/src/main/java/com/datastax/driver/core/Requests.java index caaa303f020..fb27c7ac079 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Requests.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Requests.java @@ -50,12 +50,17 @@ static class Startup extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Startup msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Startup msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { CBUtil.writeStringMap(msg.options, dest); } @Override - public int encodedSize(Startup msg, ProtocolVersion version) { + public int encodedSize( + Startup msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { return CBUtil.sizeOfStringMap(msg.options); } }; @@ -107,13 +112,18 @@ static class Credentials extends Message.Request { new Message.Coder() { @Override - public void encode(Credentials msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Credentials msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { assert version == ProtocolVersion.V1; CBUtil.writeStringMap(msg.credentials, dest); } @Override - public int encodedSize(Credentials msg, ProtocolVersion version) { + public int encodedSize( + Credentials msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { assert version == ProtocolVersion.V1; return CBUtil.sizeOfStringMap(msg.credentials); } @@ -137,10 +147,15 @@ static class Options extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Options msg, ByteBuf dest, ProtocolVersion version) {} + public void encode( + Options msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) {} @Override - public int encodedSize(Options msg, ProtocolVersion version) { + public int encodedSize( + Options msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { return 0; } }; @@ -165,13 +180,15 @@ static class Query extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Query msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Query msg, ByteBuf dest, ProtocolVersion version, ProtocolFeatureStore featureStore) { CBUtil.writeLongString(msg.query, dest); msg.options.encode(dest, version); } @Override - public int encodedSize(Query msg, ProtocolVersion version) { + public int encodedSize( + Query msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { return CBUtil.sizeOfLongString(msg.query) + msg.options.encodedSize(version); } }; @@ -210,17 +227,22 @@ static class Execute extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Execute msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Execute msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { CBUtil.writeShortBytes(msg.statementId.bytes, dest); - if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(version)) + if (ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy(version, featureStore)) CBUtil.writeShortBytes(msg.resultMetadataId.bytes, dest); msg.options.encode(dest, version); } @Override - public int encodedSize(Execute msg, ProtocolVersion version) { + public int encodedSize( + Execute msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { int size = CBUtil.sizeOfShortBytes(msg.statementId.bytes); - if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(version)) + if (ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy(version, featureStore)) size += CBUtil.sizeOfShortBytes(msg.resultMetadataId.bytes); size += msg.options.encodedSize(version); return size; @@ -485,7 +507,8 @@ static class Batch extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Batch msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Batch msg, ByteBuf dest, ProtocolVersion version, ProtocolFeatureStore featureStore) { int queries = msg.queryOrIdList.size(); assert queries <= 0xFFFF; @@ -505,7 +528,8 @@ public void encode(Batch msg, ByteBuf dest, ProtocolVersion version) { } @Override - public int encodedSize(Batch msg, ProtocolVersion version) { + public int encodedSize( + Batch msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { int size = 3; // type + nb queries for (int i = 0; i < msg.queryOrIdList.size(); i++) { Object q = msg.queryOrIdList.get(i); @@ -661,7 +685,11 @@ static class Prepare extends Message.Request { new Message.Coder() { @Override - public void encode(Prepare msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Prepare msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { CBUtil.writeLongString(msg.query, dest); if (version.compareTo(ProtocolVersion.V5) >= 0) { @@ -671,7 +699,8 @@ public void encode(Prepare msg, ByteBuf dest, ProtocolVersion version) { } @Override - public int encodedSize(Prepare msg, ProtocolVersion version) { + public int encodedSize( + Prepare msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { int size = CBUtil.sizeOfLongString(msg.query); if (version.compareTo(ProtocolVersion.V5) >= 0) { @@ -704,13 +733,18 @@ static class Register extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override - public void encode(Register msg, ByteBuf dest, ProtocolVersion version) { + public void encode( + Register msg, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { dest.writeShort(msg.eventTypes.size()); for (ProtocolEvent.Type type : msg.eventTypes) CBUtil.writeEnumValue(type, dest); } @Override - public int encodedSize(Register msg, ProtocolVersion version) { + public int encodedSize( + Register msg, ProtocolVersion version, ProtocolFeatureStore featureStore) { int size = 2; for (ProtocolEvent.Type type : msg.eventTypes) size += CBUtil.sizeOfEnumValue(type); return size; @@ -741,12 +775,17 @@ static class AuthResponse extends Message.Request { new Message.Coder() { @Override - public void encode(AuthResponse response, ByteBuf dest, ProtocolVersion version) { + public void encode( + AuthResponse response, + ByteBuf dest, + ProtocolVersion version, + ProtocolFeatureStore featureStore) { CBUtil.writeValue(response.token, dest); } @Override - public int encodedSize(AuthResponse response, ProtocolVersion version) { + public int encodedSize( + AuthResponse response, ProtocolVersion version, ProtocolFeatureStore featureStore) { return CBUtil.sizeOfValue(response.token); } }; diff --git a/driver-core/src/main/java/com/datastax/driver/core/Responses.java b/driver-core/src/main/java/com/datastax/driver/core/Responses.java index 72aae8e3b54..e014a3a2902 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Responses.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Responses.java @@ -72,7 +72,11 @@ static class Error extends Message.Response { static final Message.Decoder decoder = new Message.Decoder() { @Override - public Error decode(ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + public Error decode( + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { ExceptionCode code = ExceptionCode.fromValue(body.readInt()); String msg = CBUtil.readString(body); Object infos = null; @@ -223,7 +227,11 @@ static class Ready extends Message.Response { static final Message.Decoder decoder = new Message.Decoder() { @Override - public Ready decode(ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + public Ready decode( + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { // TODO: Would it be cool to return a singleton? Check we don't need to // set the streamId or something return new Ready(); @@ -246,7 +254,10 @@ static class Authenticate extends Message.Response { new Message.Decoder() { @Override public Authenticate decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { String authenticator = CBUtil.readString(body); return new Authenticate(authenticator); } @@ -271,7 +282,10 @@ static class Supported extends Message.Response { new Message.Decoder() { @Override public Supported decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { return new Supported(CBUtil.readStringToStringListMap(body)); } }; @@ -306,12 +320,9 @@ public String toString() { abstract static class Result extends Message.Response { static final Message.Decoder decoder = - new Message.Decoder() { - @Override - public Result decode(ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { - Kind kind = Kind.fromId(body.readInt()); - return kind.subDecoder.decode(body, version, codecRegistry); - } + (body, version, codecRegistry, protocolFeatureStore) -> { + Kind kind = Kind.fromId(body.readInt()); + return kind.subDecoder.decode(body, version, codecRegistry, protocolFeatureStore); }; enum Kind { @@ -367,7 +378,10 @@ static class Void extends Result { new Message.Decoder() { @Override public Result decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { return new Void(); } }; @@ -390,7 +404,10 @@ private SetKeyspace(String keyspace) { new Message.Decoder() { @Override public Result decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { return new SetKeyspace(CBUtil.readString(body)); } }; @@ -454,15 +471,19 @@ private Metadata( } static Metadata decode( - ByteBuf body, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) { - return decode(body, false, protocolVersion, codecRegistry); + ByteBuf body, + ProtocolVersion protocolVersion, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { + return decode(body, false, protocolVersion, codecRegistry, protocolFeatureStore); } static Metadata decode( ByteBuf body, boolean withPkIndices, ProtocolVersion protocolVersion, - CodecRegistry codecRegistry) { + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { // flags & column count int flagsInt = body.readInt(); @@ -474,7 +495,8 @@ static Metadata decode( MD5Digest resultMetadataId = null; if (flags.contains(Flag.METADATA_CHANGED)) { - assert ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion) + assert ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy( + protocolVersion, protocolFeatureStore) : "METADATA_CHANGED flag is not supported in protocol version " + protocolVersion; assert !flags.contains(Flag.NO_METADATA) : "METADATA_CHANGED and NO_METADATA are mutually exclusive flags"; @@ -540,9 +562,13 @@ public String toString() { new Message.Decoder() { @Override public Result decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { - Metadata metadata = Metadata.decode(body, version, codecRegistry); + Metadata metadata = + Metadata.decode(body, version, codecRegistry, protocolFeatureStore); int rowCount = body.readInt(); int columnCount = metadata.columnCount; @@ -607,20 +633,29 @@ static class Prepared extends Result { new Message.Decoder() { @Override public Result decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body)); MD5Digest resultMetadataId = null; - if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(version)) + if (ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy( + version, protocolFeatureStore)) resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body)); boolean withPkIndices = version.compareTo(ProtocolVersion.V4) >= 0; Rows.Metadata metadata = - Rows.Metadata.decode(body, withPkIndices, version, codecRegistry); - Rows.Metadata resultMetadata = decodeResultMetadata(body, version, codecRegistry); + Rows.Metadata.decode( + body, withPkIndices, version, codecRegistry, protocolFeatureStore); + Rows.Metadata resultMetadata = + decodeResultMetadata(body, version, codecRegistry, protocolFeatureStore); return new Prepared(id, resultMetadataId, metadata, resultMetadata); } private Metadata decodeResultMetadata( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { switch (version) { case V1: return Rows.Metadata.EMPTY; @@ -629,7 +664,7 @@ private Metadata decodeResultMetadata( case V4: case V5: case V6: - return Rows.Metadata.decode(body, version, codecRegistry); + return Rows.Metadata.decode(body, version, codecRegistry, protocolFeatureStore); default: throw version.unsupported(); } @@ -683,7 +718,10 @@ enum Change { new Message.Decoder() { @Override public Result decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { // Note: the CREATE KEYSPACE/TABLE/TYPE SCHEMA_CHANGE response is different from the // SCHEMA_CHANGE EVENT type Change change; @@ -752,7 +790,11 @@ static class Event extends Message.Response { static final Message.Decoder decoder = new Message.Decoder() { @Override - public Event decode(ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + public Event decode( + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { return new Event(ProtocolEvent.deserialize(body, version)); } }; @@ -776,7 +818,10 @@ static class AuthChallenge extends Message.Response { new Message.Decoder() { @Override public AuthChallenge decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { ByteBuffer b = CBUtil.readValue(body); if (b == null) return new AuthChallenge(null); @@ -800,7 +845,10 @@ static class AuthSuccess extends Message.Response { new Message.Decoder() { @Override public AuthSuccess decode( - ByteBuf body, ProtocolVersion version, CodecRegistry codecRegistry) { + ByteBuf body, + ProtocolVersion version, + CodecRegistry codecRegistry, + ProtocolFeatureStore protocolFeatureStore) { ByteBuffer b = CBUtil.readValue(body); if (b == null) return new AuthSuccess(null); diff --git a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java index 94a64adc2a7..c2527ccfc33 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java @@ -61,7 +61,7 @@ public void sleepIf12() { @Test(groups = "short") public void should_connect_with_credentials() { PlainTextAuthProvider authProvider = spy(new PlainTextAuthProvider("cassandra", "cassandra")); - Cluster cluster = createClusterBuilder().withAuthProvider(authProvider).build(); + Cluster cluster = register(createClusterBuilder().withAuthProvider(authProvider).build()); cluster.connect(); verify(authProvider, atLeastOnce()) .newAuthenticator( @@ -116,10 +116,11 @@ public void should_fail_to_connect_without_credentials() { @CCMConfig(dirtiesContext = true) public void should_connect_with_slow_server() { Cluster cluster = - createClusterBuilder() - .withAuthProvider(new SlowAuthProvider()) - .withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(1)) - .build(); + register( + createClusterBuilder() + .withAuthProvider(new SlowAuthProvider()) + .withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(1)) + .build()); cluster.connect(); } diff --git a/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java b/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java index 55d045cd647..25bb3267cf5 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java @@ -37,6 +37,8 @@ import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.utils.CassandraVersion; +import com.datastax.driver.core.utils.ScyllaVersion; +import java.util.Objects; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -44,6 +46,8 @@ public class PreparedStatementInvalidationTest extends CCMTestsSupport { + private final VersionNumber SCYLLA_METADATA_ID_SUPPORT_VERSION = VersionNumber.parse("2025.3"); + @BeforeMethod(groups = "short", alwaysRun = true) public void setup() throws Exception { execute("CREATE TABLE prepared_statement_invalidation_test (a int PRIMARY KEY, b int, c int);"); @@ -59,39 +63,64 @@ public void teardown() throws Exception { } @CassandraVersion("4.0") + @ScyllaVersion() @Test(groups = "short") public void should_update_statement_id_when_metadata_changed_across_executions() { // given + boolean supportsUseMetadataId = isSupportsUseMetadataId(); PreparedStatement ps = session().prepare("SELECT * FROM prepared_statement_invalidation_test WHERE a = ?"); MD5Digest idBefore = ps.getPreparedId().resultSetMetadata.id; + // when session().execute("ALTER TABLE prepared_statement_invalidation_test ADD d int"); BoundStatement bs = ps.bind(1); ResultSet rows = session().execute(bs); + // then - MD5Digest idAfter = ps.getPreparedId().resultSetMetadata.id; - assertThat(idBefore).isNotEqualTo(idAfter); - assertThat(ps.getPreparedId().resultSetMetadata.variables) - .hasSize(4) - .containsVariable("d", DataType.cint()); - assertThat(bs.preparedStatement().getPreparedId().resultSetMetadata.variables) - .hasSize(4) - .containsVariable("d", DataType.cint()); + if (supportsUseMetadataId) { + MD5Digest idAfter = ps.getPreparedId().resultSetMetadata.id; + assertThat(idBefore).isNotEqualTo(idAfter); + assertThat(ps.getPreparedId().resultSetMetadata.variables) + .hasSize(4) + .containsVariable("d", DataType.cint()); + assertThat(bs.preparedStatement().getPreparedId().resultSetMetadata.variables) + .hasSize(4) + .containsVariable("d", DataType.cint()); + } else { + assertThat(idBefore).isNull(); // Scylla does not support CQL5 extensions and metadata id + MD5Digest idAfter = ps.getPreparedId().resultSetMetadata.id; + assertThat(idAfter).isNull(); + assertThat(ps.getPreparedId().resultSetMetadata.variables) + .hasSize(3) + .doesNotContainVariable("d"); + assertThat(bs.preparedStatement().getPreparedId().resultSetMetadata.variables) + .hasSize(3) + .doesNotContainVariable("d"); + } assertThat(rows.getColumnDefinitions()).hasSize(4).containsVariable("d", DataType.cint()); } @CassandraVersion("4.0") + @ScyllaVersion() @Test(groups = "short") - public void should_update_statement_id_when_metadata_changed_across_pages() throws Exception { + public void should_update_statement_id_when_metadata_changed_across_pages() { // given + boolean supportsUseMetadataId = isSupportsUseMetadataId(); PreparedStatement ps = session().prepare("SELECT * FROM prepared_statement_invalidation_test"); ResultSet rows = session().execute(ps.bind().setFetchSize(2)); assertThat(rows.isFullyFetched()).isFalse(); MD5Digest idBefore = ps.getPreparedId().resultSetMetadata.id; + if (!supportsUseMetadataId) { + assertThat(idBefore).isNull(); + } ColumnDefinitions definitionsBefore = rows.getColumnDefinitions(); assertThat(definitionsBefore).hasSize(3).doesNotContainVariable("d"); - // consume the first page + + // when + session().execute("ALTER TABLE prepared_statement_invalidation_test ADD d int"); + + // consume the first page only after altering table to prevent early second page prefetching int remaining = rows.getAvailableWithoutFetching(); while (remaining-- > 0) { try { @@ -102,24 +131,27 @@ public void should_update_statement_id_when_metadata_changed_across_pages() thro } } - // when - session().execute("ALTER TABLE prepared_statement_invalidation_test ADD d int"); - // then - // this should trigger a background fetch of the second page, and therefore update the - // definitions + // since `prepareNextRow` (which in turn calls `fetchMoreResults`) is called for each call of + // `rows.one()`, new page will be already fetched at this point for (Row row : rows) { assertThat(row.isNull("d")).isTrue(); } MD5Digest idAfter = ps.getPreparedId().resultSetMetadata.id; ColumnDefinitions definitionsAfter = rows.getColumnDefinitions(); - assertThat(idBefore).isNotEqualTo(idAfter); + if (supportsUseMetadataId) { + assertThat(idBefore).isNotEqualTo(idAfter); + } else { + assertThat(idAfter).isNull(); + } assertThat(definitionsAfter).hasSize(4).containsVariable("d", DataType.cint()); } @CassandraVersion("4.0") + @ScyllaVersion() @Test(groups = "short") public void should_update_statement_id_when_metadata_changed_across_sessions() { + boolean supportsUseMetadataId = isSupportsUseMetadataId(); Session session1 = cluster().connect(); useKeyspace(session1, keyspace); Session session2 = cluster().connect(); @@ -131,7 +163,15 @@ public void should_update_statement_id_when_metadata_changed_across_sessions() { session2.prepare("SELECT * FROM prepared_statement_invalidation_test WHERE a = ?"); MD5Digest id1a = ps1.getPreparedId().resultSetMetadata.id; + MD5Digest id2a = ps2.getPreparedId().resultSetMetadata.id; + if (supportsUseMetadataId) { + assertThat(id1a).isNotNull(); + assertThat(id2a).isNotNull(); + } else { + assertThat(id1a).isNull(); + assertThat(id2a).isNull(); + } ResultSet rows1 = session1.execute(ps1.bind(1)); ResultSet rows2 = session2.execute(ps2.bind(1)); @@ -154,16 +194,27 @@ public void should_update_statement_id_when_metadata_changed_across_sessions() { MD5Digest id1b = ps1.getPreparedId().resultSetMetadata.id; MD5Digest id2b = ps2.getPreparedId().resultSetMetadata.id; - - assertThat(id1a).isNotEqualTo(id1b); - assertThat(id2a).isNotEqualTo(id2b); - - assertThat(ps1.getPreparedId().resultSetMetadata.variables) - .hasSize(4) - .containsVariable("d", DataType.cint()); - assertThat(ps2.getPreparedId().resultSetMetadata.variables) - .hasSize(4) - .containsVariable("d", DataType.cint()); + if (supportsUseMetadataId) { + assertThat(id1a).isNotEqualTo(id1b); + assertThat(id2a).isNotEqualTo(id2b); + + assertThat(ps1.getPreparedId().resultSetMetadata.variables) + .hasSize(4) + .containsVariable("d", DataType.cint()); + assertThat(ps2.getPreparedId().resultSetMetadata.variables) + .hasSize(4) + .containsVariable("d", DataType.cint()); + } else { + assertThat(id1b).isNull(); + assertThat(id2b).isNull(); + + assertThat(ps1.getPreparedId().resultSetMetadata.variables) + .hasSize(3) + .doesNotContainVariable("d"); + assertThat(ps2.getPreparedId().resultSetMetadata.variables) + .hasSize(3) + .doesNotContainVariable("d"); + } assertThat(rows1.getColumnDefinitions()).hasSize(4).containsVariable("d", DataType.cint()); assertThat(rows2.getColumnDefinitions()).hasSize(4).containsVariable("d", DataType.cint()); } @@ -182,21 +233,42 @@ public void should_not_reprepare_invalid_statements() { } @CassandraVersion("4.0") - @Test(groups = "short") + @ScyllaVersion() + @Test() public void should_never_update_statement_id_for_conditional_updates_in_modern_protocol() { should_never_update_statement_id_for_conditional_updates(session()); } + @CassandraVersion("4.0") + @ScyllaVersion() + @Test() + public void should_never_update_statement_for_conditional_updates_in_legacy_protocols() { + Cluster cluster = + register( + Cluster.builder() + .addContactPoints(getContactPoints()) + .withPort(ccm().getBinaryPort()) + .withProtocolVersion(ccm().getProtocolVersion(V4)) + .build()); + Session session = cluster.connect(keyspace); + should_never_update_statement_id_for_conditional_updates(session); + } + private void should_never_update_statement_id_for_conditional_updates(Session session) { // Given + boolean isScylla = Objects.nonNull(ccm().getScyllaVersion()); + boolean supportsUseMetadataId = isSupportsUseMetadataId(); PreparedStatement ps = session.prepare( "INSERT INTO prepared_statement_invalidation_test (a, b, c) VALUES (?, ?, ?) IF NOT EXISTS"); // Never store metadata in the prepared statement for conditional updates, since the result set - // can change - // depending on the outcome. - assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + // can change depending on the outcome. + if (isScylla) { + assertThat(ps.getPreparedId().resultSetMetadata.variables).hasSize(4); + } else { + assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + } MD5Digest idBefore = ps.getPreparedId().resultSetMetadata.id; // When @@ -205,11 +277,21 @@ private void should_never_update_statement_id_for_conditional_updates(Session se // Then // Successful conditional update => only contains the [applied] column assertThat(rs.wasApplied()).isTrue(); - assertThat(rs.getColumnDefinitions()) - .hasSize(1) - .containsVariable("[applied]", DataType.cboolean()); + if (isScylla) { + assertThat(rs.getColumnDefinitions()) + .hasSize(4) + .containsVariable("[applied]", DataType.cboolean()); + } else { + assertThat(rs.getColumnDefinitions()) + .hasSize(1) + .containsVariable("[applied]", DataType.cboolean()); + } // However the prepared statement shouldn't have changed - assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + if (isScylla) { + assertThat(ps.getPreparedId().resultSetMetadata.variables).hasSize(4); + } else { + assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + } assertThat(ps.getPreparedId().resultSetMetadata.id).isEqualTo(idBefore); // When @@ -225,7 +307,11 @@ private void should_never_update_statement_id_for_conditional_updates(Session se assertThat(row.getInt("b")).isEqualTo(5); assertThat(row.getInt("c")).isEqualTo(5); // The prepared statement still shouldn't have changed - assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + if (isScylla) { + assertThat(ps.getPreparedId().resultSetMetadata.variables).hasSize(4); + } else { + assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + } assertThat(ps.getPreparedId().resultSetMetadata.id).isEqualTo(idBefore); // When @@ -235,30 +321,28 @@ private void should_never_update_statement_id_for_conditional_updates(Session se // Then // Failed conditional update => regular metadata that should also contain the new column assertThat(rs.wasApplied()).isFalse(); - assertThat(rs.getColumnDefinitions()).hasSize(5); + if (isScylla && !supportsUseMetadataId) { + assertThat(rs.getColumnDefinitions()).hasSize(4); + } else { + assertThat(rs.getColumnDefinitions()).hasSize(5); + } row = rs.one(); assertThat(row.getBool("[applied]")).isFalse(); assertThat(row.getInt("a")).isEqualTo(5); assertThat(row.getInt("b")).isEqualTo(5); assertThat(row.getInt("c")).isEqualTo(5); - assertThat(row.isNull("d")).isTrue(); - assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); - assertThat(ps.getPreparedId().resultSetMetadata.id).isEqualTo(idBefore); - } - - @CassandraVersion("4.0") - @Test(groups = "short") - public void should_never_update_statement_for_conditional_updates_in_legacy_protocols() { - // Given - Cluster cluster = - register( - Cluster.builder() - .addContactPoints(getContactPoints()) - .withPort(ccm().getBinaryPort()) - .withProtocolVersion(ccm().getProtocolVersion(V4)) - .build()); - Session session = cluster.connect(keyspace); - should_never_update_statement_id_for_conditional_updates(session); + if (!isScylla) { + assertThat(row.isNull("d")).isTrue(); + assertThat(ps.getPreparedId().resultSetMetadata.variables).isNull(); + assertThat(ps.getPreparedId().resultSetMetadata.id).isEqualTo(idBefore); + } else if (supportsUseMetadataId) { + assertThat(row.isNull("d")).isTrue(); + assertThat(ps.getPreparedId().resultSetMetadata.variables).hasSize(5); + assertThat(ps.getPreparedId().resultSetMetadata.id).isNotEqualTo(idBefore); + } else { + assertThat(row.getColumnDefinitions()).doesNotContainVariable("d"); + assertThat(ps.getPreparedId().resultSetMetadata.variables).hasSize(4); + } } @DataProvider(name = "resolverName") @@ -402,4 +486,9 @@ private Session sessionWithSkipCQL4MetadataResolveMethod( session.execute("USE cql4_loopholes_test"); return session; } + + private boolean isSupportsUseMetadataId() { + return Objects.isNull(ccm().getScyllaVersion()) + || ccm().getScyllaVersion().compareTo(SCYLLA_METADATA_ID_SUPPORT_VERSION) > 0; + } } diff --git a/driver-core/src/test/java/com/datastax/driver/core/SegmentBuilderTest.java b/driver-core/src/test/java/com/datastax/driver/core/SegmentBuilderTest.java index 40adfb66dc3..27f75888fb9 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/SegmentBuilderTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/SegmentBuilderTest.java @@ -35,7 +35,7 @@ public class SegmentBuilderTest { private static final Message.ProtocolEncoder REQUEST_ENCODER = - new Message.ProtocolEncoder(ProtocolVersion.V5); + new Message.ProtocolEncoder(ProtocolVersion.V5, ProtocolFeatureStore.EMPTY); // The constant names denote the total encoded size, including the frame header private static final Message.Request _38B_REQUEST = new Requests.Query("SELECT * FROM table"); @@ -71,6 +71,8 @@ public ChannelPromise answer(InvocationOnMock invocation) { return MOCK_CHANNEL.newPromise(); } }); + + when(CONTEXT.channel()).thenReturn(MOCK_CHANNEL); } @Test(groups = "unit") diff --git a/driver-core/src/test/java/com/datastax/driver/core/StatementSizeTest.java b/driver-core/src/test/java/com/datastax/driver/core/StatementSizeTest.java index 7c9a61ebc0a..de760864287 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/StatementSizeTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/StatementSizeTest.java @@ -116,7 +116,11 @@ public void should_measure_size_of_simple_statement() { @Test(groups = "unit") public void should_measure_size_of_bound_statement() { + Host host = Mockito.mock(Host.class); + Mockito.when(host.getProtocolFeatureStore()).thenReturn(ProtocolFeatureStore.EMPTY); BoundStatement statement = new BoundStatement(preparedStatement); + statement.setHost(host); + int expectedSize = 9 // header size + (2 + PREPARED_ID.length)