Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ limitations under the License.
<pattern>io.opentelemetry</pattern>
<shadedPattern>com.google.bigtable.repackaged.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>com.google.bigtable.repackaged.javax.activation</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/versions/9/io/opentelemetry</pattern>
<shadedPattern>META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,9 @@ public int getStoreFileCount() {
public Size getStoreFileSize() {
return new Size(size, Unit.BYTE);
}

@Override
public Size getMemStoreSize() { return new Size(size, Unit.BYTE); }
}
/** Handler for unsupported operations for generating Admin class at runtime. */
public static class UnsupportedOperationsHandler implements InvocationHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ limitations under the License.
com.google.bigtable.repackaged.javax.annotation
</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>
com.google.bigtable.repackaged.javax.activation
</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
package com.google.cloud.bigtable.mirroring.hbase1_x;

import com.google.cloud.bigtable.mirroring.core.MirroringConfiguration;
import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;

public class MirroringConnection
Expand All @@ -41,4 +49,27 @@ public MirroringConnection(MirroringConfiguration mirroringConfiguration, Execut
throws IOException {
super(mirroringConfiguration, pool);
}

@Override
protected Table getMirroringTable(Table primaryTable, Table secondaryTable,
ExecutorService executorService, MismatchDetector mismatchDetector,
FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
ReadSampler readSampler, Timestamper timestamper, boolean performWritesConcurrently,
boolean waitForSecondaryWrites, MirroringTracer mirroringTracer,
ReferenceCounter parentReferenceCounter, int resultScannerBufferedMismatchedResults) {
return new MirroringTable(
primaryTable,
secondaryTable,
executorService,
this.mismatchDetector,
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.timestamper,
this.performWritesConcurrently,
this.waitForSecondaryWrites,
this.mirroringTracer,
this.referenceCounter,
this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.mirroring.core;
package com.google.cloud.bigtable.mirroring.hbase1_x;

import static com.google.cloud.bigtable.mirroring.core.utils.OperationUtils.emptyResult;
import static com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounterUtils.holdReferenceUntilCompletion;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner;
import com.google.cloud.bigtable.mirroring.core.RequestScheduler;
import com.google.cloud.bigtable.mirroring.core.WriteOperationFutureCallback;
import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncTableWrapper;
import com.google.cloud.bigtable.mirroring.core.utils.AccumulatedExceptions;
import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers.FailedSuccessfulSplit;
Expand Down Expand Up @@ -781,65 +784,4 @@ public int getWriteRpcTimeout() {
public void setWriteRpcTimeout(int i) {
throw new UnsupportedOperationException();
}

/**
* Helper class that holds common parameters to {@link
* RequestScheduling#scheduleRequestWithCallback(RequestResourcesDescription, Supplier,
* FutureCallback, FlowController, MirroringTracer, Function)} for single instance of {@link
* com.google.cloud.bigtable.mirroring.core.MirroringTable}.
*
* <p>It also takes care of reference counting all scheduled operations.
*/
public static class RequestScheduler {
final FlowController flowController;
final MirroringTracer mirroringTracer;
final ReferenceCounter referenceCounter;

public RequestScheduler(
FlowController flowController,
MirroringTracer mirroringTracer,
ReferenceCounter referenceCounter) {
this.flowController = flowController;
this.mirroringTracer = mirroringTracer;
this.referenceCounter = referenceCounter;
}

public RequestScheduler withReferenceCounter(ReferenceCounter referenceCounter) {
return new RequestScheduler(this.flowController, this.mirroringTracer, referenceCounter);
}

public <T> ListenableFuture<Void> scheduleRequestWithCallback(
final RequestResourcesDescription requestResourcesDescription,
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FutureCallback<T> verificationCallback) {
return this.scheduleRequestWithCallback(
requestResourcesDescription,
secondaryResultFutureSupplier,
verificationCallback,
// noop flowControlReservationErrorConsumer
new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
return null;
}
});
}

public <T> ListenableFuture<Void> scheduleRequestWithCallback(
final RequestResourcesDescription requestResourcesDescription,
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FutureCallback<T> verificationCallback,
final Function<Throwable, Void> flowControlReservationErrorConsumer) {
ListenableFuture<Void> future =
RequestScheduling.scheduleRequestWithCallback(
requestResourcesDescription,
secondaryResultFutureSupplier,
verificationCallback,
this.flowController,
this.mirroringTracer,
flowControlReservationErrorConsumer);
holdReferenceUntilCompletion(this.referenceCounter, future);
return future;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ public FailingHBaseHRegion2(
}

@Override
public HRegion.RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException {
// HBase 2.x implements Gets as Scans with start row == end row == requested row.
processRowThrow(scan.getStartRow());
return super.getScanner(scan, additionalScanners);
}

@Override
public void mutateRow(RowMutations rm) throws IOException {
public Result mutateRow(RowMutations rm) throws IOException {
processRowThrow(rm.getRow());
super.mutateRow(rm);
return super.mutateRow(rm);
}

@Override
Expand All @@ -100,12 +100,6 @@ public OperationStatus[] batchMutate(
mutations, (m) -> super.batchMutate(m, atomic, nonceGroup, nonce));
}

@Override
public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
throws IOException {
return batchMutateWithFailures(mutations, (m) -> super.batchMutate(m, nonceGroup, nonce));
}

@Override
public Result get(Get get) throws IOException {
processRowThrow(get.getRow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ limitations under the License.
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -202,6 +206,7 @@ limitations under the License.
<ignoredDependency>commons-logging:commons-logging</ignoredDependency>

<ignoredDependency>log4j:log4j</ignoredDependency>
<ignoredDependency>ch.qos.reload4j:reload4j</ignoredDependency>
</ignoredDependencies>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ limitations under the License.
<pattern>io.opencensus</pattern>
<shadedPattern>${shading-prefix}.io.opencensus</shadedPattern>
</relocation>
<relocation>
<pattern>io.opentelemetry</pattern>
<shadedPattern>com.google.cloud.bigtable.mirroring.repackaged.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/versions/9/io/opentelemetry</pattern>
<shadedPattern>META-INF/versions/9/com/google/cloud/bigtable/mirroring/repackaged/io/opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>com.google.cloud.bigtable.mirroring.repackaged.javax.activation</shadedPattern>
</relocation>
<!-- grpc is used only by opencensus dependency -->
<relocation>
<pattern>io.grpc</pattern>
Expand Down Expand Up @@ -198,6 +210,13 @@ limitations under the License.
<targetDependencies>
<targetDependency>org.apache.hbase:hbase-shaded-client</targetDependency>
</targetDependencies>
<!-- opentelemetry dependency is added to veneer since 2.38.0. We shade veneer so ignoring
opentelemetry when comparing with hbase-shaded-client is fine-->
<ignoredDependencies>
<dependency>io.opentelemetry:opentelemetry-api</dependency>
<dependency>io.opentelemetry:opentelemetry-context</dependency>
<dependency>io.opentelemetry:opentelemetry-semconv</dependency>
</ignoredDependencies>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,16 @@ public AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return this;
}

@Override
public AsyncTableBuilder<C> setRetryPauseForServerOverloaded(long pause, TimeUnit unit) {
setTimeParameter(
pause,
unit,
this.primaryTableBuilder::setRetryPauseForServerOverloaded,
this.secondaryTableBuilder::setRetryPauseForServerOverloaded);
return this;
}

@Override
public AsyncTableBuilder<C> setMaxAttempts(int maxAttempts) {
setIntegerParameter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.google.cloud.bigtable.mirroring.hbase2_x.utils.AsyncRequestScheduling.reserveFlowControlResourcesThenScheduleSecondary;

import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner;
import com.google.cloud.bigtable.mirroring.core.MirroringTable.RequestScheduler;
import com.google.cloud.bigtable.mirroring.core.RequestScheduler;
import com.google.cloud.bigtable.mirroring.core.WriteOperationFutureCallback;
import com.google.cloud.bigtable.mirroring.core.asyncwrappers.AsyncResultScannerWrapper;
import com.google.cloud.bigtable.mirroring.core.utils.BatchHelpers;
Expand Down Expand Up @@ -63,6 +63,8 @@
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
Expand Down Expand Up @@ -202,9 +204,9 @@ public CompletableFuture<Result> increment(Increment increment) {
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations rowMutations) {
public CompletableFuture<Result> mutateRow(RowMutations rowMutations) {
this.timestamper.fillTimestamp(rowMutations);
CompletableFuture<Void> primaryFuture = this.primaryTable.mutateRow(rowMutations);
CompletableFuture<Result> primaryFuture = this.primaryTable.mutateRow(rowMutations);
return writeWithFlowControl(
new WriteOperationInfo(rowMutations),
primaryFuture,
Expand Down Expand Up @@ -534,6 +536,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(
throw new UnsupportedOperationException("not implemented");
}

@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(List<CheckAndMutate> list) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void scan(Scan scan, C consumer) {
this.primaryTable.scan(scan, consumer);
Expand Down Expand Up @@ -561,7 +573,7 @@ public MirroringCheckAndMutateBuilder(CheckAndMutateBuilder primaryBuilder) {
private OperationStages<CompletableFuture<Boolean>> checkAndMutate(
WriteOperationInfo writeOperationInfo,
CompletableFuture<Boolean> primary,
Supplier<CompletableFuture<Void>> secondary) {
Supplier<CompletableFuture<?>> secondary) {
OperationStages<CompletableFuture<Boolean>> returnedValue =
new OperationStages<>(new CompletableFuture<>());
primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
*/
package com.google.cloud.bigtable.mirroring.hbase2_x;

import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -30,6 +37,29 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
super(conf, managed, pool, user);
}

@Override
protected Table getMirroringTable(Table primaryTable, Table secondaryTable,
ExecutorService executorService, MismatchDetector mismatchDetector,
FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
ReadSampler readSampler, Timestamper timestamper, boolean performWritesConcurrently,
boolean waitForSecondaryWrites, MirroringTracer mirroringTracer,
ReferenceCounter parentReferenceCounter, int resultScannerBufferedMismatchedResults) {
return new MirroringTable(
primaryTable,
secondaryTable,
executorService,
this.mismatchDetector,
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.timestamper,
this.performWritesConcurrently,
this.waitForSecondaryWrites,
this.mirroringTracer,
this.referenceCounter,
this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
}

public MirroringConnection(Configuration conf, ExecutorService pool, User user) throws Throwable {
this(conf, false, pool, user);
}
Expand Down
Loading