Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ limitations under the License.
<targetDependencies>
<targetDependency>org.apache.hbase:hbase-mapreduce</targetDependency>
</targetDependencies>
<ignoredDependencies>
<ignoredDependency>javax.activation:activation</ignoredDependency>
<ignoredDependency>javax.xml.bind:jaxb-api</ignoredDependency>
<ignoredDependency>javax.xml.stream:stax-api</ignoredDependency>
</ignoredDependencies>
</configuration>
</execution>
</executions>
Expand Down
4 changes: 4 additions & 0 deletions bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ limitations under the License.
<pattern>io.opentelemetry</pattern>
<shadedPattern>com.google.bigtable.repackaged.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>com.google.bigtable.repackaged.javax.activation</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/versions/9/io/opentelemetry</pattern>
<shadedPattern>META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,11 @@ public int getStoreFileCount() {
public Size getStoreFileSize() {
return new Size(size, Unit.BYTE);
}

@Override
public Size getMemStoreSize() {
return new Size(size, Unit.BYTE);
}
}
/** Handler for unsupported operations for generating Admin class at runtime. */
public static class UnsupportedOperationsHandler implements InvocationHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ limitations under the License.
com.google.bigtable.repackaged.javax.annotation
</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>
com.google.bigtable.repackaged.javax.activation
</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;

public class MirroringConnection
Expand All @@ -41,4 +42,22 @@ public MirroringConnection(MirroringConfiguration mirroringConfiguration, Execut
throws IOException {
super(mirroringConfiguration, pool);
}

@Override
protected Table getMirroringTable(Table primaryTable, Table secondaryTable) {
return new MirroringTable(
primaryTable,
secondaryTable,
executorService,
this.mismatchDetector,
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.timestamper,
this.performWritesConcurrently,
this.waitForSecondaryWrites,
this.mirroringTracer,
this.referenceCounter,
this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.mirroring.hbase1_x;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
import io.opencensus.common.Scope;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;

@InternalApi("For internal usage only")
public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable
implements Table {
/**
* @param executorService ExecutorService is used to perform operations on secondaryTable and
* verification tasks.
* @param mismatchDetector Detects mismatches in results from operations preformed on both
* databases.
* @param secondaryWriteErrorConsumer Consumer secondary write errors.
*/
public MirroringTable(
Table primaryTable,
Table secondaryTable,
ExecutorService executorService,
MismatchDetector mismatchDetector,
FlowController flowController,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
ReadSampler readSampler,
Timestamper timestamper,
boolean performWritesConcurrently,
boolean waitForSecondaryWrites,
MirroringTracer mirroringTracer,
ReferenceCounter parentReferenceCounter,
int resultScannerBufferedMismatchedResults) {
super(
primaryTable,
secondaryTable,
executorService,
mismatchDetector,
flowController,
secondaryWriteErrorConsumer,
readSampler,
timestamper,
performWritesConcurrently,
waitForSecondaryWrites,
mirroringTracer,
parentReferenceCounter,
resultScannerBufferedMismatchedResults);
}

@Override
public void mutateRow(final RowMutations rowMutations) throws IOException {
try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) {
Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations);
this.batcher.batchSingleWriteOperation(rowMutations);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Google LLC
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,28 +20,43 @@
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.google.cloud.bigtable.mirroring.core.TestConnection;
import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class TestMirroringConnection {
private Connection connection;

@Before
public void setUp() throws IOException {
TestConnection.reset();
Configuration configuration = createConfiguration();
connection = ConnectionFactory.createConnection(configuration);
assertThat(TestConnection.connectionMocks.size()).isEqualTo(2);
}

private Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.set("hbase.client.connection.impl", MirroringConnection.class.getCanonicalName());
configuration.set(
MIRRORING_PRIMARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
configuration.set(
MIRRORING_SECONDARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "1");
configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "2");
// Prefix keys have to be set because we are using the same class as primary and secondary
// connection class.
configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "primary-connection");
configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "secondary-connection");
configuration.set(
"google.bigtable.mirroring.write-error-log.appender.prefix-path", "/tmp/test-");
configuration.set("google.bigtable.mirroring.write-error-log.appender.max-buffer-size", "1024");
Expand All @@ -52,12 +67,29 @@ private Configuration createConfiguration() {

@Test
public void testConnectionFactoryCreatesMirroringConnection() throws IOException {
Configuration configuration = createConfiguration();
Connection connection = ConnectionFactory.createConnection(configuration);
assertThat(connection).isInstanceOf(MirroringConnection.class);
assertThat(((MirroringConnection) connection).getPrimaryConnection())
.isInstanceOf(TestConnection.class);
assertThat(((MirroringConnection) connection).getSecondaryConnection())
.isInstanceOf(TestConnection.class);
}

@Test
public void testCloseClosesUnderlyingConnections() throws IOException {
connection.close();
assertThat(connection.isClosed()).isTrue();
verify(TestConnection.connectionMocks.get(0), times(1)).close();
verify(TestConnection.connectionMocks.get(1), times(1)).close();
}

@Test
public void testAbortAbortsUnderlyingConnections() throws IOException {
String expectedString = "expected";
Throwable expectedThrowable = new Exception();
connection.abort(expectedString, expectedThrowable);
verify(TestConnection.connectionMocks.get(0), times(1))
.abort(expectedString, expectedThrowable);
verify(TestConnection.connectionMocks.get(1), times(1))
.abort(expectedString, expectedThrowable);
}
}
Loading