From 7a065edfe0cb54c1d4fbea7caae325609dfb309e Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Mon, 13 Nov 2017 12:46:27 +0530 Subject: [PATCH 01/14] Revert this. This is a workaround to handle insert like update. --- .../distributed/metadata/UpdateQueryInfo.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java index a2e0fdf65..8b9887761 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java @@ -166,19 +166,19 @@ private void checkUpdateFormatSupported() throws StandardException { if (cqi.isTableInfoMissing()) { cqi.setMissingTableInfo(tqi); } - if (cqi.isUsedInPartitioning()) { - throw StandardException.newException(SQLState.NOT_IMPLEMENTED, - "Update of partitioning column not supported"); - } +// if (cqi.isUsedInPartitioning()) { +// throw StandardException.newException(SQLState.NOT_IMPLEMENTED, +// "Update of partitioning column not supported"); +// } // For the time being since we are allowing only one table // the columns are necessarily part of the lone TableQueryInfo // Later the check needs to be more robust - if (cqi.isPartOfPrimaryKey(tqi.getPrimaryKeyColumns())) { - throw StandardException - .newException( - SQLState.NOT_IMPLEMENTED, - "Update of column which is primary key or is part of the primary key, not supported"); - } +// if (cqi.isPartOfPrimaryKey(tqi.getPrimaryKeyColumns())) { +// throw StandardException +// .newException( +// SQLState.NOT_IMPLEMENTED, +// "Update of column which is primary key or is part of the primary key, not supported"); +// } /* No need to go through function execution for unique constraints if(checkForColumnConstr && cqi.isReferencedByUniqueConstraint()) { checkForColumnConstr = false; From 6a467742807fd6b08b3c481e23558fea67773396 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 3 Mar 2018 21:27:27 +0530 Subject: [PATCH 02/14] Performance and functional fixes in smart connector - force decompress the underlying buffer in BlobChunk if remote node is on same host - increment reference in BlobChunk only when writing and decrement at the end of write; earlier it was incrementing in constructor which would lead to incorrect ref count if the BlobChunk was never written (e.g. filtered out in the query) - added free calls to BlobChunk in all relevant places when assigning new buffers - add IP address to the server host names (host/address[port]) in GET_TABLE_METADATA - added "decompressedReplaced" gfs stat to note the decompressions that were able to replace the underlying buffer ColumnFormatValue - allow for null cache for LowMemoryException (in case of connector mode) --- .../internal/cache/CachePerfStats.java | 10 ++++ .../internal/cache/DummyCachePerfStats.java | 4 ++ .../gemfire/internal/cache/LocalRegion.java | 11 +++- .../ddl/catalog/GfxdSystemProcedures.java | 8 +-- .../distributed/GfxdDistributionAdvisor.java | 2 +- .../java/io/snappydata/thrift/BlobChunk.java | 51 ++++++++++++------- .../io/snappydata/thrift/HostAddress.java | 8 +++ .../thrift/common/BlobChunk.java.tmpl | 51 ++++++++++++------- .../thrift/common/HostAddress.java.tmpl | 8 +++ 9 files changed, 108 insertions(+), 45 deletions(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java index b70e11128..7f1bb370c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java @@ -171,6 +171,7 @@ public class CachePerfStats implements HashingStats { protected static final int compressionSkippedId; protected static final int compressionSkippedTimeId; protected static final int compressionSkippedBytesId; + protected static final int compressionDecompressedReplacedId; protected static final int compressionDecompressedReplaceSkippedId; protected static final int compressionCompressedReplaceSkippedId; @@ -297,6 +298,8 @@ public class CachePerfStats implements HashingStats { final String compressionSkippedDesc = "The total number of compressions skipped (due to < 25% reduction)."; final String compressionSkippedTimeDesc = "The total time spent in compressions that were skipped."; final String compressionSkippedBytesDesc = "The total number bytes skipped in compression."; + final String compressionDecompressedReplacedDesc = "The total number times storage " + + "replaced buffer after decompression."; final String compressionDecompressedReplaceSkippedDesc = "The total number times storage " + "skipped replacing buffer after decompression due to active usage."; final String compressionCompressedReplaceSkippedDesc = "The total number times storage " + @@ -444,6 +447,8 @@ public class CachePerfStats implements HashingStats { compressionSkippedTimeDesc, "nanoseconds"), f.createLongCounter("compressSkippedBytes", compressionSkippedBytesDesc, "bytes"), + f.createLongCounter("decompressedReplaced", + compressionDecompressedReplacedDesc, "operations"), f.createLongCounter("decompressedReplaceSkipped", compressionDecompressedReplaceSkippedDesc, "operations"), f.createLongCounter("compressedReplaceSkipped", @@ -590,6 +595,7 @@ public class CachePerfStats implements HashingStats { compressionSkippedId = type.nameToId("compressionsSkipped"); compressionSkippedTimeId = type.nameToId("compressSkippedTime"); compressionSkippedBytesId = type.nameToId("compressSkippedBytes"); + compressionDecompressedReplacedId = type.nameToId("decompressedReplaced"); compressionDecompressedReplaceSkippedId = type.nameToId("decompressedReplaceSkipped"); compressionCompressedReplaceSkippedId = type.nameToId("compressedReplaceSkipped"); @@ -859,6 +865,10 @@ public void endCompressionSkipped(long startTime, long startSize) { stats.incLong(compressionSkippedBytesId, startSize); } + public void incDecompressedReplaced() { + stats.incLong(compressionDecompressedReplacedId, 1); + } + public void incDecompressedReplaceSkipped() { stats.incLong(compressionDecompressedReplaceSkippedId, 1); } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DummyCachePerfStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DummyCachePerfStats.java index b7237985c..39ff9db77 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DummyCachePerfStats.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DummyCachePerfStats.java @@ -335,6 +335,10 @@ public void endCompression(long startTime, long startSize, long endSize) { public void endCompressionSkipped(long startTime, long startSize) { } + @Override + public void incDecompressedReplaced() { + } + @Override public void incDecompressedReplaceSkipped() { } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index c5fd4ef17..5a7617b07 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -13551,6 +13551,12 @@ public void endCompressionSkipped(long startTime, long startSize) { cachePerfStats.stats.incLong(compressionSkippedBytesId, startSize); } + @Override + public void incDecompressedReplaced() { + stats.incLong(compressionDecompressedReplacedId, 1); + cachePerfStats.stats.incLong(compressionDecompressedReplacedId, 1); + } + @Override public void incDecompressedReplaceSkipped() { stats.incLong(compressionDecompressedReplaceSkippedId, 1); @@ -14528,9 +14534,10 @@ public void acquirePoolMemory(long oldSize, long newSize, boolean withEntryOverH public static LowMemoryException lowMemoryException(GemFireCacheImpl cache, long size) { if (cache == null) { - cache = GemFireCacheImpl.getExisting(); + cache = GemFireCacheImpl.getInstance(); } - Set sm = Collections.singleton(cache.getMyId()); + Set sm = cache != null + ? Collections.singleton(cache.getMyId()) : Collections.emptySet(); return new LowMemoryException("Could not obtain memory of size " + size, sm); } diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java index ddd7ee94d..e8db02a12 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java @@ -1455,9 +1455,9 @@ private static void getPRMetaData(final PartitionedRegion region, // get partitioning columns GfxdPartitionByExpressionResolver resolver = (GfxdPartitionByExpressionResolver)region.getPartitionResolver(); - StringBuffer stringBuffer = new StringBuffer(); + StringBuilder stringBuffer = new StringBuilder(); for (String col : resolver.getColumnNames()) { - stringBuffer.append(col + ":"); + stringBuffer.append(col).append(':'); } partColumns[0] = stringBuffer.toString(); @@ -1474,7 +1474,7 @@ private static void getRRMetaData(final DistributedRegion region, Map mbrToServerMap = GemFireXDUtils .getGfxdAdvisor().getAllNetServersWithMembers(); - StringBuffer stringBuffer = new StringBuffer(); + StringBuilder stringBuffer = new StringBuilder(); if (GemFireXDUtils.getMyVMKind().isStore()) { owners.add(Misc.getGemFireCache().getMyId()); } @@ -1482,7 +1482,7 @@ private static void getRRMetaData(final DistributedRegion region, for (InternalDistributedMember node : owners) { String netServer = mbrToServerMap.get(node); if ( netServer != null) { - stringBuffer.append(netServer + ";"); + stringBuffer.append(netServer).append(';'); } } if (stringBuffer.length() > 0) { diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/GfxdDistributionAdvisor.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/GfxdDistributionAdvisor.java index 86a2aec41..6a50eb7b6 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/GfxdDistributionAdvisor.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/GfxdDistributionAdvisor.java @@ -1089,7 +1089,7 @@ public final Map getAllNetServersWithMembers( serverSB.append(','); } if (s instanceof HostAddress) { - serverSB.append(((HostAddress)s).getHostString()); + serverSB.append(((HostAddress)s).getHostAddressString()); } else { serverSB.append(s); } diff --git a/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java b/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java index 03237e73f..bdadb9221 100644 --- a/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java +++ b/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java @@ -77,14 +77,17 @@ public class BlobChunk implements org.apache.thrift.TBase { @Override public void write(org.apache.thrift.protocol.TProtocol prot, BlobChunk struct) throws org.apache.thrift.TException { + final ByteBuffer buffer = struct.getCompressedBuffer(prot); try { - final ByteBuffer buffer = struct.getCompressedBuffer(prot); writeData(prot, struct, buffer != null ? buffer : ClientSharedData.NULL_BUFFER); } finally { diff --git a/gemfirexd/shared/src/main/java/io/snappydata/thrift/HostAddress.java b/gemfirexd/shared/src/main/java/io/snappydata/thrift/HostAddress.java index 04f656d22..dfa34fdd8 100644 --- a/gemfirexd/shared/src/main/java/io/snappydata/thrift/HostAddress.java +++ b/gemfirexd/shared/src/main/java/io/snappydata/thrift/HostAddress.java @@ -521,6 +521,14 @@ public String getHostString() { return super.toString(); } + public String getHostAddressString() { + if (this.ipAddress == null) { + return this.hostName + '[' + this.port + ']'; + } else { + return this.hostName + '/' + this.ipAddress + '[' + this.port + ']'; + } + } + @Override public String toString() { ServerType serverType = this.serverType; diff --git a/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl b/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl index 03237e73f..d7973ff2f 100644 --- a/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl +++ b/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl @@ -77,14 +77,17 @@ public class BlobChunk implements org.apache.thrift.TBase implements return super.toString(); } + public String getHostAddressString() { + if (this.ipAddress == null) { + return this.hostName + '[' + this.port + ']'; + } else { + return this.hostName + '/' + this.ipAddress + '[' + this.port + ']'; + } + } + @Override public String toString() { ServerType serverType = this.serverType; From 718f1b6ca39d3adaf1107a170660233f111be6ad Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Fri, 9 Mar 2018 04:21:48 +0530 Subject: [PATCH 03/14] [SNAP-2243][SNAP-2188] procedure for smart connector iteration and fixes Includes the changes for the two issues and a bunch of other fixes found in testing. - Added "COLUMN_TABLE_SCAN" procedure to be invoked from smart connector and a corresponding "columnTableScan" method to StoreCallbacks. - Implementation of columnTableScan returns iterator of "ColumnTableEntry" that encapsulates a single column of a batch that is converted to a ResultSet by procedure implementation. - Send back Blob value as is from DVDStoreResultSet that is used by above procedure. - Reference increment is done by columnTableScan, handed over to BlobChunk by ClientBlob, and released after thrift write by BlobChunk. - BlobChunk now will release the previously held reference then get the buffer again with DECOMPRESS_IF_IN_MEMORY flag when target node is same host, else COMPRESS for remote sends. - Fixed index column fetch in GET_TABLE_METADATA that would append indexes to "null" - Added a new type of fetch from ColumnFormatValue: DECOMPRESS_IF_IN_MEMORY. This will decompress buffer only if it can be replaced in memory else return as is (or null). This allows smart connector to avoid decompression on server side if unable to store it on server. Changed boolean flags to a "FetchRequest" enumeration. - Close bucket entries iterator before making it null else it leaks reference count of last batch. - Set the disk RegionEntry itself in the ColumnFormatValue to read from disk in case entry is deleted from region (and some reader is holding a handle). Earlier setting of DiskId was not proper since it may change due to compaction. - Zero the limit in case of full release of buffers to fail any further reads. - Corrected parenthesis in FunctionException checks in SnappyTableStatsVTI --- .../cache/AbstractDiskRegionEntry.java | 10 +-- .../cache/AbstractOplogDiskRegionEntry.java | 10 +++ .../gemfire/internal/cache/DiskEntry.java | 7 +- .../internal/cache/PartitionedRegion.java | 1 + .../cache/store/SerializedDiskBuffer.java | 12 ++- .../snappy/CallbackFactoryProvider.java | 10 +++ .../internal/snappy/ColumnTableEntry.java | 37 ++++++++ .../internal/snappy/StoreCallbacks.java | 10 +++ .../gemfire/internal/util/BlobHelper.java | 6 +- .../internal/shared/ByteBufferReference.java | 14 +-- .../gemfire/internal/shared/FetchRequest.java | 43 +++++++++ .../internal/shared/unsafe/UnsafeHolder.java | 9 +- .../thrift/internal/ClientBlob.java | 9 +- .../internal/engine/GfxdConstants.java | 4 +- .../ddl/catalog/GfxdSystemProcedures.java | 90 +++++++++++++++++-- .../engine/diag/SnappyTableStatsVTI.java | 4 +- .../message/LeadNodeExecutorMsg.java | 2 + .../engine/store/CustomRowsResultSet.java | 11 ++- .../engine/store/DVDStoreResultSet.java | 4 +- .../impl/sql/catalog/GfxdDataDictionary.java | 12 +++ .../java/io/snappydata/thrift/BlobChunk.java | 63 ++++++++++--- .../thrift/common/BlobChunk.java.tmpl | 63 ++++++++++--- 22 files changed, 356 insertions(+), 75 deletions(-) create mode 100644 gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/ColumnTableEntry.java create mode 100644 gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/FetchRequest.java diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java index 7dfff20be..74dd71511 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java @@ -18,7 +18,6 @@ import com.gemstone.gemfire.cache.hdfs.internal.AbstractBucketRegionQueue; import com.gemstone.gemfire.cache.query.internal.IndexUpdater; -import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue; @@ -72,13 +71,8 @@ && isOffHeap()) { * Set the RegionEntry DiskId into SerializedDiskBuffer value, if present, * so that the value can access data from disk when required independently. */ - protected final void initDiskIdForOffHeap(RegionEntryContext context, - Object value) { - // copy DiskId to value if required - if (value instanceof SerializedDiskBuffer) { - ((SerializedDiskBuffer)value).setDiskLocation(getDiskId(), context); - } - } + protected abstract void initDiskIdForOffHeap(RegionEntryContext context, + Object value); @Override public void handleValueOverflow(RegionEntryContext context) { diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java index c8769bd2c..01f6837d7 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java @@ -21,6 +21,7 @@ import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer; import com.gemstone.gemfire.internal.cache.versions.VersionTag; import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.shared.Version; @@ -54,6 +55,15 @@ protected AbstractOplogDiskRegionEntry(RegionEntryContext context, Object value) protected abstract void setDiskId(RegionEntry oldRe); + @Override + protected final void initDiskIdForOffHeap(RegionEntryContext context, + Object value) { + // copy DiskId to value if required + if (value instanceof SerializedDiskBuffer) { + ((SerializedDiskBuffer)value).setDiskEntry(this, context); + } + } + public final void setDiskIdForRegion(RegionEntry oldRe) { setDiskId(oldRe); if (GemFireCacheImpl.hasNewOffHeap()) { diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java index 19e7942c7..3dee93478 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java @@ -71,6 +71,7 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.shared.ClientSharedData; +import com.gemstone.gemfire.internal.shared.FetchRequest; import com.gemstone.gemfire.internal.shared.OutputStreamChannel; import com.gemstone.gemfire.internal.shared.Version; import com.gemstone.gemfire.internal.snappy.CallbackFactoryProvider; @@ -1058,7 +1059,7 @@ private static Object getValueRetain(DiskEntry entry, RegionEntryContext context @Retained Object v = entry._getValueRetain(context, true); if (rawValue && GemFireCacheImpl.hasNewOffHeap() && (v instanceof SerializedDiskBuffer)) { - ((SerializedDiskBuffer)v).retain(); + return ((SerializedDiskBuffer)v).getValueRetain(FetchRequest.ORIGINAL); } return v; } @@ -1515,10 +1516,6 @@ public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU if (did == null) { ((AbstractDiskLRURegionEntry)entry).setDelayedDiskId(region); did = entry.getDiskId(); - final Object oldValue; - if ((oldValue = entry._getValue()) instanceof SerializedDiskBuffer) { - ((SerializedDiskBuffer)oldValue).setDiskLocation(did, region); - } // add DiskId overhead to change diskIDOverhead += region.calculateDiskIdOverhead(did); } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 4242cf2f0..4c04b1ba6 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -7345,6 +7345,7 @@ public boolean hasNext() { } } // no more buckets need to be visited + close(); this.bucketEntriesIter = null; this.moveNext = false; return false; diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/store/SerializedDiskBuffer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/store/SerializedDiskBuffer.java index ffb174e06..b0067d28f 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/store/SerializedDiskBuffer.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/store/SerializedDiskBuffer.java @@ -20,11 +20,12 @@ import java.nio.ByteBuffer; import javax.annotation.concurrent.GuardedBy; -import com.gemstone.gemfire.internal.cache.DiskId; +import com.gemstone.gemfire.internal.cache.AbstractOplogDiskRegionEntry; import com.gemstone.gemfire.internal.cache.RegionEntryContext; import com.gemstone.gemfire.internal.shared.BufferAllocator; import com.gemstone.gemfire.internal.shared.ByteBufferReference; import com.gemstone.gemfire.internal.shared.ClientSharedUtils; +import com.gemstone.gemfire.internal.shared.FetchRequest; import com.gemstone.gemfire.internal.shared.OutputStreamChannel; /** @@ -135,11 +136,7 @@ public void copyToHeap(String owner) { } @Override - public SerializedDiskBuffer getValueRetain(boolean decompress, - boolean compress) throws IllegalArgumentException { - if (decompress && compress) { - throw new IllegalArgumentException("both decompress and compress true"); - } + public SerializedDiskBuffer getValueRetain(FetchRequest fetchRequest) { return retain() ? this : null; } @@ -148,7 +145,8 @@ public SerializedDiskBuffer getValueRetain(boolean decompress, /** * For buffers which are stored in region, set its DiskId. */ - public void setDiskLocation(DiskId id, RegionEntryContext context) { + public void setDiskEntry(AbstractOplogDiskRegionEntry entry, + RegionEntryContext context) { } /** diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java index eb5d966fc..295ec6522 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java @@ -17,6 +17,7 @@ package com.gemstone.gemfire.internal.snappy; +import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Set; @@ -24,6 +25,7 @@ import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.EntryEventImpl; import com.gemstone.gemfire.internal.cache.lru.LRUEntry; +import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator; import com.gemstone.gemfire.internal.snappy.memory.MemoryManagerStats; public abstract class CallbackFactoryProvider { @@ -79,6 +81,14 @@ public String columnBatchTableName(String tableName) { + toString()); } + @Override + public CloseableIterator columnTableScan( + String columnTable, int[] projection, byte[] serializedBatchFilters, + Set bucketIds) throws SQLException { + throw new UnsupportedOperationException("unexpected invocation for " + + toString()); + } + @Override public void registerRelationDestroyForHiveStore() { } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/ColumnTableEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/ColumnTableEntry.java new file mode 100644 index 000000000..a80207a9d --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/ColumnTableEntry.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.gemstone.gemfire.internal.snappy; + +import com.gemstone.gemfire.internal.shared.ByteBufferReference; + +/** + * Encapsulates a row read from column store. + */ +public final class ColumnTableEntry { + public final long uuid; + public final int bucketId; + public final int columnPosition; + public final ByteBufferReference columnValue; + + public ColumnTableEntry(long uuid, int bucketId, int columnPosition, + ByteBufferReference columnValue) { + this.uuid = uuid; + this.bucketId = bucketId; + this.columnPosition = columnPosition; + this.columnValue = columnValue; + } +} diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java index ae00c1e4e..19236b980 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java @@ -17,12 +17,14 @@ package com.gemstone.gemfire.internal.snappy; +import java.sql.SQLException; import java.util.List; import java.util.Set; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.EntryEventImpl; import com.gemstone.gemfire.internal.cache.lru.LRUEntry; +import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator; import com.gemstone.gemfire.internal.shared.SystemProperties; import com.gemstone.gemfire.internal.snappy.memory.MemoryManagerStats; @@ -49,6 +51,14 @@ Set createColumnBatch(BucketRegion region, long batchID, String columnBatchTableName(String tableName); + /** + * Scan the entries of a column table. The returned value in ColumnTableEntry + * will have reference count incremented, so caller should decrement once done. + */ + CloseableIterator columnTableScan(String qualifiedTable, + int[] projection, byte[] serializedBatchFilters, + Set bucketIds) throws SQLException; + void registerRelationDestroyForHiveStore(); void performConnectorOp(Object ctx); diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java index 383ae4b24..c01ba3b23 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java @@ -42,13 +42,14 @@ import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.internal.ByteArrayDataInput; -import com.gemstone.gemfire.internal.DSCODE; import com.gemstone.gemfire.internal.ByteBufferDataInput; import com.gemstone.gemfire.internal.ByteBufferDataOutput; +import com.gemstone.gemfire.internal.DSCODE; import com.gemstone.gemfire.internal.HeapDataOutputStream; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl.StaticSystemCallbacks; import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer; +import com.gemstone.gemfire.internal.shared.FetchRequest; import com.gemstone.gemfire.internal.shared.Version; import com.gemstone.gemfire.pdx.internal.PdxInputStream; @@ -110,7 +111,8 @@ public static SerializedDiskBuffer serializeToBuffer(Object obj, Version version SerializedDiskBuffer result; if (!(obj instanceof SerializedDiskBuffer) || // compress buffer if possible to reduce disk size - (result = ((SerializedDiskBuffer)obj).getValueRetain(false, true)) == null) { + (result = ((SerializedDiskBuffer)obj).getValueRetain( + FetchRequest.COMPRESS)) == null) { // serialize into an expanding direct ByteBuffer result = new ByteBufferDataOutput(version).serialize(obj); } diff --git a/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/ByteBufferReference.java b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/ByteBufferReference.java index 1125bd021..492f3c508 100644 --- a/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/ByteBufferReference.java +++ b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/ByteBufferReference.java @@ -47,7 +47,7 @@ public abstract class ByteBufferReference { /** * Return the data as a ByteBuffer. Should be invoked only after a - * {@link #retain()} or {@link #getValueRetain(boolean, boolean)} call. + * {@link #retain()} or {@link #getValueRetain} call. */ public abstract ByteBuffer getBuffer(); @@ -55,17 +55,11 @@ public abstract class ByteBufferReference { * Get a compressed/decompressed/original version of the underlying value * after a {@link #retain()} * - * @param decompress decompress the underlying data and return a new value - * if compressed - * @param compress compress the underlying data and return a new value - * if decompressed + * @param fetchRequest one of the {@link FetchRequest} values * - * @return a decompressed version of data if compressed when decompress is true - * or vice-versa if compress is true or else return as is if both are false - * @throws IllegalArgumentException if both decompress and compress are true + * @return possibly transformed data as per @{@link FetchRequest} */ - public abstract ByteBufferReference getValueRetain(boolean decompress, - boolean compress) throws IllegalArgumentException; + public abstract ByteBufferReference getValueRetain(FetchRequest fetchRequest); /** * An optional explicit release of the underlying data. The buffer may no diff --git a/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/FetchRequest.java b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/FetchRequest.java new file mode 100644 index 000000000..985c7711a --- /dev/null +++ b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/FetchRequest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2018 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.gemstone.gemfire.internal.shared; + +/** + * Request sent to {@link ByteBufferReference#getValueRetain}. + */ +public enum FetchRequest { + /** + * Return with original form of buffer. + */ + ORIGINAL, + /** + * Return decompressed buffer and store in region if possible. + */ + DECOMPRESS, + /** + * Return compressed buffer and store in region if required. + */ + COMPRESS, + /** + * Return decompressed buffer only if decompressed form can be stored + * in memory (i.e. don't expend effort to decompress multiple times for + * every fetch) else return null if need to read from disk. Typically + * must be followed by a call with {@link #ORIGINAL} in case result is null. + */ + DECOMPRESS_IF_IN_MEMORY, +} diff --git a/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/unsafe/UnsafeHolder.java b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/unsafe/UnsafeHolder.java index b38c0bf93..59b908d4d 100644 --- a/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/unsafe/UnsafeHolder.java +++ b/gemfire-shared/src/main/java/com/gemstone/gemfire/internal/shared/unsafe/UnsafeHolder.java @@ -301,8 +301,12 @@ public static void changeDirectBufferCleaner( * this directly rather use BufferAllocator.allocate/release where possible. */ public static void releaseIfDirectBuffer(ByteBuffer buffer) { - if (buffer != null && buffer.isDirect()) { - releaseDirectBuffer(buffer); + if (buffer != null) { + if (buffer.isDirect()) { + releaseDirectBuffer(buffer); + } else { + buffer.rewind().limit(0); + } } } @@ -312,6 +316,7 @@ public static void releaseDirectBuffer(ByteBuffer buffer) { cleaner.clean(); cleaner.clear(); } + buffer.rewind().limit(0); } public static void releasePendingReferences() { diff --git a/gemfirexd/client/src/main/java/io/snappydata/thrift/internal/ClientBlob.java b/gemfirexd/client/src/main/java/io/snappydata/thrift/internal/ClientBlob.java index 81b265523..f2b820f4e 100644 --- a/gemfirexd/client/src/main/java/io/snappydata/thrift/internal/ClientBlob.java +++ b/gemfirexd/client/src/main/java/io/snappydata/thrift/internal/ClientBlob.java @@ -323,6 +323,10 @@ public BlobChunk getAsLastChunk() throws SQLException { } } + public BlobChunk getCurrentChunk() { + return this.currentChunk; + } + /** * {@inheritDoc} */ @@ -336,12 +340,11 @@ public byte[] getBytes(long pos, int length) throws SQLException { int nbytes = readBytes(offset, result, 0, length); if (nbytes == length) { return result; - } else { + } else if (nbytes > 0) { return Arrays.copyOf(result, nbytes); } - } else { - return ClientSharedData.ZERO_ARRAY; } + return ClientSharedData.ZERO_ARRAY; } /** diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdConstants.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdConstants.java index b1a7cc565..014a09c24 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdConstants.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdConstants.java @@ -277,8 +277,8 @@ enum BT_INDIC { /** property to set max size of chunks in DML operations */ final String DML_MAX_CHUNK_SIZE_PROP = GFXD_PREFIX + "dml-max-chunk-size"; - /** default max size of chunks in DML operations */ - final long DML_MAX_CHUNK_SIZE_DEFAULT = 4194304L; + /** default max size of chunks in DML operations or query results */ + final long DML_MAX_CHUNK_SIZE_DEFAULT = 4L * 1024L * 1024L; /** * property to set min size of results for which streaming or throttling is diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java index e8db02a12..5da26b63c 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java @@ -48,8 +48,11 @@ import com.gemstone.gemfire.internal.NanoTimer; import com.gemstone.gemfire.internal.cache.*; import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; +import com.gemstone.gemfire.internal.cache.persistence.query.CloseableIterator; import com.gemstone.gemfire.internal.snappy.CallbackFactoryProvider; +import com.gemstone.gemfire.internal.snappy.ColumnTableEntry; import com.gemstone.gnu.trove.THashSet; +import com.gemstone.gnu.trove.TIntArrayList; import com.pivotal.gemfirexd.Attribute; import com.pivotal.gemfirexd.auth.callback.UserAuthenticator; import com.pivotal.gemfirexd.internal.catalog.AliasInfo; @@ -121,6 +124,7 @@ import com.pivotal.gemfirexd.internal.snappy.LeadNodeSmartConnectorOpContext; import com.pivotal.gemfirexd.load.Import; import io.snappydata.thrift.ServerType; +import io.snappydata.thrift.internal.ClientBlob; /** * GemFireXD built-in system procedures that will get executed on every @@ -1504,16 +1508,15 @@ public static void getIndexColumns(String[] indexColumns, LocalRegion region) throws StandardException { GemFireContainer container = (GemFireContainer)region.getUserAttribute(); TableDescriptor td = container.getTableDescriptor(); - String cols = null; + StringBuilder cols = new StringBuilder(); if (td != null) { String[] baseColumns = td.getColumnNamesArray(); GfxdIndexManager im = container.getIndexManager(); - if ((im != null) && (im.getIndexConglomerateDescriptors() != null)) { - Iterator itr = im.getIndexConglomerateDescriptors().iterator(); - while (itr.hasNext()) { + if (im != null && im.getIndexConglomerateDescriptors() != null) { + for (ConglomerateDescriptor cd : im.getIndexConglomerateDescriptors()) { // first column of index has to be present in filter to be usable - int[] indexCols = itr.next().getIndexDescriptor().baseColumnPositions(); - cols += baseColumns[indexCols[0] - 1] + ":"; + int[] indexCols = cd.getIndexDescriptor().baseColumnPositions(); + cols.append(baseColumns[indexCols[0] - 1]).append(':'); } } // also add primary key @@ -1522,11 +1525,16 @@ public static void getIndexColumns(String[] indexColumns, LocalRegion region) // first column of primary key has to be present in filter to be usable int[] pkCols = primaryKey.getKeyColumns(); if (pkCols != null && pkCols.length > 0) { - cols += baseColumns[pkCols[0] - 1]; + cols.append(baseColumns[pkCols[0] - 1]); } } } - indexColumns[0] = cols; + int len = cols.length(); + if (len > 0 && cols.charAt(len - 1) == ':') { + indexColumns[0] = cols.substring(0, len - 1); + } else { + indexColumns[0] = cols.toString(); + } } public static void getPKColumns(String[] pkColumns, @@ -3001,6 +3009,72 @@ public static void GET_COLUMN_TABLE_SCHEMA(String schema, String table, schemaAsJson[0] = new HarmonySerialClob(schemaString); } + private static final SharedUtils.CSVVisitor projectionAgg = + (str, projection, context) -> projection.add(Integer.parseInt(str.trim())); + + private static final ResultColumnDescriptor[] columnScanInfo = { + EmbedResultSetMetaData.getResultColumnDescriptor("UUID", + Types.BIGINT, false), + EmbedResultSetMetaData.getResultColumnDescriptor("BUCKETID", + Types.INTEGER, false), + EmbedResultSetMetaData.getResultColumnDescriptor("COLUMNPOSITION", + Types.INTEGER, false), + EmbedResultSetMetaData.getResultColumnDescriptor("DATA", + Types.BLOB, false) + }; + + public static void COLUMN_TABLE_SCAN(String columnTable, String projection, + Blob filters, ResultSet[] result) throws SQLException { + try { + // split the projection into column indexes (1-based) + final TIntArrayList columns = new TIntArrayList(4); + SharedUtils.splitCSV(projection, projectionAgg, columns, null); + byte[] batchFilters = filters != null + ? filters.getBytes(1, (int)filters.length()) : null; + Set bucketIds = ConnectionUtil.getCurrentLCC() + .getBucketIdsForLocalExecution(); + final CloseableIterator iter = + CallbackFactoryProvider.getStoreCallbacks().columnTableScan( + columnTable, columns.toNativeArray(), batchFilters, bucketIds); + if (GemFireXDUtils.TraceExecution) { + SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_EXECUTION, + "COLUMN_TABLE_SCAN table=" + columnTable + + " projection=" + projection); + } + result[0] = new CustomRowsResultSet(new CustomRowsResultSet.FetchDVDRows() { + @Override + public boolean getNext(DataValueDescriptor[] template) + throws SQLException, StandardException { + if (iter.hasNext()) { + ColumnTableEntry entry = iter.next(); + template[0].setValue(entry.uuid); + template[1].setValue(entry.bucketId); + template[2].setValue(entry.columnPosition); + ClientBlob blob = new ClientBlob(entry.columnValue); + // mark chunk as having a reference set from outside (columnTableScan) + if (!blob.getCurrentChunk().initChunkFromReference()) { + throw StandardException.newException(SQLState.DATA_UNEXPECTED_EXCEPTION, + new IllegalStateException("failed to increment reference count")); + } + template[3].setValue(blob); + return true; + } else { + return false; + } + } + + @Override + public void close() throws SQLException { + iter.close(); + } + }, columnScanInfo); + } catch (SQLException se) { + throw se; + } catch (Throwable t) { + throw TransactionResourceImpl.wrapInSQLException(t); + } + } + /** * Get the default or nested connection corresponding to the URL * jdbc:default:connection. We do not use DriverManager here as it is not diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/diag/SnappyTableStatsVTI.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/diag/SnappyTableStatsVTI.java index feec61cf8..8c2691d8f 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/diag/SnappyTableStatsVTI.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/diag/SnappyTableStatsVTI.java @@ -75,8 +75,8 @@ public boolean next() throws SQLException { throw PublicAPI.wrapStandardException(se); } catch (RuntimeException re) { String message; - if ((re instanceof FunctionException) || - (re instanceof FunctionExecutionException) && + if (((re instanceof FunctionException) || + (re instanceof FunctionExecutionException)) && re.getCause() != null) { message = re.getCause().getMessage(); } else { diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/message/LeadNodeExecutorMsg.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/message/LeadNodeExecutorMsg.java index 6c0f96d62..b0ad4bd8c 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/message/LeadNodeExecutorMsg.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/message/LeadNodeExecutorMsg.java @@ -160,6 +160,8 @@ protected void execute() throws Exception { } private static class SparkExceptionWrapper extends Exception { + private static final long serialVersionUID = -4668836542769295434L; + public SparkExceptionWrapper(Throwable ex) { super(ex.getClass().getName() + ": " + ex.getMessage(), ex.getCause()); this.setStackTrace(ex.getStackTrace()); diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/CustomRowsResultSet.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/CustomRowsResultSet.java index c2b7bdf29..912ee13a7 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/CustomRowsResultSet.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/CustomRowsResultSet.java @@ -73,10 +73,16 @@ public boolean next() throws SQLException { } } + @Override + public void close() throws SQLException { + super.close(); + this.fetchRows.close(); + } + /** * Allows fetching one row at a time as a DVD[] from arbitrary source. */ - public static interface FetchDVDRows { + public interface FetchDVDRows { /** * If next row is available then fill in template and return true, else @@ -84,5 +90,8 @@ public static interface FetchDVDRows { */ public boolean getNext(DataValueDescriptor[] template) throws SQLException, StandardException; + + default void close() throws SQLException { + } } } diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/DVDStoreResultSet.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/DVDStoreResultSet.java index 26854189f..765adf4cc 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/DVDStoreResultSet.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/DVDStoreResultSet.java @@ -961,7 +961,9 @@ else if (this.pvs != null) { } if (dvd != null && !dvd.isNull()) { this.wasNull = false; - return HarmonySerialBlob.wrapBytes(dvd.getBytes()); + Object result = dvd.getObject(); + return result instanceof byte[] + ? HarmonySerialBlob.wrapBytes((byte[])result) : (Blob)result; } else { this.wasNull = true; diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/catalog/GfxdDataDictionary.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/catalog/GfxdDataDictionary.java index 01ed92f1e..37f98b5b8 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/catalog/GfxdDataDictionary.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/catalog/GfxdDataDictionary.java @@ -2044,6 +2044,18 @@ private void createGfxdSystemProcedures(TransactionController tc, newlyCreatedRoutines, tc, GFXD_SYS_PROC_CLASSNAME, false); } + { + // COLUMN_TABLE_SCAN(String,String,String,Blob,ResultSet[]) + String[] arg_names = new String[] { "TABLE", "PROJECTION", "FILTERS" }; + TypeDescriptor[] arg_types = new TypeDescriptor[] { + DataTypeDescriptor.getCatalogType(Types.VARCHAR), + DataTypeDescriptor.getCatalogType(Types.VARCHAR), + DataTypeDescriptor.getCatalogType(Types.BLOB) + }; + super.createSystemProcedureOrFunction("COLUMN_TABLE_SCAN", sysUUID, + arg_names, arg_types, 0, 1, RoutineAliasInfo.READS_SQL_DATA, null, + newlyCreatedRoutines, tc, GFXD_SYS_PROC_CLASSNAME, false); + } } @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java b/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java index bdadb9221..e3accdd46 100644 --- a/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java +++ b/gemfirexd/shared/src/main/java/io/snappydata/thrift/BlobChunk.java @@ -31,6 +31,7 @@ import com.gemstone.gemfire.internal.shared.ByteBufferReference; import com.gemstone.gemfire.internal.shared.ClientSharedData; +import com.gemstone.gemfire.internal.shared.FetchRequest; import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder; import io.snappydata.thrift.common.SocketTimeout; import io.snappydata.thrift.common.TProtocolDirectBinary; @@ -88,6 +89,19 @@ private void assignChunkReference(ByteBufferReference reference) { this.chunkReference = reference; } + /** + * Mark the ByteBufferReference refCount as having been already incremented. + */ + public boolean initChunkFromReference() { + ByteBufferReference reference = this.chunkReference; + if (reference != null && this.chunk == ClientSharedData.NULL_BUFFER) { + this.chunk = reference.getBuffer(); + return true; + } else { + return false; + } + } + public int size() { return this.chunk != null ? this.chunk.remaining() : 0; } @@ -467,21 +481,46 @@ public boolean isSet(_Fields field) { throw new IllegalStateException(); } - private ByteBuffer getCompressedBuffer(org.apache.thrift.protocol.TProtocol oprot) { - // compress if sending to remote else send as is + /** + * Compress if sending to remote or else decompress if the blob can be + * stored in memory (so future reads will not need to decompress) + * else send as is. + */ + private ByteBuffer getBufferForWrite(org.apache.thrift.protocol.TProtocol oprot) { final ByteBufferReference reference = this.chunkReference; if (reference != null) { - // release already held reference count - if (this.chunk != ClientSharedData.NULL_BUFFER) { - reference.release(); - this.chunk = ClientSharedData.NULL_BUFFER; - } TTransport transport = oprot.getTransport(); if (transport instanceof SocketTimeout) { - boolean sameHost = ((SocketTimeout)transport).isSocketToSameHost(); - this.chunkReference = reference.getValueRetain(sameHost, !sameHost); + // check if a reference is already held + boolean hasOldReference = this.chunk != ClientSharedData.NULL_BUFFER; + if (((SocketTimeout)transport).isSocketToSameHost()) { + // try to decompress only if the value can be stored in region else + // avoid decompression overhead on server rather do on client/connector + if (hasOldReference) { + // release reference upfront if this not the only remaining reference + // (under the lock on value to ensure atomicity of check and release) + synchronized (reference) { + if (reference.referenceCount() > 1) { + reference.release(); + hasOldReference = false; + } + this.chunkReference = reference.getValueRetain( + FetchRequest.DECOMPRESS_IF_IN_MEMORY); + } + } else { + this.chunkReference = reference.getValueRetain( + FetchRequest.DECOMPRESS_IF_IN_MEMORY); + } + if (this.chunkReference == null) { + this.chunkReference = reference.getValueRetain(FetchRequest.ORIGINAL); + } + } else { + // always send compressed to remote node + this.chunkReference = reference.getValueRetain(FetchRequest.COMPRESS); + } this.chunk = this.chunkReference.getBuffer(); - } else { + if (hasOldReference) reference.release(); + } else if (this.chunk == ClientSharedData.NULL_BUFFER) { this.chunk = reference.getBufferRetain(); } } @@ -804,7 +843,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, BlobChunk struct) t public void write(org.apache.thrift.protocol.TProtocol oprot, BlobChunk struct) throws org.apache.thrift.TException { struct.validate(); - final ByteBuffer chunk = struct.getCompressedBuffer(oprot); + final ByteBuffer chunk = struct.getBufferForWrite(oprot); try { writeData(oprot, struct, chunk); } finally { @@ -856,7 +895,7 @@ private static class BlobChunkTupleScheme extends TupleScheme { @Override public void write(org.apache.thrift.protocol.TProtocol prot, BlobChunk struct) throws org.apache.thrift.TException { - final ByteBuffer buffer = struct.getCompressedBuffer(prot); + final ByteBuffer buffer = struct.getBufferForWrite(prot); try { writeData(prot, struct, buffer != null ? buffer : ClientSharedData.NULL_BUFFER); diff --git a/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl b/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl index d7973ff2f..e3accdd46 100644 --- a/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl +++ b/gemfirexd/shared/src/main/java/io/snappydata/thrift/common/BlobChunk.java.tmpl @@ -31,6 +31,7 @@ import javax.annotation.Generated; import com.gemstone.gemfire.internal.shared.ByteBufferReference; import com.gemstone.gemfire.internal.shared.ClientSharedData; +import com.gemstone.gemfire.internal.shared.FetchRequest; import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder; import io.snappydata.thrift.common.SocketTimeout; import io.snappydata.thrift.common.TProtocolDirectBinary; @@ -88,6 +89,19 @@ public class BlobChunk implements org.apache.thrift.TBase 1) { + reference.release(); + hasOldReference = false; + } + this.chunkReference = reference.getValueRetain( + FetchRequest.DECOMPRESS_IF_IN_MEMORY); + } + } else { + this.chunkReference = reference.getValueRetain( + FetchRequest.DECOMPRESS_IF_IN_MEMORY); + } + if (this.chunkReference == null) { + this.chunkReference = reference.getValueRetain(FetchRequest.ORIGINAL); + } + } else { + // always send compressed to remote node + this.chunkReference = reference.getValueRetain(FetchRequest.COMPRESS); + } this.chunk = this.chunkReference.getBuffer(); - } else { + if (hasOldReference) reference.release(); + } else if (this.chunk == ClientSharedData.NULL_BUFFER) { this.chunk = reference.getBufferRetain(); } } @@ -804,7 +843,7 @@ public class BlobChunk implements org.apache.thrift.TBase Date: Fri, 9 Mar 2018 04:37:22 +0530 Subject: [PATCH 04/14] minor comment correction --- .../gemfire/internal/cache/AbstractOplogDiskRegionEntry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java index 01f6837d7..c9ebca316 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java @@ -58,7 +58,7 @@ protected AbstractOplogDiskRegionEntry(RegionEntryContext context, Object value) @Override protected final void initDiskIdForOffHeap(RegionEntryContext context, Object value) { - // copy DiskId to value if required + // copy self to value if required if (value instanceof SerializedDiskBuffer) { ((SerializedDiskBuffer)value).setDiskEntry(this, context); } From bafd47a3e37c7525867281afde44ac7af61072e6 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Fri, 9 Mar 2018 18:49:28 +0530 Subject: [PATCH 05/14] fixed couple of occasional test failures --- .../com/gemstone/gemfire/internal/cache/TXStateProxy.java | 5 +++++ .../gemfire/internal/snappy/CallbackFactoryProvider.java | 2 +- .../com/gemstone/gemfire/internal/snappy/StoreCallbacks.java | 2 +- .../gemfirexd/recovery/PersistenceRecoveryOrderDUnit.java | 2 ++ 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java index 6c1a64b2d..faab091b0 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java @@ -3345,6 +3345,11 @@ final void checkTXState() throws TransactionException { if (state == State.OPEN || state == State.COMMIT_PHASE2_STARTED) { return; } + if (state == State.ROLLBACK_STARTED) { + throw new IllegalTransactionStateException(LocalizedStrings + .TransactionManagerImpl_TRANSACTIONMANAGERIMPL_COMMIT_TRANSACTION_ROLLED_BACK_BECAUSE_A_USER_MARKED_IT_FOR_ROLLBACK + .toLocalizedString()); + } if (state == State.CLOSED) { throw new IllegalTransactionStateException(LocalizedStrings .TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java index 295ec6522..22c53128c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/CallbackFactoryProvider.java @@ -83,7 +83,7 @@ public String columnBatchTableName(String tableName) { @Override public CloseableIterator columnTableScan( - String columnTable, int[] projection, byte[] serializedBatchFilters, + String columnTable, int[] projection, byte[] serializedFilters, Set bucketIds) throws SQLException { throw new UnsupportedOperationException("unexpected invocation for " + toString()); diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java index 19236b980..5a562b7d0 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/snappy/StoreCallbacks.java @@ -56,7 +56,7 @@ Set createColumnBatch(BucketRegion region, long batchID, * will have reference count incremented, so caller should decrement once done. */ CloseableIterator columnTableScan(String qualifiedTable, - int[] projection, byte[] serializedBatchFilters, + int[] projection, byte[] serializedFilters, Set bucketIds) throws SQLException; void registerRelationDestroyForHiveStore(); diff --git a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/recovery/PersistenceRecoveryOrderDUnit.java b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/recovery/PersistenceRecoveryOrderDUnit.java index 084302f49..44255d01b 100644 --- a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/recovery/PersistenceRecoveryOrderDUnit.java +++ b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/recovery/PersistenceRecoveryOrderDUnit.java @@ -242,6 +242,7 @@ public void testWaitForLatestMember1() throws Exception { VM server1 = this.serverVMs.get(0); VM server2 = this.serverVMs.get(1); + st1.execute("DROP TABLE IF EXISTS T1"); st1.execute("CREATE TABLE T1 (COL1 int, COL2 int) partition by column (COL1) persistent redundancy 1 buckets 1"); st1.execute("INSERT INTO T1 values(1,1)"); @@ -280,6 +281,7 @@ public void testWaitForLatestMember2() throws Exception { VM server1 = this.serverVMs.get(0); VM server2 = this.serverVMs.get(1); + st1.execute("DROP TABLE IF EXISTS T1"); st1.execute("CREATE TABLE T1 (COL1 int, COL2 int) partition by column (COL1) persistent redundancy 1 buckets 1"); st1.execute("INSERT INTO T1 values(1,1)"); From e161a823c0464dede913fcc6b9720b45c23a508a Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 10 Mar 2018 18:18:27 +0530 Subject: [PATCH 06/14] check for table read permission for COLUMN_TABLE_SCAN procedure in case of a direct query on the table, the security check is required to not allow users access to it without appropriate authorization --- .../ddl/catalog/GfxdSystemProcedures.java | 48 +++++++++++++++---- .../execute/AbstractGemFireActivation.java | 2 +- .../internal/iapi/sql/conn/Authorizer.java | 5 +- .../impl/sql/conn/GenericAuthorizer.java | 12 ++--- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java index 5da26b63c..9f499b3a4 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java @@ -90,17 +90,13 @@ import com.pivotal.gemfirexd.internal.iapi.error.StandardException; import com.pivotal.gemfirexd.internal.iapi.jdbc.AuthenticationService; import com.pivotal.gemfirexd.internal.iapi.reference.Property; +import com.pivotal.gemfirexd.internal.iapi.services.io.FormatableBitSet; import com.pivotal.gemfirexd.internal.iapi.services.property.PropertyUtil; import com.pivotal.gemfirexd.internal.iapi.sql.ResultColumnDescriptor; +import com.pivotal.gemfirexd.internal.iapi.sql.conn.Authorizer; import com.pivotal.gemfirexd.internal.iapi.sql.conn.ConnectionUtil; import com.pivotal.gemfirexd.internal.iapi.sql.conn.LanguageConnectionContext; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.AliasDescriptor; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.ConglomerateDescriptor; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.DataDictionary; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.ReferencedKeyConstraintDescriptor; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.SchemaDescriptor; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.StatementRoutinePermission; -import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.TableDescriptor; +import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.*; import com.pivotal.gemfirexd.internal.iapi.store.access.TransactionController; import com.pivotal.gemfirexd.internal.iapi.types.DataValueDescriptor; import com.pivotal.gemfirexd.internal.iapi.types.HarmonySerialBlob; @@ -3029,10 +3025,44 @@ public static void COLUMN_TABLE_SCAN(String columnTable, String projection, // split the projection into column indexes (1-based) final TIntArrayList columns = new TIntArrayList(4); SharedUtils.splitCSV(projection, projectionAgg, columns, null); + + // check authorization for given columns of the table + LanguageConnectionContext lcc = ConnectionUtil.getCurrentLCC(); + if (lcc.usesSqlAuthorization()) { + final int numColumns = columns.size(); + ArrayList permissions = new ArrayList<>( + numColumns + 1); + String rowBufferTable = GemFireContainer.getRowBufferTableName(columnTable); + GemFireContainer rowContainer = (GemFireContainer)Misc.getRegionForTable( + rowBufferTable, true).getUserAttribute(); + TableDescriptor td = rowContainer.getTableDescriptor(); + if (td == null) { + throw PublicAPI.wrapStandardException(StandardException.newException( + SQLState.LANG_TABLE_NOT_FOUND, rowBufferTable)); + } + permissions.add(new StatementTablePermission(td.getUUID(), + Authorizer.SELECT_PRIV)); + if (numColumns > 0) { + FormatableBitSet bitSet = new FormatableBitSet(td.getNumberOfColumns()); + for (int i = 0; i < numColumns; i++) { + int col = columns.getQuick(i); + ColumnDescriptor cd = td.getColumnDescriptor(col); + if (cd == null) { + throw PublicAPI.wrapStandardException(StandardException.newException( + SQLState.LANG_COLUMN_NOT_FOUND, rowBufferTable + '.' + col)); + } + bitSet.set(col - 1); + } + permissions.add(new StatementColumnPermission(td.getUUID(), + Authorizer.SELECT_PRIV, bitSet)); + } + lcc.getAuthorizer().authorize(lcc.getLastActivation(), null, permissions, + Authorizer.SQL_SELECT_OP); + } + byte[] batchFilters = filters != null ? filters.getBytes(1, (int)filters.length()) : null; - Set bucketIds = ConnectionUtil.getCurrentLCC() - .getBucketIdsForLocalExecution(); + Set bucketIds = lcc.getBucketIdsForLocalExecution(); final CloseableIterator iter = CallbackFactoryProvider.getStoreCallbacks().columnTableScan( columnTable, columns.toNativeArray(), batchFilters, bucketIds); diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/sql/execute/AbstractGemFireActivation.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/sql/execute/AbstractGemFireActivation.java index cba41f9c7..3fb1d4b52 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/sql/execute/AbstractGemFireActivation.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/sql/execute/AbstractGemFireActivation.java @@ -89,7 +89,7 @@ public void setupActivation(final ExecPreparedStatement ps, this.row = new ExecRow[2]; } // check authorization - lcc.getAuthorizer().authorize(this, ps, Authorizer.SQL_SKIP_OP + (this.qInfo + lcc.getAuthorizer().authorize(this, ps, null, Authorizer.SQL_SKIP_OP + (this.qInfo .isSelect() ? Authorizer.SQL_SELECT_OP : Authorizer.SQL_WRITE_OP)); final int paramCnt = this.qInfo.getParameterCount(); diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/iapi/sql/conn/Authorizer.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/iapi/sql/conn/Authorizer.java index 69394008a..5afabb2c7 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/iapi/sql/conn/Authorizer.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/iapi/sql/conn/Authorizer.java @@ -40,8 +40,11 @@ Licensed to the Apache Software Foundation (ASF) under one or more package com.pivotal.gemfirexd.internal.iapi.sql.conn; +import java.util.List; + import com.pivotal.gemfirexd.internal.iapi.error.StandardException; import com.pivotal.gemfirexd.internal.iapi.sql.Activation; +import com.pivotal.gemfirexd.internal.iapi.sql.dictionary.StatementPermission; import com.pivotal.gemfirexd.internal.iapi.sql.execute.ExecPreparedStatement; /** The Authorizer verifies a connected user has the authorization @@ -133,7 +136,7 @@ public void authorize(Activation activation, int operation) // GemStone changes BEGIN public void authorize(Activation activation, ExecPreparedStatement eps, - int operation) throws StandardException; + List perms, int operation) throws StandardException; // GemStone changes END /** Get the Authorization ID for this Authorizer. diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/conn/GenericAuthorizer.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/conn/GenericAuthorizer.java index b45dbfb2b..565f23116 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/conn/GenericAuthorizer.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/impl/sql/conn/GenericAuthorizer.java @@ -116,7 +116,7 @@ private boolean connectionMustRemainReadOnly() public void authorize( int operation) throws StandardException { // GemStone changes BEGIN - authorize(null, null, operation); + authorize(null, null, null, operation); /* (original code) authorize( (Activation) null, operation); */ @@ -131,12 +131,12 @@ public void authorize( int operation) throws StandardException public final void authorize(final Activation activation, final int operation) throws StandardException { authorize(activation, activation != null ? activation - .getPreparedStatement() : null, operation); + .getPreparedStatement() : null, null, operation); } public final void authorize(final Activation activation, - ExecPreparedStatement ps, final int operation) - throws StandardException + ExecPreparedStatement ps, List perms, + final int operation) throws StandardException /* (original code) public void authorize( Activation activation, int operation) throws StandardException */ @@ -225,7 +225,7 @@ public void authorize( Activation activation, int operation) throws StandardExce SanityManager.THROWASSERT("Bad operation code "+operation); } // GemStone changes BEGIN - if (activation != null && (ps != null + if (activation != null && (ps != null || perms != null || (ps = activation.getPreparedStatement()) != null)) { /* (original code) if( activation != null) @@ -237,7 +237,7 @@ public void authorize( Activation activation, int operation) throws StandardExce try { // check if ps is uptodate activation.checkStatementValidity(); - List requiredPermissionsList = ps.getRequiredPermissionsList(); + List requiredPermissionsList = perms != null ? perms : ps.getRequiredPermissionsList(); /*[originally] List requiredPermissionsList = activation.getPreparedStatement().getRequiredPermissionsList(); DataDictionary dd = lcc.getDataDictionary(); From 49049d24ae6c0a8ce2ddf1ffd1d66f9b0052e2f9 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 10 Mar 2018 20:12:14 +0530 Subject: [PATCH 07/14] minor changes --- .../engine/ddl/catalog/GfxdSystemProcedures.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java index 9f499b3a4..373abc6bc 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/GfxdSystemProcedures.java @@ -3060,8 +3060,11 @@ public static void COLUMN_TABLE_SCAN(String columnTable, String projection, Authorizer.SQL_SELECT_OP); } - byte[] batchFilters = filters != null - ? filters.getBytes(1, (int)filters.length()) : null; + byte[] batchFilters = null; + if (filters != null) { + batchFilters = filters.getBytes(1, (int)filters.length()); + filters.free(); + } Set bucketIds = lcc.getBucketIdsForLocalExecution(); final CloseableIterator iter = CallbackFactoryProvider.getStoreCallbacks().columnTableScan( @@ -3084,7 +3087,7 @@ public boolean getNext(DataValueDescriptor[] template) // mark chunk as having a reference set from outside (columnTableScan) if (!blob.getCurrentChunk().initChunkFromReference()) { throw StandardException.newException(SQLState.DATA_UNEXPECTED_EXCEPTION, - new IllegalStateException("failed to increment reference count")); + new IllegalStateException("failed to initialize chunk with buffer")); } template[3].setValue(blob); return true; From e429a455faa01ef4f21e86a3544d1cda0fcb00ab Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Mon, 12 Mar 2018 03:09:38 +0530 Subject: [PATCH 08/14] cleaned up methods to determine column table also skip the check for delta == ifOld for column tables --- .../gemfire/internal/cache/BucketRegion.java | 7 ++++++- .../gemfire/internal/cache/LocalRegion.java | 3 ++- .../internal/cache/PartitionedRegion.java | 17 ----------------- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java index 13a7ddb1b..1e50c16b6 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java @@ -3373,10 +3373,15 @@ public boolean areSecondariesPingable() { } + @Override + public boolean isInternalColumnTable() { + return getPartitionedRegion().isInternalColumnTable(); + } + @Override public boolean isSnapshotEnabledRegion() { // concurrency checks is by default true in column table - return getPartitionedRegion().columnTable() || + return getPartitionedRegion().isInternalColumnTable() || getPartitionedRegion().needsBatching() || super.isSnapshotEnabledRegion(); } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index 5a7617b07..94391ba1c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -2309,7 +2309,7 @@ public final Object validatedPut(EntryEventImpl event, long startPut) // Rahul: this has to be an update. // so executing it as an update. - boolean forceUpdateForDelta = event.hasDelta(); + boolean forceUpdateForDelta = event.hasDelta() && !isInternalColumnTable(); // Gfxd Changes end. if (basicPut(event, false, // ifNew forceUpdateForDelta, // ifOld @@ -14613,6 +14613,7 @@ private void memTrace(String mesage) { } } + @Override public boolean isInternalColumnTable() { return isInternalColumnTable; } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 4c04b1ba6..696ababb8 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -2523,7 +2523,6 @@ else if (txProxy != null || !this.concurrencyChecksEnabled) { private volatile Boolean columnBatching; - private volatile Boolean columnStoreTable; public boolean needsBatching() { final Boolean columnBatching = this.columnBatching; if (columnBatching != null) { @@ -2545,22 +2544,6 @@ public boolean needsBatching() { } } - public boolean columnTable() { - final Boolean columnTable = this.columnStoreTable; - if (columnTable != null) { - return columnTable; - } - // Find all the child region and see if they anyone of them has name ending - // with _SHADOW_ - if (this.getName().toUpperCase().endsWith(StoreCallbacks.SHADOW_TABLE_SUFFIX)) { - this.columnStoreTable = true; - return true; - } else { - this.columnStoreTable = false; - } - return false; - } - private void handleSendOrWaitException(Exception ex, DistributedPutAllOperation putallO, PutAllPartialResult partialKeys, PutAllPRMessage prMsg, TXStateProxy txProxy) { From 246d96d263cddedc1bf46635ea813bcb1d1f36cc Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Mon, 19 Mar 2018 02:33:22 +0530 Subject: [PATCH 09/14] made RawStoreResultSet more generic - allow for passing an iterator of raw values - allow for optionally passing container in which case RowFormatter will be determined for each row dynamically using the schemas in container - implement ResultSetWithNull interface for RawStoreResultSet --- .../wan/messages/GfxdCBArgForSynchPrms.java | 6 +- .../engine/store/RawStoreResultSet.java | 74 +++++++++++++++---- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/wan/messages/GfxdCBArgForSynchPrms.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/wan/messages/GfxdCBArgForSynchPrms.java index 0499e403b..f050c765d 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/wan/messages/GfxdCBArgForSynchPrms.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/wan/messages/GfxdCBArgForSynchPrms.java @@ -505,8 +505,8 @@ public List getNewRow() throws UnsupportedOperationException { @Override public ResultSet getNewRowsAsResultSet() { if (this.isBulkInsert) { - return new RawStoreResultSet(this.bulkInsertRows, getExtraTableInfo() - .getRowFormatter()); + return new RawStoreResultSet(this.bulkInsertRows.iterator(), null, + getExtraTableInfo().getRowFormatter()); } else if (this.params != null) { return new DVDStoreResultSet(this.params, this.params.length, null, null, @@ -530,7 +530,7 @@ public ResultSet getPrimaryKeysAsResultSet() final ExtraTableInfo tableInfo = getExtraTableInfo(); final int[] pkCols = tableInfo.getPrimaryKeyColumns(); if (pkCols != null) { - return new RawStoreResultSet(this.bulkInsertRows, + return new RawStoreResultSet(this.bulkInsertRows.iterator(), tableInfo.getRowFormatter(), pkCols, tableInfo .getPrimaryKeyFormatter().getMetaData()); } diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/RawStoreResultSet.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/RawStoreResultSet.java index f344c2ba8..058c5eb81 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/RawStoreResultSet.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/RawStoreResultSet.java @@ -22,18 +22,10 @@ import java.io.StringReader; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; -import java.sql.Blob; -import java.sql.Clob; -import java.sql.Date; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; +import java.sql.*; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.Iterator; -import java.util.List; import com.gemstone.gemfire.internal.shared.ClientSharedData; import com.pivotal.gemfirexd.callbacks.TableMetaData; @@ -43,6 +35,9 @@ import com.pivotal.gemfirexd.internal.impl.jdbc.ReaderToAscii; import com.pivotal.gemfirexd.internal.impl.jdbc.TransactionResourceImpl; import com.pivotal.gemfirexd.internal.impl.jdbc.Util; +import io.snappydata.ResultSetWithNull; + +import static com.pivotal.gemfirexd.internal.engine.store.RowFormatter.OFFSET_AND_WIDTH_IS_NULL; /** * Encapsulates a set of one or more rows in raw underlying storage format that @@ -52,7 +47,7 @@ * @since 7.0 */ public final class RawStoreResultSet extends NonUpdatableRowsResultSet - implements ResultSet, ResultWasNull { + implements ResultSet, ResultWasNull, ResultSetWithNull { private byte[] currentRowBytes; @@ -62,7 +57,9 @@ public final class RawStoreResultSet extends NonUpdatableRowsResultSet private final int numColumns; - private final RowFormatter formatter; + private final GemFireContainer container; + + private RowFormatter formatter; private int[] changedColumns; @@ -74,6 +71,7 @@ public final class RawStoreResultSet extends NonUpdatableRowsResultSet public RawStoreResultSet(final byte[] row, final RowFormatter rf) { this.currentRowBytes = row; + this.container = null; this.formatter = rf; this.numColumns = rf.getNumColumns(); } @@ -81,6 +79,7 @@ public RawStoreResultSet(final byte[] row, final RowFormatter rf) { public RawStoreResultSet(final byte[] row, final RowFormatter rf, final int[] changedColumns, final TableMetaData metadata) { this.currentRowBytes = row; + this.container = null; this.formatter = rf; this.metadata = metadata; this.changedColumns = changedColumns; @@ -89,6 +88,7 @@ public RawStoreResultSet(final byte[] row, final RowFormatter rf, public RawStoreResultSet(final byte[][] row, final RowFormatter rf) { this.currentRowByteArrays = row; + this.container = null; this.formatter = rf; this.numColumns = rf.getNumColumns(); } @@ -96,21 +96,25 @@ public RawStoreResultSet(final byte[][] row, final RowFormatter rf) { public RawStoreResultSet(final byte[][] row, final RowFormatter rf, final int[] changedColumns, final TableMetaData metadata) { this.currentRowByteArrays = row; + this.container = null; this.formatter = rf; this.metadata = metadata; this.changedColumns = changedColumns; this.numColumns = changedColumns.length; } - public RawStoreResultSet(final List rows, final RowFormatter rf) { - this.rows = rows.iterator(); + public RawStoreResultSet(final Iterator rows, + final GemFireContainer container, final RowFormatter rf) { + this.rows = rows; + this.container = container; this.formatter = rf; this.numColumns = rf.getNumColumns(); } - public RawStoreResultSet(final List rows, final RowFormatter rf, + public RawStoreResultSet(final Iterator rows, final RowFormatter rf, final int[] changedColumns, final TableMetaData metadata) { - this.rows = rows.iterator(); + this.rows = rows; + this.container = null; this.formatter = rf; this.metadata = metadata; this.changedColumns = changedColumns; @@ -124,12 +128,19 @@ public RawStoreResultSet(final List rows, final RowFormatter rf, public boolean next() throws SQLException { if (this.rows != null) { if (this.rows.hasNext()) { + final GemFireContainer container = this.container; final Object next = this.rows.next(); if (next.getClass() == byte[].class) { this.currentRowBytes = (byte[])next; + if (container != null) { + this.formatter = container.getRowFormatter(currentRowBytes); + } } else { this.currentRowByteArrays = (byte[][])next; + if (container != null) { + this.formatter = container.getRowFormatter(currentRowByteArrays); + } } return true; } @@ -1090,6 +1101,39 @@ public Clob getClob(int columnIndex) throws SQLException { } } + /** + * {@inheritDoc} + */ + @Override + public boolean isNull(int columnIndex) throws SQLException { + if (columnIndex > 0 && columnIndex <= this.numColumns) { + try { + if (this.currentRowBytes != null) { + if (this.changedColumns == null) { + return this.formatter.getOffsetAndWidth(columnIndex, + this.currentRowBytes) == OFFSET_AND_WIDTH_IS_NULL; + } else { + return this.formatter.getOffsetAndWidth( + this.changedColumns[columnIndex - 1], this.currentRowBytes) == + OFFSET_AND_WIDTH_IS_NULL; + } + } else { + CompactExecRowWithLobs execRow = new CompactExecRowWithLobs( + this.currentRowByteArrays, this.formatter); + if (this.changedColumns == null) { + return execRow.isNull(columnIndex) == OFFSET_AND_WIDTH_IS_NULL; + } else { + return execRow.isNull(columnIndex) == OFFSET_AND_WIDTH_IS_NULL; + } + } + } catch (StandardException se) { + throw Util.generateCsSQLException(se); + } + } else { + throw invalidColumnException(columnIndex); + } + } + /** * {@inheritDoc} */ From 0a107bc4f17463395c95c6bbe3a291bffd8270af Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Mon, 19 Mar 2018 12:07:12 +0530 Subject: [PATCH 10/14] Small change for doing build --- .../java/com/gemstone/gemfire/internal/cache/LocalRegion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index 94391ba1c..6499e3025 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -14613,7 +14613,7 @@ private void memTrace(String mesage) { } } - @Override + // TODO VB: Check in master @Override public boolean isInternalColumnTable() { return isInternalColumnTable; } From 5d5ff6bef8e1c97c3e316873fb5bd83eb872c598 Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Thu, 10 May 2018 12:16:18 +0530 Subject: [PATCH 11/14] Added a new class for delete --- .../com/pivotal/gemfirexd/internal/engine/GfxdSerializable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdSerializable.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdSerializable.java index 97bc90486..1791a5050 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdSerializable.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/GfxdSerializable.java @@ -331,4 +331,6 @@ public interface GfxdSerializable extends GfxdDSFID { byte COLUMN_FORMAT_DELTA = 115; byte COLUMN_DELETE_DELTA = 116; + + byte COLUMN_DELETE_CHANGE = 117; } From 7ba7b1ff68d9c8a0ab6d07ce5bcd70113c0cbda7 Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Thu, 31 May 2018 12:33:11 +0530 Subject: [PATCH 12/14] Changes from Sumedh to sort column batches on minimum value --- .../gemfire/internal/cache/ExternalTableMetaData.java | 2 ++ .../ddl/resolver/GfxdPartitionByExpressionResolver.java | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java index 987cea7c8..38aa48a6c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java @@ -44,6 +44,7 @@ public ExternalTableMetaData(String entityName, this.dependents = dependents; this.dataSourcePath = dataSourcePath; this.driverClass = driverClass; + this.hasPrimaryIndex = true; // TODO: VB: read from table properties } public String entityName; @@ -61,6 +62,7 @@ public ExternalTableMetaData(String entityName, public String shortProvider; public String dataSourcePath; public String driverClass; + public boolean hasPrimaryIndex; // columns for metadata queries public List columns; diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/resolver/GfxdPartitionByExpressionResolver.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/resolver/GfxdPartitionByExpressionResolver.java index 19854aa4f..9d21b3181 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/resolver/GfxdPartitionByExpressionResolver.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/resolver/GfxdPartitionByExpressionResolver.java @@ -449,6 +449,14 @@ public String[] getColumnNames() { return this.partitionColumnNames; } + /** + * Return the column positions (1-based) of partitioning columns + * in the table schema. + */ + public int[] getColumnPositions() { + return this.columnPositionsInRow; + } + @Override public int getPartitioningColumnIndex(String partitionColumn) { Integer idx = this.columnToIndexMap.get(partitionColumn); From 57dac63077aa25d5efdbafabef0636c23160585b Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Thu, 7 Jun 2018 16:37:32 +0530 Subject: [PATCH 13/14] Removing hardcoded elimination of exceptions. --- .../distributed/metadata/UpdateQueryInfo.java | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java index 8b9887761..ebdd0e518 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java @@ -16,8 +16,10 @@ */ package com.pivotal.gemfirexd.internal.engine.distributed.metadata; +import com.gemstone.gemfire.internal.cache.ExternalTableMetaData; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils; +import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer; import com.pivotal.gemfirexd.internal.iapi.error.StandardException; import com.pivotal.gemfirexd.internal.iapi.reference.SQLState; import com.pivotal.gemfirexd.internal.iapi.services.io.FormatableBitSet; @@ -158,6 +160,23 @@ private void checkUpdateFormatSupported() throws StandardException { .clear(this.queryType, IS_PRIMARY_KEY_TYPE); checkForColumnConstr = false; } + + /** + * For sorted column batches allow delta insert that is implemented as update plan + * It is not necessary to check if its update vs insert since there will be no row in row buffer + */ + boolean isSortedColumnTable = false; + LocalRegion region = tqi.getRegion(); + if (region != null) { + GemFireContainer container = (GemFireContainer)region.getUserAttribute(); + if (container.isPartitioned()) { + ExternalTableMetaData md = container.fetchHiveMetaData(false); + if (md != null) { + isSortedColumnTable = md.hasPrimaryIndex; + } + } + } + //ASIF: DISABLE FK Constraint check on query node //this.fcd = this.tableQueryInfoList.get(this.updateTargetTableNum).getForeignKeyConstraintDescriptorIfAny(); for (int i = 0; i < this.updateCols.length; ++i) { @@ -166,19 +185,19 @@ private void checkUpdateFormatSupported() throws StandardException { if (cqi.isTableInfoMissing()) { cqi.setMissingTableInfo(tqi); } -// if (cqi.isUsedInPartitioning()) { -// throw StandardException.newException(SQLState.NOT_IMPLEMENTED, -// "Update of partitioning column not supported"); -// } + if (!isSortedColumnTable && cqi.isUsedInPartitioning()) { + throw StandardException.newException(SQLState.NOT_IMPLEMENTED, + "Update of partitioning column not supported"); + } // For the time being since we are allowing only one table // the columns are necessarily part of the lone TableQueryInfo // Later the check needs to be more robust -// if (cqi.isPartOfPrimaryKey(tqi.getPrimaryKeyColumns())) { -// throw StandardException -// .newException( -// SQLState.NOT_IMPLEMENTED, -// "Update of column which is primary key or is part of the primary key, not supported"); -// } + if (!isSortedColumnTable && cqi.isPartOfPrimaryKey(tqi.getPrimaryKeyColumns())) { + throw StandardException + .newException( + SQLState.NOT_IMPLEMENTED, + "Update of column which is primary key or is part of the primary key, not supported"); + } /* No need to go through function execution for unique constraints if(checkForColumnConstr && cqi.isReferencedByUniqueConstraint()) { checkForColumnConstr = false; From 043a1008642dce04337bf9d4d1d52fa4c9a1e004 Mon Sep 17 00:00:00 2001 From: vivekwiz Date: Thu, 14 Jun 2018 11:50:14 +0530 Subject: [PATCH 14/14] Incorporating required DDL changes so column batches would only be sorted when user has given option while create table. --- .../gemfire/internal/cache/ExternalTableMetaData.java | 5 +++-- .../engine/distributed/metadata/UpdateQueryInfo.java | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java index 38aa48a6c..ece8c0439 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExternalTableMetaData.java @@ -31,6 +31,7 @@ public ExternalTableMetaData(String entityName, String dml, String[] dependents, String dataSourcePath, + String columnTableSorting, String driverClass) { this.entityName = entityName; this.schema = schema; @@ -44,7 +45,7 @@ public ExternalTableMetaData(String entityName, this.dependents = dependents; this.dataSourcePath = dataSourcePath; this.driverClass = driverClass; - this.hasPrimaryIndex = true; // TODO: VB: read from table properties + this.columnTableSortOrder = columnTableSorting; } public String entityName; @@ -62,7 +63,7 @@ public ExternalTableMetaData(String entityName, public String shortProvider; public String dataSourcePath; public String driverClass; - public boolean hasPrimaryIndex; + public String columnTableSortOrder; // columns for metadata queries public List columns; diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java index ebdd0e518..a52c1a931 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/distributed/metadata/UpdateQueryInfo.java @@ -172,7 +172,10 @@ private void checkUpdateFormatSupported() throws StandardException { if (container.isPartitioned()) { ExternalTableMetaData md = container.fetchHiveMetaData(false); if (md != null) { - isSortedColumnTable = md.hasPrimaryIndex; + isSortedColumnTable = md.columnTableSortOrder.equalsIgnoreCase("ASC") || + md.columnTableSortOrder.equalsIgnoreCase("Ascending") || + md.columnTableSortOrder.equalsIgnoreCase("DESC") || + md.columnTableSortOrder.equalsIgnoreCase("Descending"); } } }