diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml index 661c34de58..a5be6d570b 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml @@ -291,6 +291,10 @@ limitations under the License. io.opentelemetry com.google.bigtable.repackaged.io.opentelemetry + + javax.activation + com.google.bigtable.repackaged.javax.activation + META-INF/versions/9/io/opentelemetry META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java index 0a7b948026..43b7e124ac 100644 --- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java +++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java @@ -695,6 +695,9 @@ public int getStoreFileCount() { public Size getStoreFileSize() { return new Size(size, Unit.BYTE); } + + @Override + public Size getMemStoreSize() { return new Size(size, Unit.BYTE); } } /** Handler for unsupported operations for generating Admin class at runtime. */ public static class UnsupportedOperationsHandler implements InvocationHandler { diff --git a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml index 0c29be6832..3de383b0ba 100644 --- a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml +++ b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml @@ -57,6 +57,12 @@ limitations under the License. com.google.bigtable.repackaged.javax.annotation + + javax.activation + + com.google.bigtable.repackaged.javax.activation + + org.checkerframework diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java index a91652823a..15549e6989 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java @@ -16,10 +16,18 @@ package com.google.cloud.bigtable.mirroring.hbase1_x; import com.google.cloud.bigtable.mirroring.core.MirroringConfiguration; +import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; import java.io.IOException; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; public class MirroringConnection @@ -41,4 +49,27 @@ public MirroringConnection(MirroringConfiguration mirroringConfiguration, Execut throws IOException { super(mirroringConfiguration, pool); } + + @Override + protected Table getMirroringTable(Table primaryTable, Table secondaryTable, + ExecutorService executorService, MismatchDetector mismatchDetector, + FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + ReadSampler readSampler, Timestamper timestamper, boolean performWritesConcurrently, + boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, + ReferenceCounter parentReferenceCounter, int resultScannerBufferedMismatchedResults) { + return new MirroringTable( + primaryTable, + secondaryTable, + executorService, + this.mismatchDetector, + this.flowController, + this.secondaryWriteErrorConsumer, + this.readSampler, + this.timestamper, + this.performWritesConcurrently, + this.waitForSecondaryWrites, + this.mirroringTracer, + this.referenceCounter, + this.configuration.mirroringOptions.maxLoggedBinaryValueLength); + } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java similarity index 92% rename from hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java rename to hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java index 31a7abc88f..83b14e040a 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java @@ -13,12 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.mirroring.core; +package com.google.cloud.bigtable.mirroring.hbase1_x; import static com.google.cloud.bigtable.mirroring.core.utils.OperationUtils.emptyResult; import static com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounterUtils.holdReferenceUntilCompletion; import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner; +import com.google.cloud.bigtable.mirroring.core.RequestScheduler; +import com.google.cloud.bigtable.mirroring.core.WriteOperationFutureCallback; import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncTableWrapper; import com.google.cloud.bigtable.mirroring.core.utils.AccumulatedExceptions; import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.FailedSuccessfulSplit; @@ -781,65 +784,4 @@ public int getWriteRpcTimeout() { public void setWriteRpcTimeout(int i) { throw new UnsupportedOperationException(); } - - /** - * Helper class that holds common parameters to {@link - * RequestScheduling#scheduleRequestWithCallback(RequestResourcesDescription, Supplier, - * FutureCallback, FlowController, MirroringTracer, Function)} for single instance of {@link - * com.google.cloud.bigtable.mirroring.core.MirroringTable}. - * - *

It also takes care of reference counting all scheduled operations. - */ - public static class RequestScheduler { - final FlowController flowController; - final MirroringTracer mirroringTracer; - final ReferenceCounter referenceCounter; - - public RequestScheduler( - FlowController flowController, - MirroringTracer mirroringTracer, - ReferenceCounter referenceCounter) { - this.flowController = flowController; - this.mirroringTracer = mirroringTracer; - this.referenceCounter = referenceCounter; - } - - public RequestScheduler withReferenceCounter(ReferenceCounter referenceCounter) { - return new RequestScheduler(this.flowController, this.mirroringTracer, referenceCounter); - } - - public ListenableFuture scheduleRequestWithCallback( - final RequestResourcesDescription requestResourcesDescription, - final Supplier> secondaryResultFutureSupplier, - final FutureCallback verificationCallback) { - return this.scheduleRequestWithCallback( - requestResourcesDescription, - secondaryResultFutureSupplier, - verificationCallback, - // noop flowControlReservationErrorConsumer - new Function() { - @Override - public Void apply(Throwable t) { - return null; - } - }); - } - - public ListenableFuture scheduleRequestWithCallback( - final RequestResourcesDescription requestResourcesDescription, - final Supplier> secondaryResultFutureSupplier, - final FutureCallback verificationCallback, - final Function flowControlReservationErrorConsumer) { - ListenableFuture future = - RequestScheduling.scheduleRequestWithCallback( - requestResourcesDescription, - secondaryResultFutureSupplier, - verificationCallback, - this.flowController, - this.mirroringTracer, - flowControlReservationErrorConsumer); - holdReferenceUntilCompletion(this.referenceCounter, future); - return future; - } - } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java index 75ffe6fe33..6a0eb84f75 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java @@ -80,7 +80,7 @@ public FailingHBaseHRegion2( } @Override - public HRegion.RegionScannerImpl getScanner(Scan scan, List additionalScanners) + public RegionScannerImpl getScanner(Scan scan, List additionalScanners) throws IOException { // HBase 2.x implements Gets as Scans with start row == end row == requested row. processRowThrow(scan.getStartRow()); @@ -88,9 +88,9 @@ public HRegion.RegionScannerImpl getScanner(Scan scan, List add } @Override - public void mutateRow(RowMutations rm) throws IOException { + public Result mutateRow(RowMutations rm) throws IOException { processRowThrow(rm.getRow()); - super.mutateRow(rm); + return super.mutateRow(rm); } @Override @@ -100,12 +100,6 @@ public OperationStatus[] batchMutate( mutations, (m) -> super.batchMutate(m, atomic, nonceGroup, nonce)); } - @Override - public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) - throws IOException { - return batchMutateWithFailures(mutations, (m) -> super.batchMutate(m, nonceGroup, nonce)); - } - @Override public Result get(Get get) throws IOException { processRowThrow(get.getRow()); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml index a9d66c0b2b..9066e808f3 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml @@ -42,6 +42,10 @@ limitations under the License. log4j log4j + + ch.qos.reload4j + reload4j + @@ -202,6 +206,7 @@ limitations under the License. commons-logging:commons-logging log4j:log4j + ch.qos.reload4j:reload4j diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml index 9293d1123e..3cdc16de8c 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml @@ -126,6 +126,18 @@ limitations under the License. io.opencensus ${shading-prefix}.io.opencensus + + io.opentelemetry + com.google.cloud.bigtable.mirroring.repackaged.io.opentelemetry + + + META-INF/versions/9/io/opentelemetry + META-INF/versions/9/com/google/cloud/bigtable/mirroring/repackaged/io/opentelemetry + + + javax.activation + com.google.cloud.bigtable.mirroring.repackaged.javax.activation + io.grpc @@ -198,6 +210,13 @@ limitations under the License. org.apache.hbase:hbase-shaded-client + + + io.opentelemetry:opentelemetry-api + io.opentelemetry:opentelemetry-context + io.opentelemetry:opentelemetry-semconv + diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java index e0e4a7e1ba..6f8b7840de 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java @@ -370,6 +370,16 @@ public AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { return this; } + @Override + public AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { + setTimeParameter( + pause, + unit, + this.primaryTableBuilder::setRetryPauseForServerOverloaded, + this.secondaryTableBuilder::setRetryPauseForServerOverloaded); + return this; + } + @Override public AsyncTableBuilder setMaxAttempts(int maxAttempts) { setIntegerParameter( diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java index d92fe51911..cd2ee3755d 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java @@ -20,7 +20,7 @@ import static com.google.cloud.bigtable.mirroring.hbase2_x.utils.AsyncRequestScheduling.reserveFlowControlResourcesThenScheduleSecondary; import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner; -import com.google.cloud.bigtable.mirroring.core.MirroringTable.RequestScheduler; +import com.google.cloud.bigtable.mirroring.core.RequestScheduler; import com.google.cloud.bigtable.mirroring.core.WriteOperationFutureCallback; import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncResultScannerWrapper; import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers; @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -202,9 +204,9 @@ public CompletableFuture increment(Increment increment) { } @Override - public CompletableFuture mutateRow(RowMutations rowMutations) { + public CompletableFuture mutateRow(RowMutations rowMutations) { this.timestamper.fillTimestamp(rowMutations); - CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); + CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); return writeWithFlowControl( new WriteOperationInfo(rowMutations), primaryFuture, @@ -534,6 +536,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate( throw new UnsupportedOperationException("not implemented"); } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public List> checkAndMutate(List list) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void scan(Scan scan, C consumer) { this.primaryTable.scan(scan, consumer); @@ -561,7 +573,7 @@ public MirroringCheckAndMutateBuilder(CheckAndMutateBuilder primaryBuilder) { private OperationStages> checkAndMutate( WriteOperationInfo writeOperationInfo, CompletableFuture primary, - Supplier> secondary) { + Supplier> secondary) { OperationStages> returnedValue = new OperationStages<>(new CompletableFuture<>()); primary diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java index ffe21d4871..76d988c361 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java @@ -15,6 +15,13 @@ */ package com.google.cloud.bigtable.mirroring.hbase2_x; +import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -30,6 +37,29 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService super(conf, managed, pool, user); } + @Override + protected Table getMirroringTable(Table primaryTable, Table secondaryTable, + ExecutorService executorService, MismatchDetector mismatchDetector, + FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + ReadSampler readSampler, Timestamper timestamper, boolean performWritesConcurrently, + boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, + ReferenceCounter parentReferenceCounter, int resultScannerBufferedMismatchedResults) { + return new MirroringTable( + primaryTable, + secondaryTable, + executorService, + this.mismatchDetector, + this.flowController, + this.secondaryWriteErrorConsumer, + this.readSampler, + this.timestamper, + this.performWritesConcurrently, + this.waitForSecondaryWrites, + this.mirroringTracer, + this.referenceCounter, + this.configuration.mirroringOptions.maxLoggedBinaryValueLength); + } + public MirroringConnection(Configuration conf, ExecutorService pool, User user) throws Throwable { this(conf, false, pool, user); } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java index aad985884a..ce2cdf60ad 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,26 +15,117 @@ */ package com.google.cloud.bigtable.mirroring.hbase2_x; +import static com.google.cloud.bigtable.mirroring.core.utils.OperationUtils.emptyResult; +import static com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounterUtils.holdReferenceUntilCompletion; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner; +import com.google.cloud.bigtable.mirroring.core.RequestScheduler; +import com.google.cloud.bigtable.mirroring.core.WriteOperationFutureCallback; +import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncTableWrapper; +import com.google.cloud.bigtable.mirroring.core.utils.AccumulatedExceptions; +import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.FailedSuccessfulSplit; +import com.google.cloud.bigtable.mirroring.core.utils.Batcher; +import com.google.cloud.bigtable.mirroring.core.utils.CallableThrowingIOException; +import com.google.cloud.bigtable.mirroring.core.utils.Logger; +import com.google.cloud.bigtable.mirroring.core.utils.OperationUtils; import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler; +import com.google.cloud.bigtable.mirroring.core.utils.RequestScheduling; import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer; import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.RequestResourcesDescription; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.WriteOperationInfo; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.HierarchicalReferenceCounter; import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; +import com.google.cloud.bigtable.mirroring.core.verification.VerificationContinuationFactory; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import io.opencensus.common.Scope; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Table which mirrors every two mutations to two underlying tables. + * + *

Objects of this class present themselves as HBase 1.x `Table` objects. Every operation is + * first performed on primary table and if it succeeded it is replayed on the secondary table + * asynchronously. Read operations are mirrored to verify that content of both databases matches. + */ +@InternalApi("For internal usage only") +public class MirroringTable implements Table { + + private static final Logger Log = new Logger(MirroringTable.class); + private static final Predicate resultIsFaultyPredicate = + new Predicate() { + @Override + public boolean apply(Object o) { + return o == null || o instanceof Throwable; + } + }; + protected final Table primaryTable; + private final AsyncTableWrapper secondaryAsyncWrapper; + private final VerificationContinuationFactory verificationContinuationFactory; + /** Counter for MirroringConnection and MirroringTable. */ + private final HierarchicalReferenceCounter referenceCounter; -public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable - implements Table { + private final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer; + private final MirroringTracer mirroringTracer; + private final ReadSampler readSampler; + private final RequestScheduler requestScheduler; + private final Batcher batcher; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final SettableFuture closedFuture = SettableFuture.create(); + private final int resultScannerBufferedMismatchedResults; + private final Timestamper timestamper; + /** + * @param executorService ExecutorService is used to perform operations on secondaryTable and + * verification tasks. + * @param mismatchDetector Detects mismatches in results from operations preformed on both + * databases. + * @param secondaryWriteErrorConsumer Consumer secondary write errors. + */ public MirroringTable( Table primaryTable, Table secondaryTable, @@ -47,46 +138,650 @@ public MirroringTable( boolean performWritesConcurrently, boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, - ReferenceCounter referenceCounter, + ReferenceCounter parentReferenceCounter, int resultScannerBufferedMismatchedResults) { - super( - primaryTable, - secondaryTable, - executorService, - mismatchDetector, - flowController, - secondaryWriteErrorConsumer, - readSampler, - timestamper, - performWritesConcurrently, - waitForSecondaryWrites, - mirroringTracer, - referenceCounter, - resultScannerBufferedMismatchedResults); + this.primaryTable = primaryTable; + this.verificationContinuationFactory = new VerificationContinuationFactory(mismatchDetector); + this.readSampler = readSampler; + this.secondaryAsyncWrapper = + new AsyncTableWrapper( + secondaryTable, MoreExecutors.listeningDecorator(executorService), mirroringTracer); + this.referenceCounter = new HierarchicalReferenceCounter(parentReferenceCounter); + this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer; + Preconditions.checkArgument( + !(performWritesConcurrently && !waitForSecondaryWrites), + "If concurrent writes are enabled, then waiting for secondary writes should also be enabled."); + this.mirroringTracer = mirroringTracer; + this.requestScheduler = + new RequestScheduler(flowController, this.mirroringTracer, this.referenceCounter); + this.timestamper = timestamper; + this.batcher = + new Batcher( + this.primaryTable, + this.secondaryAsyncWrapper, + this.requestScheduler, + this.secondaryWriteErrorConsumer, + this.verificationContinuationFactory, + this.readSampler, + this.timestamper, + resultIsFaultyPredicate, + waitForSecondaryWrites, + performWritesConcurrently, + this.mirroringTracer); + this.resultScannerBufferedMismatchedResults = resultScannerBufferedMismatchedResults; } @Override - public TableDescriptor getDescriptor() throws IOException { - return primaryTable.getDescriptor(); + public boolean exists(final Get get) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.EXISTS)) { + Log.trace("[%s] exists(get=%s)", this.getName(), get); + + boolean result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Boolean call() throws IOException { + return primaryTable.exists(get); + } + }, + HBaseOperation.EXISTS); + + scheduleSequentialReadOperationWithVerification( + new RequestResourcesDescription(result), + this.secondaryAsyncWrapper.exists(get), + this.verificationContinuationFactory.exists(get, result)); + return result; + } + } + + @Override + public boolean[] existsAll(final List inputList) throws IOException { + final List list = new ArrayList<>(inputList); + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.EXISTS_ALL)) { + Log.trace("[%s] existsAll(gets=%s)", this.getName(), list); + + boolean[] result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public boolean[] call() throws IOException { + return primaryTable.existsAll(list); + } + }, + HBaseOperation.EXISTS_ALL); + + scheduleSequentialReadOperationWithVerification( + new RequestResourcesDescription(result), + this.secondaryAsyncWrapper.existsAll(list), + this.verificationContinuationFactory.existsAll(list, result)); + return result; + } + } + + @Override + public Result get(final Get get) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET)) { + Log.trace("[%s] get(get=%s)", this.getName(), get); + + Result result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + return primaryTable.get(get); + } + }, + HBaseOperation.GET); + + scheduleSequentialReadOperationWithVerification( + new RequestResourcesDescription(result), + this.secondaryAsyncWrapper.get(get), + this.verificationContinuationFactory.get(get, result)); + return result; + } + } + + @Override + public Result[] get(final List inputList) throws IOException { + final List list = new ArrayList<>(inputList); + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET_LIST)) { + Log.trace("[%s] get(gets=%s)", this.getName(), list); + + Result[] result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result[] call() throws IOException { + return primaryTable.get(list); + } + }, + HBaseOperation.GET_LIST); + + scheduleSequentialReadOperationWithVerification( + new RequestResourcesDescription(result), + this.secondaryAsyncWrapper.get(list), + this.verificationContinuationFactory.get(list, result)); + return result; + } + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.GET_SCANNER)) { + Log.trace("[%s] getScanner(scan=%s)", this.getName(), scan); + MirroringResultScanner scanner = + new MirroringResultScanner( + scan, + this.primaryTable.getScanner(scan), + this.secondaryAsyncWrapper.getScanner(scan), + this.verificationContinuationFactory, + this.mirroringTracer, + this.readSampler.shouldNextReadOperationBeSampled(), + this.requestScheduler, + this.referenceCounter, + this.resultScannerBufferedMismatchedResults); + return scanner; + } + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return getScanner(new Scan().addFamily(family)); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return getScanner(new Scan().addColumn(family, qualifier)); + } + + @Override + public void put(final Put put) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.PUT)) { + Log.trace("[%s] put(put=%s)", this.getName(), put); + this.batcher.batchSingleWriteOperation(put); + } + } + + @Override + public void put(List puts) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.PUT_LIST)) { + Log.trace("[%s] put(puts=%s)", this.getName(), puts); + try { + Object[] results = new Object[puts.size()]; + this.batcher.batch(puts, results); + } catch (InterruptedException e) { + IOException e2 = new InterruptedIOException(); + e2.initCause(e); + throw e2; + } + } + } + + @Override + public void delete(final Delete delete) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.DELETE)) { + Log.trace("[%s] delete(delete=%s)", this.getName(), delete); + this.batcher.batchSingleWriteOperation(delete); + } + } + + @Override + public void delete(List deletes) throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.DELETE_LIST)) { + Log.trace("[%s] delete(deletes=%s)", this.getName(), deletes); + Object[] results = new Object[deletes.size()]; + try { + this.batcher.batch(deletes, results); + } catch (InterruptedException e) { + IOException e2 = new InterruptedIOException(); + e2.initCause(e); + throw e2; + } finally { + final FailedSuccessfulSplit failedSuccessfulSplit = + new FailedSuccessfulSplit<>(deletes, results, resultIsFaultyPredicate, Object.class); + + // Delete should remove successful operations from input list. + // To conform to this requirement we are clearing the list and re-adding failed deletes. + deletes.clear(); + deletes.addAll(failedSuccessfulSplit.failedOperations); + } + } + } + + @Override + public Result mutateRow(final RowMutations rowMutations) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) { + Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations); + + Result result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + return primaryTable.mutateRow(rowMutations); + } + }, + HBaseOperation.MUTATE_ROW); + + Put put = OperationUtils.makePutFromResult(result); + + scheduleSequentialWriteOperation( + new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); + + return result; + } + } + + @Override + public Result append(final Append append) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.APPEND)) { + Log.trace("[%s] append(append=%s)", this.getName(), append); + boolean wantsResults = append.isReturnResults(); + append.setReturnResults(true); + + Result result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + return primaryTable.append(append); + } + }, + HBaseOperation.APPEND); + + Put put = OperationUtils.makePutFromResult(result); + + scheduleSequentialWriteOperation( + new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); + + // HBase's append() returns null when isReturnResults is false. + return wantsResults ? result : null; + } + } + + @Override + public Result increment(final Increment increment) throws IOException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.INCREMENT)) { + Log.trace("[%s] increment(increment=%s)", this.getName(), increment); + boolean wantsResults = increment.isReturnResults(); + increment.setReturnResults(true); + + Result result = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Result call() throws IOException { + return primaryTable.increment(increment); + } + }, + HBaseOperation.INCREMENT); + + Put put = OperationUtils.makePutFromResult(result); + + scheduleSequentialWriteOperation( + new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); + return wantsResults ? result : emptyResult(); + } + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) + throws IOException { + Log.trace( + "[%s] incrementColumnValue(row=%s, family=%s, qualifier=%s, amount=%s)", + this.getName(), row, family, qualifier, amount); + Result result = increment((new Increment(row)).addColumn(family, qualifier, amount)); + Cell cell = result.getColumnLatestCell(family, qualifier); + Preconditions.checkNotNull(cell); + return Bytes.toLong(CellUtil.cloneValue(cell)); + } + + @Override + public long incrementColumnValue( + byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) + throws IOException { + Log.trace( + "[%s] incrementColumnValue(row=%s, family=%s, qualifier=%s, amount=%s, durability=%s)", + this.getName(), row, family, qualifier, amount, durability); + Result result = + increment( + (new Increment(row)).addColumn(family, qualifier, amount).setDurability(durability)); + Cell cell = result.getColumnLatestCell(family, qualifier); + Preconditions.checkNotNull(cell); + return Bytes.toLong(CellUtil.cloneValue(cell)); + } + + @Override + public void batch(List operations, Object[] results) + throws IOException, InterruptedException { + try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BATCH)) { + this.batcher.batch(operations, results); + } + } + + @Override + public void batchCallback( + List inputOperations, Object[] results, final Callback callback) + throws IOException, InterruptedException { + final List operations = new ArrayList<>(inputOperations); + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BATCH_CALLBACK)) { + Log.trace( + "[%s] batchCallback(operations=%s, results, callback=%s)", + this.getName(), operations, callback); + + this.batcher.batch(operations, results, callback); + } } @Override - public RegionLocator getRegionLocator() { - throw new UnsupportedOperationException("not implemented"); + public boolean checkAndMutate( + byte[] row, + byte[] family, + byte[] qualifier, + CompareOp compareOp, + byte[] value, + RowMutations rowMutations) + throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.CHECK_AND_MUTATE)) { + Log.trace( + "[%s] checkAndMutate(row=%s, family=%s, qualifier=%s, compareOp=%s, value=%s, rowMutations=%s)", + this.getName(), row, family, qualifier, compareOp, value, rowMutations); + + return checkAndMutateWithSpan(row, family, qualifier, compareOp, value, rowMutations); + } + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) + throws IOException { + Log.trace( + "[%s] checkAndPut(row=%s, family=%s, qualifier=%s, value=%s, put=%s)", + this.getName(), row, family, qualifier, value, put); + return this.checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); + } + + @Override + public boolean checkAndPut( + byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Put put) + throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.CHECK_AND_PUT)) { + Log.trace( + "[%s] checkAndPut(row=%s, family=%s, qualifier=%s, compareOp=%s, value=%s, put=%s)", + this.getName(), row, family, qualifier, compareOp, value, put); + RowMutations mutations = new RowMutations(row); + mutations.add(put); + return this.checkAndMutateWithSpan(row, family, qualifier, compareOp, value, mutations); + } + } + + @Override + public boolean checkAndDelete( + byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { + Log.trace( + "[%s] checkAndDelete(row=%s, family=%s, qualifier=%s, value=%s, delete=%s)", + this.getName(), row, family, qualifier, value, delete); + return this.checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); } @Override - public boolean[] exists(List gets) throws IOException { - return existsAll(gets); + public boolean checkAndDelete( + byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.CHECK_AND_DELETE)) { + Log.trace( + "[%s] checkAndDelete(row=%s, family=%s, qualifier=%s, compareOp=%s, value=%s, delete=%s)", + this.getName(), row, family, qualifier, compareOp, value, delete); + RowMutations mutations = new RowMutations(row); + mutations.add(delete); + return this.checkAndMutateWithSpan(row, family, qualifier, compareOp, value, mutations); + } + } + + private boolean checkAndMutateWithSpan( + final byte[] row, + final byte[] family, + final byte[] qualifier, + final CompareOp compareOp, + final byte[] value, + final RowMutations rowMutations) + throws IOException { + this.timestamper.fillTimestamp(rowMutations); + boolean wereMutationsApplied = + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Boolean call() throws IOException { + return primaryTable.checkAndMutate( + row, family, qualifier, compareOp, value, rowMutations); + } + }, + HBaseOperation.CHECK_AND_MUTATE); + + if (wereMutationsApplied) { + scheduleSequentialWriteOperation( + new WriteOperationInfo(rowMutations), this.secondaryAsyncWrapper.mutateRow(rowMutations)); + } + return wereMutationsApplied; } /** - * HBase 1.x's {@link Table#append} returns {@code null} when {@link Append#isReturnResults} is - * {@code false} + * Synchronously {@link Table#close()}s primary table and schedules closing of the secondary table + * after finishing all secondary requests that are yet in-flight ({@link + * AsyncTableWrapper#close()}). */ @Override - public Result append(Append append) throws IOException { - Result result = super.append(append); - return result == null ? Result.create(new Cell[0]) : result; + public void close() throws IOException { + this.closePrimaryAndScheduleSecondaryClose(); + } + + private void closePrimaryAndScheduleSecondaryClose() throws IOException { + try (Scope scope = + this.mirroringTracer.spanFactory.operationScope(HBaseOperation.TABLE_CLOSE)) { + if (this.closed.getAndSet(true)) { + return; + } + + // We are freeing the initial reference to current level reference counter. + this.referenceCounter.current.decrementReferenceCount(); + // But we are scheduling asynchronous secondary operation and we should increment our parent's + // ref counter until this operation is finished. + holdReferenceUntilCompletion(this.referenceCounter.parent, this.closedFuture); + + AccumulatedExceptions exceptionsList = new AccumulatedExceptions(); + try { + this.mirroringTracer.spanFactory.wrapPrimaryOperation( + new CallableThrowingIOException() { + @Override + public Void call() throws IOException { + primaryTable.close(); + return null; + } + }, + HBaseOperation.TABLE_CLOSE); + } catch (IOException e) { + exceptionsList.add(e); + } + + try { + // Close secondary wrapper (what will close secondary table) after all scheduled requests + // have finished. + this.referenceCounter + .current + .getOnLastReferenceClosed() + .addListener( + new Runnable() { + @Override + public void run() { + try { + secondaryAsyncWrapper.close(); + closedFuture.set(null); + } catch (IOException e) { + closedFuture.setException(e); + } + } + }, + MoreExecutors.directExecutor()); + } catch (RuntimeException e) { + exceptionsList.add(e); + } + + exceptionsList.rethrowIfCaptured(); + } finally { + this.mirroringTracer.spanFactory.asyncCloseSpanWhenCompleted( + this.referenceCounter.current.getOnLastReferenceClosed()); + } + } + + private void scheduleSequentialReadOperationWithVerification( + final RequestResourcesDescription resourcesDescription, + final Supplier> secondaryOperationSupplier, + final FutureCallback verificationCallback) { + if (!this.readSampler.shouldNextReadOperationBeSampled()) { + return; + } + this.requestScheduler.scheduleRequestWithCallback( + resourcesDescription, + secondaryOperationSupplier, + this.mirroringTracer.spanFactory.wrapReadVerificationCallback(verificationCallback)); + } + + private void scheduleSequentialWriteOperation( + final WriteOperationInfo writeOperationInfo, + final Supplier> secondaryOperationSupplier) { + WriteOperationFutureCallback writeErrorCallback = + new WriteOperationFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + secondaryWriteErrorConsumer.consume( + writeOperationInfo.hBaseOperation, writeOperationInfo.operations, throwable); + } + }; + + // If flow controller errs and won't allow the request we will handle the error using this + // handler. + Function flowControlReservationErrorConsumer = + new Function() { + @Override + public Void apply(Throwable throwable) { + secondaryWriteErrorConsumer.consume( + writeOperationInfo.hBaseOperation, writeOperationInfo.operations, throwable); + return null; + } + }; + + this.requestScheduler.scheduleRequestWithCallback( + writeOperationInfo.requestResourcesDescription, + secondaryOperationSupplier, + this.mirroringTracer.spanFactory.wrapWriteOperationCallback( + writeOperationInfo.hBaseOperation, this.mirroringTracer, writeErrorCallback), + flowControlReservationErrorConsumer); + } + + @Override + public TableName getName() { + return this.primaryTable.getName(); + } + + @Override + public Configuration getConfiguration() { + return this.primaryTable.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return this.primaryTable.getTableDescriptor(); + } + + @Override + public TableDescriptor getDescriptor() throws IOException { + return this.primaryTable.getDescriptor(); + } + + @Override + public RegionLocator getRegionLocator() throws IOException { + return this.primaryTable.getRegionLocator(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] bytes) { + throw new UnsupportedOperationException(); + } + + @Override + public Map coprocessorService( + Class aClass, byte[] bytes, byte[] bytes1, Call call) throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public void coprocessorService( + Class aClass, byte[] bytes, byte[] bytes1, Call call, Callback callback) + throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public Map batchCoprocessorService( + MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) + throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public void batchCoprocessorService( + MethodDescriptor methodDescriptor, + Message message, + byte[] bytes, + byte[] bytes1, + R r, + Callback callback) + throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public void setOperationTimeout(int i) { + throw new UnsupportedOperationException(); + } + + @Override + public int getOperationTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public int getRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setRpcTimeout(int i) { + throw new UnsupportedOperationException(); + } + + @Override + public int getReadRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadRpcTimeout(int i) { + throw new UnsupportedOperationException(); + } + + @Override + public int getWriteRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteRpcTimeout(int i) { + throw new UnsupportedOperationException(); } } diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 4d2c5839d0..5252729080 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -757,7 +757,7 @@ public void testConditionalWriteHappensWhenSecondaryErred() when(primaryBuilder.thenMutate(mutations)).thenReturn(primaryFuture); IOException ioe = new IOException("expected"); - CompletableFuture exceptionalFuture = new CompletableFuture<>(); + CompletableFuture exceptionalFuture = new CompletableFuture<>(); exceptionalFuture.completeExceptionally(ioe); when(secondaryTable.mutateRow(mutations)).thenReturn(exceptionalFuture); @@ -841,12 +841,12 @@ public void testDelete() throws InterruptedException, ExecutionException { @Test public void testMutateRow() throws ExecutionException, InterruptedException { RowMutations mutations = new RowMutations("r1".getBytes()); - CompletableFuture primaryFuture = new CompletableFuture<>(); - CompletableFuture secondaryFuture = new CompletableFuture<>(); + CompletableFuture primaryFuture = new CompletableFuture<>(); + CompletableFuture secondaryFuture = new CompletableFuture<>(); when(primaryTable.mutateRow(mutations)).thenReturn(primaryFuture); when(secondaryTable.mutateRow(mutations)).thenReturn(secondaryFuture); - CompletableFuture resultFuture = mirroringTable.mutateRow(mutations); + CompletableFuture resultFuture = mirroringTable.mutateRow(mutations); primaryFuture.complete(null); secondaryFuture.complete(null); resultFuture.get(); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java index aee0e36800..2c07f89216 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector; import com.google.common.annotations.VisibleForTesting; @@ -59,7 +60,7 @@ public class MirroringConnection implements Connection { protected final MismatchDetector mismatchDetector; /** * Counter of all asynchronous operations that are using the secondary connection. Incremented - * when scheduling operations by underlying {@link MirroringTable} and {@link + * when scheduling operations by underlying MirroringTable and {@link * MirroringResultScanner}. */ protected final ListenableReferenceCounter referenceCounter; @@ -231,7 +232,7 @@ public Table call() throws IOException { }, HBaseOperation.GET_TABLE); Table secondaryTable = this.secondaryConnection.getTable(tableName); - return new MirroringTable( + return getMirroringTable( primaryTable, secondaryTable, executorService, @@ -248,6 +249,20 @@ public Table call() throws IOException { } } + protected Table getMirroringTable(Table primaryTable, + Table secondaryTable, + ExecutorService executorService, + MismatchDetector mismatchDetector, + FlowController flowController, + SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + ReadSampler readSampler, + Timestamper timestamper, + boolean performWritesConcurrently, + boolean waitForSecondaryWrites, + MirroringTracer mirroringTracer, + ReferenceCounter parentReferenceCounter, + int resultScannerBufferedMismatchedResults) { return null; } + @Override public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { return getBufferedMutator(new BufferedMutatorParams(tableName)); diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringResultScanner.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringResultScanner.java index 07dde94ef7..fbb090ad07 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringResultScanner.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringResultScanner.java @@ -18,7 +18,7 @@ import static com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounterUtils.holdReferenceUntilCompletion; import com.google.api.core.InternalApi; -import com.google.cloud.bigtable.mirroring.core.MirroringTable.RequestScheduler; +import com.google.cloud.bigtable.mirroring.core.RequestScheduler; import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncResultScannerWrapper; import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncResultScannerWrapper.ScannerRequestContext; import com.google.cloud.bigtable.mirroring.core.utils.AccumulatedExceptions; diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/RequestScheduler.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/RequestScheduler.java new file mode 100644 index 0000000000..dffcde4b61 --- /dev/null +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/RequestScheduler.java @@ -0,0 +1,92 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigtable.mirroring.core; + + +import static com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounterUtils.holdReferenceUntilCompletion; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.mirroring.core.utils.RequestScheduling; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.RequestResourcesDescription; +import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Helper class that holds common parameters to {@link + * RequestScheduling#scheduleRequestWithCallback(RequestResourcesDescription, Supplier, + * FutureCallback, FlowController, MirroringTracer, Function)}. + * + *

It also takes care of reference counting all scheduled operations. + */ +@InternalApi("For internal usage only") +public class RequestScheduler { + final FlowController flowController; + final MirroringTracer mirroringTracer; + final ReferenceCounter referenceCounter; + + public RequestScheduler( + FlowController flowController, + MirroringTracer mirroringTracer, + ReferenceCounter referenceCounter) { + this.flowController = flowController; + this.mirroringTracer = mirroringTracer; + this.referenceCounter = referenceCounter; + } + + public RequestScheduler withReferenceCounter(ReferenceCounter referenceCounter) { + return new RequestScheduler(this.flowController, this.mirroringTracer, referenceCounter); + } + + public ListenableFuture scheduleRequestWithCallback( + final RequestResourcesDescription requestResourcesDescription, + final Supplier> secondaryResultFutureSupplier, + final FutureCallback verificationCallback) { + return this.scheduleRequestWithCallback( + requestResourcesDescription, + secondaryResultFutureSupplier, + verificationCallback, + // noop flowControlReservationErrorConsumer + new Function() { + @Override + public Void apply(Throwable t) { + return null; + } + }); + } + + public ListenableFuture scheduleRequestWithCallback( + final RequestResourcesDescription requestResourcesDescription, + final Supplier> secondaryResultFutureSupplier, + final FutureCallback verificationCallback, + final Function flowControlReservationErrorConsumer) { + ListenableFuture future = + RequestScheduling.scheduleRequestWithCallback( + requestResourcesDescription, + secondaryResultFutureSupplier, + verificationCallback, + this.flowController, + this.mirroringTracer, + flowControlReservationErrorConsumer); + holdReferenceUntilCompletion(this.referenceCounter, future); + return future; + } +} \ No newline at end of file diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/utils/Batcher.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/utils/Batcher.java index 9f5a29ddae..eb24bcb08b 100644 --- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/utils/Batcher.java +++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/utils/Batcher.java @@ -20,7 +20,7 @@ import static com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.reconcileBatchResultsSequential; import com.google.cloud.bigtable.mirroring.core.MirroringOperationException; -import com.google.cloud.bigtable.mirroring.core.MirroringTable.RequestScheduler; +import com.google.cloud.bigtable.mirroring.core.RequestScheduler; import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncTableWrapper; import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.BatchData; import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.FailedSuccessfulSplit; diff --git a/hbase-migration-tools/mirroring-client/pom.xml b/hbase-migration-tools/mirroring-client/pom.xml index 61b7794f9e..e68cab5aa4 100644 --- a/hbase-migration-tools/mirroring-client/pom.xml +++ b/hbase-migration-tools/mirroring-client/pom.xml @@ -15,9 +15,6 @@ com.google.bigtable.hbase.mirroring.shaded com/google/bigtable/hbase/mirroring/shaded - - - 2.3.6 diff --git a/pom.xml b/pom.xml index 586ad6cc73..edaa339441 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ limitations under the License. 1.7.2 - 2.5.7-hadoop3 + 2.5.11-hadoop3 2.28.2