Skip to content

Commit b9e40fa

Browse files
authored
GH-942: Fix JDBC Connection.setCatalog() (#943)
## What's Changed Connection.setCatalog() is not silently ignored anymore (through the default implementation in Calcite) but instead it updates the catalog session option in the same way as during the initial connection. Closes #942.
1 parent 385b51e commit b9e40fa

File tree

4 files changed

+131
-31
lines changed

4 files changed

+131
-31
lines changed

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p
7979
public void closeStatement(final StatementHandle statementHandle) {
8080
PreparedStatement preparedStatement =
8181
statementHandlePreparedStatementMap.remove(new StatementHandleKey(statementHandle));
82-
// Testing if the prepared statement was created because the statement can be not created until
82+
// Testing if the prepared statement was created because the statement can be
83+
// not created until
8384
// this moment
8485
if (preparedStatement != null) {
8586
preparedStatement.close();
@@ -224,7 +225,8 @@ public ExecuteResult prepareAndExecute(
224225
MetaResultSet.create(handle.connectionId, handle.id, false, handle.signature, null);
225226
return new ExecuteResult(Collections.singletonList(metaResultSet));
226227
} catch (SQLTimeoutException e) {
227-
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and Runtime
228+
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and
229+
// Runtime
228230
// Exceptions.
229231
throw new RuntimeException(e);
230232
} catch (SQLException e) {
@@ -253,6 +255,20 @@ public boolean syncResults(
253255
return false;
254256
}
255257

258+
@Override
259+
public ConnectionProperties connectionSync(ConnectionHandle ch, ConnectionProperties connProps) {
260+
final ConnectionProperties result = super.connectionSync(ch, connProps);
261+
final String newCatalog = this.connProps.getCatalog();
262+
if (newCatalog != null) {
263+
try {
264+
((ArrowFlightConnection) connection).getClientHandler().setCatalog(newCatalog);
265+
} catch (SQLException e) {
266+
throw new RuntimeException(e);
267+
}
268+
}
269+
return result;
270+
}
271+
256272
void setDefaultConnectionProperties() {
257273
// TODO Double-check this.
258274
connProps
@@ -268,7 +284,8 @@ PreparedStatement getPreparedStatement(StatementHandle statementHandle) {
268284
return statementHandlePreparedStatementMap.get(new StatementHandleKey(statementHandle));
269285
}
270286

271-
// Helper used to look up prepared statement instances later. Avatica doesn't give us the
287+
// Helper used to look up prepared statement instances later. Avatica doesn't
288+
// give us the
272289
// signature in
273290
// an UPDATE code path so we can't directly use StatementHandle as a map key.
274291
private static final class StatementHandleKey {

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.arrow.flight.FlightStatusCode;
4848
import org.apache.arrow.flight.Location;
4949
import org.apache.arrow.flight.LocationSchemes;
50-
import org.apache.arrow.flight.SessionOptionValue;
5150
import org.apache.arrow.flight.SessionOptionValueFactory;
5251
import org.apache.arrow.flight.SetSessionOptionsRequest;
5352
import org.apache.arrow.flight.SetSessionOptionsResult;
@@ -147,20 +146,26 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
147146
try {
148147
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
149148
if (endpoint.getLocations().isEmpty()) {
150-
// Create a stream using the current client only and do not close the client at the end.
149+
// Create a stream using the current client only and do not close the client at
150+
// the end.
151151
endpoints.add(
152152
new CloseableEndpointStreamPair(
153153
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
154154
} else {
155155
// Clone the builder and then set the new endpoint on it.
156156

157-
// GH-38574: Currently a new FlightClient will be made for each partition that returns a
158-
// non-empty Location then disposed of. It may be better to cache clients because a server
159-
// may report the same Locations. It would also be good to identify when the reported
157+
// GH-38574: Currently a new FlightClient will be made for each partition that
158+
// returns a
159+
// non-empty Location then disposed of. It may be better to cache clients
160+
// because a server
161+
// may report the same Locations. It would also be good to identify when the
162+
// reported
160163
// location
161-
// is the same as the original connection's Location and skip creating a FlightClient in
164+
// is the same as the original connection's Location and skip creating a
165+
// FlightClient in
162166
// that scenario.
163-
// Also copy the cache to the client so we can share a cache. Cache needs to cache
167+
// Also copy the cache to the client so we can share a cache. Cache needs to
168+
// cache
164169
// negative attempts too.
165170
List<Exception> exceptions = new ArrayList<>();
166171
CloseableEndpointStreamPair stream = null;
@@ -337,7 +342,8 @@ private boolean isBenignCloseException(FlightRuntimeException fre) {
337342
*/
338343
private void logSuppressedCloseException(
339344
FlightRuntimeException fre, String operationDescription) {
340-
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during shutdown
345+
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during
346+
// shutdown
341347
LOGGER.debug("Suppressed error {}", operationDescription, fre);
342348
}
343349

@@ -388,25 +394,40 @@ public interface PreparedStatement extends AutoCloseable {
388394
/** A connection is created with catalog set as a session option. */
389395
private void setSetCatalogInSessionIfPresent() {
390396
if (catalog.isPresent()) {
391-
final SetSessionOptionsRequest setSessionOptionRequest =
392-
new SetSessionOptionsRequest(
393-
ImmutableMap.<String, SessionOptionValue>builder()
394-
.put(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
395-
.build());
396-
final SetSessionOptionsResult result =
397-
sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
397+
try {
398+
setCatalog(catalog.get());
399+
} catch (SQLException e) {
400+
throw CallStatus.INVALID_ARGUMENT
401+
.withDescription(e.getMessage())
402+
.withCause(e)
403+
.toRuntimeException();
404+
}
405+
}
406+
}
398407

408+
/**
409+
* Sets the catalog for the current session.
410+
*
411+
* @param catalog the catalog to set.
412+
* @throws SQLException if an error occurs while setting the catalog.
413+
*/
414+
public void setCatalog(final String catalog) throws SQLException {
415+
final SetSessionOptionsRequest request =
416+
new SetSessionOptionsRequest(
417+
ImmutableMap.of(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog)));
418+
try {
419+
final SetSessionOptionsResult result = sqlClient.setSessionOptions(request, getOptions());
399420
if (result.hasErrors()) {
400-
Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
401-
for (Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
421+
final Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
422+
for (final Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
402423
LOGGER.warn(error.toString());
403424
}
404-
throw CallStatus.INVALID_ARGUMENT
405-
.withDescription(
406-
String.format(
407-
"Cannot set session option for catalog = %s. Check log for details.", catalog))
408-
.toRuntimeException();
425+
throw new SQLException(
426+
String.format(
427+
"Cannot set session option for catalog = %s. Check log for details.", catalog));
409428
}
429+
} catch (final FlightRuntimeException e) {
430+
throw new SQLException(e);
410431
}
411432
}
412433

@@ -654,7 +675,8 @@ public static final class Builder {
654675

655676
@VisibleForTesting @Nullable Duration connectTimeout;
656677

657-
// These two middleware are for internal use within build() and should not be exposed by builder
678+
// These two middleware are for internal use within build() and should not be
679+
// exposed by builder
658680
// APIs.
659681
// Note that these middleware may not necessarily be registered.
660682
@VisibleForTesting
@@ -980,15 +1002,17 @@ public Location getLocation() {
9801002
* @throws SQLException on error.
9811003
*/
9821004
public ArrowFlightSqlClientHandler build() throws SQLException {
983-
// Copy middleware so that the build method doesn't change the state of the builder fields
1005+
// Copy middleware so that the build method doesn't change the state of the
1006+
// builder fields
9841007
// itself.
9851008
Set<FlightClientMiddleware.Factory> buildTimeMiddlewareFactories =
9861009
new HashSet<>(this.middlewareFactories);
9871010
FlightClient client = null;
9881011
boolean isUsingUserPasswordAuth = username != null && token == null;
9891012

9901013
try {
991-
// Token should take priority since some apps pass in a username/password even when a token
1014+
// Token should take priority since some apps pass in a username/password even
1015+
// when a token
9921016
// is provided
9931017
if (isUsingUserPasswordAuth) {
9941018
buildTimeMiddlewareFactories.add(authFactory);
@@ -1047,8 +1071,10 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
10471071
allocator, channelBuilder.build(), clientBuilder.middleware());
10481072
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
10491073
if (isUsingUserPasswordAuth) {
1050-
// If the authFactory has already been used for a handshake, use the existing token.
1051-
// This can occur if the authFactory is being re-used for a new connection spawned for
1074+
// If the authFactory has already been used for a handshake, use the existing
1075+
// token.
1076+
// This can occur if the authFactory is being re-used for a new connection
1077+
// spawned for
10521078
// getStream().
10531079
if (authFactory.getCredentialCallOption() != null) {
10541080
credentialOptions.add(authFactory.getCredentialCallOption());

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.arrow.driver.jdbc;
1818

19+
import static org.junit.jupiter.api.Assertions.assertEquals;
1920
import static org.junit.jupiter.api.Assertions.assertNotNull;
2021
import static org.junit.jupiter.api.Assertions.assertThrows;
2122
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,12 +27,15 @@
2627
import java.sql.Driver;
2728
import java.sql.DriverManager;
2829
import java.sql.SQLException;
30+
import java.util.Map;
2931
import java.util.Properties;
3032
import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
3133
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
3234
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
3335
import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
3436
import org.apache.arrow.flight.FlightMethod;
37+
import org.apache.arrow.flight.NoOpSessionOptionValueVisitor;
38+
import org.apache.arrow.flight.SessionOptionValue;
3539
import org.apache.arrow.memory.BufferAllocator;
3640
import org.apache.arrow.memory.RootAllocator;
3741
import org.apache.arrow.util.AutoCloseables;
@@ -614,12 +618,46 @@ public void testJdbcDriverVersionIntegration() throws Exception {
614618

615619
var expectedUserAgent =
616620
"JDBC Flight SQL Driver " + driverVersion.getDriverVersion().versionString;
617-
// Driver appends version to grpc user-agent header. Assert the header starts with the
621+
// Driver appends version to grpc user-agent header. Assert the header starts
622+
// with the
618623
// expected
619624
// value and ignored grpc version.
620625
assertTrue(
621626
actualUserAgent.startsWith(expectedUserAgent),
622627
"Expected: " + expectedUserAgent + " but found: " + actualUserAgent);
623628
}
624629
}
630+
631+
@Test
632+
public void testSetCatalogShouldUpdateSessionOptions() throws Exception {
633+
final Properties properties = new Properties();
634+
properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
635+
properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(), passTest);
636+
properties.put("useEncryption", false);
637+
638+
try (Connection connection =
639+
DriverManager.getConnection(
640+
"jdbc:arrow-flight-sql://"
641+
+ FLIGHT_SERVER_TEST_EXTENSION.getHost()
642+
+ ":"
643+
+ FLIGHT_SERVER_TEST_EXTENSION.getPort(),
644+
properties)) {
645+
final String catalog = "new_catalog";
646+
connection.setCatalog(catalog);
647+
648+
final Map<String, SessionOptionValue> options = PRODUCER.getSessionOptions();
649+
assertTrue(options.containsKey("catalog"));
650+
String actualCatalog =
651+
options
652+
.get("catalog")
653+
.acceptVisitor(
654+
new NoOpSessionOptionValueVisitor<String>() {
655+
@Override
656+
public String visit(String value) {
657+
return value;
658+
}
659+
});
660+
assertEquals(catalog, actualCatalog);
661+
}
662+
}
625663
}

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
import org.apache.arrow.flight.PutResult;
5353
import org.apache.arrow.flight.Result;
5454
import org.apache.arrow.flight.SchemaResult;
55+
import org.apache.arrow.flight.SessionOptionValue;
56+
import org.apache.arrow.flight.SetSessionOptionsRequest;
57+
import org.apache.arrow.flight.SetSessionOptionsResult;
5558
import org.apache.arrow.flight.Ticket;
5659
import org.apache.arrow.flight.sql.FlightSqlProducer;
5760
import org.apache.arrow.flight.sql.SqlInfoBuilder;
@@ -664,6 +667,22 @@ public SqlInfoBuilder getSqlInfoBuilder() {
664667
return sqlInfoBuilder;
665668
}
666669

670+
private final Map<String, SessionOptionValue> sessionOptions = new HashMap<>();
671+
672+
@Override
673+
public void setSessionOptions(
674+
final SetSessionOptionsRequest request,
675+
final CallContext context,
676+
final StreamListener<SetSessionOptionsResult> listener) {
677+
sessionOptions.putAll(request.getSessionOptions());
678+
listener.onNext(new SetSessionOptionsResult(Collections.emptyMap()));
679+
listener.onCompleted();
680+
}
681+
682+
public Map<String, SessionOptionValue> getSessionOptions() {
683+
return sessionOptions;
684+
}
685+
667686
private static final class TicketConversionUtils {
668687
private TicketConversionUtils() {
669688
// Prevent instantiation.

0 commit comments

Comments
 (0)