From cc257fb2d1defd076d786f4757642ccf90d1c64d Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 22 Apr 2015 11:31:34 -0400 Subject: [PATCH] fixes #456 moved notification deletion to end of transaction --- .../accumulo/iterators/PrewriteIterator.java | 15 +- .../java/io/fluo/core/impl/Notification.java | 47 +++++++ .../io/fluo/core/impl/SharedBatchWriter.java | 59 ++++++-- .../io/fluo/core/impl/TransactionImpl.java | 129 ++++++++++-------- .../io/fluo/core/log/TracingTransaction.java | 26 ++-- .../java/io/fluo/core/util/ColumnUtil.java | 2 - .../fluo/core/worker/NotificationFinder.java | 5 +- .../core/worker/NotificationProcessor.java | 20 +-- .../java/io/fluo/core/worker/WorkTask.java | 26 ++-- .../finder/hash/HashNotificationFinder.java | 5 +- .../core/worker/finder/hash/ScanTask.java | 10 +- .../java/io/fluo/core/TestTransaction.java | 48 ++++++- .../java/io/fluo/core/impl/FailureIT.java | 7 +- .../test/java/io/fluo/core/impl/FluoIT.java | 100 +++++++++++--- .../core/impl/WeakNotificationOverlapIT.java | 3 + .../src/test/java/io/fluo/core/log/LogIT.java | 15 +- 16 files changed, 357 insertions(+), 160 deletions(-) create mode 100644 modules/core/src/main/java/io/fluo/core/impl/Notification.java diff --git a/modules/accumulo/src/main/java/io/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/io/fluo/accumulo/iterators/PrewriteIterator.java index cdedb0dc9..060a59128 100644 --- a/modules/accumulo/src/main/java/io/fluo/accumulo/iterators/PrewriteIterator.java +++ b/modules/accumulo/src/main/java/io/fluo/accumulo/iterators/PrewriteIterator.java @@ -32,17 +32,19 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; /** - * + * */ public class PrewriteIterator implements SortedKeyValueIterator { private static final String TIMESTAMP_OPT = "timestampOpt"; private static final String CHECK_ACK_OPT = "checkAckOpt"; + private static final String NTFY_TIMESTAMP_OPT = "ntfyTsOpt"; private SortedKeyValueIterator source; private long snaptime; boolean hasTop = false; boolean checkAck = false; + long ntfyTimestamp = -1; public static void setSnaptime(IteratorSetting cfg, long time) { if (time < 0 || (ColumnConstants.PREFIX_MASK & time) != 0) { @@ -51,8 +53,9 @@ public static void setSnaptime(IteratorSetting cfg, long time) { cfg.addOption(TIMESTAMP_OPT, time + ""); } - public static void enableAckCheck(IteratorSetting cfg) { + public static void enableAckCheck(IteratorSetting cfg, long timestamp) { cfg.addOption(CHECK_ACK_OPT, "true"); + cfg.addOption(NTFY_TIMESTAMP_OPT, timestamp + ""); } @Override @@ -62,6 +65,7 @@ public void init(SortedKeyValueIterator source, Map this.snaptime = Long.parseLong(options.get(TIMESTAMP_OPT)); if (options.containsKey(CHECK_ACK_OPT)) { this.checkAck = Boolean.parseBoolean(options.get(CHECK_ACK_OPT)); + this.ntfyTimestamp = Long.parseLong(options.get(NTFY_TIMESTAMP_OPT)); } } @@ -94,7 +98,6 @@ public void seek(Range range, Collection columnFamilies, boolean i } long invalidationTime = -1; - long firstWrite = -1; hasTop = false; while (source.hasTop() @@ -112,10 +115,6 @@ public void seek(Range range, Collection columnFamilies, boolean i invalidationTime = timePtr; } - if (firstWrite == -1) { - firstWrite = ts; - } - if (ts >= snaptime) { hasTop = true; return; @@ -144,7 +143,7 @@ public void seek(Range range, Collection columnFamilies, boolean i // can stop looking return; } else if (colType == ColumnConstants.ACK_PREFIX) { - if (checkAck && ts >= firstWrite) { + if (checkAck && ts > ntfyTimestamp) { hasTop = true; return; } diff --git a/modules/core/src/main/java/io/fluo/core/impl/Notification.java b/modules/core/src/main/java/io/fluo/core/impl/Notification.java new file mode 100644 index 000000000..1ea9233fc --- /dev/null +++ b/modules/core/src/main/java/io/fluo/core/impl/Notification.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014 Fluo authors (see AUTHORS) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.fluo.core.impl; + +import io.fluo.accumulo.util.ColumnConstants; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.data.RowColumn; +import io.fluo.core.util.ColumnUtil; +import io.fluo.core.util.Flutation; +import org.apache.accumulo.core.security.ColumnVisibility; + +public class Notification extends RowColumn { + private long timestamp; + + public Notification(Bytes row, Column col, long ts) { + super(row, col); + this.timestamp = ts; + } + + public long getTimestamp() { + return timestamp; + } + + public Flutation newDelete(Environment env) { + return newDelete(env, getTimestamp()); + } + + public Flutation newDelete(Environment env, long ts) { + Flutation m = new Flutation(env, getRow()); + ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(getColumn()); + m.putDelete(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(getColumn()), cv, ts); + return m; + } +} diff --git a/modules/core/src/main/java/io/fluo/core/impl/SharedBatchWriter.java b/modules/core/src/main/java/io/fluo/core/impl/SharedBatchWriter.java index a8b59a9c2..75137cd27 100644 --- a/modules/core/src/main/java/io/fluo/core/impl/SharedBatchWriter.java +++ b/modules/core/src/main/java/io/fluo/core/impl/SharedBatchWriter.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.data.Mutation; @@ -31,15 +32,14 @@ public class SharedBatchWriter { private ArrayBlockingQueue mQueue = new ArrayBlockingQueue<>(1000); private MutationBatch end = new MutationBatch(new ArrayList()); + private AtomicLong asyncBatchesAdded = new AtomicLong(0); + private long asyncBatchesProcessed = 0; + private static class MutationBatch { private List mutations; private CountDownLatch cdl; - - public MutationBatch(Mutation m) { - mutations = Collections.singletonList(m); - cdl = new CountDownLatch(1); - } + private boolean isAsync = false; public MutationBatch(List mutations) { this.mutations = mutations; @@ -67,11 +67,24 @@ public void run() { bw.flush(); + int numAsync = 0; + for (MutationBatch mutationBatch : batches) { if (mutationBatch == end) { keepRunning = false; } mutationBatch.cdl.countDown(); + + if (mutationBatch.isAsync) { + numAsync++; + } + } + + if (numAsync > 0) { + synchronized (SharedBatchWriter.this) { + asyncBatchesProcessed += numAsync; + SharedBatchWriter.this.notifyAll(); + } } } catch (Exception e) { @@ -92,13 +105,7 @@ public SharedBatchWriter(BatchWriter bw) { } public void writeMutation(Mutation m) { - try { - MutationBatch mb = new MutationBatch(m); - mQueue.put(mb); - mb.cdl.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } + writeMutations(Collections.singletonList(m)); } public void writeMutations(List ml) { @@ -125,13 +132,37 @@ public void close() { } } - public void writeMutationAsync(Mutation m) { + public void writeMutationsAsync(List ml) { try { - MutationBatch mb = new MutationBatch(m); + MutationBatch mb = new MutationBatch(ml); + mb.isAsync = true; + asyncBatchesAdded.incrementAndGet(); mQueue.put(mb); + } catch (Exception e) { throw new RuntimeException(e); } } + public void writeMutationAsync(Mutation m) { + writeMutationsAsync(Collections.singletonList(m)); + } + + /** + * waits for all async mutations that were added before this was called to be flushed. Does not + * wait for asyn mutations added after call. + */ + public void waitForAsyncFlush() { + long numAdded = asyncBatchesAdded.get(); + + synchronized (this) { + while (numAdded > asyncBatchesProcessed) { + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } } diff --git a/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java index fac6c57ca..e2d6094f2 100644 --- a/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java @@ -46,6 +46,7 @@ import io.fluo.core.util.ConditionalFlutation; import io.fluo.core.util.FluoCondition; import io.fluo.core.util.Flutation; +import io.fluo.core.util.SpanUtil; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; @@ -53,12 +54,14 @@ import org.apache.accumulo.core.client.ConditionalWriter.Status; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.ColumnVisibility; /** @@ -81,10 +84,8 @@ private static enum TxStatus { private final Environment env; final Map> columnsRead = new HashMap<>(); private final TxStats stats = new TxStats(); - private Bytes triggerRow; - private Column triggerColumn; - private Bytes weakRow; - private Column weakColumn; + private Notification notification; + private Notification weakNotification; private TransactorNode tnode = null; private TxStatus status = TxStatus.OPEN; @@ -92,38 +93,36 @@ private ColumnVisibility gv(Column colvis) { return env.getSharedResources().getVisCache().getCV(colvis); } - public TransactionImpl(Environment env, Bytes triggerRow, Column triggerColumn, long startTs) { + public TransactionImpl(Environment env, Notification trigger, long startTs) { Preconditions.checkNotNull(env, "environment cannot be null"); Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative"); this.env = env; this.startTs = startTs; this.observedColumns = env.getObservers().keySet(); - if (triggerColumn != null && env.getWeakObservers().containsKey(triggerColumn)) { - this.weakRow = triggerRow; - this.weakColumn = triggerColumn; + if (trigger != null && env.getWeakObservers().containsKey(trigger.getColumn())) { + this.weakNotification = trigger; } else { - this.triggerRow = triggerRow; - this.triggerColumn = triggerColumn; + this.notification = trigger; } - if (triggerRow != null) { + if (notification != null) { Map colUpdates = new HashMap<>(); - colUpdates.put(triggerColumn, null); - updates.put(triggerRow, colUpdates); + colUpdates.put(notification.getColumn(), null); + updates.put(notification.getRow(), colUpdates); } } - public TransactionImpl(Environment env, Bytes triggerRow, Column tiggerColumn) { - this(env, triggerRow, tiggerColumn, allocateTimestamp(env)); + public TransactionImpl(Environment env, Notification trigger) { + this(env, trigger, allocateTimestamp(env)); } public TransactionImpl(Environment env) { - this(env, null, null, allocateTimestamp(env)); + this(env, null, allocateTimestamp(env)); } public TransactionImpl(Environment env, long startTs) { - this(env, null, null, startTs); + this(env, null, startTs); } private static Long allocateTimestamp(Environment env) { @@ -275,9 +274,9 @@ private ConditionalFlutation prewrite(ConditionalFlutation cm, Bytes row, Column Bytes primaryRow, Column primaryColumn, boolean isTriggerRow) { IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class); PrewriteIterator.setSnaptime(iterConf, startTs); - boolean isTrigger = isTriggerRow && col.equals(triggerColumn); + boolean isTrigger = isTriggerRow && col.equals(notification.getColumn()); if (isTrigger) { - PrewriteIterator.enableAckCheck(iterConf); + PrewriteIterator.enableAckCheck(iterConf, notification.getTimestamp()); } Condition cond = new FluoCondition(env, col).setIterators(iterConf); @@ -347,9 +346,9 @@ public String toString() { public boolean preCommit(CommitData cd) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, AlreadyAcknowledgedException { - if (triggerRow != null) { + if (notification != null) { // always want to throw already ack exception if collision, so process trigger first - return preCommit(cd, triggerRow, triggerColumn); + return preCommit(cd, notification.getRow(), notification.getColumn()); } else { Bytes prow = updates.keySet().iterator().next(); Map colSet = updates.get(prow); @@ -359,6 +358,10 @@ public boolean preCommit(CommitData cd) throws TableNotFoundException, AccumuloE } + private boolean isTriggerRow(Bytes row) { + return notification != null && notification.getRow().equals(row); + } + public boolean preCommit(CommitData cd, Bytes primRow, Column primCol) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, AlreadyAcknowledgedException { @@ -377,7 +380,7 @@ public boolean preCommit(CommitData cd, Bytes primRow, Column primCol) // try to lock primary column ConditionalMutation pcm = - prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, cd.prow.equals(triggerRow)); + prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow)); Status mutationStatus = cd.cw.write(pcm).getStatus(); @@ -418,7 +421,7 @@ public boolean preCommit(CommitData cd, Bytes primRow, Column primCol) for (Entry> rowUpdates : updates.entrySet()) { ConditionalFlutation cm = null; - boolean isTriggerRow = rowUpdates.getKey().equals(triggerRow); + boolean isTriggerRow = isTriggerRow(rowUpdates.getKey()); for (Entry colUpdates : rowUpdates.getValue().entrySet()) { if (cm == null) { @@ -490,7 +493,6 @@ private void writeWeakNotifications(long commitTs) { *
  • TX2 attempts to write r1:col1 w/o reading it * * - *

    * In this case TX2 would not roll back TX1, because it never read the column. This function * attempts to handle this case if TX2 fails. Only doing this in case of failures is cheaper than * trying to always read unread columns. @@ -523,7 +525,7 @@ private void readUnread(CommitData cd) throws Exception { private boolean checkForAckCollision(ConditionalMutation cm) { Bytes row = Bytes.of(cm.getRow()); - if (row.equals(triggerRow)) { + if (isTriggerRow(row)) { List updates = cm.getUpdates(); for (ColumnUpdate cu : updates) { @@ -532,20 +534,33 @@ private boolean checkForAckCollision(ConditionalMutation cm) { new Column(Bytes.of(cu.getColumnFamily()), Bytes.of(cu.getColumnQualifier()), Bytes.of(cu.getColumnVisibility())); - if (triggerColumn.equals(col)) { + if (notification.getColumn().equals(col)) { + // check to see if ACK exist after notification + Key startKey = SpanUtil.toKey(notification); + startKey.setTimestamp(ColumnConstants.ACK_PREFIX + | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK)); + + Key endKey = SpanUtil.toKey(notification); + endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() + 1)); + + Range range = new Range(startKey, endKey); - // TODO this check will not detect ack when tx overlaps with another tx... it will instead - // the the lock release.. this may be ok, the worker will - // retry the tx and then see the already ack exception + Scanner scanner; + try { + // TODO reuse or share scanner + scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations()); + } catch (TableNotFoundException e) { + // TODO proper exception handling + throw new RuntimeException(e); + } - IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class); - PrewriteIterator.setSnaptime(iterConf, startTs); - PrewriteIterator.enableAckCheck(iterConf); - Key key = ColumnUtil.checkColumn(env, iterConf, row, col).getKey(); - // TODO could key be null? - long colType = key.getTimestamp() & ColumnConstants.PREFIX_MASK; + scanner.setRange(range); - if (colType == ColumnConstants.ACK_PREFIX) { + // TODO could use iterator that stops after 1st ACK. thought of using versioning iter but + // it scans to ACK + if (scanner.iterator().hasNext()) { + env.getSharedResources().getBatchWriter() + .writeMutationAsync(notification.newDelete(env)); return true; } } @@ -570,7 +585,7 @@ public boolean commitPrimaryColumn(CommitData cd, long commitTs) throws Accumulo // try to delete lock and add write for primary column IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class); PrewriteIterator.setSnaptime(iterConf, startTs); - boolean isTrigger = cd.prow.equals(triggerRow) && cd.pcol.equals(triggerColumn); + boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn()); Condition lockCheck = new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue( @@ -647,32 +662,36 @@ public boolean finishCommit(CommitData cd, long commitTs) throws TableNotFoundEx ArrayList mutations = new ArrayList<>(updates.size() + 1); for (Entry> rowUpdates : updates.entrySet()) { Flutation m = new Flutation(env, rowUpdates.getKey()); - boolean isTriggerRow = rowUpdates.getKey().equals(triggerRow); + boolean isTriggerRow = isTriggerRow(rowUpdates.getKey()); for (Entry colUpdates : rowUpdates.getValue().entrySet()) { - ColumnUtil.commitColumn(env, isTriggerRow && colUpdates.getKey().equals(triggerColumn), - false, colUpdates.getKey(), colUpdates.getValue() != null, - colUpdates.getValue() == DELETE, startTs, commitTs, observedColumns, m); + ColumnUtil.commitColumn(env, + isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false, + colUpdates.getKey(), colUpdates.getValue() != null, colUpdates.getValue() == DELETE, + startTs, commitTs, observedColumns, m); } mutations.add(m); } - if (weakRow != null) { - Flutation m = new Flutation(env, weakRow); - m.putDelete(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(weakColumn), - gv(weakColumn), startTs); - - mutations.add(m); - } - env.getSharedResources().getBatchWriter().writeMutations(mutations); // mark transaction as complete for garbage collection purposes + mutations.clear(); + Flutation m = new Flutation(env, cd.prow); m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY); + mutations.add(m); + + if (weakNotification != null) { + mutations.add(weakNotification.newDelete(env, startTs)); + } - env.getSharedResources().getBatchWriter().writeMutationAsync(m); + if (notification != null) { + mutations.add(notification.newDelete(env, startTs)); + } + + env.getSharedResources().getBatchWriter().writeMutationsAsync(mutations); return true; } @@ -718,7 +737,6 @@ public synchronized void commit() throws CommitException { // TODO write TX_DONE throw new CommitException("Commit failed"); } - } catch (CommitException e) { throw e; } catch (RuntimeException e) { @@ -733,12 +751,9 @@ public synchronized void commit() throws CommitException { } void deleteWeakRow() { - if (weakRow != null) { - Flutation m = new Flutation(env, weakRow); - m.putDelete(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(weakColumn), - gv(weakColumn), startTs); - - env.getSharedResources().getBatchWriter().writeMutation(m); + if (weakNotification != null) { + env.getSharedResources().getBatchWriter() + .writeMutation(weakNotification.newDelete(env, startTs)); } } diff --git a/modules/core/src/main/java/io/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/io/fluo/core/log/TracingTransaction.java index e36acfe98..95c38527b 100644 --- a/modules/core/src/main/java/io/fluo/core/log/TracingTransaction.java +++ b/modules/core/src/main/java/io/fluo/core/log/TracingTransaction.java @@ -27,6 +27,7 @@ import io.fluo.api.exceptions.AlreadySetException; import io.fluo.api.exceptions.CommitException; import io.fluo.api.iterator.RowIterator; +import io.fluo.core.impl.Notification; import io.fluo.core.impl.TransactionImpl; import io.fluo.core.impl.TxStats; import org.slf4j.Logger; @@ -42,32 +43,30 @@ public class TracingTransaction implements Transaction, Snapshot { private final TransactionImpl tx; private final long txid; - private Bytes triggerRow; - private Column triggerColumn; + private Notification notification; private Class clazz; private boolean committed = false; public TracingTransaction(TransactionImpl tx) { - this(tx, null, null, null); + this(tx, null, null); } public TracingTransaction(TransactionImpl tx, Class clazz) { - this(tx, null, null, clazz); + this(tx, null, clazz); } - public TracingTransaction(TransactionImpl tx, Bytes triggerRow, Column triggerColumn, - Class clazz) { + public TracingTransaction(TransactionImpl tx, Notification notification, Class clazz) { this.tx = tx; this.txid = tx.getStartTs(); - this.triggerRow = triggerRow; - this.triggerColumn = triggerColumn; + this.notification = notification; this.clazz = clazz; log.trace("txid: {} begin() thread: {}", txid, Thread.currentThread().getId()); - if (triggerRow != null) { - log.trace("txid: {} trigger: {} {}", txid, triggerRow, triggerColumn); + if (notification != null) { + log.trace("txid: {} trigger: {} {} {}", txid, notification.getRow(), + notification.getColumn(), notification.getTimestamp()); } if (clazz != null) { @@ -131,8 +130,9 @@ public void commit() throws CommitException { } catch (CommitException ce) { log.trace("txid: {} commit() -> UNSUCCESSFUL commitTs: {}", txid, tx.getStats().getCommitTs()); - if (!log.isTraceEnabled() && triggerRow != null) { - collisionLog.trace("txid: {} trigger: {} {}", txid, triggerRow, triggerColumn); + if (!log.isTraceEnabled() && notification != null) { + collisionLog.trace("txid: {} trigger: {} {} {}", txid, notification.getRow(), + notification.getColumn(), notification.getTimestamp()); } if (!log.isTraceEnabled() && clazz != null) { @@ -157,7 +157,7 @@ public void close() { // TODO log total # read, see fluo-426 summaryLog .trace( - "txid: {} thread : {} time: {} #ret: {} #set: {} #collisions: {} waitTime: {} %s committed: {} class: {}", + "txid: {} thread : {} time: {} #ret: {} #set: {} #collisions: {} waitTime: {} committed: {} class: {}", txid, Thread.currentThread().getId(), stats.getTime(), stats.getEntriesReturned(), stats.getEntriesSet(), stats.getCollisions(), stats.getLockWaitTime(), committed, className); diff --git a/modules/core/src/main/java/io/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/io/fluo/core/util/ColumnUtil.java index d9c5ff9e0..0151163ed 100644 --- a/modules/core/src/main/java/io/fluo/core/util/ColumnUtil.java +++ b/modules/core/src/main/java/io/fluo/core/util/ColumnUtil.java @@ -65,8 +65,6 @@ public static void commitColumn(Environment env, boolean isTrigger, boolean isPr if (isTrigger) { Flutation.put(env, m, col, ColumnConstants.ACK_PREFIX | startTs, TransactionImpl.EMPTY); - m.putDelete(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(col), gv(env, col), - startTs); } if (observedColumns.contains(col) && isWrite && !isDelete) { m.put(ColumnConstants.NOTIFY_CF.toArray(), ColumnUtil.concatCFCQ(col), gv(env, col), diff --git a/modules/core/src/main/java/io/fluo/core/worker/NotificationFinder.java b/modules/core/src/main/java/io/fluo/core/worker/NotificationFinder.java index c203a1da7..4cddbc9cd 100644 --- a/modules/core/src/main/java/io/fluo/core/worker/NotificationFinder.java +++ b/modules/core/src/main/java/io/fluo/core/worker/NotificationFinder.java @@ -14,9 +14,8 @@ package io.fluo.core.worker; -import io.fluo.api.data.Bytes; -import io.fluo.api.data.Column; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; public interface NotificationFinder { public void init(Environment env, NotificationProcessor processor); @@ -25,5 +24,5 @@ public interface NotificationFinder { public void stop(); - public void failedToProcess(Bytes row, Column col, TxResult status); + public void failedToProcess(Notification notification, TxResult status); } diff --git a/modules/core/src/main/java/io/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/io/fluo/core/worker/NotificationProcessor.java index 407fd31ef..543073e0a 100644 --- a/modules/core/src/main/java/io/fluo/core/worker/NotificationProcessor.java +++ b/modules/core/src/main/java/io/fluo/core/worker/NotificationProcessor.java @@ -24,10 +24,10 @@ import java.util.concurrent.TimeUnit; import com.codahale.metrics.Gauge; -import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; import io.fluo.api.data.RowColumn; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,12 +113,10 @@ public synchronized void clear() { } - public boolean addNotification(NotificationFinder notificationFinder, final Bytes row, - final Column col) { + public boolean addNotification(NotificationFinder notificationFinder, + final Notification notification) { - final RowColumn rowCol = new RowColumn(row, col); - - final WorkTask workTask = new WorkTask(notificationFinder, env, row, col, observers); + final WorkTask workTask = new WorkTask(notificationFinder, env, notification, observers); Runnable eht = new Runnable() { @@ -127,9 +125,11 @@ public void run() { try { workTask.run(); } catch (Exception e) { - log.error("Failed to process work " + row + " " + col, e); + log.error( + "Failed to process work " + notification.getRow() + " " + notification.getColumn() + + " " + notification.getTimestamp(), e); } finally { - tracker.remove(rowCol); + tracker.remove(notification); workFinished(); } } @@ -137,7 +137,7 @@ public void run() { FutureTask ft = new FutureTask(eht, null); - if (!tracker.add(rowCol, ft)) { + if (!tracker.add(notification, ft)) { return false; } @@ -146,7 +146,7 @@ public void run() { try { executor.execute(eht); } catch (RejectedExecutionException rje) { - tracker.remove(rowCol); + tracker.remove(notification); workFinished(); throw rje; } diff --git a/modules/core/src/main/java/io/fluo/core/worker/WorkTask.java b/modules/core/src/main/java/io/fluo/core/worker/WorkTask.java index 1e491d58a..c803c3b29 100644 --- a/modules/core/src/main/java/io/fluo/core/worker/WorkTask.java +++ b/modules/core/src/main/java/io/fluo/core/worker/WorkTask.java @@ -15,12 +15,11 @@ package io.fluo.core.worker; import io.fluo.api.client.Transaction; -import io.fluo.api.data.Bytes; -import io.fluo.api.data.Column; import io.fluo.api.exceptions.CommitException; import io.fluo.api.observer.Observer; import io.fluo.core.exceptions.AlreadyAcknowledgedException; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; import io.fluo.core.impl.TransactionImpl; import io.fluo.core.log.TracingTransaction; import org.slf4j.Logger; @@ -31,42 +30,39 @@ public class WorkTask implements Runnable { private static Logger log = LoggerFactory.getLogger(WorkTask.class); private Environment env; - private Bytes row; - private Column col; + private Notification notification; private Observers observers; private NotificationFinder notificationFinder; - WorkTask(NotificationFinder notificationFinder, Environment env, Bytes row, Column col, + WorkTask(NotificationFinder notificationFinder, Environment env, Notification notification, Observers observers) { this.notificationFinder = notificationFinder; this.env = env; - this.row = row; - this.col = col; + this.notification = notification; this.observers = observers; } @Override public void run() { - Observer observer = observers.getObserver(col); + Observer observer = observers.getObserver(notification.getColumn()); try { while (true) { TransactionImpl txi = null; Transaction tx = txi; TxResult status = TxResult.UNKNOWN; try { - txi = new TransactionImpl(env, row, col); + txi = new TransactionImpl(env, notification); tx = txi; - if (TracingTransaction.isTracingEnabled()) { - tx = new TracingTransaction(txi, row, col, observer.getClass()); - } + if (TracingTransaction.isTracingEnabled()) + tx = new TracingTransaction(txi, notification, observer.getClass()); - observer.process(tx, row, col); + observer.process(tx, notification.getRow(), notification.getColumn()); tx.commit(); status = TxResult.COMMITTED; break; } catch (AlreadyAcknowledgedException aae) { status = TxResult.AACKED; - notificationFinder.failedToProcess(row, col, status); + notificationFinder.failedToProcess(notification, status); break; } catch (CommitException e) { // retry @@ -74,7 +70,7 @@ public void run() { } catch (Exception e) { status = TxResult.ERROR; log.warn("Failed to execute observer " + observer.getClass().getSimpleName(), e); - notificationFinder.failedToProcess(row, col, status); + notificationFinder.failedToProcess(notification, status); break; } finally { if (txi != null) { diff --git a/modules/core/src/main/java/io/fluo/core/worker/finder/hash/HashNotificationFinder.java b/modules/core/src/main/java/io/fluo/core/worker/finder/hash/HashNotificationFinder.java index 2dc98b457..29f683f93 100644 --- a/modules/core/src/main/java/io/fluo/core/worker/finder/hash/HashNotificationFinder.java +++ b/modules/core/src/main/java/io/fluo/core/worker/finder/hash/HashNotificationFinder.java @@ -23,9 +23,8 @@ import com.google.common.base.Preconditions; import io.fluo.accumulo.util.ZookeeperPath; -import io.fluo.api.data.Bytes; -import io.fluo.api.data.Column; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; import io.fluo.core.util.UtilWaitThread; import io.fluo.core.worker.NotificationFinder; import io.fluo.core.worker.NotificationProcessor; @@ -181,7 +180,7 @@ public void stop() { } @Override - public void failedToProcess(Bytes row, Column col, TxResult status) {} + public void failedToProcess(Notification notification, TxResult status) {} NotificationProcessor getWorkerQueue() { return notificationProcessor; diff --git a/modules/core/src/main/java/io/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/io/fluo/core/worker/finder/hash/ScanTask.java index 6a590f4e3..a12635cba 100644 --- a/modules/core/src/main/java/io/fluo/core/worker/finder/hash/ScanTask.java +++ b/modules/core/src/main/java/io/fluo/core/worker/finder/hash/ScanTask.java @@ -28,6 +28,7 @@ import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; import io.fluo.core.util.ByteUtil; import io.fluo.core.util.UtilWaitThread; import io.fluo.core.worker.TabletInfoCache; @@ -81,6 +82,7 @@ public TabletData get() { max_sleep_time = env.getConfiguration().getInt(MAX_SLEEP_TIME_PROP, 5 * 60 * 1000); } + @Override public void run() { int qSize = hwf.getWorkerQueue().size(); @@ -109,6 +111,9 @@ public void run() { int count = 0; ModulusParams modParams = hwf.getModulusParams(); if (modParams != null) { + // notifications could have been asynchronously queued for deletion. Let that happen + // 1st before scanning + env.getSharedResources().getBatchWriter().waitForAsyncFlush(); count = scan(modParams, tabletInfo.getRange()); tabletsScanned++; } @@ -180,7 +185,7 @@ private int scan(ModulusParams lmp, Range range) throws TableNotFoundException { for (Entry entry : scanner) { if (lmp.update != hwf.getModulusParams().update) { - throw new ModParamsChangedException(); + throw new HashNotificationFinder.ModParamsChangedException(); } if (stopped.get()) { @@ -190,7 +195,8 @@ private int scan(ModulusParams lmp, Range range) throws TableNotFoundException { Bytes row = ByteUtil.toBytes(entry.getKey().getRowData()); List ca = Bytes.split(Bytes.of(entry.getKey().getColumnQualifierData().toArray())); Column col = new Column(ca.get(0), ca.get(1)); - if (hwf.getWorkerQueue().addNotification(hwf, row, col)) { + Notification notification = new Notification(row, col, entry.getKey().getTimestamp()); + if (hwf.getWorkerQueue().addNotification(hwf, notification)) { count++; } } diff --git a/modules/core/src/test/java/io/fluo/core/TestTransaction.java b/modules/core/src/test/java/io/fluo/core/TestTransaction.java index c906d5bba..ee6ac0ae7 100644 --- a/modules/core/src/test/java/io/fluo/core/TestTransaction.java +++ b/modules/core/src/test/java/io/fluo/core/TestTransaction.java @@ -14,6 +14,12 @@ package io.fluo.core; +import java.util.Map.Entry; + +import io.fluo.api.data.Span; + +import io.fluo.core.util.SpanUtil; +import io.fluo.accumulo.util.ColumnConstants; import io.fluo.api.client.TransactionBase; import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; @@ -23,19 +29,52 @@ import io.fluo.api.types.TypedTransactionBase; import io.fluo.core.exceptions.AlreadyAcknowledgedException; import io.fluo.core.impl.Environment; +import io.fluo.core.impl.Notification; import io.fluo.core.impl.TransactionImpl; import io.fluo.core.impl.TransactionImpl.CommitData; import io.fluo.core.impl.TransactorNode; import io.fluo.core.impl.TxStats; +import io.fluo.core.util.ByteUtil; +import io.fluo.core.util.ColumnUtil; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.iterators.user.VersioningIterator; +import org.apache.hadoop.io.Text; public class TestTransaction extends TypedTransactionBase implements TransactionBase { private TransactionImpl tx; + public static long getNotificationTS(Environment env, String row, Column col) { + Scanner scanner; + try { + scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations()); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + IteratorSetting iterCfg = new IteratorSetting(20, "ver", VersioningIterator.class); + scanner.addScanIterator(iterCfg); + + Text cv = ByteUtil.toText(col.getVisibility()); + + scanner.setRange(SpanUtil.toRange(Span.prefix(row))); + scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF), + new Text(ColumnUtil.concatCFCQ(col))); + + for (Entry entry : scanner) { + if (entry.getKey().getColumnVisibility().equals(cv)) { + return entry.getKey().getTimestamp(); + } + } + + throw new RuntimeException("No notification found"); + } + @SuppressWarnings("resource") public TestTransaction(Environment env, TransactorNode transactor) { this(new TransactionImpl(env).setTransactor(transactor), new StringEncoder()); @@ -50,12 +89,13 @@ public TestTransaction(TransactionImpl transactionImpl, StringEncoder stringEnco this.tx = transactionImpl; } - public TestTransaction(Environment env, Bytes trow, Column tcol) { - this(new TransactionImpl(env, trow, tcol), new StringEncoder()); + public TestTransaction(Environment env, String trow, Column tcol) { + this(env, trow, tcol, getNotificationTS(env, trow, tcol)); } - public TestTransaction(Environment env, String trow, Column tcol) { - this(new TransactionImpl(env, Bytes.of(trow), tcol), new StringEncoder()); + public TestTransaction(Environment env, String trow, Column tcol, long notificationTS) { + this(new TransactionImpl(env, new Notification(Bytes.of(trow), tcol, notificationTS)), + new StringEncoder()); } /** diff --git a/modules/core/src/test/java/io/fluo/core/impl/FailureIT.java b/modules/core/src/test/java/io/fluo/core/impl/FailureIT.java index 847b9a1ca..7b137b32f 100644 --- a/modules/core/src/test/java/io/fluo/core/impl/FailureIT.java +++ b/modules/core/src/test/java/io/fluo/core/impl/FailureIT.java @@ -417,7 +417,7 @@ public void testAcks() throws Exception { Assert.assertEquals("url0000", tx6.get().row("idx:def").fam("doc").qual("url").toString()); iter = scanner.iterator(); - Assert.assertFalse(iter.hasNext()); + Assert.assertTrue(iter.hasNext()); // TODO is tx4 start before tx5, then this test will not work because AlreadyAck is not thrown // for overlapping.. CommitException is thrown @@ -432,6 +432,11 @@ public void testAcks() throws Exception { Assert.fail(); } catch (AlreadyAcknowledgedException aae) { } + + // commit above should schedule async delete of notification + env.getSharedResources().getBatchWriter().waitForAsyncFlush(); + iter = scanner.iterator(); + Assert.assertFalse(iter.hasNext()); } @Test diff --git a/modules/core/src/test/java/io/fluo/core/impl/FluoIT.java b/modules/core/src/test/java/io/fluo/core/impl/FluoIT.java index 48ef925af..69af84c44 100644 --- a/modules/core/src/test/java/io/fluo/core/impl/FluoIT.java +++ b/modules/core/src/test/java/io/fluo/core/impl/FluoIT.java @@ -14,8 +14,13 @@ package io.fluo.core.impl; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.types.TypedTransactionBase; +import io.fluo.api.types.TypedObserver; import io.fluo.api.client.FluoAdmin; import io.fluo.api.client.FluoClient; import io.fluo.api.client.FluoFactory; @@ -42,6 +47,25 @@ public class FluoIT extends ITBaseImpl { static TypeLayer typeLayer = new TypeLayer(new StringEncoder()); + public static class BalanceObserver extends TypedObserver { + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(typeLayer.bc().fam("account").qual("balance").vis(), + NotificationType.STRONG); + } + + @Override + public void process(TypedTransactionBase tx, Bytes row, Column col) { + Assert.fail(); + } + } + + @Override + protected List getObservers() { + return Arrays.asList(new ObserverConfiguration(BalanceObserver.class.getName())); + }; + @Test public void testFluoFactory() throws Exception { try (FluoAdmin admin = FluoFactory.newAdmin(config)) { @@ -111,10 +135,17 @@ public void testOverlap1() throws Exception { private void assertCommitFails(TestTransaction tx) { try { - tx.commit(); + tx.done(); Assert.fail(); } catch (CommitException ce) { + } + } + private void assertAAck(TestTransaction tx) { + try { + tx.done(); + Assert.fail(); + } catch (AlreadyAcknowledgedException ce) { } } @@ -211,7 +242,7 @@ public void testAck() throws Exception { tx2.mutate().row("bob").col(balanceCol).set(11); tx1.done(); - assertCommitFails(tx2); + assertAAck(tx2); TestTransaction tx3 = new TestTransaction(env); @@ -232,9 +263,11 @@ public void testAck() throws Exception { tx5.get().row("joe").col(balanceCol); tx5.mutate().row("bob").col(balanceCol).set(11); + TestTransaction tx7 = new TestTransaction(env, "joe", balanceCol); + // make the 2nd transaction to start commit 1st tx5.done(); - assertCommitFails(tx4); + assertAAck(tx4); TestTransaction tx6 = new TestTransaction(env); @@ -242,16 +275,11 @@ public void testAck() throws Exception { Assert.assertEquals(21, tx6.get().row("joe").col(balanceCol).toInteger(0)); Assert.assertEquals(61, tx6.get().row("jill").col(balanceCol).toInteger(0)); - TestTransaction tx7 = new TestTransaction(env, "joe", balanceCol); tx7.get().row("joe").col(balanceCol); tx7.mutate().row("bob").col(balanceCol).set(15); tx7.mutate().row("jill").col(balanceCol).set(60); - try { - tx7.commit(); - Assert.fail(); - } catch (AlreadyAcknowledgedException aae) { - } + assertAAck(tx7); TestTransaction tx8 = new TestTransaction(env); @@ -275,6 +303,7 @@ public void testAck2() throws Exception { TestTransaction tx1 = new TestTransaction(env, "bob", balanceCol); TestTransaction tx2 = new TestTransaction(env, "bob", balanceCol); + TestTransaction tx3 = new TestTransaction(env, "bob", balanceCol); tx1.get().row("bob").col(balanceCol).toInteger(); tx2.get().row("bob").col(balanceCol).toInteger(); @@ -290,24 +319,55 @@ public void testAck2() throws Exception { CommitData cd = tx1.createCommitData(); Assert.assertTrue(tx1.preCommit(cd)); - try { - tx2.commit(); - } catch (CommitException ce) { - - } + assertCommitFails(tx2); long commitTs = env.getSharedResources().getOracleClient().getTimestamp(); Assert.assertTrue(tx1.commitPrimaryColumn(cd, commitTs)); tx1.finishCommit(cd, commitTs); - TestTransaction tx3 = new TestTransaction(env, "bob", balanceCol); tx3.mutate().row("bob").col(addrCol).set("2 loop pl"); - try { - tx3.commit(); - } catch (AlreadyAcknowledgedException e) { + assertAAck(tx3); + } - } + @Test + public void testAck3() throws Exception { + TestTransaction tx = new TestTransaction(env); + + Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis(); + + tx.mutate().row("bob").col(balanceCol).set(10); + tx.mutate().row("joe").col(balanceCol).set(20); + tx.mutate().row("jill").col(balanceCol).set(60); + + tx.done(); + long notTS1 = TestTransaction.getNotificationTS(env, "bob", balanceCol); + + // this transaction should create a second notification + TestTransaction tx1 = new TestTransaction(env); + tx1.mutate().row("bob").col(balanceCol).set(11); + tx1.done(); + + long notTS2 = TestTransaction.getNotificationTS(env, "bob", balanceCol); + + Assert.assertTrue(notTS1 < notTS2); + + // even though there were two notifications and TX is using 1st notification TS... only 1st TX + // should execute + // google paper calls this message collapsing + + TestTransaction tx3 = new TestTransaction(env, "bob", balanceCol, notTS1); + + TestTransaction tx2 = new TestTransaction(env, "bob", balanceCol, notTS1); + Assert.assertEquals(11, tx2.get().row("bob").col(balanceCol).toInteger(0)); + tx2.done(); + + Assert.assertEquals(11, tx3.get().row("bob").col(balanceCol).toInteger(0)); + assertAAck(tx3); + + TestTransaction tx4 = new TestTransaction(env, "bob", balanceCol, notTS2); + Assert.assertEquals(11, tx4.get().row("bob").col(balanceCol).toInteger(0)); + assertAAck(tx4); } @Test @@ -335,7 +395,7 @@ public void testWriteObserved() throws Exception { tx1.mutate().row("jill").col(balanceCol).set(61); tx1.done(); - assertCommitFails(tx2); + assertAAck(tx2); TestTransaction tx3 = new TestTransaction(env); diff --git a/modules/core/src/test/java/io/fluo/core/impl/WeakNotificationOverlapIT.java b/modules/core/src/test/java/io/fluo/core/impl/WeakNotificationOverlapIT.java index a283eafeb..36ff6da70 100644 --- a/modules/core/src/test/java/io/fluo/core/impl/WeakNotificationOverlapIT.java +++ b/modules/core/src/test/java/io/fluo/core/impl/WeakNotificationOverlapIT.java @@ -201,6 +201,9 @@ public void testOverlap2() throws Exception { } private int countNotifications() throws Exception { + // deletes of notifications are queued async at end of transaction + env.getSharedResources().getBatchWriter().waitForAsyncFlush(); + Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY); scanner.fetchColumnFamily(ByteUtil.toText(ColumnConstants.NOTIFY_CF)); diff --git a/modules/core/src/test/java/io/fluo/core/log/LogIT.java b/modules/core/src/test/java/io/fluo/core/log/LogIT.java index 3aa39fdb7..23149e5ba 100644 --- a/modules/core/src/test/java/io/fluo/core/log/LogIT.java +++ b/modules/core/src/test/java/io/fluo/core/log/LogIT.java @@ -119,7 +119,7 @@ public void testCollisionLogging() throws Exception { pattern += ".*txid: \\1 collisions: \\Q{r1=[a b ]}\\E.*"; Assert.assertTrue(logMsgs.matches(pattern)); - pattern = ".*txid: (\\d+) trigger: \\d+ stat count"; + pattern = ".*txid: (\\d+) trigger: \\d+ stat count \\d+"; pattern += ".*txid: \\1 class: io.fluo.core.log.LogIT\\$TestObserver"; pattern += ".*txid: \\1 collisions: \\Q{all=[stat count ]}\\E.*"; Assert.assertTrue(logMsgs.matches(pattern)); @@ -158,19 +158,19 @@ public void testSummaryLogging() throws Exception { Assert .assertTrue(logMsgs - .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 0 #set: 1 #collisions: 0 waitTime: \\d+ %s committed: true class: TriggerLoader.*")); + .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 0 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TriggerLoader.*")); Assert .assertTrue(logMsgs - .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 1 #set: 1 #collisions: 0 waitTime: \\d+ %s committed: true class: SimpleLoader.*")); + .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 1 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: SimpleLoader.*")); Assert .assertTrue(logMsgs - .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 1 #set: 1 #collisions: 1 waitTime: \\d+ %s committed: false class: SimpleLoader.*")); + .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 1 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: SimpleLoader.*")); Assert .assertTrue(logMsgs - .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 2 #set: 2 #collisions: 0 waitTime: \\d+ %s committed: true class: TestObserver.*")); + .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 2 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TestObserver.*")); Assert .assertTrue(logMsgs - .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 2 #set: 2 #collisions: 1 waitTime: \\d+ %s committed: false class: TestObserver.*")); + .matches(".*txid: \\d+ thread : \\d+ time: \\d+ #ret: 2 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: TestObserver.*")); } @Test @@ -209,7 +209,6 @@ public void testAllLogging() throws Exception { } String logMsgs = writer.toString(); - System.out.println(logMsgs); logMsgs = logMsgs.replace('\n', ' '); String pattern; @@ -234,7 +233,7 @@ public void testAllLogging() throws Exception { // observer should cause this pattern in logs pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+"; - pattern += ".*txid: \\1 trigger: 0 stat count"; + pattern += ".*txid: \\1 trigger: 0 stat count \\d+"; pattern += ".*txid: \\1 class: io.fluo.core.log.LogIT\\$TestObserver"; pattern += ".*txid: \\1 get\\(0, stat count \\) -> 1"; pattern += ".*txid: \\1 get\\(all, stat count \\) -> null";