Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.phoenix.index;

import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
import static org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;

import java.sql.SQLException;
Expand All @@ -32,15 +35,21 @@
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexMetaDataCacheClient {

private static final Logger LOGGER = LoggerFactory.getLogger(IndexMetaDataCacheClient.class);

private final ServerCacheClient serverCache;
private PTable cacheUsingTable;

Expand Down Expand Up @@ -120,10 +129,51 @@ public static ServerCache setMetaDataOnMutations(PhoenixConnection connection, P
txState = connection.getMutationState().encodeTransaction();
}
boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
ReadOnlyProps props = connection.getQueryServices().getProps();
if (hasIndexMetaData) {
if (
List<PTable> indexes = table.getIndexes();
boolean sendIndexMaintainers =
indexes != null && indexes.stream().anyMatch(IndexMaintainer::sendIndexMaintainer);
boolean useServerMetadata = props.getBoolean(INDEX_USE_SERVER_METADATA_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_USE_SERVER_METADATA)
&& props.getBoolean(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);
boolean serverSideImmutableIndexes =
props.getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
boolean useServerCacheRpc =
useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)
&& sendIndexMaintainers;
long updateCacheFreq = table.getUpdateCacheFrequency();
// PHOENIX-7727 Eliminate IndexMetadataCache RPCs by leveraging server PTable cache and
// retrieve IndexMaintainer objects for each active index from the PTable object.
// To optimize rpc calls, use it only when all of these conditions are met:
// 1. Use server metadata feature is enabled (enabled by default).
// 2. New index design is used (IndexRegionObserver coproc).
// 3. Table is not of type System.
// 4. Either table has mutable indexes or server side handling of immutable indexes is
// enabled.
// 5. Table's UPDATE_CACHE_FREQUENCY is not ALWAYS. This ensures IndexRegionObserver
// does not have to make additional getTable() rpc call with each batchMutate() rpc call.
// 6. Table's UPDATE_CACHE_FREQUENCY is ALWAYS but addServerCache() rpc call is needed
// due to the size of mutations. Unless expensive addServerCache() rpc call is required,
// client can attach index maintainer mutation attribute so that IndexRegionObserver
// does not have to make additional getTable() rpc call with each batchMutate() rpc call
// with small mutation size (size < phoenix.index.mutableBatchSizeThreshold value).
// If above conditions do not match and if the mutation size is greater than
// "phoenix.index.mutableBatchSizeThreshold" value, however if none of the data table
// indexes need to be sent to server (only index in state other than DISABLE,
// CREATE_DISABLE, PENDING_ACTIVE need to be sent to server), do not use expensive
// addServerCache() rpc call.
if (
useServerMetadata && table.getType() != PTableType.SYSTEM
&& (!table.isImmutableRows() || serverSideImmutableIndexes)
&& (updateCacheFreq > 0 || useServerCacheRpc)
) {
LOGGER.trace("Using server-side metadata for table {}, not sending IndexMaintainer or UUID",
table.getTableName());
uuidValue = ByteUtil.EMPTY_BYTE_ARRAY;
} else if (useServerCacheRpc) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, table);
cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
uuidValue = cache.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public interface QueryServices extends SQLCloseable {
public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows";
public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB =
"phoenix.index.mutableBatchSizeThreshold";
public static final String INDEX_USE_SERVER_METADATA_ATTRIB = "phoenix.index.useServerMetadata";
public static final String DROP_METADATA_ATTRIB = "phoenix.schema.dropMetaData";
public static final String GROUPBY_SPILLABLE_ATTRIB = "phoenix.groupby.spillable";
public static final String GROUPBY_SPILL_FILES_ATTRIB = "phoenix.groupby.spillFiles";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY;
import static org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLED;
import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE;
import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
Expand Down Expand Up @@ -202,6 +203,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1024 * 1024 * 1; // 1 Mb
public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
public static final boolean DEFAULT_INDEX_USE_SERVER_METADATA = true;
public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
// Only the first chunked batches are fetched in parallel, so this default
// should be on the relatively bigger side of things. Bigger means more
Expand Down Expand Up @@ -551,6 +553,7 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
.setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB,
DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
.setIfUnset(INDEX_USE_SERVER_METADATA_ATTRIB, DEFAULT_INDEX_USE_SERVER_METADATA)
.setIfUnset(MAX_SPOOL_TO_DISK_BYTES_ATTRIB, DEFAULT_MAX_SPOOL_TO_DISK_BYTES)
.setIfUnset(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)
.setIfUnset(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.phoenix.index;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
Expand All @@ -32,14 +35,24 @@
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixIndexMetaDataBuilder {

private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexMetaDataBuilder.class);

private final RegionCoprocessorEnvironment env;

PhoenixIndexMetaDataBuilder(RegionCoprocessorEnvironment env) {
Expand All @@ -63,6 +76,13 @@ private static IndexMetaDataCache getIndexMetaDataCache(RegionCoprocessorEnviron
if (uuid == null) {
return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
}
boolean useServerMetadata = uuid.length == 0;
if (useServerMetadata) {
IndexMetaDataCache cacheFromPTable = getIndexMetaDataCacheFromPTable(env, attributes);
if (cacheFromPTable != null) {
return cacheFromPTable;
}
}
byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
if (md == null) {
md = attributes.get(PhoenixIndexCodec.INDEX_MD);
Expand All @@ -77,28 +97,7 @@ private static IndexMetaDataCache getIndexMetaDataCache(RegionCoprocessorEnviron
: Bytes.toInt(clientVersionBytes);
final PhoenixTransactionContext txnContext =
TransactionFactory.getTransactionContext(txState, clientVersion);
return new IndexMetaDataCache() {

@Override
public void close() throws IOException {
}

@Override
public List<IndexMaintainer> getIndexMaintainers() {
return indexMaintainers;
}

@Override
public PhoenixTransactionContext getTransactionContext() {
return txnContext;
}

@Override
public int getClientVersion() {
return clientVersion;
}

};
return getIndexMetaDataCache(clientVersion, txnContext, indexMaintainers);
} else {
byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
ImmutableBytesPtr tenantId =
Expand All @@ -117,4 +116,101 @@ public int getClientVersion() {
}

}

/**
* Get IndexMetaDataCache by looking up PTable using table metadata attributes attached to the
* mutation.
* @param env RegionCoprocessorEnvironment.
* @param attributes Mutation attributes.
* @return IndexMetaDataCache or null if table metadata not found in attributes.
*/
private static IndexMetaDataCache getIndexMetaDataCacheFromPTable(
RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) {
try {
byte[] schemaBytes =
attributes.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
byte[] tableBytes =
attributes.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
if (schemaBytes == null || tableBytes == null) {
LOGGER.error("Table metadata for table name and schema name not found in mutation "
+ "attributes, falling back to GlobalCache lookup");
return null;
}
byte[] tenantIdBytes =
attributes.get(MutationState.MutationMetadataType.TENANT_ID.toString());
byte[] txState = attributes.get(BaseScannerRegionObserverConstants.TX_STATE);
byte[] clientVersionBytes = attributes.get(BaseScannerRegionObserverConstants.CLIENT_VERSION);

final int clientVersion = clientVersionBytes == null
? ScanUtil.UNKNOWN_CLIENT_VERSION
: Bytes.toInt(clientVersionBytes);
final PhoenixTransactionContext txnContext =
TransactionFactory.getTransactionContext(txState, clientVersion);

String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes);
String tenantId =
tenantIdBytes == null || tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
Properties props = new Properties();
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
try (Connection conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration())) {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable dataTable = pconn.getTable(tenantId, fullTableName);
final List<IndexMaintainer> indexMaintainers =
buildIndexMaintainersFromPTable(dataTable, pconn);
if (indexMaintainers.isEmpty()) {
LOGGER.debug("No active indexes found for table {}", fullTableName);
return null;
}
return getIndexMetaDataCache(clientVersion, txnContext, indexMaintainers);
}
} catch (Exception e) {
LOGGER.warn("Failed to get PTable from CQSI cache, falling back to GlobalCache lookup", e);
return null;
}
}

private static IndexMetaDataCache getIndexMetaDataCache(int clientVersion,
PhoenixTransactionContext txnContext, List<IndexMaintainer> indexMaintainers) {
return new IndexMetaDataCache() {
@Override
public void close() throws IOException {
}

@Override
public List<IndexMaintainer> getIndexMaintainers() {
return indexMaintainers;
}

@Override
public PhoenixTransactionContext getTransactionContext() {
return txnContext;
}

@Override
public int getClientVersion() {
return clientVersion;
}
};
}

/**
* Build List of IndexMaintainer for each active index.
* @param dataTable PTable of the data table.
* @param connection PhoenixConnection.
* @return List of IndexMaintainer objects for active indexes.
*/
private static List<IndexMaintainer> buildIndexMaintainersFromPTable(PTable dataTable,
PhoenixConnection connection) throws SQLException {
List<IndexMaintainer> indexMaintainers = new ArrayList<>();
List<PTable> indexes = dataTable.getIndexes();
for (PTable index : indexes) {
if (IndexMaintainer.sendIndexMaintainer(index)) {
IndexMaintainer maintainer = IndexMaintainer.create(dataTable, index, connection);
indexMaintainers.add(maintainer);
}
}
return indexMaintainers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls2() throws E
conn.close();

Timestamp expectedTimestamp;
ts = 1040;
ts = clock.time + 1;
clock.time = ts;
conn = DriverManager.getConnection(getUrl(), props);
stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)");
Expand All @@ -239,7 +239,7 @@ public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls2() throws E
conn.commit();
conn.close();

ts = 1050;
ts = clock.time + 1;
clock.time = ts;
conn = DriverManager.getConnection(getUrl(), props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;

import static org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
import static org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
Expand Down Expand Up @@ -75,6 +76,7 @@ private static void initCluster() throws Exception {
props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, Boolean.toString(false));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put(DISABLE_VIEW_SUBTREE_VALIDATION, "true");
props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.phoenix.end2end;

import static org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;

import java.util.Map;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.QueryServices;
Expand All @@ -43,6 +45,7 @@ private static void initCluster() throws Exception {
props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, Boolean.toString(true));
props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, Boolean.toString(true));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

Expand Down
Loading