Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,12 @@ public enum OperationStatusCode {
public static final String ROW_CACHE_ENABLED_KEY = "row.cache.enabled";
public static final boolean ROW_CACHE_ENABLED_DEFAULT = false;

/**
* Configuration key for the evict the row cache on close
*/
public static final String ROW_CACHE_EVICT_ON_CLOSE_KEY = "row.cache.evictOnClose";
public static final boolean ROW_CACHE_EVICT_ON_CLOSE_DEFAULT = false;

/**
* Configuration key for the memory size of the block cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,8 @@ public void setTimestamp(byte[] ts) throws IOException {

@Override
public ExtendedCell deepClone() {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
// To garbage collect the objects referenced by this cell, we need to deep clone it
return ExtendedCell.super.deepClone();
}
}

Expand Down Expand Up @@ -796,8 +796,8 @@ public void write(ByteBuffer buf, int offset) {

@Override
public ExtendedCell deepClone() {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
// To cache row, we need to deep clone it
return super.deepClone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String L2_CACHE_HIT_RATIO_DESC = "L2 cache hit ratio.";
String L2_CACHE_MISS_RATIO = "l2CacheMissRatio";
String L2_CACHE_MISS_RATIO_DESC = "L2 cache miss ratio.";

String ROW_CACHE_HIT_COUNT = "rowCacheHitCount";
String ROW_CACHE_MISS_COUNT = "rowCacheMissCount";
String ROW_CACHE_EVICTED_ROW_COUNT = "rowCacheEvictedRowCount";
String ROW_CACHE_SIZE = "rowCacheSize";
String ROW_CACHE_COUNT = "rowCacheCount";

String RS_START_TIME_NAME = "regionServerStartTime";
String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum";
String SERVER_NAME_NAME = "serverName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
.addCounter(Interns.info(BLOCK_CACHE_DELETE_FAMILY_BLOOM_HIT_COUNT, ""),
rsWrap.getDeleteFamilyBloomHitCount())
.addCounter(Interns.info(BLOCK_CACHE_TRAILER_HIT_COUNT, ""), rsWrap.getTrailerHitCount())
.addCounter(Interns.info(ROW_CACHE_HIT_COUNT, ""), rsWrap.getRowCacheHitCount())
.addCounter(Interns.info(ROW_CACHE_MISS_COUNT, ""), rsWrap.getRowCacheMissCount())
.addCounter(Interns.info(ROW_CACHE_EVICTED_ROW_COUNT, ""),
rsWrap.getRowCacheEvictedRowCount())
.addGauge(Interns.info(ROW_CACHE_SIZE, ""), rsWrap.getRowCacheSize())
.addGauge(Interns.info(ROW_CACHE_COUNT, ""), rsWrap.getRowCacheCount())
.addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC),
rsWrap.getUpdatesBlockedTime())
.addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), rsWrap.getFlushedCellsCount())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,16 @@ public interface MetricsRegionServerWrapper {

long getTrailerHitCount();

long getRowCacheHitCount();

long getRowCacheMissCount();

long getRowCacheSize();

long getRowCacheCount();

long getRowCacheEvictedRowCount();

long getTotalRowActionRequestCount();

long getByteBuffAllocatorHeapAllocationBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY;
Expand Down Expand Up @@ -145,6 +147,7 @@
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.mob.MobFileCache;
Expand Down Expand Up @@ -946,14 +949,19 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
this.isRowCacheEnabled = checkRowCacheConfig();
}

private boolean checkRowCacheConfig() {
boolean checkRowCacheConfig() {
Boolean fromDescriptor = htableDescriptor.getRowCacheEnabled();
// The setting from TableDescriptor has higher priority than the global configuration
return fromDescriptor != null
? fromDescriptor
: conf.getBoolean(HConstants.ROW_CACHE_ENABLED_KEY, HConstants.ROW_CACHE_ENABLED_DEFAULT);
}

// For testing only
void setRowCache(RowCache rowCache) {
this.rowCache = rowCache;
}
Comment on lines +960 to +963
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing only, we could use @RestrictedApi annotation?


private void setHTableSpecificConf() {
if (this.htableDescriptor == null) {
return;
Expand Down Expand Up @@ -1963,6 +1971,8 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {
}
}

evictRowCache();

status.setStatus("Writing region close event to WAL");
// Always write close marker to wal even for read only table. This is not a big problem as we
// do not write any data into the region; it is just a meta edit in the WAL file.
Expand Down Expand Up @@ -2003,6 +2013,22 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {
}
}

private void evictRowCache() {
boolean evictOnClose = getReadOnlyConfiguration().getBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY,
ROW_CACHE_EVICT_ON_CLOSE_DEFAULT);

if (!evictOnClose) {
return;
}

if (!(rsServices instanceof HRegionServer regionServer)) {
return;
}

RowCache rowCache = regionServer.getRSRpcServices().getServer().getRowCache();
rowCache.evictRowsByRegion(this);
}

/** Wait for all current flushes and compactions of the region to complete */
// TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for
// Phoenix needs.
Expand Down Expand Up @@ -3259,21 +3285,32 @@ public RegionScannerImpl getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}

RegionScannerImpl getScannerWithResults(Get get, Scan scan, List<Cell> results)
throws IOException {
RegionScannerImpl getScannerWithResults(Get get, Scan scan, List<Cell> results,
RpcCallContext context) throws IOException {
if (!rowCache.canCacheRow(get, this)) {
return getScannerWithResults(scan, results);
}

// Try get from row cache
RowCacheKey key = new RowCacheKey(this, get.getRow());
if (rowCache.tryGetFromCache(key, get, results)) {
addReadRequestsCount(1);
if (getMetrics() != null) {
getMetrics().updateReadRequestCount();
}

// Cache is hit, and then no scanner is created
return null;
}

RegionScannerImpl scanner = getScannerWithResults(scan, results);
rowCache.populateCache(results, key);

// When results came from memstore only, do not populate the row cache
boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1;
if (!readFromMemStoreOnly) {
rowCache.populateCache(this, results, key);
}

return scanner;
}

Expand Down Expand Up @@ -3435,6 +3472,15 @@ private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, b
@Override
public void put(Put put) throws IOException {
TraceUtil.trace(() -> {
// Put with TTL is not allowed on tables with row cache enabled, because cached rows cannot
// track TTL expiration
if (isRowCacheEnabled) {
if (put.getTTL() != Long.MAX_VALUE) {
throw new DoNotRetryIOException(
"Tables with row cache enabled do not allow setting TTL on Puts");
}
}

checkReadOnly();

// Do a rough check that we have resources to accept a write. The check is
Expand Down Expand Up @@ -4811,7 +4857,12 @@ public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long
// checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
if (rowCache == null) {
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
}

return rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations),
() -> batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)));
}

@Override
Expand All @@ -4823,10 +4874,9 @@ public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
}

OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException {
OperationStatus[] operationStatuses =
rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations),
() -> this.batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE));
return TraceUtil.trace(() -> operationStatuses, () -> createRegionSpan("Region.batchMutate"));
return TraceUtil.trace(
() -> batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE),
() -> createRegionSpan("Region.batchMutate"));
}

/**
Expand Down Expand Up @@ -5111,8 +5161,17 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws

public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
long nonce) throws IOException {
CheckAndMutateResult checkAndMutateResult = rowCache.mutateWithRowCacheBarrier(this,
checkAndMutate.getRow(), () -> this.checkAndMutate(checkAndMutate, nonceGroup, nonce));
CheckAndMutateResult checkAndMutateResult =
rowCache.mutateWithRowCacheBarrier(this, checkAndMutate.getRow(),
() -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce));
return TraceUtil.trace(() -> checkAndMutateResult,
() -> createRegionSpan("Region.checkAndMutate"));
}

public CheckAndMutateResult checkAndMutate(List<Mutation> mutations,
CheckAndMutate checkAndMutate, long nonceGroup, long nonce) throws IOException {
CheckAndMutateResult checkAndMutateResult = rowCache.mutateWithRowCacheBarrier(this, mutations,
() -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce));
return TraceUtil.trace(() -> checkAndMutateResult,
() -> createRegionSpan("Region.checkAndMutate"));
}
Expand Down Expand Up @@ -5312,6 +5371,10 @@ private OperationStatus mutate(Mutation mutation, boolean atomic) throws IOExcep

private OperationStatus mutate(Mutation mutation, boolean atomic, long nonceGroup, long nonce)
throws IOException {
if (rowCache == null) {
return this.mutateInternal(mutation, atomic, nonceGroup, nonce);
}

return rowCache.mutateWithRowCacheBarrier(this, mutation.getRow(),
() -> this.mutateInternal(mutation, atomic, nonceGroup, nonce));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper {
private BlockCache l2Cache = null;
private MobFileCache mobFileCache;
private CacheStats cacheStats;
private final RowCache rowCache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better move to L71 under MobFileCache?

private CacheStats l1Stats = null;
private CacheStats l2Stats = null;
private volatile long numWALFiles = 0;
Expand Down Expand Up @@ -99,6 +100,8 @@ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
initBlockCache();
initMobFileCache();
RSRpcServices rsRpcServices = this.regionServer.getRSRpcServices();
this.rowCache = rsRpcServices == null ? null : rsRpcServices.getServer().getRowCache();
Comment on lines +103 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use a method initRowCache() here, just like blockcache and mobfilecache does?

this.excludeDatanodeManager = this.regionServer.getWalFactory().getExcludeDatanodeManager();

this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
Expand Down Expand Up @@ -1194,6 +1197,31 @@ public long getTrailerHitCount() {
return this.cacheStats != null ? this.cacheStats.getTrailerHitCount() : 0L;
}

@Override
public long getRowCacheHitCount() {
return this.rowCache != null ? this.rowCache.getHitCount() : 0L;
}

@Override
public long getRowCacheMissCount() {
return this.rowCache != null ? this.rowCache.getMissCount() : 0L;
}

@Override
public long getRowCacheSize() {
return this.rowCache != null ? this.rowCache.getSize() : 0L;
}

@Override
public long getRowCacheCount() {
return this.rowCache != null ? this.rowCache.getCount() : 0L;
}

@Override
public long getRowCacheEvictedRowCount() {
return this.rowCache != null ? this.rowCache.getEvictedRowCount() : 0L;
}

@Override
public long getByteBuffAllocatorHeapAllocationBytes() {
return ByteBuffAllocator.getHeapAllocationBytes(allocator, ByteBuffAllocator.HEAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Ac
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
result = region.checkAndMutate(mutations, checkAndMutate, nonceGroup, nonce);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
Expand Down Expand Up @@ -2347,8 +2347,21 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
return bulkLoadHFileInternal(request);
}

// TODO: implement row cache logic for bulk load
return bulkLoadHFileInternal(request);
RowCache rowCache = region.getRegionServerServices().getRowCache();

// Since bulkload modifies the store files, the row cache should be disabled until the bulkload
// is finished.
rowCache.createRegionLevelBarrier(region);
try {
// We do not invalidate the entire row cache directly, as it contains a large number of
// entries and takes a long time. Instead, we increment rowCacheSeqNum, which is used when
// constructing a RowCacheKey, thereby making the existing row cache entries stale.
rowCache.increaseRowCacheSeqNum(region);
return bulkLoadHFileInternal(request);
} finally {
// The row cache for the region has been enabled again
rowCache.removeTableLevelBarrier(region);
Comment on lines +2362 to +2363
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto region level

}
}

BulkLoadHFileResponse bulkLoadHFileInternal(final BulkLoadHFileRequest request)
Expand Down Expand Up @@ -2609,7 +2622,7 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
RegionScannerImpl scanner = null;
long blockBytesScannedBefore = context.getBlockBytesScanned();
try {
scanner = region.getScannerWithResults(get, scan, results);
scanner = region.getScannerWithResults(get, scan, results, context);
} finally {
if (scanner != null) {
if (closeCallBack == null) {
Expand Down
Loading