diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f03ffe628ea0..5f89d3898a13 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -96,7 +96,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; -public class ColumnFamilyStore implements ColumnFamilyStoreMBean +public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Comparable { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); @@ -2629,4 +2629,13 @@ public boolean getNeverPurgeTombstones() { return neverPurgeTombstones; } + + @Override + public int compareTo(ColumnFamilyStore other) + { + return ComparisonChain.start() + .compare(this.keyspace.getName(), other.keyspace.getName()) + .compare(this.name, other.name) + .result(); + } } diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractIteratingTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractIteratingTable.java new file mode 100644 index 000000000000..bff77b52ce33 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/AbstractIteratingTable.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import com.google.common.base.Preconditions; +import com.google.common.collect.AbstractIterator; + +/** + * An abstract virtual table that will iteratively build its rows. This is + * for when the data set is too large to fit in memory. It requires that the partition + * keys are provided in the order of the partitioner of the table metadata. + */ +public abstract class AbstractIteratingTable implements VirtualTable +{ + final protected TableMetadata metadata; + + protected AbstractIteratingTable(TableMetadata metadata) + { + this.metadata = metadata; + } + + /** + * @param partitionKey + * @return boolean if the partition key would exist in this table + */ + protected abstract boolean hasKey(DecoratedKey partitionKey); + + /** + * Returns an in order iterator (metadata.partitioner) of decorated partition keys for this table. A DataRange is + * provided and all the keys for that range must be provided, but it is not required that the keys fall in this + * range. If your partition key set is small enough it is Ok to provide entire set. + * + * @param dataRange + * optional range of keys to return + * @return Iterator of keys in token order + */ + protected abstract Iterator getPartitionKeys(DataRange dataRange); + + /** + * @param isReversed if orderby reverse requested + * @param key partition key + * @param columns queried columns + * @return iterator of rows in order for a given partition key + */ + protected abstract Iterator getRows(boolean isReversed, DecoratedKey key, RegularAndStaticColumns columns); + + @Override + // eclipse warnings doesnt like returning closeable iterators when created anonymously + @SuppressWarnings("resource") + public UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringFilter, + ColumnFilter columnFilter) + { + if (!hasKey(partitionKey)) + { + return EmptyIterators.unfilteredPartition(metadata); + } + Iterator iter = getRows(clusteringFilter.isReversed(), partitionKey, columnFilter.queriedColumns()); + if (iter == null || !iter.hasNext()) + { + return EmptyIterators.unfilteredPartition(metadata); + } + UnfilteredRowIterator partition = new AbstractUnfilteredRowIterator(metadata, + partitionKey, + DeletionTime.LIVE, + columnFilter.queriedColumns(), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS) + { + protected Unfiltered computeNext() + { + while (iter.hasNext()) + { + Row row = iter.next(); + if (clusteringFilter.selects(row.clustering())) + return row; + } + return endOfData(); + } + }; + return new SingletonUnfilteredPartitionIterator(partition); + } + + @Override + public UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter) + { + Iterator iter = getPartitionKeys(dataRange); + return partitionIterator(new AbstractIterator() + { + protected UnfilteredRowIterator computeNext() + { + Token last = metadata.partitioner.getMinimumToken(); + while (iter.hasNext()) + { + DecoratedKey key = iter.next(); + Preconditions.checkArgument(last.compareTo(key.getToken()) <= 0, "Keys out of order"); + last = key.getToken(); + if (dataRange.contains(key)) + { + return makePartition(key, dataRange, columnFilter); + } + } + return endOfData(); + } + }); + } + + private UnfilteredRowIterator makePartition(DecoratedKey key, DataRange dataRange, ColumnFilter columnFilter) + { + return new AbstractUnfilteredRowIterator(metadata, + key, + DeletionTime.LIVE, + columnFilter.queriedColumns(), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS) + { + Iterator iter = null; + ClusteringIndexFilter clusteringFilter = null;; + protected Unfiltered computeNext() + { + if (iter == null) + { + clusteringFilter = dataRange.clusteringIndexFilter(key); + iter = getRows(clusteringFilter.isReversed(), key, columnFilter.queriedColumns()); + } + + while (iter.hasNext()) + { + Row row = iter.next(); + if (clusteringFilter.selects(row.clustering())) + return row; + } + return endOfData(); + } + }; + } + + private UnfilteredPartitionIterator partitionIterator(Iterator partitions) + { + return new AbstractUnfilteredPartitionIterator() + { + public UnfilteredRowIterator next() + { + return partitions.next(); + } + + public boolean hasNext() + { + return partitions.hasNext(); + } + + public TableMetadata metadata() + { + return metadata; + } + }; + } + + public TableMetadata metadata() + { + return this.metadata; + } + + public void apply(PartitionUpdate update) + { + throw new InvalidRequestException("Modification is not supported by table " + metadata); + } + + protected RowBuilder row(Object... clusteringValues) + { + if (clusteringValues.length == 0) + return new RowBuilder(Clustering.EMPTY); + + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + return new RowBuilder(Clustering.make(clusteringByteBuffers)); + } + + protected class RowBuilder + { + private final Clustering clustering; + + private final Map values = new HashMap<>(); + + private RowBuilder(Clustering clustering) + { + this.clustering = clustering; + } + + public RowBuilder add(String columnName, Object value) + { + ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName)); + if (null == column || !column.isRegular()) + throw new IllegalArgumentException(); + values.put(column, value); + return this; + } + + public Row build(RegularAndStaticColumns columns) + { + int now = FBUtilities.nowInSeconds(); + Row.Builder builder = BTreeRow.unsortedBuilder((int) TimeUnit.MILLISECONDS.toSeconds(now)); + builder.newRow(clustering); + + columns.forEach(c -> + { + Object value = values.get(c); + if (null != value) + builder.addCell(BufferCell.live(c, now, decompose(c.type, value))); + }); + + return builder.build(); + } + } + + @SuppressWarnings("unchecked") + private static ByteBuffer decompose(AbstractType type, T value) + { + return ((AbstractType) type).decompose(value); + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SSTableDumpTable.java b/src/java/org/apache/cassandra/db/virtual/SSTableDumpTable.java new file mode 100644 index 000000000000..057dbedf7ccf --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/SSTableDumpTable.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; + +final class SSTableDumpTable extends AbstractIteratingTable +{ + private static final String KEYSPACE_NAME = "keyspace_name"; + private final static String TABLE_NAME = "table_name"; + private static final String ID = "id"; + private static final String PK = "partition_key"; + private static final String CK = "clustering"; + private static final IPartitioner partitioner = new LocalPartitioner(CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance)); + + private static final String RAW = "raw"; + + SSTableDumpTable(String keyspace) + { + super(TableMetadata.builder(keyspace, "sstable_dump") + .comment("current sstables and debug dump of their data") + .kind(TableMetadata.Kind.VIRTUAL) + .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance) + .addPartitionKeyColumn(TABLE_NAME, UTF8Type.instance) + .partitioner(partitioner) + .addClusteringColumn(ID, Int32Type.instance) + .addClusteringColumn(PK, UTF8Type.instance) + .addClusteringColumn(CK, UTF8Type.instance) + .addRegularColumn(RAW, UTF8Type.instance) + .build()); + } + + private String getKeyspace(DecoratedKey key) + { + CompositeType type = (CompositeType) metadata.partitionKeyType; + ByteBuffer[] bb = type.split(key.getKey()); + + return type.types.get(0).getString(bb[0]); + } + + private String getTable(DecoratedKey key) + { + CompositeType type = (CompositeType) metadata.partitionKeyType; + ByteBuffer[] bb = type.split(key.getKey()); + return type.types.get(1).getString(bb[1]); + } + + private DecoratedKey makeKey(String keyspace, String table) + { + ByteBuffer partitionKey = ((CompositeType) metadata.partitionKeyType).decompose(keyspace, table); + return metadata.partitioner.decorateKey(partitionKey); + } + + protected boolean hasKey(DecoratedKey key) + { + return Schema.instance.getTableMetadata(getKeyspace(key), getTable(key)) != null; + } + + public Iterator getPartitionKeys(DataRange range) + { + Iterator cfs = Ordering.natural().sortedCopy(ColumnFamilyStore.all()).iterator(); + return new AbstractIterator() + { + protected DecoratedKey computeNext() + { + if (!cfs.hasNext()) + return endOfData(); + ColumnFamilyStore next = cfs.next(); + return makeKey(next.keyspace.getName(), next.name); + } + }; + } + + protected Iterator getRows(boolean isReversed, DecoratedKey key, RegularAndStaticColumns columns) + { + + ColumnFamilyStore cfs = Schema.instance.getKeyspaceInstance(getKeyspace(key)) + .getColumnFamilyStore(getTable(key)); + List generations = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL)); + Collections.sort(generations, Comparator.comparingInt(s -> s.descriptor.generation)); + if (isReversed) + Collections.reverse(generations); + Iterator iter = generations.iterator(); + return new AbstractIterator() + { + SSTableReader current = null; + ISSTableScanner scanner = null; + UnfilteredRowIterator partition = null; + boolean checkedStatic = false; + + private String pk() + { + Preconditions.checkNotNull(current, "Current sstable cannot be null"); + Preconditions.checkNotNull(partition, "Partition cannot be null"); + + return current.metadata().partitionKeyType.getString(partition.partitionKey().getKey()); + } + + protected Row computeNext() + { + // return any static rows for partition first + if (!checkedStatic && partition != null && !partition.staticRow().isEmpty()) + { + checkedStatic = true; + Unfiltered row = partition.staticRow(); + return row(current.descriptor.generation, pk(), "") + .add(RAW, row.toString(current.metadata(), false, true)) + .build(columns); + } + + // return any rows left in the partition + if (partition != null && partition.hasNext()) + { + Unfiltered row = partition.next(); + String ck = row.clustering().toString(current.metadata()); + return row(current.descriptor.generation, pk(), ck) + .add(RAW, row.toString(current.metadata(), false, true)) + .build(columns); + } + + // find next partition + if (scanner != null && scanner.hasNext()) + { + partition = scanner.next(); + checkedStatic = false; + + // if its a partition level deletion return that first + if (!partition.partitionLevelDeletion().isLive()) + { + String raw = partition.partitionLevelDeletion().toString(); + return row(current.descriptor.generation, pk(), "") + .add(RAW, raw) + .build(columns); + } + return computeNext(); + } + + // find next sstable + if (iter.hasNext()) + { + current = iter.next(); + scanner = current.getScanner(); + return computeNext(); + } + return endOfData(); + } + }; + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SSTablesTable.java b/src/java/org/apache/cassandra/db/virtual/SSTablesTable.java new file mode 100644 index 000000000000..6fd6961efe6c --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/SSTablesTable.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.marshal.BooleanType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; + +final class SSTablesTable extends AbstractIteratingTable +{ + private static final IPartitioner partitioner = new LocalPartitioner(CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance)); + private static final String KEYSPACE_NAME = "keyspace_name"; + private final static String TABLE_NAME = "table_name"; + private static final String ID = "id"; + + private static final String ON_DISK_SIZE = "on_disk_size"; + private static final String IS_REPAIRED = "is_repaired"; + private static final String ESTIMATED_PARTITIONS = "estimated_partitions"; + private static final String MAX_TTL = "max_ttl"; + private static final String MIN_TTL = "min_ttl"; + private static final String READS = "reads"; + private static final String MAX_LOCAL_DELETION_TIME = "max_local_deletion_time"; + private static final String MIN_LOCAL_DELETION_TIME = "min_local_deletion_time"; + private static final String MAX_TIMESTAMP = "max_timestamp"; + private static final String MIN_TIMESTAMP = "min_timestamp"; + private static final String MAX_PARTITION_SIZE = "max_partition_size"; + private static final String LEVEL = "level"; + private static final String LAST_TOKEN = "last_token"; + private static final String FIRST_TOKEN = "first_token"; + + SSTablesTable(String keyspace) + { + super(TableMetadata.builder(keyspace, "sstables") + .comment("current sstables with some metadata") + .kind(TableMetadata.Kind.VIRTUAL) + .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance) + .addPartitionKeyColumn(TABLE_NAME, UTF8Type.instance) + .partitioner(partitioner) + .addClusteringColumn(ID, Int32Type.instance) + .addRegularColumn(ON_DISK_SIZE, LongType.instance) + .addRegularColumn(IS_REPAIRED, BooleanType.instance) + .addRegularColumn(ESTIMATED_PARTITIONS, LongType.instance) + .addRegularColumn(MAX_TTL, Int32Type.instance) + .addRegularColumn(MIN_TTL, Int32Type.instance) + .addRegularColumn(LEVEL, Int32Type.instance) + .addRegularColumn(READS, LongType.instance) + .addRegularColumn(MAX_PARTITION_SIZE, LongType.instance) + .addRegularColumn(MAX_LOCAL_DELETION_TIME, Int32Type.instance) + .addRegularColumn(MIN_LOCAL_DELETION_TIME, Int32Type.instance) + .addRegularColumn(MAX_TIMESTAMP, LongType.instance) + .addRegularColumn(MIN_TIMESTAMP, LongType.instance) + .addRegularColumn(FIRST_TOKEN, UTF8Type.instance) + .addRegularColumn(LAST_TOKEN, UTF8Type.instance) + .build()); + } + + private String getKeyspace(DecoratedKey key) + { + CompositeType type = (CompositeType) metadata.partitionKeyType; + ByteBuffer[] bb = type.split(key.getKey()); + + return type.types.get(0).getString(bb[0]); + } + + private String getTable(DecoratedKey key) + { + CompositeType type = (CompositeType) metadata.partitionKeyType; + ByteBuffer[] bb = type.split(key.getKey()); + return type.types.get(1).getString(bb[1]); + } + + private DecoratedKey makeKey(String keyspace, String table) + { + ByteBuffer partitionKey = ((CompositeType) metadata.partitionKeyType).decompose(keyspace, table); + return metadata.partitioner.decorateKey(partitionKey); + } + + protected boolean hasKey(DecoratedKey key) + { + return Schema.instance.getTableMetadata(getKeyspace(key), getTable(key)) != null; + } + + public Iterator getPartitionKeys(DataRange range) + { + Iterator cfs = Ordering.natural().sortedCopy(ColumnFamilyStore.all()).iterator(); + return new AbstractIterator() + { + protected DecoratedKey computeNext() + { + if (!cfs.hasNext()) + return endOfData(); + ColumnFamilyStore next = cfs.next(); + return makeKey(next.keyspace.getName(), next.name); + } + }; + } + + protected Iterator getRows(boolean isReversed, DecoratedKey key, RegularAndStaticColumns columns) + { + ColumnFamilyStore cfs = Schema.instance.getKeyspaceInstance(getKeyspace(key)) + .getColumnFamilyStore(getTable(key)); + List sorted = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL)); + Collections.sort(sorted, Comparator.comparingInt(s -> s.descriptor.generation)); + if (isReversed) + Collections.reverse(sorted); + return Iterators.transform(sorted.iterator(), sstable -> + row(sstable.descriptor.generation) + .add(FIRST_TOKEN, sstable.first.getToken().toString()) + .add(LAST_TOKEN, sstable.last.getToken().toString()) + .add(MIN_TIMESTAMP, sstable.getMinTimestamp()) + .add(MAX_TIMESTAMP, sstable.getMaxTimestamp()) + .add(MIN_LOCAL_DELETION_TIME, sstable.getMinLocalDeletionTime()) + .add(MAX_LOCAL_DELETION_TIME, sstable.getMaxLocalDeletionTime()) + .add(READS, sstable.getReadMeter() == null ? 0 : sstable.getReadMeter().count()) + .add(MIN_TTL, sstable.getMinTTL()) + .add(MAX_TTL, sstable.getMaxTTL()) + .add(ESTIMATED_PARTITIONS, sstable.estimatedKeys()) + .add(IS_REPAIRED, sstable.isRepaired()) + .add(ON_DISK_SIZE, sstable.onDiskLength()) + .add(MAX_PARTITION_SIZE, sstable.getEstimatedPartitionSize().max()) + .add(LEVEL, sstable.getSSTableLevel()) + .build(columns)); + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 53c01a4bf59d..389f04d692cb 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -27,6 +27,6 @@ public final class SystemViewsKeyspace extends VirtualKeyspace private SystemViewsKeyspace() { - super(NAME, ImmutableList.of(new SSTableTasksTable(NAME), new ClientsTable(NAME))); + super(NAME, ImmutableList.of(new SSTableTasksTable(NAME), new ClientsTable(NAME), new SSTableDumpTable(NAME), new SSTablesTable(NAME))); } } diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java index a503a60577d6..068d04e8dfb4 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java @@ -17,34 +17,48 @@ */ package org.apache.cassandra.cql3.validation.entities; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import com.google.common.collect.ImmutableList; -import org.junit.BeforeClass; -import org.junit.Test; - import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.virtual.AbstractVirtualTable; import org.apache.cassandra.db.virtual.SimpleDataSet; +import org.apache.cassandra.db.virtual.AbstractIteratingTable; import org.apache.cassandra.db.virtual.VirtualKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; import org.apache.cassandra.db.virtual.VirtualTable; +import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.triggers.ITrigger; +import org.junit.BeforeClass; +import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; public class VirtualTableTest extends CQLTester { @@ -87,6 +101,14 @@ public void apply(PartitionUpdate update) } } + private static DecoratedKey makeKey(TableMetadata metadata, String... partitionKeyValues) + { + ByteBuffer partitionKey = partitionKeyValues.length == 1 + ? UTF8Type.instance.decompose(partitionKeyValues[0]) + : ((CompositeType) metadata.partitionKeyType).decompose(partitionKeyValues); + return metadata.partitioner.decorateKey(partitionKey); + } + @BeforeClass public static void setUpClass() { @@ -116,97 +138,147 @@ public DataSet data() } }; VirtualTable vt2 = new WritableVirtualTable(KS_NAME, VT2_NAME); + TableMetadata vt3metadata = TableMetadata.builder(KS_NAME, "vt3") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + .addPartitionKeyColumn("pk", UTF8Type.instance) + .addClusteringColumn("c", UTF8Type.instance) + .addRegularColumn("v1", Int32Type.instance) + .addRegularColumn("v2", LongType.instance) + .build(); + final List vt3keys = Lists.newArrayList( + makeKey(vt3metadata, "pk1"), + makeKey(vt3metadata, "pk2")); + + VirtualTable vt3 = new AbstractIteratingTable(vt3metadata) + { + protected Iterator getPartitionKeys(DataRange dataRange) + { + return vt3keys.iterator(); + } - VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2))); + protected Iterator getRows(boolean isReversed, DecoratedKey key, RegularAndStaticColumns columns) + { + String value = metadata.partitionKeyType.getString(key.getKey()); + int pk = Integer.parseInt(value.substring(2)); + List rows = Lists.newArrayList( + row("c1").add("v1", 10 * pk + 1).add("v2", (long) (10 * pk + 1)).build(columns), + row("c2").add("v1", 10 * pk + 2).add("v2", (long) (10 * pk + 2)).build(columns), + row("c3").add("v1", 10 * pk + 3).add("v2", (long) (10 * pk + 3)).build(columns)); + if (isReversed) + { + Collections.reverse(rows); + } + return rows.iterator(); + } + + protected boolean hasKey(DecoratedKey partitionKey) + { + return vt3keys.contains(partitionKey); + } + }; + + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(vt1, vt2, vt3))); CQLTester.setUpClass(); } - @Test - public void testQueries() throws Throwable + public void testQueries(String table) throws Throwable { - assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'UNKNOWN'")); + assertRowsNet(executeNet("SELECT * FROM test_virtual_ks."+table+" WHERE pk = 'UNKNOWN'")); - assertRowsNet(executeNet("SELECT * FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'UNKNOWN'")); + assertRowsNet(executeNet("SELECT * FROM test_virtual_ks."+table+" WHERE pk = 'pk1' AND c = 'UNKNOWN'")); // Test DISTINCT query - assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1"), + assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks."+table), row("pk1"), row("pk2")); - assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks.vt1 WHERE token(pk) > token('pk1')"), + assertRowsNet(executeNet("SELECT DISTINCT pk FROM test_virtual_ks."+table+" WHERE token(pk) > token('pk1')"), row("pk2")); // Test single partition queries - assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c = 'c1'"), + assertRowsNet(executeNet("SELECT v1, v2 FROM test_virtual_ks."+table+" WHERE pk = 'pk1' AND c = 'c1'"), row(11, 11L)); - assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c1', 'c2')"), + assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks."+table+" WHERE pk = 'pk1' AND c IN ('c1', 'c2')"), row("c1", 11, 11L), row("c2", 12, 12L)); - assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk = 'pk1' AND c IN ('c2', 'c1') ORDER BY c DESC"), + assertRowsNet(executeNet("SELECT c, v1, v2 FROM test_virtual_ks."+table+" WHERE pk = 'pk1' AND c IN ('c2', 'c1') ORDER BY c DESC"), row("c2", 12, 12L), row("c1", 11, 11L)); // Test multi-partition queries - assertRows(execute("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"), + assertRows(execute("SELECT * FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"), row("pk1", "c1", 11, 11L), row("pk1", "c2", 12, 12L), row("pk2", "c1", 21, 21L), row("pk2", "c2", 22, 22L)); - assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC"), + assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC"), row("pk1", "c2", 12), row("pk2", "c2", 22), row("pk1", "c1", 11), row("pk2", "c1", 21)); - assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC LIMIT 1"), + assertRows(execute("SELECT pk, c, v1 FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') ORDER BY c DESC LIMIT 1"), row("pk1", "c2", 12)); - assertRows(execute("SELECT c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1' , 'c3') ORDER BY c DESC PER PARTITION LIMIT 1"), + assertRows(execute("SELECT c, v1, v2 FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1' , 'c3') ORDER BY c DESC PER PARTITION LIMIT 1"), row("c3", 13, 13L), row("c3", 23, 23L)); - assertRows(execute("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"), + assertRows(execute("SELECT count(*) FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')"), row(4L)); for (int pageSize = 1; pageSize < 5; pageSize++) { - assertRowsNet(executeNetWithPaging("SELECT pk, c, v1, v2 FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize), + assertRowsNet(executeNetWithPaging("SELECT pk, c, v1, v2 FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize), row("pk1", "c1", 11, 11L), row("pk1", "c2", 12, 12L), row("pk2", "c1", 21, 21L), row("pk2", "c2", 22, 22L)); - assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') LIMIT 2", pageSize), + assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1') LIMIT 2", pageSize), row("pk1", "c1", 11, 11L), row("pk1", "c2", 12, 12L)); - assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize), + assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks."+table+" WHERE pk IN ('pk2', 'pk1') AND c IN ('c2', 'c1')", pageSize), row(4L)); } // Test range queries for (int pageSize = 1; pageSize < 4; pageSize++) { - assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') ALLOW FILTERING", pageSize), + assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks."+table+" WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') ALLOW FILTERING", pageSize), row("pk1", "c1", 11, 11L), row("pk1", "c2", 12, 12L)); - assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') LIMIT 1 ALLOW FILTERING", pageSize), + assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks."+table+" WHERE token(pk) < token('pk2') AND c IN ('c2', 'c1') LIMIT 1 ALLOW FILTERING", pageSize), row("pk1", "c1", 11, 11L)); - assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks.vt1 WHERE token(pk) <= token('pk2') AND c > 'c1' PER PARTITION LIMIT 1 ALLOW FILTERING", pageSize), + assertRowsNet(executeNetWithPaging("SELECT * FROM test_virtual_ks."+table+" WHERE token(pk) <= token('pk2') AND c > 'c1' PER PARTITION LIMIT 1 ALLOW FILTERING", pageSize), row("pk1", "c2", 12, 12L), row("pk2", "c2", 22, 22L)); - assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks.vt1 WHERE token(pk) = token('pk2') AND c < 'c3' ALLOW FILTERING", pageSize), + assertRowsNet(executeNetWithPaging("SELECT count(*) FROM test_virtual_ks."+table+" WHERE token(pk) = token('pk2') AND c < 'c3' ALLOW FILTERING", pageSize), row(2L)); } } + @Test + public void testInMemoryQueries() throws Throwable + { + testQueries("vt1"); + } + + @Test + public void testIteratorTable() throws Throwable + { + testQueries("vt3"); + } + @Test public void testModifications() throws Throwable {