Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.setBoolean(HFileOutputFormat2.SET_MAX_SEQ_ID_KEY, true);
conf.set(JOB_NAME_CONF_KEY, jobname);

boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -186,6 +187,13 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;

/**
* Set the MAX_SEQ_ID metadata on the resulting HFile. This will ensure the HFiles will be sorted
* properly when read by tools such as the ClientSideRegionScanner. Will have no effect if the
* HFile is bulkloaded, as the sequence ID generated when bulkloading will override this metadata.
*/
public static final String SET_MAX_SEQ_ID_KEY = "hbase.hfileoutputformat.set.max.seq.id";

public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";

Expand Down Expand Up @@ -241,7 +249,7 @@ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWrit

return new RecordWriter<ImmutableBytesWritable, V>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], WriterInfo> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final long now = EnvironmentEdgeManager.currentTime();
private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
Expand Down Expand Up @@ -269,10 +277,10 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
}
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);

WriterLength wl = this.writers.get(tableAndFamily);
WriterInfo wi = this.writers.get(tableAndFamily);

// If this is a new column family, verify that the directory exists
if (wl == null) {
if (wi == null) {
Path writerPath = null;
if (writeMultipleTables) {
Path tableRelPath = getTableRelativePath(tableNameBytes);
Expand All @@ -286,14 +294,14 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {

// This can only happen once a row is finished though
if (
wl != null && wl.written + length >= maxsize
wi != null && wi.written + length >= maxsize
&& Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
) {
rollWriters(wl);
rollWriters(wi);
}

// create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
if (wi == null || wi.writer == null) {
InetSocketAddress[] favoredNodes = null;
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;
Expand Down Expand Up @@ -324,14 +332,15 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
}
}
}
wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
wi = getNewWriter(tableNameBytes, family, conf, favoredNodes);

}

// we now have the proper WAL writer. full steam ahead
PrivateCellUtil.updateLatestStamp(kv, this.now);
wl.writer.append((ExtendedCell) kv);
wl.written += length;
wi.writer.append((ExtendedCell) kv);
wi.written += length;
wi.maxSequenceId = Math.max(kv.getSequenceId(), wi.maxSequenceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns that Cell#getSequenceId is removed in HBase 3? Any plans for how we should handle that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, as long as this is an ExtendedCell looks like this should be possible in branch-3


// Copy the row so we know when a row transition.
this.previousRows.put(family, rowKey);
Expand All @@ -347,24 +356,25 @@ private Path getTableRelativePath(byte[] tableNameBytes) {
return tableRelPath;
}

private void rollWriters(WriterLength writerLength) throws IOException {
if (writerLength != null) {
closeWriter(writerLength);
private void rollWriters(WriterInfo writerInfo) throws IOException {
if (writerInfo != null) {
closeWriter(writerInfo);
} else {
for (WriterLength wl : this.writers.values()) {
closeWriter(wl);
for (WriterInfo wi : this.writers.values()) {
closeWriter(wi);
}
}
}

private void closeWriter(WriterLength wl) throws IOException {
if (wl.writer != null) {
private void closeWriter(WriterInfo wi) throws IOException {
if (wi.writer != null) {
LOG.info(
"Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
close(wl.writer);
wl.writer = null;
"Writer=" + wi.writer.getPath() + ((wi.written == 0) ? "" : ", wrote=" + wi.written));
close(wi.writer, wi);
wi.writer = null;
}
wl.written = 0;
wi.written = 0;
wi.maxSequenceId = -1;
}

private Configuration createRemoteClusterConf(Configuration conf) {
Expand Down Expand Up @@ -404,19 +414,19 @@ private Configuration createRemoteClusterConf(Configuration conf) {

/*
* Create a new StoreFile.Writer.
* @return A WriterLength, containing a new StoreFile.Writer.
* @return A WriterInfo, containing a new StoreFile.Writer.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
justification = "Not important")
private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
private WriterInfo getNewWriter(byte[] tableName, byte[] family, Configuration conf,
InetSocketAddress[] favoredNodes) throws IOException {
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
Path familydir = new Path(outputDir, Bytes.toString(family));
if (writeMultipleTables) {
familydir =
new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
}
WriterLength wl = new WriterLength();
WriterInfo wi = new WriterInfo();
Algorithm compression = overriddenCompression;
compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
compression = compression == null ? defaultCompression : compression;
Expand All @@ -443,23 +453,26 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration

HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) {
wl.writer =
wi.writer =
new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
.withBloomType(bloomType).withFileContext(hFileContext).build();
} else {
wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
wi.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}

this.writers.put(tableAndFamily, wl);
return wl;
this.writers.put(tableAndFamily, wi);
return wi;
}

private void close(final StoreFileWriter w) throws IOException {
private void close(final StoreFileWriter w, final WriterInfo wi) throws IOException {
if (w != null) {
w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
if (conf.getBoolean(SET_MAX_SEQ_ID_KEY, false) && wi.maxSequenceId >= 0) {
w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(wi.maxSequenceId));
}
w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
Expand All @@ -469,8 +482,8 @@ private void close(final StoreFileWriter w) throws IOException {

@Override
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
for (WriterLength wl : this.writers.values()) {
close(wl.writer);
for (WriterInfo wi : this.writers.values()) {
close(wi.writer, wi);
}
}
};
Expand All @@ -493,8 +506,9 @@ static void configureStoragePolicy(final Configuration conf, final FileSystem fs
/*
* Data structure to hold a Writer and amount of data written on it.
*/
static class WriterLength {
static class WriterInfo {
long written = 0;
long maxSequenceId = -1;
StoreFileWriter writer = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -45,11 +48,15 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
Expand Down Expand Up @@ -225,6 +232,113 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
});
}

/**
* Tests that the sequence IDs of cells are retained in the resulting HFile and usable by a
* RegionScanner. It does this by running the WALPlayer multiple times and using a RegionScanner
* to read the files; without sequence IDs, the files will be sorted by size or name and will not
* always return the correct result.
*/
@Test
public void testMaxSeqIdHFileMetadata() throws Exception {
final int numEdits = 20;
final int flushInterval = 10;

// Phase 1: Setup test data and configuration
final TableName tableName = TableName.valueOf(name.getMethodName());
final byte[] family = Bytes.toBytes("family");
final byte[] column = Bytes.toBytes("c1");
final byte[] row = Bytes.toBytes("row");
final Table table = TEST_UTIL.createTable(tableName, family);

long now = EnvironmentEdgeManager.currentTime();
{
Put put = new Put(row);
put.addColumn(family, column, now, column);
table.put(put);
}

String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
HConstants.HREGION_LOGDIR_NAME).toString();
String walPlayerOutputRoot = "/tmp/" + name.getMethodName();

Configuration walPlayerConfig = new Configuration(TEST_UTIL.getConfiguration());
walPlayerConfig.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
walPlayerConfig.setBoolean(HFileOutputFormat2.SET_MAX_SEQ_ID_KEY, true);

// Phase 2: Write edits with periodic WAL rolling and WALPlayer execution
int walPlayerRunCount = 0;
byte[] lastVal = null;

for (int i = 0; i < numEdits; i++) {
lastVal = new byte[12];
ThreadLocalRandom.current().nextBytes(lastVal);

Put put = new Put(row);
put.addColumn(family, column, now, lastVal);
table.put(put);

// Roll WALs and run WALPlayer every flushInterval iterations
if (i > 0 && (i % flushInterval == 0) || i + 1 == numEdits) {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();

walPlayerRunCount++;
String walPlayerRunDir = walPlayerOutputRoot + "/run_" + walPlayerRunCount;
Configuration runConfig = new Configuration(walPlayerConfig);
runConfig.set(WALPlayer.BULK_OUTPUT_CONF_KEY, walPlayerRunDir);

WALPlayer player = new WALPlayer(runConfig);
assertEquals(0, ToolRunner.run(runConfig, player,
new String[] { walInputDir, tableName.getNameAsString() }));
}
}

table.close();

final byte[] finalLastVal = lastVal;

// Phase 3: Collect all generated HFiles into proper structure for region scanner
TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(tableName);
RegionInfo regionInfo = cluster.getRegions(tableName).get(0).getRegionInfo();
FileSystem fs = cluster.getRegionServer(0).getFileSystem();

Path regionOutPath = CommonFSUtils.getRegionDir(new Path(walPlayerOutputRoot),
htd.getTableName(), regionInfo.getEncodedName());
Path familyOutPath = new Path(regionOutPath, new String(family));
fs.mkdirs(familyOutPath);

// Copy all HFiles from each WALPlayer run
for (int i = 1; i <= walPlayerRunCount; i++) {
Path walPlayerRunPath = new Path(walPlayerOutputRoot, "run_" + i);
RemoteIterator<LocatedFileStatus> files =
fs.listFiles(new Path(walPlayerRunPath, tableName.getNamespaceAsString()), true);

while (files.hasNext()) {
LocatedFileStatus fileStatus = files.next();
// Skip hidden/metadata files (starting with '.')
if (fileStatus.isFile() && !fileStatus.getPath().getName().startsWith(".")) {
FileUtil.copy(fs, fileStatus.getPath(), fs,
new Path(familyOutPath, fileStatus.getPath().getName()), false, walPlayerConfig);
}
}
}

// Phase 4: Verify sequence IDs are preserved correctly
Scan scan = new Scan();
try (ClientSideRegionScanner scanner = new ClientSideRegionScanner(walPlayerConfig, fs,
new Path(walPlayerOutputRoot), htd, regionInfo, scan, null)) {

// Verify exactly one row returned
Result result = scanner.next();
assertThat(result, notNullValue());
assertThat(result.listCells(), notNullValue());

// Verify the value with highest sequence ID (from last iteration) wins
byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column));
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal)));
}
}

/**
* Simple end-to-end test
*/
Expand Down