Skip to content

Commit 94fd568

Browse files
authored
HBASE-29722 (backport portion of HBASE-27558) Coproc - possible data integrity issues for scan with heavy filters (#7470)
Signed-off-by: Istvan Toth <[email protected]> Signed-off-by: sanjeet006py <[email protected]> Signed-off-by: Aman Poonia <[email protected]> Signed-off-by: Umesh <[email protected]>
1 parent 13233bb commit 94fd568

File tree

2 files changed

+148
-2
lines changed

2 files changed

+148
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3469,12 +3469,18 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
34693469
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
34703470

34713471
if (limitReached || !moreRows) {
3472+
// With block size limit, we may exceed size limit without collecting any results.
3473+
// In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3474+
// or cursor if results were collected, for example for cell size or heap size limits.
3475+
boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
34723476
// We only want to mark a ScanResponse as a heartbeat message in the event that
34733477
// there are more values to be read server side. If there aren't more values,
34743478
// marking it as a heartbeat is wasteful because the client will need to issue
34753479
// another ScanRequest only to realize that they already have all the values
3476-
if (moreRows && timeLimitReached) {
3477-
// Heartbeat messages occur when the time limit has been reached.
3480+
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3481+
// Heartbeat messages occur when the time limit has been reached, or size limit has
3482+
// been reached before collecting any results. This can happen for heavily filtered
3483+
// scans which scan over too many blocks.
34783484
builder.setHeartbeatMessage(true);
34793485
if (rsh.needCursor) {
34803486
Cell cursorCell = scannerContext.getLastPeekedCell();
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.io.IOException;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.HBaseTestingUtility;
26+
import org.apache.hadoop.hbase.HConstants;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.Put;
29+
import org.apache.hadoop.hbase.client.RegionLocator;
30+
import org.apache.hadoop.hbase.client.ResultScanner;
31+
import org.apache.hadoop.hbase.client.Scan;
32+
import org.apache.hadoop.hbase.client.Table;
33+
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34+
import org.apache.hadoop.hbase.testclassification.LargeTests;
35+
import org.apache.hadoop.hbase.util.Bytes;
36+
import org.junit.Before;
37+
import org.junit.BeforeClass;
38+
import org.junit.ClassRule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
42+
@Category({ LargeTests.class })
43+
public class TestScannerBlockSizeLimits {
44+
45+
@ClassRule
46+
public static final HBaseClassTestRule CLASS_RULE =
47+
HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class);
48+
49+
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50+
private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits");
51+
private static final byte[] FAMILY1 = Bytes.toBytes("0");
52+
private static final byte[] FAMILY2 = Bytes.toBytes("1");
53+
54+
private static final byte[] DATA = new byte[1000];
55+
private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 };
56+
57+
private static final byte[] COLUMN1 = Bytes.toBytes(0);
58+
private static final byte[] COLUMN2 = Bytes.toBytes(1);
59+
private static final byte[] COLUMN3 = Bytes.toBytes(2);
60+
private static final byte[] COLUMN5 = Bytes.toBytes(5);
61+
62+
@BeforeClass
63+
public static void setUp() throws Exception {
64+
Configuration conf = TEST_UTIL.getConfiguration();
65+
conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200);
66+
TEST_UTIL.startMiniCluster(1);
67+
TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048);
68+
createTestData();
69+
}
70+
71+
@Before
72+
public void setupEach() throws Exception {
73+
HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
74+
for (HRegion region : regionServer.getRegions(TABLE)) {
75+
System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName());
76+
regionServer.clearRegionBlockCache(region);
77+
}
78+
}
79+
80+
private static void createTestData() throws IOException, InterruptedException {
81+
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
82+
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
83+
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName);
84+
85+
for (int i = 1; i < 10; i++) {
86+
// 5 columns per row, in 2 families
87+
// Each column value is 1000 bytes, which is enough to fill a full block with row and header.
88+
// So 5 blocks per row in FAMILY1
89+
Put put = new Put(Bytes.toBytes(i));
90+
for (int j = 0; j < 6; j++) {
91+
put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
92+
}
93+
94+
// Additional block in FAMILY2 (notably smaller than block size)
95+
put.addColumn(FAMILY2, COLUMN1, DATA);
96+
97+
region.put(put);
98+
99+
if (i % 2 == 0) {
100+
region.flush(true);
101+
}
102+
}
103+
104+
// we've created 10 storefiles at this point, 5 per family
105+
region.flush(true);
106+
107+
}
108+
109+
/**
110+
* Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in
111+
* the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2
112+
* requests.
113+
*/
114+
@Test
115+
public void testSingleBlock() throws IOException {
116+
Table table = TEST_UTIL.getConnection().getTable(TABLE);
117+
118+
ResultScanner scanner =
119+
table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2))
120+
.addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM));
121+
122+
ScanMetrics metrics = scanner.getScanMetrics();
123+
124+
scanner.next(100);
125+
126+
// we fetch 2 columns from 1 row, so about 2 blocks
127+
assertEquals(1, metrics.countOfRowsScanned.get());
128+
assertEquals(1, metrics.countOfRPCcalls.get());
129+
}
130+
131+
/**
132+
* We enable cursors and partial results to give us more granularity over counting of results, and
133+
* we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the
134+
* rpc counts.
135+
*/
136+
private Scan getBaseScan() {
137+
return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true)
138+
.setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM);
139+
}
140+
}

0 commit comments

Comments
 (0)