Skip to content

CASSJAVA97: Let users inject an ID for each request and write to the custom payload #2037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: 4.x
Choose a base branch
from

Conversation

SiyaoIsHiding
Copy link
Contributor

@SiyaoIsHiding SiyaoIsHiding commented Apr 16, 2025

No description provided.

@SiyaoIsHiding
Copy link
Contributor Author

I did integration testing with C* OSS 5.0.2. @lukasz-antoniak helped me add a LoggingQueryHandler and set it as the cassandra.custom_query_handler_class.
I developed a client app using this Java driver with the following config

datastax-java-driver.advanced = {
  distributed-tracing.id-generator.class = W3CContextDistributedTraceIdGenerator
  distributed-tracing.custom-payload-with-key = "traceparent"
}

Running this client app, I got

17:03:20.860 [s0-io-5] TRACE InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Writing 00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00 on stream id 0
17:03:20.863 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Request sent on [id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042]
17:03:20.864 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Speculative execution policy returned -1, no next execution
17:03:20.877 [s0-io-5] DEBUG InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Got last response on in-flight stream id 0, completing and releasing
17:03:20.877 [s0-io-5] TRACE InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Releasing stream id 0
17:03:20.877 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Got result, completing

And the debug.log at server side got

DEBUG [Native-Transport-Requests-1] 2025-04-15 17:03:20,870 LoggingQueryHandler.java:44 - Processing CQL statement SelectStatement[aggregationSpecFactory=,bindVariables=[],isReversed=false,limit=,orderingComparator=,parameters=org.apache.cassandra.cql3.statements.SelectStatement$Parameters@5d46ec82,perPartitionLimit=,restrictions=StatementRestrictions[clusteringColumnsRestrictions=ClusteringColumnRestrictions[allowFiltering=false,comparator=comparator(),restrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}]],filterRestrictions=IndexRestrictions[customExpressions=[],regularRestrictions=[]],hasRegularColumnsRestrictions=false,isKeyRange=true,nonPrimaryKeyRestrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}],notNullColumns=[],partitionKeyRestrictions=PartitionKeySingleRestrictionSet[comparator=comparator(org.apache.cassandra.db.marshal.UTF8Type),restrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}]],table=system.local,type=SELECT,usesSecondaryIndexing=false],selection=SimpleSelection{columns=[key, bootstrapped, broadcast_address, broadcast_port, cluster_name, cql_version, data_center, gossip_generation, host_id, listen_address, listen_port, native_protocol_version, partitioner, rack, release_version, rpc_address, rpc_port, schema_version, tokens, truncated_at], columnMapping={ Columns:[key, bootstrapped, broadcast_address, broadcast_port, cluster_name, cql_version, data_center, gossip_generation, host_id, listen_address, listen_port, native_protocol_version, partitioner, rack, release_version, rpc_address, rpc_port, schema_version, tokens, truncated_at], Mappings:{rack:[rack], cql_version:[cql_version], listen_address:[listen_address], release_version:[release_version], data_center:[data_center], broadcast_port:[broadcast_port], broadcast_address:[broadcast_address], partitioner:[partitioner], host_id:[host_id], gossip_generation:[gossip_generation], listen_port:[listen_port], rpc_address:[rpc_address], schema_version:[schema_version], rpc_port:[rpc_port], truncated_at:[truncated_at], cluster_name:[cluster_name], native_protocol_version:[native_protocol_version], tokens:[tokens], key:[key], bootstrapped:[bootstrapped]} }, metadata=[key(system, local), org.apache.cassandra.db.marshal.UTF8Type][bootstrapped(system, local), org.apache.cassandra.db.marshal.UTF8Type][broadcast_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][broadcast_port(system, local), org.apache.cassandra.db.marshal.Int32Type][cluster_name(system, local), org.apache.cassandra.db.marshal.UTF8Type][cql_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][data_center(system, local), org.apache.cassandra.db.marshal.UTF8Type][gossip_generation(system, local), org.apache.cassandra.db.marshal.Int32Type][host_id(system, local), org.apache.cassandra.db.marshal.UUIDType][listen_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][listen_port(system, local), org.apache.cassandra.db.marshal.Int32Type][native_protocol_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][partitioner(system, local), org.apache.cassandra.db.marshal.UTF8Type][rack(system, local), org.apache.cassandra.db.marshal.UTF8Type][release_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][rpc_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][rpc_port(system, local), org.apache.cassandra.db.marshal.Int32Type][schema_version(system, local), org.apache.cassandra.db.marshal.UUIDType][tokens(system, local), org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)][truncated_at(system, local), org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UUIDType,org.apache.cassandra.db.marshal.BytesType)]},table=system.local] with custom payload {traceparent=30302d64353165643230313263316633316234343334663430396535303239346461392d323564613234386430613233393831632d3030}

The value 30302d64353165643230313263316633316234343334663430396535303239346461392d323564613234386430613233393831632d3030 is the hex of the id 00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00. This shows the capability of tracing a request across client and server.

// We cannot do statement.getCustomPayload().put() because the default empty map is abstract
// But this will create new Statement instance for every request. We might want to optimize
// this
Map<String, ByteBuffer> existingMap = new HashMap<>(statement.getCustomPayload());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statement is by design immutable. Maybe a nicer way would be to create method StatementBuilder.from(Statement) where you could create builder again based on statement. The code would look like: StatementBuilder.from(statement).addCustomPayload(...).build().

Map<String, ByteBuffer> existingMap = new HashMap<>(statement.getCustomPayload());
existingMap.put(
this.customPayloadKey, ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8)));
statement = statement.setCustomPayload(existingMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding custom payload here is not thread-safe. If client application executes the same statement instance multiple times concurrently (not a good use-case, but still possible), we do not guarantee how this map will be changed. Maybe indeed, there is no other way than make a shallow copy of the statement. Will think about it.

  /**
   * Sets the custom payload to use for execution.
   *
   * <p>All the driver's built-in statement implementations are immutable, and return a new instance
   * from this method. However custom implementations may choose to be mutable and return the same
   * instance.
   *
   * <p>Note that it's your responsibility to provide a thread-safe map. This can be achieved with a
   * concurrent or immutable implementation, or by making it effectively immutable (meaning that
   * it's never modified after being set on the statement).
   */
  @NonNull
  @CheckReturnValue
  SelfT setCustomPayload(@NonNull Map<String, ByteBuffer> newCustomPayload);

@SiyaoIsHiding SiyaoIsHiding marked this pull request as ready for review April 17, 2025 08:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants