diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 6a51172e9a73..f140783067af 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1029,6 +1029,12 @@ public enum OperationStatusCode { public static final String ROW_CACHE_ENABLED_KEY = "row.cache.enabled"; public static final boolean ROW_CACHE_ENABLED_DEFAULT = false; + /** + * Configuration key for the evict the row cache on close + */ + public static final String ROW_CACHE_EVICT_ON_CLOSE_KEY = "row.cache.evictOnClose"; + public static final boolean ROW_CACHE_EVICT_ON_CLOSE_DEFAULT = false; + /** * Configuration key for the memory size of the block cache */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 5ec39fa5803d..54505dfce955 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -547,8 +547,8 @@ public void setTimestamp(byte[] ts) throws IOException { @Override public ExtendedCell deepClone() { - // This is not used in actual flow. Throwing UnsupportedOperationException - throw new UnsupportedOperationException(); + // To garbage collect the objects referenced by this cell, we need to deep clone it + return ExtendedCell.super.deepClone(); } } @@ -796,8 +796,8 @@ public void write(ByteBuffer buf, int offset) { @Override public ExtendedCell deepClone() { - // This is not used in actual flow. Throwing UnsupportedOperationException - throw new UnsupportedOperationException(); + // To cache row, we need to deep clone it + return super.deepClone(); } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index c88a77b51407..166484fe8991 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -430,6 +430,13 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String L2_CACHE_HIT_RATIO_DESC = "L2 cache hit ratio."; String L2_CACHE_MISS_RATIO = "l2CacheMissRatio"; String L2_CACHE_MISS_RATIO_DESC = "L2 cache miss ratio."; + + String ROW_CACHE_HIT_COUNT = "rowCacheHitCount"; + String ROW_CACHE_MISS_COUNT = "rowCacheMissCount"; + String ROW_CACHE_EVICTED_ROW_COUNT = "rowCacheEvictedRowCount"; + String ROW_CACHE_SIZE = "rowCacheSize"; + String ROW_CACHE_COUNT = "rowCacheCount"; + String RS_START_TIME_NAME = "regionServerStartTime"; String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum"; String SERVER_NAME_NAME = "serverName"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index b214c8f8f4e7..90ea2a1165c8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -452,6 +452,12 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { .addCounter(Interns.info(BLOCK_CACHE_DELETE_FAMILY_BLOOM_HIT_COUNT, ""), rsWrap.getDeleteFamilyBloomHitCount()) .addCounter(Interns.info(BLOCK_CACHE_TRAILER_HIT_COUNT, ""), rsWrap.getTrailerHitCount()) + .addCounter(Interns.info(ROW_CACHE_HIT_COUNT, ""), rsWrap.getRowCacheHitCount()) + .addCounter(Interns.info(ROW_CACHE_MISS_COUNT, ""), rsWrap.getRowCacheMissCount()) + .addCounter(Interns.info(ROW_CACHE_EVICTED_ROW_COUNT, ""), + rsWrap.getRowCacheEvictedRowCount()) + .addGauge(Interns.info(ROW_CACHE_SIZE, ""), rsWrap.getRowCacheSize()) + .addGauge(Interns.info(ROW_CACHE_COUNT, ""), rsWrap.getRowCacheCount()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), rsWrap.getFlushedCellsCount()) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 5b957d9bf08f..68e43b276ee2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -635,6 +635,16 @@ public interface MetricsRegionServerWrapper { long getTrailerHitCount(); + long getRowCacheHitCount(); + + long getRowCacheMissCount(); + + long getRowCacheSize(); + + long getRowCacheCount(); + + long getRowCacheEvictedRowCount(); + long getTotalRowActionRequestCount(); long getByteBuffAllocatorHeapAllocationBytes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 60bd4cee6b73..3a5c3f34313a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY; @@ -145,6 +147,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerCall; import org.apache.hadoop.hbase.mob.MobFileCache; @@ -946,7 +949,7 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co this.isRowCacheEnabled = checkRowCacheConfig(); } - private boolean checkRowCacheConfig() { + boolean checkRowCacheConfig() { Boolean fromDescriptor = htableDescriptor.getRowCacheEnabled(); // The setting from TableDescriptor has higher priority than the global configuration return fromDescriptor != null @@ -954,6 +957,11 @@ private boolean checkRowCacheConfig() { : conf.getBoolean(HConstants.ROW_CACHE_ENABLED_KEY, HConstants.ROW_CACHE_ENABLED_DEFAULT); } + // For testing only + void setRowCache(RowCache rowCache) { + this.rowCache = rowCache; + } + private void setHTableSpecificConf() { if (this.htableDescriptor == null) { return; @@ -1963,6 +1971,8 @@ public Pair> call() throws IOException { } } + evictRowCache(); + status.setStatus("Writing region close event to WAL"); // Always write close marker to wal even for read only table. This is not a big problem as we // do not write any data into the region; it is just a meta edit in the WAL file. @@ -2003,6 +2013,22 @@ public Pair> call() throws IOException { } } + private void evictRowCache() { + boolean evictOnClose = getReadOnlyConfiguration().getBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY, + ROW_CACHE_EVICT_ON_CLOSE_DEFAULT); + + if (!evictOnClose) { + return; + } + + if (!(rsServices instanceof HRegionServer regionServer)) { + return; + } + + RowCache rowCache = regionServer.getRSRpcServices().getServer().getRowCache(); + rowCache.evictRowsByRegion(this); + } + /** Wait for all current flushes and compactions of the region to complete */ // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for // Phoenix needs. @@ -3259,8 +3285,8 @@ public RegionScannerImpl getScanner(Scan scan) throws IOException { return getScanner(scan, null); } - RegionScannerImpl getScannerWithResults(Get get, Scan scan, List results) - throws IOException { + RegionScannerImpl getScannerWithResults(Get get, Scan scan, List results, + RpcCallContext context) throws IOException { if (!rowCache.canCacheRow(get, this)) { return getScannerWithResults(scan, results); } @@ -3268,12 +3294,23 @@ RegionScannerImpl getScannerWithResults(Get get, Scan scan, List results) // Try get from row cache RowCacheKey key = new RowCacheKey(this, get.getRow()); if (rowCache.tryGetFromCache(key, get, results)) { + addReadRequestsCount(1); + if (getMetrics() != null) { + getMetrics().updateReadRequestCount(); + } + // Cache is hit, and then no scanner is created return null; } RegionScannerImpl scanner = getScannerWithResults(scan, results); - rowCache.populateCache(results, key); + + // When results came from memstore only, do not populate the row cache + boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1; + if (!readFromMemStoreOnly) { + rowCache.populateCache(this, results, key); + } + return scanner; } @@ -3435,6 +3472,15 @@ private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, b @Override public void put(Put put) throws IOException { TraceUtil.trace(() -> { + // Put with TTL is not allowed on tables with row cache enabled, because cached rows cannot + // track TTL expiration + if (isRowCacheEnabled) { + if (put.getTTL() != Long.MAX_VALUE) { + throw new DoNotRetryIOException( + "Tables with row cache enabled do not allow setting TTL on Puts"); + } + } + checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is @@ -4811,7 +4857,12 @@ public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long // checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); + if (rowCache == null) { + return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); + } + + return rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations), + () -> batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce))); } @Override @@ -4823,10 +4874,9 @@ public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { } OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException { - OperationStatus[] operationStatuses = - rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations), - () -> this.batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE)); - return TraceUtil.trace(() -> operationStatuses, () -> createRegionSpan("Region.batchMutate")); + return TraceUtil.trace( + () -> batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE), + () -> createRegionSpan("Region.batchMutate")); } /** @@ -5111,8 +5161,17 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup, long nonce) throws IOException { - CheckAndMutateResult checkAndMutateResult = rowCache.mutateWithRowCacheBarrier(this, - checkAndMutate.getRow(), () -> this.checkAndMutate(checkAndMutate, nonceGroup, nonce)); + CheckAndMutateResult checkAndMutateResult = + rowCache.mutateWithRowCacheBarrier(this, checkAndMutate.getRow(), + () -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce)); + return TraceUtil.trace(() -> checkAndMutateResult, + () -> createRegionSpan("Region.checkAndMutate")); + } + + public CheckAndMutateResult checkAndMutate(List mutations, + CheckAndMutate checkAndMutate, long nonceGroup, long nonce) throws IOException { + CheckAndMutateResult checkAndMutateResult = rowCache.mutateWithRowCacheBarrier(this, mutations, + () -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce)); return TraceUtil.trace(() -> checkAndMutateResult, () -> createRegionSpan("Region.checkAndMutate")); } @@ -5312,6 +5371,10 @@ private OperationStatus mutate(Mutation mutation, boolean atomic) throws IOExcep private OperationStatus mutate(Mutation mutation, boolean atomic, long nonceGroup, long nonce) throws IOException { + if (rowCache == null) { + return this.mutateInternal(mutation, atomic, nonceGroup, nonce); + } + return rowCache.mutateWithRowCacheBarrier(this, mutation.getRow(), () -> this.mutateInternal(mutation, atomic, nonceGroup, nonce)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index c8f7f96a033b..b4dabf7fb3bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -69,6 +69,7 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper { private BlockCache l2Cache = null; private MobFileCache mobFileCache; private CacheStats cacheStats; + private final RowCache rowCache; private CacheStats l1Stats = null; private CacheStats l2Stats = null; private volatile long numWALFiles = 0; @@ -99,6 +100,8 @@ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { this.regionServer = regionServer; initBlockCache(); initMobFileCache(); + RSRpcServices rsRpcServices = this.regionServer.getRSRpcServices(); + this.rowCache = rsRpcServices == null ? null : rsRpcServices.getServer().getRowCache(); this.excludeDatanodeManager = this.regionServer.getWalFactory().getExcludeDatanodeManager(); this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, @@ -1194,6 +1197,31 @@ public long getTrailerHitCount() { return this.cacheStats != null ? this.cacheStats.getTrailerHitCount() : 0L; } + @Override + public long getRowCacheHitCount() { + return this.rowCache != null ? this.rowCache.getHitCount() : 0L; + } + + @Override + public long getRowCacheMissCount() { + return this.rowCache != null ? this.rowCache.getMissCount() : 0L; + } + + @Override + public long getRowCacheSize() { + return this.rowCache != null ? this.rowCache.getSize() : 0L; + } + + @Override + public long getRowCacheCount() { + return this.rowCache != null ? this.rowCache.getCount() : 0L; + } + + @Override + public long getRowCacheEvictedRowCount() { + return this.rowCache != null ? this.rowCache.getEvictedRowCount() : 0L; + } + @Override public long getByteBuffAllocatorHeapAllocationBytes() { return ByteBuffAllocator.getHeapAllocationBytes(allocator, ByteBuffAllocator.HEAP); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 35371cb74ae7..7a21ab8a5504 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -668,7 +668,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List{@code RowCache} coordinates cache access for Get operations and - * enforces cache consistency during mutations. It delegates actual - * storage and eviction policy decisions (e.g., LRU, LFU) to a - * {@link RowCacheStrategy} implementation.

- * - *

This class is responsible for: + *

+ * {@code RowCache} coordinates cache access for Get operations and enforces cache consistency + * during mutations. It delegates actual storage and eviction policy decisions (e.g., LRU, LFU) to a + * {@link RowCacheStrategy} implementation. + *

+ *

+ * This class is responsible for: *

    - *
  • Determining whether row cache is enabled for a region
  • - *
  • Attempting cache lookups before falling back to the normal read path
  • - *
  • Populating the cache after successful reads
  • - *
  • Evicting affected rows on mutations to maintain correctness
  • + *
  • Determining whether row cache is enabled for a region
  • + *
  • Attempting cache lookups before falling back to the normal read path
  • + *
  • Populating the cache after successful reads
  • + *
  • Evicting affected rows on mutations to maintain correctness
  • *
- * - *

{@code RowCache} does not implement caching policy or storage directly; - * those concerns are encapsulated by {@code RowCacheStrategy}.

+ *

+ * {@code RowCache} does not implement caching policy or storage directly; those concerns are + * encapsulated by {@code RowCacheStrategy}. + *

*/ @org.apache.yetus.audience.InterfaceAudience.Private public class RowCache { + /** + * A barrier that prevents the row cache from being populated during region operations, such as + * bulk loads. It is implemented as a counter to address issues that arise when the same region is + * updated concurrently. + */ + private final Map regionLevelBarrierMap = new ConcurrentHashMap<>(); + /** + * A barrier that prevents the row cache from being populated during row mutations. It is + * implemented as a counter to address issues that arise when the same row is mutated + * concurrently. + */ + private final Map rowLevelBarrierMap = new ConcurrentHashMap<>(); + private final boolean enabledByConf; private final RowCacheStrategy rowCacheStrategy; @@ -63,8 +85,8 @@ R execute(RowOperation operation) throws IOException { RowCache(Configuration conf) { enabledByConf = conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT) > 0; - // TODO: implement row cache - rowCacheStrategy = null; + // Currently we only support TinyLfu implementation + rowCacheStrategy = new TinyLfuRowCacheStrategy(MemorySizeUtil.getRowCacheSize(conf)); } R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation operation) @@ -74,9 +96,39 @@ R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation oper } RowCacheKey key = new RowCacheKey(region, row); - // TODO: implement mutate with row cache barrier logic - evictRow(key); - return execute(operation); + try { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRow(key); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + removeRowLevelBarrier(key); + } + } + + /** + * Remove the barrier after mutation to allow the row cache to be populated again + * @param key the cache key of the row + */ + void removeRowLevelBarrier(RowCacheKey key) { + rowLevelBarrierMap.computeIfPresent(key, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + /** + * Creates a barrier to prevent the row cache from being populated for this row during mutation + * @param key the cache key of the row + */ + void createRowLevelBarrier(RowCacheKey key) { + rowLevelBarrierMap.computeIfAbsent(key, k -> new AtomicInteger(0)).incrementAndGet(); } R mutateWithRowCacheBarrier(HRegion region, List mutations, @@ -85,21 +137,88 @@ R mutateWithRowCacheBarrier(HRegion region, List mutations, return operation.execute(); } - // TODO: implement mutate with row cache barrier logic Set rowCacheKeys = new HashSet<>(mutations.size()); - mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, mutation.getRow()))); - rowCacheKeys.forEach(this::evictRow); + try { + // Evict the entire row cache + mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, mutation.getRow()))); + rowCacheKeys.forEach(key -> { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); - return execute(operation); + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRow(key); + }); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + rowCacheKeys.forEach(this::removeRowLevelBarrier); + } } void evictRow(RowCacheKey key) { rowCacheStrategy.evictRow(key); } + void evictRowsByRegion(HRegion region) { + rowCacheStrategy.evictRowsByRegion(region); + } + + // @formatter:off + /** + * Row cache is only enabled when the following conditions are met: + * - Row cache is enabled at the table level. + * - Cache blocks is enabled in the get request. + * - A Get object cannot be distinguished from others except by its row key. + * So we check equality for the following: + * - filter + * - retrieving cells + * - TTL + * - attributes + * - CheckExistenceOnly + * - ColumnFamilyTimeRange + * - Consistency + * - MaxResultsPerColumnFamily + * - ReplicaId + * - RowOffsetPerColumnFamily + * @param get the Get request + * @param region the Region + * @return true if the row can be cached, false otherwise + */ + // @formatter:on boolean canCacheRow(Get get, Region region) { - // TODO: implement logic to determine if the row can be cached - return false; + return enabledByConf && region.isRowCacheEnabled() && get.getCacheBlocks() + && get.getFilter() == null && isRetrieveAllCells(get, region) && isDefaultTtl(region) + && get.getAttributesMap().isEmpty() && !get.isCheckExistenceOnly() + && get.getColumnFamilyTimeRange().isEmpty() && get.getConsistency() == Consistency.STRONG + && get.getMaxResultsPerColumnFamily() == -1 && get.getReplicaId() == -1 + && get.getRowOffsetPerColumnFamily() == 0 && get.getTimeRange().isAllTime(); + } + + private static boolean isRetrieveAllCells(Get get, Region region) { + if (region.getTableDescriptor().getColumnFamilyCount() != get.numFamilies()) { + return false; + } + + boolean hasQualifier = get.getFamilyMap().values().stream().anyMatch(Objects::nonNull); + return !hasQualifier; + } + + private static boolean isDefaultTtl(Region region) { + return Arrays.stream(region.getTableDescriptor().getColumnFamilies()) + .allMatch(cfd -> cfd.getTimeToLive() == ColumnFamilyDescriptorBuilder.DEFAULT_TTL); + } + + // For testing only + public RowCells getRow(RowCacheKey key) { + return getRow(key, true); + } + + // For testing only + RowCells getRow(RowCacheKey key, boolean caching) { + return rowCacheStrategy.getRow(key, caching); } boolean tryGetFromCache(RowCacheKey key, Get get, List results) { @@ -110,16 +229,67 @@ boolean tryGetFromCache(RowCacheKey key, Get get, List results) { } results.addAll(row.getCells()); - // TODO: implement update of metrics return true; } - void populateCache(List results, RowCacheKey key) { - // TODO: implement with barrier to avoid cache read during mutation - try { - rowCacheStrategy.cacheRow(key, new RowCells(results)); - } catch (CloneNotSupportedException ignored) { - // Not able to cache row cells, ignore - } + void populateCache(HRegion region, List results, RowCacheKey key) { + // The row cache is populated only when no region level barriers remain + regionLevelBarrierMap.computeIfAbsent(region, t -> { + // The row cache is populated only when no row level barriers remain + rowLevelBarrierMap.computeIfAbsent(key, k -> { + try { + rowCacheStrategy.cacheRow(key, new RowCells(results)); + } catch (CloneNotSupportedException ignored) { + // Not able to cache row cells, ignore + } + return null; + }); + return null; + }); + } + + void createRegionLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfAbsent(region, k -> new AtomicInteger(0)).incrementAndGet(); + } + + void increaseRowCacheSeqNum(HRegion region) { + region.increaseRowCacheSeqNum(); + } + + void removeTableLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfPresent(region, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + long getHitCount() { + return rowCacheStrategy.getHitCount(); + } + + long getMissCount() { + return rowCacheStrategy.getMissCount(); + } + + long getSize() { + return rowCacheStrategy.getSize(); + } + + long getCount() { + return rowCacheStrategy.getCount(); + } + + long getEvictedRowCount() { + return rowCacheStrategy.getEvictedRowCount(); + } + + // For testing only + AtomicInteger getRowLevelBarrier(RowCacheKey key) { + return rowLevelBarrierMap.get(key); + } + + // For testing only + AtomicInteger getRegionLevelBarrier(HRegion region) { + return regionLevelBarrierMap.get(region); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java index 2f44058e0a24..af0a0ea4c537 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java @@ -39,8 +39,7 @@ public RowCells(List cells) throws CloneNotSupportedException { // To garbage collect the objects referenced by the cells this.cells.add(extCell.deepClone()); } catch (RuntimeException e) { - // throw new CloneNotSupportedException("Deep clone failed"); - this.cells.add(extCell); + throw new CloneNotSupportedException("Deep clone failed"); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java new file mode 100644 index 000000000000..e141bd3cbb2b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.LongAdder; +import org.checkerframework.checker.nullness.qual.NonNull; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class TinyLfuRowCacheStrategy implements RowCacheStrategy { + private final class EvictionListener + implements RemovalListener<@NonNull RowCacheKey, @NonNull RowCells> { + @Override + public void onRemoval(RowCacheKey key, RowCells value, @NonNull RemovalCause cause) { + evictedRowCount.increment(); + } + } + + private final Cache<@NonNull RowCacheKey, RowCells> cache; + + // Cache.stats() does not provide eviction count for entries, so we maintain our own counter. + private final LongAdder evictedRowCount = new LongAdder(); + + TinyLfuRowCacheStrategy(long maxSizeBytes) { + if (maxSizeBytes <= 0) { + cache = Caffeine.newBuilder().maximumSize(0).build(); + return; + } + + cache = + Caffeine.newBuilder().maximumWeight(maxSizeBytes).removalListener(new EvictionListener()) + .weigher((RowCacheKey key, + RowCells value) -> (int) Math.min(key.heapSize() + value.heapSize(), Integer.MAX_VALUE)) + .recordStats().build(); + } + + @Override + public void cacheRow(RowCacheKey key, RowCells value) { + cache.put(key, value); + } + + @Override + public void evictRow(RowCacheKey key) { + cache.asMap().remove(key); + } + + @Override + public void evictRowsByRegion(HRegion region) { + cache.asMap().keySet().removeIf(key -> key.isSameRegion(region)); + } + + @Override + public long getCount() { + return cache.estimatedSize(); + } + + @Override + public long getEvictedRowCount() { + return evictedRowCount.sum(); + } + + @Override + public long getHitCount() { + return cache.stats().hitCount(); + } + + @Override + public long getMaxSize() { + Optional result = cache.policy().eviction().map(Policy.Eviction::getMaximum); + return result.orElse(-1L); + } + + @Override + public long getMissCount() { + return cache.stats().missCount(); + } + + @Override + public RowCells getRow(RowCacheKey key, boolean caching) { + if (!caching) { + return null; + } + + return cache.getIfPresent(key); + } + + @Override + public long getSize() { + Optional result = cache.policy().eviction().map(Policy.Eviction::weightedSize); + return result.orElse(OptionalLong.of(-1L)).orElse(-1L); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index f1b6efe50a99..6b677f2d1223 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -662,6 +662,31 @@ public long getTrailerHitCount() { return 0; } + @Override + public long getRowCacheHitCount() { + return 2; + } + + @Override + public long getRowCacheMissCount() { + return 1; + } + + @Override + public long getRowCacheEvictedRowCount() { + return 0; + } + + @Override + public long getRowCacheSize() { + return 1; + } + + @Override + public long getRowCacheCount() { + return 2; + } + @Override public int getSplitQueueSize() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index aac2a5922b9b..76c2a8ad6e42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -148,6 +148,11 @@ public void testWrapperSource() { HELPER.assertGauge("l2CacheHitRatio", 90, serverSource); HELPER.assertGauge("l2CacheMissRatio", 10, serverSource); HELPER.assertCounter("updatesBlockedTime", 419, serverSource); + HELPER.assertCounter("rowCacheHitCount", 2, serverSource); + HELPER.assertCounter("rowCacheMissCount", 1, serverSource); + HELPER.assertCounter("rowCacheEvictedRowCount", 0, serverSource); + HELPER.assertGauge("rowCacheSize", 1, serverSource); + HELPER.assertGauge("rowCacheCount", 2, serverSource); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java new file mode 100644 index 000000000000..c4ca0d70faff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_EVICTED_ROW_COUNT; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_HIT_COUNT; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_MISS_COUNT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRowCache { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCache.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] CF1 = Bytes.toBytes("cf1"); + private static final byte[] CF2 = Bytes.toBytes("cf2"); + private static final byte[] Q1 = Bytes.toBytes("q1"); + private static final byte[] Q2 = Bytes.toBytes("q2"); + + private static MetricsAssertHelper metricsHelper; + private static MetricsRegionServer regionServerMetrics; + private static MetricsRegionServerSource serverSource; + + private static Admin admin; + private static RowCache rowCache; + + private TableName tableName; + private Table table; + HRegion region; + private final Map counterBase = new HashMap<>(); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache but reduce the block cache size to fit in 80% of the heap + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(); + cluster.waitForActiveAndReadyMaster(); + admin = TEST_UTIL.getAdmin(); + + metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class); + HRegionServer regionServer = cluster.getRegionServer(0); + regionServerMetrics = regionServer.getMetrics(); + serverSource = regionServerMetrics.getMetricsSource(); + + rowCache = regionServer.getRSRpcServices().getServer().getRowCache(); + } + + @AfterClass + public static void afterClass() throws Exception { + HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void beforeTestMethod() throws Exception { + ColumnFamilyDescriptor cf1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + // To test data block encoding + ColumnFamilyDescriptor cf2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF).build(); + + tableName = TableName.valueOf(testName.getMethodName()); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true) + .setColumnFamily(cf1).setColumnFamily(cf2).build(); + admin.createTable(td); + table = admin.getConnection().getTable(tableName); + region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream() + .filter(r -> r.getRegionInfo().getTable().equals(tableName)).findFirst().orElseThrow(); + } + + @After + public void afterTestMethod() throws Exception { + counterBase.clear(); + + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + private void setCounterBase(String metric, long value) { + counterBase.put(metric, value); + } + + private void assertCounterDiff(String metric, long diff) { + Long base = counterBase.get(metric); + if (base == null) { + throw new IllegalStateException( + "base counter of " + metric + " metric should have been set before by setCounterBase()"); + } + long newValue = base + diff; + metricsHelper.assertCounter(metric, newValue, serverSource); + counterBase.put(metric, newValue); + } + + private static void recomputeMetrics() { + regionServerMetrics.getRegionServerWrapper().forceRecompute(); + } + + @Test + public void testGetWithRowCache() throws IOException { + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Result result; + + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Initialize metrics + recomputeMetrics(); + setCounterBase("Get_num_ops", metricsHelper.getCounter("Get_num_ops", serverSource)); + setCounterBase(ROW_CACHE_HIT_COUNT, + metricsHelper.getCounter(ROW_CACHE_HIT_COUNT, serverSource)); + setCounterBase(ROW_CACHE_MISS_COUNT, + metricsHelper.getCounter(ROW_CACHE_MISS_COUNT, serverSource)); + setCounterBase(ROW_CACHE_EVICTED_ROW_COUNT, + metricsHelper.getCounter(ROW_CACHE_EVICTED_ROW_COUNT, serverSource)); + + // Put a row + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + put.addColumn(CF1, Q2, "12".getBytes()); + put.addColumn(CF2, Q1, "21".getBytes()); + put.addColumn(CF2, Q2, "22".getBytes()); + table.put(put); + admin.flush(tableName); + recomputeMetrics(); + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // First get to populate the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1)); + assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2)); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from HFile without row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 1); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Get from the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1)); + assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2)); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Row cache is invalidated by the put operation + assertNotNull(rowCache.getRow(rowCacheKey)); + table.put(put); + recomputeMetrics(); + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 1); + + // Get is executed without the row cache; however, the cache is re-populated as a result + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation not from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 1); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Get again with the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Row cache is invalidated by the increment operation + assertNotNull(rowCache.getRow(rowCacheKey)); + table.incrementColumnValue(rowKey, CF1, Q1, 1); + assertNull(rowCache.getRow(rowCacheKey)); + + // Get is executed without the row cache; however, the cache is re-populated as a result + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey)); + + // Row cache is invalidated by the append operation + assertNotNull(rowCache.getRow(rowCacheKey)); + Append append = new Append(rowKey); + append.addColumn(CF1, Q1, Bytes.toBytes(0L)); + table.append(append); + assertNull(rowCache.getRow(rowCacheKey)); + + // Get is executed without the row cache; however, the cache is re-populated as a result + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey)); + + // Row cache is invalidated by the delete operation + assertNotNull(rowCache.getRow(rowCacheKey)); + Delete delete = new Delete(rowKey); + delete.addColumn(CF1, Q1); + table.delete(delete); + assertNull(rowCache.getRow(rowCacheKey)); + } + + @Test(expected = DoNotRetryIOException.class) + public void testPutWithTTL() throws IOException { + // Put with TTL is not allowed on tables with row cache enabled, because cached rows cannot + // track TTL expiration + Put put = new Put("row".getBytes()); + put.addColumn(CF1, Q1, "11".getBytes()); + put.setTTL(1); + table.put(put); + } + + @Test + public void testCheckAndMutate() throws IOException { + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Result result; + CheckAndMutate cam; + CheckAndMutateResult camResult; + + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Put a row + Put put1 = new Put(rowKey); + put1.addColumn(CF1, Q1, "11".getBytes()); + put1.addColumn(CF1, Q2, "12".getBytes()); + table.put(put1); + admin.flush(tableName); + + // Validate that the row cache is populated + result = table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey)); + assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + + // The row cache is not invalidated when a checkAndMutate operation fails + Put put2 = new Put(rowKey); + put2.addColumn(CF1, Q2, "1212".getBytes()); + cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, "00".getBytes()).build(put2); + camResult = table.checkAndMutate(cam); + assertFalse(camResult.isSuccess()); + assertNull(rowCache.getRow(rowCacheKey)); + + // Validate that the row cache is populated + result = table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey)); + assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + + // The row cache is invalidated by a checkAndMutate operation + cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, "12".getBytes()).build(put2); + camResult = table.checkAndMutate(cam); + assertTrue(camResult.isSuccess()); + assertNull(rowCache.getRow(rowCacheKey)); + } + + @Test + public void testCheckAndMutates() throws IOException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Result result1, result2; + List cams; + List camResults; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + + // Put rows + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + table.put(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + table.put(put2); + admin.flush(tableName); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey2)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + + // The row caches are invalidated by checkAndMutate operations + cams = new ArrayList<>(); + cams.add(CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q2, "112".getBytes()).build(put1)); + cams.add(CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q2, "212".getBytes()).build(put2)); + camResults = table.checkAndMutate(cams); + assertTrue(camResults.get(0).isSuccess()); + assertTrue(camResults.get(1).isSuccess()); + assertNull(rowCache.getRow(rowCacheKey1)); + assertNull(rowCache.getRow(rowCacheKey2)); + } + + @Test + public void testRowMutations() throws IOException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Result result1, result2; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + + // Put rows + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + table.put(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + table.put(put2); + admin.flush(tableName); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + + // The row caches are invalidated by batch operation + Put put12 = new Put(rowKey1); + put12.addColumn(CF1, Q1, "111111".getBytes()); + Put put13 = new Put(rowKey1); + put13.addColumn(CF1, Q2, "112112".getBytes()); + RowMutations rms = new RowMutations(rowKey1); + rms.add(put12); + rms.add(put13); + CheckAndMutate cam = + CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q1, "111".getBytes()).build(rms); + table.checkAndMutate(cam); + assertNull(rowCache.getRow(rowCacheKey1)); + assertNotNull(rowCache.getRow(rowCacheKey2)); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("111111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + } + + @Test + public void testBatch() throws IOException, InterruptedException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + byte[] rowKey3 = "row3".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Get get3 = new Get(rowKey3); + List batchOperations; + Object[] results; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + RowCacheKey rowCacheKey3 = new RowCacheKey(region, rowKey3); + + // Put rows + batchOperations = new ArrayList<>(); + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + batchOperations.add(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + batchOperations.add(put2); + Put put3 = new Put(rowKey3); + put3.addColumn(CF1, Q1, "311".getBytes()); + put3.addColumn(CF1, Q2, "312".getBytes()); + batchOperations.add(put3); + results = new Result[batchOperations.size()]; + table.batch(batchOperations, results); + admin.flush(tableName); + + // Validate that the row caches are populated + batchOperations = new ArrayList<>(); + batchOperations.add(get1); + batchOperations.add(get2); + batchOperations.add(get3); + results = new Object[batchOperations.size()]; + table.batch(batchOperations, results); + assertEquals(3, results.length); + assertNotNull(rowCache.getRow(rowCacheKey1)); + assertArrayEquals("111".getBytes(), ((Result) results[0]).getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), ((Result) results[0]).getValue(CF1, Q2)); + assertNotNull(rowCache.getRow(rowCacheKey2)); + assertArrayEquals("211".getBytes(), ((Result) results[1]).getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), ((Result) results[1]).getValue(CF1, Q2)); + assertNotNull(rowCache.getRow(rowCacheKey3)); + assertArrayEquals("311".getBytes(), ((Result) results[2]).getValue(CF1, Q1)); + assertArrayEquals("312".getBytes(), ((Result) results[2]).getValue(CF1, Q2)); + + // The row caches are invalidated by batch operation + batchOperations = new ArrayList<>(); + batchOperations.add(put1); + Put put2New = new Put(rowKey2); + put2New.addColumn(CF1, Q1, "211211".getBytes()); + put2New.addColumn(CF1, Q2, "212".getBytes()); + CheckAndMutate cam = + CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q1, "211".getBytes()).build(put2New); + batchOperations.add(cam); + results = new Object[batchOperations.size()]; + table.batch(batchOperations, results); + assertEquals(2, results.length); + assertNull(rowCache.getRow(rowCacheKey1)); + assertNull(rowCache.getRow(rowCacheKey2)); + assertNotNull(rowCache.getRow(rowCacheKey3)); + } + + @Test + public void testGetFromMemstoreOnly() throws IOException, InterruptedException { + byte[] rowKey = "row".getBytes(); + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Put a row into memstore only, not flushed to HFile yet + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + table.put(put); + + // Get from memstore only + Get get = new Get(rowKey); + table.get(get); + + // Validate that the row cache is not populated + assertNull(rowCache.getRow(rowCacheKey)); + + // Flush memstore to HFile, then get again + admin.flush(tableName); + get = new Get(rowKey); + table.get(get); + + // Validate that the row cache is populated now + assertNotNull(rowCache.getRow(rowCacheKey)); + + // Put another qualifier. And now the cells are in both memstore and HFile. + put = new Put(rowKey); + put.addColumn(CF1, Q2, Bytes.toBytes(0L)); + table.put(put); + + // Validate that the row cache is invalidated + assertNull(rowCache.getRow(rowCacheKey)); + + // Get from memstore and HFile + get = new Get(rowKey); + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java new file mode 100644 index 000000000000..ea3ed188b758 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheCanCacheRow { + private static final byte[] CF1 = "cf1".getBytes(); + private static final byte[] CF2 = "cf2".getBytes(); + private static final byte[] ROW_KEY = "row".getBytes(); + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheCanCacheRow.class); + + @Test + public void testRowCacheEnabledByTable() { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td; + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd) + .build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + + RowCache rowCache = new RowCache(conf); + Assert.assertTrue(rowCache.canCacheRow(get, region)); + + // Disable row cache, expect false + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd) + .setRowCacheEnabled(false).build(); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertFalse(rowCache.canCacheRow(get, region)); + } + + @Test + public void testRowCacheDisabledByConfig() { + Region region = Mockito.mock(Region.class); + Configuration conf = HBaseConfiguration.create(); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td; + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + + // Row cache enabled at table level, but disabled by row cache size 0, expect false + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd) + .build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + + RowCache rowCache = new RowCache(conf); + Assert.assertFalse(rowCache.canCacheRow(get, region)); + } + + @Test + public void testRetrieveAllCells() { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + ColumnFamilyDescriptor cfd2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2).build(); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCache rowCache = new RowCache(conf); + + // Not all CFs, expect false + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + Assert.assertFalse(rowCache.canCacheRow(get, region)); + + // All CFs, expect true + get.addFamily(CF2); + Assert.assertTrue(rowCache.canCacheRow(get, region)); + + // Not all qualifiers, expect false + get.addColumn(CF1, "q1".getBytes()); + Assert.assertFalse(rowCache.canCacheRow(get, region)); + } + + @Test + public void testTtl() { + ColumnFamilyDescriptor cfd1; + ColumnFamilyDescriptor cfd2; + TableDescriptor td; + Region region = Mockito.mock(Region.class); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCache rowCache = new RowCache(conf); + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + get.addFamily(CF2); + + // Ttl is set, expect false + cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).setTimeToLive(1).build(); + cfd2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2).build(); + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertFalse(rowCache.canCacheRow(get, region)); + + // Ttl is not set, expect true + cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertTrue(rowCache.canCacheRow(get, region)); + } + + @Test + public void testFilter() { + testWith( + get -> get.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW_KEY)))); + } + + @Test + public void testCacheBlock() { + testWith(get -> get.setCacheBlocks(false)); + } + + @Test + public void testAttribute() { + testWith(get -> get.setAttribute("test", "value".getBytes())); + } + + @Test + public void testCheckExistenceOnly() { + testWith(get -> get.setCheckExistenceOnly(true)); + } + + @Test + public void testColumnFamilyTimeRange() { + testWith(get -> get.setColumnFamilyTimeRange(CF1, 1000, 2000)); + } + + @Test + public void testConsistency() { + testWith(get -> get.setConsistency(Consistency.TIMELINE)); + } + + @Test + public void testAuthorizations() { + testWith(get -> get.setAuthorizations(new Authorizations("foo"))); + } + + @Test + public void testId() { + testWith(get -> get.setId("test")); + } + + @Test + public void testIsolationLevel() { + testWith(get -> get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED)); + } + + @Test + public void testMaxResultsPerColumnFamily() { + testWith(get -> get.setMaxResultsPerColumnFamily(2)); + } + + @Test + public void testReplicaId() { + testWith(get -> get.setReplicaId(1)); + } + + @Test + public void testRowOffsetPerColumnFamily() { + testWith(get -> get.setRowOffsetPerColumnFamily(1)); + } + + @Test + public void testTimeRange() { + testWith(get -> { + try { + return get.setTimeRange(1, 2); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testTimestamp() { + testWith(get -> get.setTimestamp(1)); + } + + private static void testWith(Function func) { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCache rowCache = new RowCache(conf); + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + Assert.assertTrue(rowCache.canCacheRow(get, region)); + + // noinspection unused + var unused = func.apply(get); + + // expect false + Assert.assertFalse(rowCache.canCacheRow(get, region)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java new file mode 100644 index 000000000000..02bba6fddf88 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheConfiguration { + private static final byte[] CF1 = "cf1".getBytes(); + private static final TableName TABLE_NAME = TableName.valueOf("table"); + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + @Test + public void testDetermineRowCacheEnabled() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + + HRegion region; + + // Set global config to false + conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, false); + + region = createRegion(null); + assertFalse(region.checkRowCacheConfig()); + + region = createRegion(false); + assertFalse(region.checkRowCacheConfig()); + + region = createRegion(true); + assertTrue(region.checkRowCacheConfig()); + + // Set global config to true + conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, true); + + region = createRegion(null); + assertTrue(region.checkRowCacheConfig()); + + region = createRegion(false); + assertFalse(region.checkRowCacheConfig()); + + region = createRegion(true); + assertTrue(region.checkRowCacheConfig()); + } + + private HRegion createRegion(Boolean rowCacheEnabled) throws IOException { + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd); + if (rowCacheEnabled != null) { + tdb.setRowCacheEnabled(rowCacheEnabled); + } + return TEST_UTIL.createLocalHRegion(tdb.build(), "".getBytes(), "1".getBytes()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java new file mode 100644 index 000000000000..4b3a1419f93a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category({ RegionServerTests.class, MediumTests.class }) +@RunWith(Parameterized.class) +public class TestRowCacheEvictOnClose { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheEvictOnClose.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] CF1 = Bytes.toBytes("cf1"); + private static final byte[] Q1 = Bytes.toBytes("q1"); + private static final byte[] Q2 = Bytes.toBytes("q2"); + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameter + public boolean evictOnClose; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + @Test + public void testEvictOnClose() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + // Set ROW_CACHE_EVICT_ON_CLOSE + conf.setBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY, evictOnClose); + + // Start cluster + SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(); + cluster.waitForActiveAndReadyMaster(); + Admin admin = TEST_UTIL.getAdmin(); + + RowCache rowCache = cluster.getRegionServer(0).getRSRpcServices().getServer().getRowCache(); + + // Create table with row cache enabled + ColumnFamilyDescriptor cf1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableName tableName = TableName.valueOf(testName.getMethodName().replaceAll("[\\[\\]]", "_")); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true) + .setColumnFamily(cf1).build(); + admin.createTable(td); + Table table = admin.getConnection().getTable(tableName); + + int numRows = 10; + + // Put rows + for (int i = 0; i < numRows; i++) { + byte[] rowKey = ("row" + i).getBytes(); + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + put.addColumn(CF1, Q2, "12".getBytes()); + table.put(put); + } + // Need to flush because the row cache is not populated when reading only from the memstore. + admin.flush(tableName); + + // Populate row caches + for (int i = 0; i < numRows; i++) { + byte[] rowKey = ("row" + i).getBytes(); + Get get = new Get(rowKey); + Result result = table.get(get); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + } + + // Verify row cache has some entries + assertEquals(numRows, rowCache.getCount()); + + // Disable table + admin.disableTable(tableName); + + // Verify row cache is cleared on table close + assertEquals(evictOnClose ? 0 : numRows, rowCache.getCount()); + + admin.deleteTable(tableName); + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java new file mode 100644 index 000000000000..a8c59dc6ccbc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRowCacheHRegion { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheHRegion.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + public static final byte[] CF = Bytes.toBytes("cf1"); + + @Rule + public TestName currentTest = new TestName(); + + @BeforeClass + public static void setupCluster() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testOpenHRegion() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + WALFactory walFactory = new WALFactory(conf, + ServerName.valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()) + .toString()); + WAL wal = walFactory.getWAL(null); + Path hbaseRootDir = CommonFSUtils.getRootDir(conf); + TableName tableName = TableName.valueOf(currentTest.getMethodName()); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(); + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + HRegion region = HRegion.openHRegion(conf, FileSystem.get(conf), hbaseRootDir, hri, htd, wal, + regionServer, null); + + // Verify that rowCacheSeqNum is initialized correctly + assertNotEquals(HConstants.NO_SEQNUM, region.getRowCacheSeqNum()); + assertEquals(region.getOpenSeqNum(), region.getRowCacheSeqNum()); + + region.close(); + walFactory.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java new file mode 100644 index 000000000000..dafbfbdf6f8a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestRowCacheWithBucketCacheAndDataBlockEncoding { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheWithBucketCacheAndDataBlockEncoding.class); + + @Parameterized.Parameter + public static boolean uesBucketCache; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + @Rule + public TestName name = new TestName(); + + private static final byte[] ROW_KEY = Bytes.toBytes("checkRow"); + private static final byte[] CF = Bytes.toBytes("CF"); + private static final byte[] QUALIFIER = Bytes.toBytes("cq"); + private static final byte[] VALUE = Bytes.toBytes("checkValue"); + private static HBaseTestingUtil testingUtil; + private static Admin admin = null; + private static RowCache rowCache; + + @Before + public void beforeClass() throws Exception { + testingUtil = new HBaseTestingUtil(); + Configuration conf = testingUtil.getConfiguration(); + + // Use bucket cache + if (uesBucketCache) { + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1); + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); + } + + // Use row cache + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + testingUtil.startMiniCluster(); + admin = testingUtil.getAdmin(); + + rowCache = testingUtil.getHBaseCluster().getRegionServer(0).getRowCache(); + } + + @After + public void afterClass() throws Exception { + testingUtil.shutdownMiniCluster(); + } + + @Test + public void testRowCacheNoEncode() throws Exception { + testRowCache(name.getMethodName(), DataBlockEncoding.NONE); + } + + @Test + public void testRowCacheEncode() throws Exception { + testRowCache(name.getMethodName(), DataBlockEncoding.FAST_DIFF); + } + + private void testRowCache(String methodName, DataBlockEncoding dbe) throws Exception { + TableName tableName = TableName.valueOf(methodName.replaceAll("[\\[\\]]", "_")); + try (Table testTable = createTable(tableName, dbe)) { + Put put = new Put(ROW_KEY); + put.addColumn(CF, QUALIFIER, VALUE); + testTable.put(put); + admin.flush(testTable.getName()); + + long countBase = rowCache.getCount(); + long hitCountBase = rowCache.getHitCount(); + + Result result; + + // First get should not hit the row cache, and populate it + Get get = new Get(ROW_KEY); + result = testTable.get(get); + assertArrayEquals(ROW_KEY, result.getRow()); + assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER)); + assertEquals(1, rowCache.getCount() - countBase); + assertEquals(0, rowCache.getHitCount() - hitCountBase); + + // Second get should hit the row cache + result = testTable.get(get); + assertArrayEquals(ROW_KEY, result.getRow()); + assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER)); + assertEquals(1, rowCache.getCount() - countBase); + assertEquals(1, rowCache.getHitCount() - hitCountBase); + } + } + + private Table createTable(TableName tableName, DataBlockEncoding dbe) throws IOException { + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100) + .setDataBlockEncoding(dbe).build()) + .setRowCacheEnabled(true).build(); + return testingUtil.createTable(td, null); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java new file mode 100644 index 000000000000..bfb8530d9f64 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheWithMock { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheWithMock.class); + + @Test + public void testBarrier() throws IOException { + // Mocking dependencies to create rowCache instance + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + TableName tableName = TableName.valueOf("table1"); + Mockito.when(regionInfo.getTable()).thenReturn(tableName); + + List stores = new ArrayList<>(); + HStore hStore = Mockito.mock(HStore.class); + Mockito.when(hStore.getStorefilesCount()).thenReturn(2); + stores.add(hStore); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build(); + TableDescriptor td = Mockito.mock(TableDescriptor.class); + Mockito.when(td.getColumnFamilies()).thenReturn(new ColumnFamilyDescriptor[] { cfd }); + + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Scan scan = new Scan(get); + List results = new ArrayList<>(); + + RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class); + + RpcCallContext context = Mockito.mock(RpcCallContext.class); + Mockito.when(context.getBlockBytesScanned()).thenReturn(1L); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + + RowCache rowCache = new RowCache(conf); + + HRegion region = Mockito.mock(HRegion.class); + Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any()); + region.setRowCache(rowCache); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.getStores()).thenReturn(stores); + Mockito.when(region.getScanner(scan)).thenReturn(regionScanner); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + Mockito.when(region.getScannerWithResults(get, scan, results, context)).thenCallRealMethod(); + + RowCacheKey key = new RowCacheKey(region, rowKey); + results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1")); + + // Verify that row cache populated before creating a row level barrier + region.getScannerWithResults(get, scan, results, context); + assertNotNull(rowCache.getRow(key)); + assertNull(rowCache.getRowLevelBarrier(key)); + + // Evict the row cache + rowCache.evictRow(key); + assertNull(rowCache.getRow(key)); + + // Create a row level barrier for the row key + rowCache.createRowLevelBarrier(key); + assertEquals(1, rowCache.getRowLevelBarrier(key).get()); + + // Verify that no row cache populated after creating a row level barrier + region.getScannerWithResults(get, scan, results, context); + assertNull(rowCache.getRow(key)); + + // Remove the row level barrier + rowCache.removeRowLevelBarrier(key); + assertNull(rowCache.getRowLevelBarrier(key)); + + // Verify that row cache populated before creating a table level barrier + region.getScannerWithResults(get, scan, results, context); + assertNotNull(rowCache.getRow(key)); + assertNull(rowCache.getRegionLevelBarrier(region)); + + // Evict the row cache + rowCache.evictRow(key); + assertNull(rowCache.getRow(key)); + + // Create a table level barrier for the row key + rowCache.createRegionLevelBarrier(region); + assertEquals(1, rowCache.getRegionLevelBarrier(region).get()); + + // Verify that no row cache populated after creating a table level barrier + region.getScannerWithResults(get, scan, results, context); + assertNull(rowCache.getRow(key)); + + // Remove the table level barrier + rowCache.removeTableLevelBarrier(region); + assertNull(rowCache.getRegionLevelBarrier(region)); + } + + @Test + public void testMutate() throws IOException, ServiceException { + // Mocking RowCache and its dependencies + TableDescriptor tableDescriptor = Mockito.mock(TableDescriptor.class); + + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + + RowCache rowCache = Mockito.mock(RowCache.class); + + RegionServerServices rss = Mockito.mock(RegionServerServices.class); + Mockito.when(rss.getRowCache()).thenReturn(rowCache); + + HRegion region = Mockito.mock(HRegion.class); + Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any()); + region.setRowCache(rowCache); + Mockito.when(region.getTableDescriptor()).thenReturn(tableDescriptor); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getBlockCache()).thenReturn(Mockito.mock(BlockCache.class)); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + Mockito.when(region.getRegionServerServices()).thenReturn(rss); + + RSRpcServices rsRpcServices = Mockito.mock(RSRpcServices.class); + Mockito.when(rsRpcServices.getRegion(Mockito.any())).thenReturn(region); + + RpcController rpcController = Mockito.mock(RpcController.class); + + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder("row".getBytes()) + .ifEquals("CF".getBytes(), "q1".getBytes(), "v1".getBytes()).build(new Put("row".getBytes())); + + Put put1 = new Put("row1".getBytes()); + put1.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes()); + Put put2 = new Put("row1".getBytes()); + put2.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes()); + List mutations = new ArrayList<>(); + mutations.add(put1); + mutations.add(put2); + + Delete del = new Delete("row1".getBytes()); + Append append = new Append("row1".getBytes()); + append.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes()); + Increment increment = new Increment("row1".getBytes()); + increment.addColumn("CF".getBytes(), "q1".getBytes(), 1L); + + Mutation[] mutationArray = new Mutation[mutations.size()]; + mutations.toArray(mutationArray); + + // rowCache.mutateWithRowCacheBarrier must run real code so internal calls are recorded + Mockito.doCallRealMethod().when(rowCache).mutateWithRowCacheBarrier(Mockito.any(HRegion.class), + Mockito.any(byte[].class), Mockito.any()); + Mockito.doCallRealMethod().when(rowCache).mutateWithRowCacheBarrier(Mockito.any(HRegion.class), + Mockito.anyList(), Mockito.any()); + + InOrder inOrder; + + // Put + Mockito.doAnswer(invocation -> { + Put arg = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null); + return null; + }).when(region).put(put1); + Mockito.clearInvocations(rowCache); + inOrder = Mockito.inOrder(rowCache); + region.put(put1); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Delete + Mockito.doAnswer(invocation -> { + Delete arg = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null); + return null; + }).when(region).delete(del); + inOrder = Mockito.inOrder(rowCache); + Mockito.clearInvocations(rowCache); + region.delete(del); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Append + Mockito.doAnswer(invocation -> { + Append arg = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null); + return null; + }).when(region).append(append); + inOrder = Mockito.inOrder(rowCache); + Mockito.clearInvocations(rowCache); + region.append(append); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Increment + Mockito.doAnswer(invocation -> { + Increment arg = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null); + return null; + }).when(region).increment(increment); + inOrder = Mockito.inOrder(rowCache); + Mockito.clearInvocations(rowCache); + region.increment(increment); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // CheckAndMutate + Mockito.doAnswer(invocation -> { + CheckAndMutate c = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, c.getRow(), () -> null); + return null; + }).when(region).checkAndMutate(Mockito.any(CheckAndMutate.class), Mockito.anyLong(), + Mockito.anyLong()); + Mockito.clearInvocations(rowCache); + inOrder = Mockito.inOrder(rowCache); + region.checkAndMutate(checkAndMutate, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // RowMutations + Mockito.doAnswer(invocation -> { + List muts = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, muts, () -> null); + return null; + }).when(region).checkAndMutate(Mockito.anyList(), Mockito.any(CheckAndMutate.class), + Mockito.anyLong(), Mockito.anyLong()); + Mockito.clearInvocations(rowCache); + inOrder = Mockito.inOrder(rowCache); + region.checkAndMutate(mutations, checkAndMutate, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Batch + Mockito.doAnswer(invocation -> { + Mutation[] muts = invocation.getArgument(0); + rowCache.mutateWithRowCacheBarrier(region, Arrays.asList(muts), () -> null); + return null; + }).when(region).batchMutate(Mockito.any(Mutation[].class), Mockito.anyBoolean(), + Mockito.anyLong(), Mockito.anyLong()); + Mockito.clearInvocations(rowCache); + inOrder = Mockito.inOrder(rowCache); + region.batchMutate(mutationArray, true, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Bulkload + HBaseProtos.RegionSpecifier regionSpecifier = HBaseProtos.RegionSpecifier.newBuilder() + .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME) + .setValue(ByteString.copyFrom("region".getBytes())).build(); + ClientProtos.BulkLoadHFileRequest bulkLoadRequest = + ClientProtos.BulkLoadHFileRequest.newBuilder().setRegion(regionSpecifier).build(); + Mockito.doCallRealMethod().when(rsRpcServices).bulkLoadHFile(rpcController, bulkLoadRequest); + Mockito.clearInvocations(rowCache); + inOrder = Mockito.inOrder(rowCache); + rsRpcServices.bulkLoadHFile(rpcController, bulkLoadRequest); + // Verify the sequence of method calls + inOrder.verify(rowCache, Mockito.times(1)).createRegionLevelBarrier(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).increaseRowCacheSeqNum(Mockito.any()); + inOrder.verify(rowCache, Mockito.times(1)).removeTableLevelBarrier(Mockito.any()); + } + + @Test + public void testCaching() throws IOException { + // Mocking dependencies to create RowCache instance + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + TableName tableName = TableName.valueOf("table1"); + Mockito.when(regionInfo.getTable()).thenReturn(tableName); + + List stores = new ArrayList<>(); + HStore hStore = Mockito.mock(HStore.class); + Mockito.when(hStore.getStorefilesCount()).thenReturn(2); + stores.add(hStore); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build(); + TableDescriptor td = Mockito.mock(TableDescriptor.class); + Mockito.when(td.getColumnFamilies()).thenReturn(new ColumnFamilyDescriptor[] { cfd }); + + RpcCallContext context = Mockito.mock(RpcCallContext.class); + Mockito.when(context.getBlockBytesScanned()).thenReturn(1L); + + byte[] rowKey = "row".getBytes(); + RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class); + + Get get = new Get(rowKey); + Scan scan = new Scan(get); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + RowCache rowCache = new RowCache(conf); + + HRegion region = Mockito.mock(HRegion.class); + Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any()); + region.setRowCache(rowCache); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.getStores()).thenReturn(stores); + Mockito.when(region.getScanner(scan)).thenReturn(regionScanner); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + Mockito.when(region.getScannerWithResults(Mockito.any(Get.class), Mockito.any(Scan.class), + Mockito.anyList(), Mockito.any())).thenCallRealMethod(); + + RowCacheKey key = new RowCacheKey(region, rowKey); + List results = new ArrayList<>(); + results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1")); + + // Verify that row cache populated with caching=false + // This should be called first not to populate the row cache + get.setCacheBlocks(false); + region.getScannerWithResults(get, scan, results, context); + assertNull(rowCache.getRow(key)); + assertNull(rowCache.getRow(key)); + + // Verify that row cache populated with caching=true + get.setCacheBlocks(true); + region.getScannerWithResults(get, scan, results, context); + assertNotNull(rowCache.getRow(key, true)); + assertNull(rowCache.getRow(key, false)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java new file mode 100644 index 000000000000..c5a62935e5e6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowCache; +import org.apache.hadoop.hbase.regionserver.RowCacheKey; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestRowCacheBulkLoadHFiles { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheBulkLoadHFiles.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static Admin admin; + + final static int NUM_CFS = 2; + final static byte[] QUAL = Bytes.toBytes("qual"); + final static int ROWCOUNT = 10; + + private TableName tableName; + private Table table; + private HRegion[] regions; + + @Rule + public TestName testName = new TestName(); + + static String family(int i) { + return String.format("family_%04d", i); + } + + public static void buildHFiles(FileSystem fs, Path dir) throws IOException { + byte[] val = "value".getBytes(); + for (int i = 0; i < NUM_CFS; i++) { + Path testIn = new Path(dir, family(i)); + + TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); + } + } + + private TableDescriptor createTableDesc(TableName name) { + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(name).setRowCacheEnabled(true); + IntStream.range(0, NUM_CFS).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) + .forEachOrdered(builder::setColumnFamily); + return builder.build(); + } + + private Path buildBulkFiles(TableName table) throws Exception { + Path dir = TEST_UTIL.getDataTestDirOnTestFS(table.getNameAsString()); + Path bulk1 = new Path(dir, table.getNameAsString()); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + buildHFiles(fs, bulk1); + return bulk1; + } + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache but reduce the block cache size to fit in 80% of the heap + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + TEST_UTIL.startMiniCluster(1); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception { + tableName = TableName.valueOf(testName.getMethodName()); + // Split the table into 2 regions + byte[][] splitKeys = new byte[][] { TestHRegionServerBulkLoad.rowkey(ROWCOUNT) }; + admin.createTable(createTableDesc(tableName), splitKeys); + table = TEST_UTIL.getConnection().getTable(tableName); + // Sorted by region name + regions = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream() + .filter(r -> r.getRegionInfo().getTable().equals(tableName)) + .sorted(Comparator.comparing(r -> r.getRegionInfo().getRegionNameAsString())) + .toArray(HRegion[]::new); + } + + @After + public void after() throws Exception { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } + + @Test + public void testRowCache() throws Exception { + RowCache rowCache = + TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices().getServer().getRowCache(); + + // The region to be bulk-loaded + byte[] rowKeyRegion0 = TestHRegionServerBulkLoad.rowkey(0); + // The region not to be bulk-loaded + byte[] rowKeyRegion1 = TestHRegionServerBulkLoad.rowkey(ROWCOUNT); + + // Put a row into each region to populate the row cache + Put put0 = new Put(rowKeyRegion0); + put0.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes()); + table.put(put0); + Put put1 = new Put(rowKeyRegion1); + put1.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes()); + table.put(put1); + admin.flush(tableName); + + // Ensure each region has a row cache + Get get0 = new Get(rowKeyRegion0); + Result result0 = table.get(get0); + assertNotNull(result0); + RowCacheKey keyPrev0 = new RowCacheKey(regions[0], get0.getRow()); + assertNotNull(rowCache.getRow(keyPrev0)); + Get get1 = new Get(rowKeyRegion1); + Result result1 = table.get(get1); + assertNotNull(result1); + RowCacheKey keyPrev1 = new RowCacheKey(regions[1], get1.getRow()); + assertNotNull(rowCache.getRow(keyPrev1)); + + // Do bulkload to region0 only + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); + Path dir = buildBulkFiles(tableName); + loader.bulkLoad(tableName, dir); + + // Ensure the row cache is removed after bulkload for region0 + RowCacheKey keyCur0 = new RowCacheKey(regions[0], get0.getRow()); + assertNotEquals(keyPrev0, keyCur0); + assertNull(rowCache.getRow(keyCur0)); + // Ensure the row cache for keyPrev0 still exists, but it is not used anymore. + assertNotNull(rowCache.getRow(keyPrev0)); + + // Ensure the row cache for region1 is not affected + RowCacheKey keyCur1 = new RowCacheKey(regions[1], get1.getRow()); + assertEquals(keyPrev1, keyCur1); + assertNotNull(rowCache.getRow(keyCur1)); + } +}