From a784dacb7de48be3255fa5ed64b8414cc6cf4e19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Barbosa=20Sampaio?= Date: Tue, 15 Jul 2025 14:21:33 -0400 Subject: [PATCH 1/3] mirror hbase 2 update --- .../bigtable-hbase-2.x-shaded/pom.xml | 4 + .../bigtable/hbase2_x/BigtableAsyncAdmin.java | 3 + .../it/verify-shaded-jar-entries-ok/pom.xml | 6 + .../hbase1_x/MirroringConnection.java | 19 ++ .../mirroring/hbase1_x/MirroringTable.java | 80 ++++++ .../hbase1_x/TestMirroringConnection.java | 45 ++- .../TestMirroringConnectionClosing.java | 268 ++++++++++++++++++ .../java/hbase1_x/TestMirroringTable.java | 112 ++++++++ .../pom.xml | 5 + .../pom.xml | 19 ++ .../hbase2_x/MirroringAsyncConnection.java | 10 + .../hbase2_x/MirroringAsyncTable.java | 18 +- .../hbase2_x/MirroringConnection.java | 18 ++ .../mirroring/hbase2_x/MirroringTable.java | 62 ++++ .../mirroring/hbase2_x/TestConnection.java | 47 +++ .../hbase2_x/TestMirroringAsyncTable.java | 8 +- .../hbase2_x}/TestMirroringConnection.java | 31 +- .../TestMirroringConnectionClosing.java | 5 +- .../hbase2_x/TestMirroringTable.java | 114 ++++++++ .../mirroring/core/MirroringConnection.java | 19 +- .../mirroring/core/MirroringTable.java | 63 +--- .../mirroring/core/TestMirroringTable.java | 11 - .../mirroring-client/pom.xml | 3 - pom.xml | 2 +- 24 files changed, 863 insertions(+), 109 deletions(-) create mode 100644 hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java create mode 100644 hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java create mode 100644 hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java create mode 100644 hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java rename hbase-migration-tools/mirroring-client/{bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core => bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x}/TestMirroringConnection.java (80%) rename hbase-migration-tools/mirroring-client/{bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core => bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x}/TestMirroringConnectionClosing.java (97%) create mode 100644 hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml index 661c34de58..a5be6d570b 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml @@ -291,6 +291,10 @@ limitations under the License. io.opentelemetry com.google.bigtable.repackaged.io.opentelemetry + + javax.activation + com.google.bigtable.repackaged.javax.activation + META-INF/versions/9/io/opentelemetry META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java index 0a7b948026..43b7e124ac 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java @@ -695,6 +695,9 @@ public int getStoreFileCount() { public Size getStoreFileSize() { return new Size(size, Unit.BYTE); } + + @Override + public Size getMemStoreSize() { return new Size(size, Unit.BYTE); } } /** Handler for unsupported operations for generating Admin class at runtime. */ public static class UnsupportedOperationsHandler implements InvocationHandler { diff --git a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml index 0c29be6832..3de383b0ba 100644 --- a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml +++ b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml @@ -57,6 +57,12 @@ limitations under the License. com.google.bigtable.repackaged.javax.annotation + + javax.activation + + com.google.bigtable.repackaged.javax.activation + + org.checkerframework diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java index a91652823a..1623afdaa3 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; public class MirroringConnection @@ -41,4 +42,22 @@ public MirroringConnection(MirroringConfiguration mirroringConfiguration, Execut throws IOException { super(mirroringConfiguration, pool); } + + @Override + protected Table getMirroringTable(Table primaryTable, Table secondaryTable) { + return new MirroringTable( + primaryTable, + secondaryTable, + executorService, + this.mismatchDetector, + this.flowController, + this.secondaryWriteErrorConsumer, + this.readSampler, + this.timestamper, + this.performWritesConcurrently, + this.waitForSecondaryWrites, + this.mirroringTracer, + this.referenceCounter, + this.configuration.mirroringOptions.maxLoggedBinaryValueLength); + } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java new file mode 100644 index 0000000000..8af7776ff2 --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java @@ -0,0 +1,80 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; +import io.opencensus.common.Scope; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; + +@InternalApi("For internal usage only") +public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable + implements Table { + /** + * @param executorService ExecutorService is used to perform operations on secondaryTable and + * verification tasks. + * @param mismatchDetector Detects mismatches in results from operations preformed on both + * databases. + * @param secondaryWriteErrorConsumer Consumer secondary write errors. + */ + public MirroringTable( + Table primaryTable, + Table secondaryTable, + ExecutorService executorService, + MismatchDetector mismatchDetector, + FlowController flowController, + SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + ReadSampler readSampler, + Timestamper timestamper, + boolean performWritesConcurrently, + boolean waitForSecondaryWrites, + MirroringTracer mirroringTracer, + ReferenceCounter parentReferenceCounter, + int resultScannerBufferedMismatchedResults) { + super( + primaryTable, + secondaryTable, + executorService, + mismatchDetector, + flowController, + secondaryWriteErrorConsumer, + readSampler, + timestamper, + performWritesConcurrently, + waitForSecondaryWrites, + mirroringTracer, + parentReferenceCounter, + resultScannerBufferedMismatchedResults); + } + + @Override + public void mutateRow(final RowMutations rowMutations) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) { + Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations); + this.batcher.batchSingleWriteOperation(rowMutations); + } + } +} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java index 86216141cb..a313912a17 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.cloud.bigtable.mirroring.core.TestConnection; import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection; @@ -27,12 +29,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class TestMirroringConnection { + private Connection connection; + + @Before + public void setUp() throws IOException { + TestConnection.reset(); + Configuration configuration = createConfiguration(); + connection = ConnectionFactory.createConnection(configuration); + assertThat(TestConnection.connectionMocks.size()).isEqualTo(2); + } + private Configuration createConfiguration() { Configuration configuration = new Configuration(); configuration.set("hbase.client.connection.impl", MirroringConnection.class.getCanonicalName()); @@ -40,8 +53,10 @@ private Configuration createConfiguration() { MIRRORING_PRIMARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName()); configuration.set( MIRRORING_SECONDARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName()); - configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "1"); - configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "2"); + // Prefix keys have to be set because we are using the same class as primary and secondary + // connection class. + configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "primary-connection"); + configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "secondary-connection"); configuration.set( "google.bigtable.mirroring.write-error-log.appender.prefix-path", "/tmp/test-"); configuration.set("google.bigtable.mirroring.write-error-log.appender.max-buffer-size", "1024"); @@ -52,12 +67,30 @@ private Configuration createConfiguration() { @Test public void testConnectionFactoryCreatesMirroringConnection() throws IOException { - Configuration configuration = createConfiguration(); - Connection connection = ConnectionFactory.createConnection(configuration); - assertThat(connection).isInstanceOf(MirroringConnection.class); + assertThat(connection).isInstanceOf( + MirroringConnection.class); assertThat(((MirroringConnection) connection).getPrimaryConnection()) .isInstanceOf(TestConnection.class); assertThat(((MirroringConnection) connection).getSecondaryConnection()) .isInstanceOf(TestConnection.class); } + + @Test + public void testCloseClosesUnderlyingConnections() throws IOException { + connection.close(); + assertThat(connection.isClosed()).isTrue(); + verify(TestConnection.connectionMocks.get(0), times(1)).close(); + verify(TestConnection.connectionMocks.get(1), times(1)).close(); + } + + @Test + public void testAbortAbortsUnderlyingConnections() throws IOException { + String expectedString = "expected"; + Throwable expectedThrowable = new Exception(); + connection.abort(expectedString, expectedThrowable); + verify(TestConnection.connectionMocks.get(0), times(1)) + .abort(expectedString, expectedThrowable); + verify(TestConnection.connectionMocks.get(1), times(1)) + .abort(expectedString, expectedThrowable); + } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java new file mode 100644 index 0000000000..83e3ffcaad --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java @@ -0,0 +1,268 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase1_x; + +import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY; +import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY; +import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY; +import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule; +import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner; +import com.google.cloud.bigtable.mirroring.core.TestConnection; +import com.google.cloud.bigtable.mirroring.core.TestHelpers; +import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection; +import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringTable; +import com.google.common.util.concurrent.SettableFuture; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class TestMirroringConnectionClosing { + @Rule + public final ExecutorServiceRule executorServiceRule = + ExecutorServiceRule.spyedSingleThreadedExecutor(); + + private Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.set("hbase.client.connection.impl", MirroringConnection.class.getCanonicalName()); + configuration.set( + MIRRORING_PRIMARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName()); + configuration.set( + MIRRORING_SECONDARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName()); + // Prefix keys have to be set because we are using the same class as primary and secondary + // connection class. + configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "primary-connection"); + configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "secondary-connection"); + configuration.set( + "google.bigtable.mirroring.write-error-log.appender.prefix-path", "/tmp/test-"); + configuration.set("google.bigtable.mirroring.write-error-log.appender.max-buffer-size", "1024"); + configuration.set( + "google.bigtable.mirroring.write-error-log.appender.drop-on-overflow", "false"); + return configuration; + } + + MirroringConnection mirroringConnection; + MirroringTable mirroringTable; + MirroringResultScanner mirroringScanner; + + @Before + public void setUp() throws IOException { + TestConnection.reset(); + Configuration configuration = createConfiguration(); + + mirroringConnection = + spy( + (MirroringConnection) + ConnectionFactory.createConnection( + configuration, executorServiceRule.executorService)); + assertThat(TestConnection.connectionMocks.size()).isEqualTo(2); + + mirroringTable = (MirroringTable) mirroringConnection.getTable(TableName.valueOf("test")); + mirroringScanner = (MirroringResultScanner) mirroringTable.getScanner(new Scan()); + } + + @Test + public void testUnderlingObjectsAreClosedInCorrectOrder() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + final SettableFuture unblockSecondaryScanner = SettableFuture.create(); + final SettableFuture scannerAndTableClosed = SettableFuture.create(); + final SettableFuture closeFinished = SettableFuture.create(); + TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockSecondaryScanner).next(); + + // We expect secondary objects to be closed in correct order - from the innermost to the + // outermost. + // TestConnection object is created for each both primary and secondary, that connections, + // tables and scanners created using those connections are stored in static *Mocks field of + // TestConnection, in order of creation. + // Thus, `TestConnection.connectionMocks.get(1)` is secondary connection mock, + // `TestConnection.tableMocks.get(1)` is a table created using this connection, + // and `TestConnection.scannerMocks.get(1)` is a scanner created using this table. + InOrder inOrder = + Mockito.inOrder( + TestConnection.scannerMocks.get(1), + TestConnection.tableMocks.get(1), + TestConnection.connectionMocks.get(1)); + + Thread t = + new Thread( + new Runnable() { + @Override + public void run() { + try { + mirroringScanner.next(); + + mirroringScanner.close(); + mirroringTable.close(); + + scannerAndTableClosed.set(null); + mirroringConnection.close(); + closeFinished.set(null); + } catch (Exception e) { + closeFinished.setException(e); + } + } + }); + t.start(); + + // Wait until secondary request is scheduled. + scannerAndTableClosed.get(5, TimeUnit.SECONDS); + // Give mirroringConnection.close() some time to run + Thread.sleep(3000); + // and verify that it was called. + verify(mirroringConnection).close(); + + // Finish async call. + unblockSecondaryScanner.set(null); + // The close() should finish. + closeFinished.get(5, TimeUnit.SECONDS); + t.join(); + + executorServiceRule.waitForExecutor(); + + inOrder.verify(TestConnection.scannerMocks.get(1)).close(); + inOrder.verify(TestConnection.tableMocks.get(1)).close(); + inOrder.verify(TestConnection.connectionMocks.get(1)).close(); + + assertThat(mirroringConnection.isClosed()).isTrue(); + verify(TestConnection.connectionMocks.get(0), times(1)).close(); + verify(TestConnection.tableMocks.get(0), times(1)).close(); + verify(TestConnection.scannerMocks.get(0), times(1)).close(); + } + + @Test(timeout = 5000) + public void testClosingConnectionWithoutClosingUnderlyingObjectsShouldntBlock() + throws IOException { + // We have created a connection, table and scanner. + // They are not use asynchronously now, thus connection should be closed without delay. + mirroringConnection.close(); + verify(TestConnection.connectionMocks.get(0)).close(); + verify(TestConnection.connectionMocks.get(1)).close(); + } + + @Test + public void testInFlightRequestBlockClosingConnection() + throws IOException, InterruptedException, TimeoutException, ExecutionException { + final SettableFuture unblockSecondaryScanner = SettableFuture.create(); + final SettableFuture asyncScheduled = SettableFuture.create(); + final SettableFuture closeFinished = SettableFuture.create(); + TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockSecondaryScanner).next(); + + Thread t = + new Thread( + new Runnable() { + @Override + public void run() { + try { + mirroringScanner.next(); + + // Not calling close on scanner nor on table. + asyncScheduled.set(null); + + mirroringConnection.close(); + closeFinished.set(null); + } catch (Exception e) { + closeFinished.setException(e); + } + } + }); + t.start(); + + // Wait until secondary request is scheduled. + asyncScheduled.get(5, TimeUnit.SECONDS); + // Give mirroringConnection.close() some time to run + Thread.sleep(3000); + // and verify that it was called. + verify(mirroringConnection).close(); + + // Finish async call. + unblockSecondaryScanner.set(null); + // The close() should finish even though we didn't close scanner nor table. + closeFinished.get(5, TimeUnit.SECONDS); + t.join(); + } + + @Test + public void testConnectionWaitsForAsynchronousClose() + throws IOException, InterruptedException, TimeoutException, ExecutionException { + final SettableFuture unblockScannerNext = SettableFuture.create(); + final SettableFuture unblockScannerClose = SettableFuture.create(); + final SettableFuture asyncScheduled = SettableFuture.create(); + final SettableFuture closeFinished = SettableFuture.create(); + TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockScannerNext).next(); + TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockScannerClose).close(); + + Thread t = + new Thread( + new Runnable() { + @Override + public void run() { + try { + mirroringScanner.next(); + mirroringScanner.close(); + asyncScheduled.set(null); + mirroringConnection.close(); + closeFinished.set(null); + } catch (Exception e) { + closeFinished.setException(e); + } + } + }); + t.start(); + + // Wait until secondary request is scheduled. + asyncScheduled.get(5, TimeUnit.SECONDS); + // Unblock scanner next. + unblockScannerNext.set(null); + // Give mirroringConnection.close() and secondaryScanner.close() some time to run + Thread.sleep(1000); + // and verify that they were called. + verify(mirroringConnection).close(); + verify(TestConnection.scannerMocks.get(1)).close(); + + // secondary.close() was not yet finished, close should be blocked. + try { + closeFinished.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException expected) { + // async operation has not finished - close should block. + } + + // Finish secondaryScanner.close(). + unblockScannerClose.set(null); + // And now connection.close() should unblock. + closeFinished.get(5, TimeUnit.SECONDS); + t.join(); + } +} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java new file mode 100644 index 0000000000..d904934c12 --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java @@ -0,0 +1,112 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase1_x; + +import static com.google.cloud.bigtable.mirroring.core.TestHelpers.mockBatch; +import static com.google.cloud.bigtable.mirroring.core.TestHelpers.setupFlowControllerMock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule; +import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumerWithMetrics; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringMetricsRecorder; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanFactory; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.core.verification.DefaultMismatchDetector; +import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; +import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringTable; +import io.opencensus.trace.Tracing; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TestMirroringTable { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Rule + public final ExecutorServiceRule executorServiceRule = + ExecutorServiceRule.singleThreadedExecutor(); + + @Mock Table primaryTable; + @Mock Table secondaryTable; + @Mock FlowController flowController; + @Mock MirroringMetricsRecorder mirroringMetricsRecorder; + @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + @Mock ReferenceCounter referenceCounter; + + Timestamper timestamper = new NoopTimestamper(); + MismatchDetector mismatchDetector; + MirroringTable mirroringTable; + MirroringTracer mirroringTracer; + + @Before + public void setUp() { + setupFlowControllerMock(flowController); + this.mirroringTracer = + new MirroringTracer( + new MirroringSpanFactory(Tracing.getTracer(), mirroringMetricsRecorder), + mirroringMetricsRecorder); + this.mismatchDetector = spy(new DefaultMismatchDetector(this.mirroringTracer, 100)); + this.mirroringTable = + spy( + new MirroringTable( + primaryTable, + secondaryTable, + this.executorServiceRule.executorService, + mismatchDetector, + flowController, + secondaryWriteErrorConsumer, + new ReadSampler(100), + this.timestamper, + false, + false, + this.mirroringTracer, + this.referenceCounter, + 1000)); + } + + @Test + public void testMutateRow() throws IOException, InterruptedException { + RowMutations mutations = new RowMutations("r1".getBytes()); + List listOfMutations = Arrays.asList(mutations); + mockBatch(primaryTable, secondaryTable, mutations, new Result()); + mirroringTable.mutateRow(mutations); + executorServiceRule.waitForExecutor(); + verify(primaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); + } +} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml index a9d66c0b2b..9066e808f3 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml @@ -42,6 +42,10 @@ limitations under the License. log4j log4j + + ch.qos.reload4j + reload4j + @@ -202,6 +206,7 @@ limitations under the License. commons-logging:commons-logging log4j:log4j + ch.qos.reload4j:reload4j diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml index 9293d1123e..3cdc16de8c 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml @@ -126,6 +126,18 @@ limitations under the License. io.opencensus ${shading-prefix}.io.opencensus + + io.opentelemetry + com.google.cloud.bigtable.mirroring.repackaged.io.opentelemetry + + + META-INF/versions/9/io/opentelemetry + META-INF/versions/9/com/google/cloud/bigtable/mirroring/repackaged/io/opentelemetry + + + javax.activation + com.google.cloud.bigtable.mirroring.repackaged.javax.activation + io.grpc @@ -198,6 +210,13 @@ limitations under the License. org.apache.hbase:hbase-shaded-client + + + io.opentelemetry:opentelemetry-api + io.opentelemetry:opentelemetry-context + io.opentelemetry:opentelemetry-semconv + diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java index e0e4a7e1ba..6f8b7840de 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java @@ -370,6 +370,16 @@ public AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { return this; } + @Override + public AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { + setTimeParameter( + pause, + unit, + this.primaryTableBuilder::setRetryPauseForServerOverloaded, + this.secondaryTableBuilder::setRetryPauseForServerOverloaded); + return this; + } + @Override public AsyncTableBuilder setMaxAttempts(int maxAttempts) { setIntegerParameter( diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java index d92fe51911..a3157f6759 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -202,9 +204,9 @@ public CompletableFuture increment(Increment increment) { } @Override - public CompletableFuture mutateRow(RowMutations rowMutations) { + public CompletableFuture mutateRow(RowMutations rowMutations) { this.timestamper.fillTimestamp(rowMutations); - CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); + CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); return writeWithFlowControl( new WriteOperationInfo(rowMutations), primaryFuture, @@ -534,6 +536,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate( throw new UnsupportedOperationException("not implemented"); } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public List> checkAndMutate(List list) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void scan(Scan scan, C consumer) { this.primaryTable.scan(scan, consumer); @@ -561,7 +573,7 @@ public MirroringCheckAndMutateBuilder(CheckAndMutateBuilder primaryBuilder) { private OperationStages> checkAndMutate( WriteOperationInfo writeOperationInfo, CompletableFuture primary, - Supplier> secondary) { + Supplier> secondary) { OperationStages> returnedValue = new OperationStages<>(new CompletableFuture<>()); primary diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java index ffe21d4871..c6cabb0294 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java @@ -30,6 +30,24 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService super(conf, managed, pool, user); } + @Override + protected Table getMirroringTable(Table primaryTable, Table secondaryTable) { + return new MirroringTable( + primaryTable, + secondaryTable, + executorService, + this.mismatchDetector, + this.flowController, + this.secondaryWriteErrorConsumer, + this.readSampler, + this.timestamper, + this.performWritesConcurrently, + this.waitForSecondaryWrites, + this.mirroringTracer, + this.referenceCounter, + this.configuration.mirroringOptions.maxLoggedBinaryValueLength); + } + public MirroringConnection(Configuration conf, ExecutorService pool, User user) throws Throwable { this(conf, false, pool, user); } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java index aad985884a..47c38a7e27 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java @@ -15,26 +15,39 @@ */ package com.google.cloud.bigtable.mirroring.hbase2_x; +import com.google.cloud.bigtable.mirroring.core.utils.CallableThrowingIOException; +import com.google.cloud.bigtable.mirroring.core.utils.OperationUtils; import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer; import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.WriteOperationInfo; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; +import io.opencensus.common.Scope; import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable implements Table { + // We keep a reference to the secondary table to get around the + // api change for mutate rows (used to return void, not returns Result) + private Table secondaryTable; + public MirroringTable( Table primaryTable, Table secondaryTable, @@ -63,6 +76,7 @@ public MirroringTable( mirroringTracer, referenceCounter, resultScannerBufferedMismatchedResults); + this.secondaryTable = secondaryTable; } @Override @@ -80,6 +94,54 @@ public boolean[] exists(List gets) throws IOException { return existsAll(gets); } + @Override + public Result mutateRow(final RowMutations rowMutations) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) { + Log.trace("[%s] mutateRow(mutateRow=%s)", this.getName(), rowMutations); + + if (rowMutations.getMutations().isEmpty()) { + return Result.EMPTY_RESULT; + } + + Result result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + return primaryTable.mutateRow(rowMutations); + } + }, + HBaseOperation.MUTATE_ROW); + + Mutation firstMutation = rowMutations.getMutations().get(0); + + // If it is either Append or Increment, the underlying operation is a rmw and we need + // the result of that operation to apply on the secondary table + if (firstMutation instanceof Append || firstMutation instanceof Increment) { + Put put = OperationUtils.makePutFromResult(result); + + scheduleSequentialWriteOperation( + new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); + } else { + // Async wrapper mutateRow implementation returns void. HBase 2.4+ returns result + // so we skip the syntatic sugar + scheduleSequentialWriteOperation( + new WriteOperationInfo(rowMutations), + this.secondaryAsyncWrapper.createSubmitTaskSupplier( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + Log.trace("mutateRow(RowMutations)"); + return secondaryTable.mutateRow(rowMutations); + } + }, + HBaseOperation.MUTATE_ROW)); + } + + return result; + } + } + /** * HBase 1.x's {@link Table#append} returns {@code null} when {@link Append#isReturnResults} is * {@code false} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java new file mode 100644 index 0000000000..269db8e9b7 --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java @@ -0,0 +1,47 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase2_x; + +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableBuilder; +import org.apache.hadoop.hbase.security.User; + +public class TestConnection extends com.google.cloud.bigtable.mirroring.core.TestConnection implements + Connection { + + public TestConnection(Configuration conf, boolean managed, + ExecutorService pool, User user) { + super(conf, managed, pool, user); + } + + public TestConnection(Configuration conf, + ExecutorService pool, User user) { + super(conf, false, pool, user); + } + + @Override + public void clearRegionLocationCache() { + + } + + @Override + public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) { + return null; + } +} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 4d2c5839d0..5252729080 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -757,7 +757,7 @@ public void testConditionalWriteHappensWhenSecondaryErred() when(primaryBuilder.thenMutate(mutations)).thenReturn(primaryFuture); IOException ioe = new IOException("expected"); - CompletableFuture exceptionalFuture = new CompletableFuture<>(); + CompletableFuture exceptionalFuture = new CompletableFuture<>(); exceptionalFuture.completeExceptionally(ioe); when(secondaryTable.mutateRow(mutations)).thenReturn(exceptionalFuture); @@ -841,12 +841,12 @@ public void testDelete() throws InterruptedException, ExecutionException { @Test public void testMutateRow() throws ExecutionException, InterruptedException { RowMutations mutations = new RowMutations("r1".getBytes()); - CompletableFuture primaryFuture = new CompletableFuture<>(); - CompletableFuture secondaryFuture = new CompletableFuture<>(); + CompletableFuture primaryFuture = new CompletableFuture<>(); + CompletableFuture secondaryFuture = new CompletableFuture<>(); when(primaryTable.mutateRow(mutations)).thenReturn(primaryFuture); when(secondaryTable.mutateRow(mutations)).thenReturn(secondaryFuture); - CompletableFuture resultFuture = mirroringTable.mutateRow(mutations); + CompletableFuture resultFuture = mirroringTable.mutateRow(mutations); primaryFuture.complete(null); secondaryFuture.complete(null); resultFuture.get(); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java similarity index 80% rename from hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java rename to hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java index 9c75fd0e1b..c901422f8c 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,20 +13,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.mirroring.core; +package com.google.cloud.bigtable.mirroring.hbase2_x; + import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableBuilder; +import org.apache.hadoop.hbase.security.User; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -65,7 +84,8 @@ private Configuration createConfiguration() { @Test public void testConnectionFactoryCreatesMirroringConnection() throws IOException { - assertThat(connection).isInstanceOf(MirroringConnection.class); + assertThat(connection).isInstanceOf( + MirroringConnection.class); assertThat(((MirroringConnection) connection).getPrimaryConnection()) .isInstanceOf(TestConnection.class); assertThat(((MirroringConnection) connection).getSecondaryConnection()) @@ -90,9 +110,4 @@ public void testAbortAbortsUnderlyingConnections() throws IOException { verify(TestConnection.connectionMocks.get(1), times(1)) .abort(expectedString, expectedThrowable); } - - @Test - public void testConstructorTakingMirroringConfiguration() throws IOException { - new MirroringConnection(new MirroringConfiguration(createConfiguration()), null); - } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java similarity index 97% rename from hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java rename to hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java index 1a82fe38f1..525d16347d 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.mirroring.core; +package com.google.cloud.bigtable.mirroring.hbase2_x; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY; @@ -25,6 +25,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule; +import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner; +import com.google.cloud.bigtable.mirroring.core.TestHelpers; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.util.concurrent.ExecutionException; diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java new file mode 100644 index 0000000000..6c77e18106 --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java @@ -0,0 +1,114 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase2_x; + +import static com.google.cloud.bigtable.mirroring.core.TestHelpers.setupFlowControllerMock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule; +import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumerWithMetrics; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringMetricsRecorder; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanFactory; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.core.verification.DefaultMismatchDetector; +import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; +import io.opencensus.trace.Tracing; +import java.io.IOException; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TestMirroringTable { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Rule + public final ExecutorServiceRule executorServiceRule = + ExecutorServiceRule.singleThreadedExecutor(); + + @Mock + Table primaryTable; + @Mock Table secondaryTable; + @Mock + FlowController flowController; + @Mock + MirroringMetricsRecorder mirroringMetricsRecorder; + @Mock + SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + @Mock + ReferenceCounter referenceCounter; + + Timestamper timestamper = new NoopTimestamper(); + MismatchDetector mismatchDetector; + MirroringTable mirroringTable; + MirroringTracer mirroringTracer; + + @Before + public void setUp() { + setupFlowControllerMock(flowController); + this.mirroringTracer = + new MirroringTracer( + new MirroringSpanFactory(Tracing.getTracer(), mirroringMetricsRecorder), + mirroringMetricsRecorder); + this.mismatchDetector = spy(new DefaultMismatchDetector(this.mirroringTracer, 100)); + this.mirroringTable = + spy( + new MirroringTable( + primaryTable, + secondaryTable, + this.executorServiceRule.executorService, + mismatchDetector, + flowController, + secondaryWriteErrorConsumer, + new ReadSampler(100), + this.timestamper, + false, + false, + this.mirroringTracer, + this.referenceCounter, + 1000)); + } + + @Test + public void testMutateRow() throws IOException, InterruptedException { + RowMutations mutations = new RowMutations("r1".getBytes()); + mutations.add(new Put("r1".getBytes())); + when(primaryTable.mutateRow(any(RowMutations.class))) + .thenReturn(Result.EMPTY_RESULT); + mirroringTable.mutateRow(mutations); + executorServiceRule.waitForExecutor(); + verify(primaryTable, times(1)).mutateRow(mutations); + verify(secondaryTable, times(1)).mutateRow(mutations); + } +} diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java index aee0e36800..daef27e929 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; -public class MirroringConnection implements Connection { +public abstract class MirroringConnection implements Connection { private static final com.google.cloud.bigtable.mirroring.core.utils.Logger Log = new com.google.cloud.bigtable.mirroring.core.utils.Logger(MirroringConnection.class); protected final FlowController flowController; @@ -231,20 +231,7 @@ public Table call() throws IOException { }, HBaseOperation.GET_TABLE); Table secondaryTable = this.secondaryConnection.getTable(tableName); - return new MirroringTable( - primaryTable, - secondaryTable, - executorService, - this.mismatchDetector, - this.flowController, - this.secondaryWriteErrorConsumer, - this.readSampler, - this.timestamper, - this.performWritesConcurrently, - this.waitForSecondaryWrites, - this.mirroringTracer, - this.referenceCounter, - this.configuration.mirroringOptions.maxLoggedBinaryValueLength); + return getMirroringTable(primaryTable, secondaryTable); } } @@ -416,4 +403,6 @@ public void run() { // This error is not reported to the user. } } + + protected abstract Table getMirroringTable(Table primaryTable, Table secondaryTable); } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java index 31a7abc88f..542b50ded3 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java @@ -89,9 +89,9 @@ * asynchronously. Read operations are mirrored to verify that content of both databases matches. */ @InternalApi("For internal usage only") -public class MirroringTable implements Table { +public class MirroringTable { - private static final Logger Log = new Logger(MirroringTable.class); + protected static final Logger Log = new Logger(MirroringTable.class); private static final Predicate resultIsFaultyPredicate = new Predicate() { @Override @@ -100,16 +100,16 @@ public boolean apply(Object o) { } }; protected final Table primaryTable; - private final AsyncTableWrapper secondaryAsyncWrapper; + protected final AsyncTableWrapper secondaryAsyncWrapper; private final VerificationContinuationFactory verificationContinuationFactory; /** Counter for MirroringConnection and MirroringTable. */ private final HierarchicalReferenceCounter referenceCounter; private final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer; - private final MirroringTracer mirroringTracer; + protected final MirroringTracer mirroringTracer; private final ReadSampler readSampler; private final RequestScheduler requestScheduler; - private final Batcher batcher; + protected final Batcher batcher; private final AtomicBoolean closed = new AtomicBoolean(false); private final SettableFuture closedFuture = SettableFuture.create(); private final int resultScannerBufferedMismatchedResults; @@ -166,7 +166,6 @@ public MirroringTable( this.resultScannerBufferedMismatchedResults = resultScannerBufferedMismatchedResults; } - @Override public boolean exists(final Get get) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.EXISTS)) { Log.trace("[%s] exists(get=%s)", this.getName(), get); @@ -189,7 +188,6 @@ public Boolean call() throws IOException { } } - @Override public boolean[] existsAll(final List inputList) throws IOException { final List list = new ArrayList<>(inputList); try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.EXISTS_ALL)) { @@ -213,7 +211,6 @@ public boolean[] call() throws IOException { } } - @Override public Result get(final Get get) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET)) { Log.trace("[%s] get(get=%s)", this.getName(), get); @@ -236,7 +233,6 @@ public Result call() throws IOException { } } - @Override public Result[] get(final List inputList) throws IOException { final List list = new ArrayList<>(inputList); try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET_LIST)) { @@ -260,7 +256,6 @@ public Result[] call() throws IOException { } } - @Override public ResultScanner getScanner(Scan scan) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET_SCANNER)) { @@ -280,17 +275,14 @@ public ResultScanner getScanner(Scan scan) throws IOException { } } - @Override public ResultScanner getScanner(byte[] family) throws IOException { return getScanner(new Scan().addFamily(family)); } - @Override public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { return getScanner(new Scan().addColumn(family, qualifier)); } - @Override public void put(final Put put) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.PUT)) { Log.trace("[%s] put(put=%s)", this.getName(), put); @@ -298,7 +290,6 @@ public void put(final Put put) throws IOException { } } - @Override public void put(List puts) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.PUT_LIST)) { Log.trace("[%s] put(puts=%s)", this.getName(), puts); @@ -313,7 +304,6 @@ public void put(List puts) throws IOException { } } - @Override public void delete(final Delete delete) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.DELETE)) { Log.trace("[%s] delete(delete=%s)", this.getName(), delete); @@ -321,7 +311,6 @@ public void delete(final Delete delete) throws IOException { } } - @Override public void delete(List deletes) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.DELETE_LIST)) { @@ -345,15 +334,6 @@ public void delete(List deletes) throws IOException { } } - @Override - public void mutateRow(final RowMutations rowMutations) throws IOException { - try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) { - Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations); - this.batcher.batchSingleWriteOperation(rowMutations); - } - } - - @Override public Result append(final Append append) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.APPEND)) { Log.trace("[%s] append(append=%s)", this.getName(), append); @@ -380,7 +360,6 @@ public Result call() throws IOException { } } - @Override public Result increment(final Increment increment) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.INCREMENT)) { Log.trace("[%s] increment(increment=%s)", this.getName(), increment); @@ -405,7 +384,6 @@ public Result call() throws IOException { } } - @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { Log.trace( @@ -417,7 +395,6 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo return Bytes.toLong(CellUtil.cloneValue(cell)); } - @Override public long incrementColumnValue( byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { @@ -432,7 +409,6 @@ public long incrementColumnValue( return Bytes.toLong(CellUtil.cloneValue(cell)); } - @Override public void batch(List operations, Object[] results) throws IOException, InterruptedException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BATCH)) { @@ -440,7 +416,6 @@ public void batch(List operations, Object[] results) } } - @Override public Object[] batch(List operations) throws IOException, InterruptedException { Log.trace("[%s] batch(operations=%s)", this.getName(), operations); Object[] results = new Object[operations.size()]; @@ -448,7 +423,6 @@ public Object[] batch(List operations) throws IOException, Interr return results; } - @Override public void batchCallback( List inputOperations, Object[] results, final Callback callback) throws IOException, InterruptedException { @@ -463,7 +437,6 @@ public void batchCallback( } } - @Override public Object[] batchCallback(List operations, Callback callback) throws IOException, InterruptedException { Log.trace( @@ -473,7 +446,6 @@ public Object[] batchCallback(List operations, Callback ca return results; } - @Override public boolean checkAndMutate( byte[] row, byte[] family, @@ -492,7 +464,6 @@ public boolean checkAndMutate( } } - @Override public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { Log.trace( @@ -501,7 +472,6 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v return this.checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); } - @Override public boolean checkAndPut( byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Put put) throws IOException { @@ -516,7 +486,6 @@ public boolean checkAndPut( } } - @Override public boolean checkAndDelete( byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { Log.trace( @@ -525,7 +494,6 @@ public boolean checkAndDelete( return this.checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); } - @Override public boolean checkAndDelete( byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Delete delete) throws IOException { @@ -572,7 +540,6 @@ public Boolean call() throws IOException { * after finishing all secondary requests that are yet in-flight ({@link * AsyncTableWrapper#close()}). */ - @Override public void close() throws IOException { this.closePrimaryAndScheduleSecondaryClose(); } @@ -648,7 +615,7 @@ private void scheduleSequentialReadOperationWithVerification( this.mirroringTracer.spanFactory.wrapReadVerificationCallback(verificationCallback)); } - private void scheduleSequentialWriteOperation( + protected void scheduleSequentialWriteOperation( final WriteOperationInfo writeOperationInfo, final Supplier> secondaryOperationSupplier) { WriteOperationFutureCallback writeErrorCallback = @@ -680,57 +647,47 @@ public Void apply(Throwable throwable) { flowControlReservationErrorConsumer); } - @Override public TableName getName() { return this.primaryTable.getName(); } - @Override public Configuration getConfiguration() { return this.primaryTable.getConfiguration(); } - @Override public HTableDescriptor getTableDescriptor() throws IOException { return this.primaryTable.getTableDescriptor(); } - @Override public CoprocessorRpcChannel coprocessorService(byte[] bytes) { throw new UnsupportedOperationException(); } - @Override public Map coprocessorService( Class aClass, byte[] bytes, byte[] bytes1, Call call) throws Throwable { throw new UnsupportedOperationException(); } - @Override public void coprocessorService( Class aClass, byte[] bytes, byte[] bytes1, Call call, Callback callback) throws Throwable { throw new UnsupportedOperationException(); } - @Override public long getWriteBufferSize() { throw new UnsupportedOperationException(); } - @Override public void setWriteBufferSize(long l) throws IOException { throw new UnsupportedOperationException(); } - @Override public Map batchCoprocessorService( MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws Throwable { throw new UnsupportedOperationException(); } - @Override public void batchCoprocessorService( MethodDescriptor methodDescriptor, Message message, @@ -742,42 +699,34 @@ public void batchCoprocessorService( throw new UnsupportedOperationException(); } - @Override public void setOperationTimeout(int i) { throw new UnsupportedOperationException(); } - @Override public int getOperationTimeout() { throw new UnsupportedOperationException(); } - @Override public int getRpcTimeout() { throw new UnsupportedOperationException(); } - @Override public void setRpcTimeout(int i) { throw new UnsupportedOperationException(); } - @Override public int getReadRpcTimeout() { throw new UnsupportedOperationException(); } - @Override public void setReadRpcTimeout(int i) { throw new UnsupportedOperationException(); } - @Override public int getWriteRpcTimeout() { throw new UnsupportedOperationException(); } - @Override public void setWriteRpcTimeout(int i) { throw new UnsupportedOperationException(); } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringTable.java index 6cca2d3e9e..7fdd190f31 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringTable.java @@ -914,17 +914,6 @@ public void testDeleteList() throws IOException, InterruptedException { verify(secondaryTable, times(1)).batch(eq(originalDeletes), any(Object[].class)); } - @Test - public void testMutateRow() throws IOException, InterruptedException { - RowMutations mutations = new RowMutations("r1".getBytes()); - List listOfMutations = Arrays.asList(mutations); - mockBatch(primaryTable, secondaryTable, mutations, new Result()); - mirroringTable.mutateRow(mutations); - executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); - verify(secondaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); - } - @Test public void testIncrement() throws IOException { byte[] row = "r1".getBytes(); diff --git a/hbase-migration-tools/mirroring-client/pom.xml b/hbase-migration-tools/mirroring-client/pom.xml index 61b7794f9e..e68cab5aa4 100644 --- a/hbase-migration-tools/mirroring-client/pom.xml +++ b/hbase-migration-tools/mirroring-client/pom.xml @@ -15,9 +15,6 @@ com.google.bigtable.hbase.mirroring.shaded com/google/bigtable/hbase/mirroring/shaded - - - 2.3.6 diff --git a/pom.xml b/pom.xml index 586ad6cc73..edaa339441 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ limitations under the License. 1.7.2 - 2.5.7-hadoop3 + 2.5.11-hadoop3 2.28.2 From ece21c1bb2f27132a01dac2625d5b70e2250ea88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Barbosa=20Sampaio?= Date: Tue, 15 Jul 2025 14:52:19 -0400 Subject: [PATCH 2/3] Formating --- .../bigtable/hbase2_x/BigtableAsyncAdmin.java | 4 +++- .../hbase1_x/TestMirroringConnection.java | 3 +-- .../regionserver/FailingHBaseHRegion2.java | 12 +++------- .../mirroring/hbase2_x/TestConnection.java | 14 +++++------- .../hbase2_x/TestMirroringConnection.java | 22 +------------------ .../hbase2_x/TestMirroringTable.java | 18 +++++---------- .../mirroring/core/MirroringTable.java | 2 +- 7 files changed, 20 insertions(+), 55 deletions(-) diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java index 43b7e124ac..83f6eb5529 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java @@ -697,7 +697,9 @@ public Size getStoreFileSize() { } @Override - public Size getMemStoreSize() { return new Size(size, Unit.BYTE); } + public Size getMemStoreSize() { + return new Size(size, Unit.BYTE); + } } /** Handler for unsupported operations for generating Admin class at runtime. */ public static class UnsupportedOperationsHandler implements InvocationHandler { diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java index a313912a17..caf4effc89 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java @@ -67,8 +67,7 @@ private Configuration createConfiguration() { @Test public void testConnectionFactoryCreatesMirroringConnection() throws IOException { - assertThat(connection).isInstanceOf( - MirroringConnection.class); + assertThat(connection).isInstanceOf(MirroringConnection.class); assertThat(((MirroringConnection) connection).getPrimaryConnection()) .isInstanceOf(TestConnection.class); assertThat(((MirroringConnection) connection).getSecondaryConnection()) diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java index 75ffe6fe33..6a0eb84f75 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java @@ -80,7 +80,7 @@ public FailingHBaseHRegion2( } @Override - public HRegion.RegionScannerImpl getScanner(Scan scan, List additionalScanners) + public RegionScannerImpl getScanner(Scan scan, List additionalScanners) throws IOException { // HBase 2.x implements Gets as Scans with start row == end row == requested row. processRowThrow(scan.getStartRow()); @@ -88,9 +88,9 @@ public HRegion.RegionScannerImpl getScanner(Scan scan, List add } @Override - public void mutateRow(RowMutations rm) throws IOException { + public Result mutateRow(RowMutations rm) throws IOException { processRowThrow(rm.getRow()); - super.mutateRow(rm); + return super.mutateRow(rm); } @Override @@ -100,12 +100,6 @@ public OperationStatus[] batchMutate( mutations, (m) -> super.batchMutate(m, atomic, nonceGroup, nonce)); } - @Override - public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) - throws IOException { - return batchMutateWithFailures(mutations, (m) -> super.batchMutate(m, nonceGroup, nonce)); - } - @Override public Result get(Get get) throws IOException { processRowThrow(get.getRow()); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java index 269db8e9b7..f0c2b48205 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java @@ -22,23 +22,19 @@ import org.apache.hadoop.hbase.client.TableBuilder; import org.apache.hadoop.hbase.security.User; -public class TestConnection extends com.google.cloud.bigtable.mirroring.core.TestConnection implements - Connection { +public class TestConnection extends com.google.cloud.bigtable.mirroring.core.TestConnection + implements Connection { - public TestConnection(Configuration conf, boolean managed, - ExecutorService pool, User user) { + public TestConnection(Configuration conf, boolean managed, ExecutorService pool, User user) { super(conf, managed, pool, user); } - public TestConnection(Configuration conf, - ExecutorService pool, User user) { + public TestConnection(Configuration conf, ExecutorService pool, User user) { super(conf, false, pool, user); } @Override - public void clearRegionLocationCache() { - - } + public void clearRegionLocationCache() {} @Override public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) { diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java index c901422f8c..363795e59e 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java @@ -15,37 +15,18 @@ */ package com.google.cloud.bigtable.mirroring.hbase2_x; - import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY; import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableBuilder; -import org.apache.hadoop.hbase.security.User; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -84,8 +65,7 @@ private Configuration createConfiguration() { @Test public void testConnectionFactoryCreatesMirroringConnection() throws IOException { - assertThat(connection).isInstanceOf( - MirroringConnection.class); + assertThat(connection).isInstanceOf(MirroringConnection.class); assertThat(((MirroringConnection) connection).getPrimaryConnection()) .isInstanceOf(TestConnection.class); assertThat(((MirroringConnection) connection).getSecondaryConnection()) diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java index 6c77e18106..3fc5040d54 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java @@ -57,17 +57,12 @@ public class TestMirroringTable { public final ExecutorServiceRule executorServiceRule = ExecutorServiceRule.singleThreadedExecutor(); - @Mock - Table primaryTable; + @Mock Table primaryTable; @Mock Table secondaryTable; - @Mock - FlowController flowController; - @Mock - MirroringMetricsRecorder mirroringMetricsRecorder; - @Mock - SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; - @Mock - ReferenceCounter referenceCounter; + @Mock FlowController flowController; + @Mock MirroringMetricsRecorder mirroringMetricsRecorder; + @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + @Mock ReferenceCounter referenceCounter; Timestamper timestamper = new NoopTimestamper(); MismatchDetector mismatchDetector; @@ -104,8 +99,7 @@ public void setUp() { public void testMutateRow() throws IOException, InterruptedException { RowMutations mutations = new RowMutations("r1".getBytes()); mutations.add(new Put("r1".getBytes())); - when(primaryTable.mutateRow(any(RowMutations.class))) - .thenReturn(Result.EMPTY_RESULT); + when(primaryTable.mutateRow(any(RowMutations.class))).thenReturn(Result.EMPTY_RESULT); mirroringTable.mutateRow(mutations); executorServiceRule.waitForExecutor(); verify(primaryTable, times(1)).mutateRow(mutations); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java index 542b50ded3..6298f35269 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java @@ -615,7 +615,7 @@ private void scheduleSequentialReadOperationWithVerification( this.mirroringTracer.spanFactory.wrapReadVerificationCallback(verificationCallback)); } - protected void scheduleSequentialWriteOperation( + protected void scheduleSequentialWriteOperation( final WriteOperationInfo writeOperationInfo, final Supplier> secondaryOperationSupplier) { WriteOperationFutureCallback writeErrorCallback = From d5c1cf52b841e8eb85cb79cfbcaf5739fae2af7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Barbosa=20Sampaio?= Date: Tue, 15 Jul 2025 16:57:55 -0400 Subject: [PATCH 3/3] ignore provided dependencies --- .../bigtable-hbase-2.x-mapreduce/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml index 756c0d6693..264497b28c 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml @@ -188,6 +188,11 @@ limitations under the License. org.apache.hbase:hbase-mapreduce + + javax.activation:activation + javax.xml.bind:jaxb-api + javax.xml.stream:stax-api +