Skip to content

Commit

Permalink
Merge pull request #481 from keith-turner/fluo-456
Browse files Browse the repository at this point in the history
fixes #456 moved notification deletion to end of transaction
  • Loading branch information
keith-turner committed Apr 23, 2015
2 parents 6b9f630 + cc257fb commit e936ba5
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

/**
*
*
*/
public class PrewriteIterator implements SortedKeyValueIterator<Key, Value> {
private static final String TIMESTAMP_OPT = "timestampOpt";
private static final String CHECK_ACK_OPT = "checkAckOpt";
private static final String NTFY_TIMESTAMP_OPT = "ntfyTsOpt";

private SortedKeyValueIterator<Key, Value> source;
private long snaptime;

boolean hasTop = false;
boolean checkAck = false;
long ntfyTimestamp = -1;

public static void setSnaptime(IteratorSetting cfg, long time) {
if (time < 0 || (ColumnConstants.PREFIX_MASK & time) != 0) {
Expand All @@ -51,8 +53,9 @@ public static void setSnaptime(IteratorSetting cfg, long time) {
cfg.addOption(TIMESTAMP_OPT, time + "");
}

public static void enableAckCheck(IteratorSetting cfg) {
public static void enableAckCheck(IteratorSetting cfg, long timestamp) {
cfg.addOption(CHECK_ACK_OPT, "true");
cfg.addOption(NTFY_TIMESTAMP_OPT, timestamp + "");
}

@Override
Expand All @@ -62,6 +65,7 @@ public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String>
this.snaptime = Long.parseLong(options.get(TIMESTAMP_OPT));
if (options.containsKey(CHECK_ACK_OPT)) {
this.checkAck = Boolean.parseBoolean(options.get(CHECK_ACK_OPT));
this.ntfyTimestamp = Long.parseLong(options.get(NTFY_TIMESTAMP_OPT));
}
}

Expand Down Expand Up @@ -94,7 +98,6 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
}

long invalidationTime = -1;
long firstWrite = -1;

hasTop = false;
while (source.hasTop()
Expand All @@ -112,10 +115,6 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
invalidationTime = timePtr;
}

if (firstWrite == -1) {
firstWrite = ts;
}

if (ts >= snaptime) {
hasTop = true;
return;
Expand Down Expand Up @@ -144,7 +143,7 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
// can stop looking
return;
} else if (colType == ColumnConstants.ACK_PREFIX) {
if (checkAck && ts >= firstWrite) {
if (checkAck && ts > ntfyTimestamp) {
hasTop = true;
return;
}
Expand Down
47 changes: 47 additions & 0 deletions modules/core/src/main/java/io/fluo/core/impl/Notification.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.fluo.core.impl;

import io.fluo.accumulo.util.ColumnConstants;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.RowColumn;
import io.fluo.core.util.ColumnUtil;
import io.fluo.core.util.Flutation;
import org.apache.accumulo.core.security.ColumnVisibility;

public class Notification extends RowColumn {
private long timestamp;

public Notification(Bytes row, Column col, long ts) {
super(row, col);
this.timestamp = ts;
}

public long getTimestamp() {
return timestamp;
}

public Flutation newDelete(Environment env) {
return newDelete(env, getTimestamp());
}

public Flutation newDelete(Environment env, long ts) {
Flutation m = new Flutation(env, getRow());
ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(getColumn());
m.putDelete(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(getColumn()), cv, ts);
return m;
}
}
59 changes: 45 additions & 14 deletions modules/core/src/main/java/io/fluo/core/impl/SharedBatchWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.data.Mutation;
Expand All @@ -31,15 +32,14 @@ public class SharedBatchWriter {
private ArrayBlockingQueue<MutationBatch> mQueue = new ArrayBlockingQueue<>(1000);
private MutationBatch end = new MutationBatch(new ArrayList<Mutation>());

private AtomicLong asyncBatchesAdded = new AtomicLong(0);
private long asyncBatchesProcessed = 0;

private static class MutationBatch {

private List<Mutation> mutations;
private CountDownLatch cdl;

public MutationBatch(Mutation m) {
mutations = Collections.singletonList(m);
cdl = new CountDownLatch(1);
}
private boolean isAsync = false;

public MutationBatch(List<Mutation> mutations) {
this.mutations = mutations;
Expand Down Expand Up @@ -67,11 +67,24 @@ public void run() {

bw.flush();

int numAsync = 0;

for (MutationBatch mutationBatch : batches) {
if (mutationBatch == end) {
keepRunning = false;
}
mutationBatch.cdl.countDown();

if (mutationBatch.isAsync) {
numAsync++;
}
}

if (numAsync > 0) {
synchronized (SharedBatchWriter.this) {
asyncBatchesProcessed += numAsync;
SharedBatchWriter.this.notifyAll();
}
}

} catch (Exception e) {
Expand All @@ -92,13 +105,7 @@ public SharedBatchWriter(BatchWriter bw) {
}

public void writeMutation(Mutation m) {
try {
MutationBatch mb = new MutationBatch(m);
mQueue.put(mb);
mb.cdl.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
writeMutations(Collections.singletonList(m));
}

public void writeMutations(List<Mutation> ml) {
Expand All @@ -125,13 +132,37 @@ public void close() {
}
}

public void writeMutationAsync(Mutation m) {
public void writeMutationsAsync(List<Mutation> ml) {
try {
MutationBatch mb = new MutationBatch(m);
MutationBatch mb = new MutationBatch(ml);
mb.isAsync = true;
asyncBatchesAdded.incrementAndGet();
mQueue.put(mb);

} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void writeMutationAsync(Mutation m) {
writeMutationsAsync(Collections.singletonList(m));
}

/**
* waits for all async mutations that were added before this was called to be flushed. Does not
* wait for asyn mutations added after call.
*/
public void waitForAsyncFlush() {
long numAdded = asyncBatchesAdded.get();

synchronized (this) {
while (numAdded > asyncBatchesProcessed) {
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Loading

0 comments on commit e936ba5

Please sign in to comment.