diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 4fac0ca3c93c..f0ef36118e4b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -434,6 +434,7 @@ protected void walToHFiles(List dirPaths, List tableList) throws conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + conf.setBoolean(HFileOutputFormat2.SET_MAX_SEQ_ID_KEY, true); conf.set(JOB_NAME_CONF_KEY, jobname); boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 0e81c95677c3..1e1e12c35913 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; import java.io.IOException; import java.io.UncheckedIOException; @@ -186,6 +187,13 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; + /** + * Set the MAX_SEQ_ID metadata on the resulting HFile. This will ensure the HFiles will be sorted + * properly when read by tools such as the ClientSideRegionScanner. Will have no effect if the + * HFile is bulkloaded, as the sequence ID generated when bulkloading will override this metadata. + */ + public static final String SET_MAX_SEQ_ID_KEY = "hbase.hfileoutputformat.set.max.seq.id"; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -241,7 +249,7 @@ static RecordWriter createRecordWrit return new RecordWriter() { // Map of families to writers and how much has been output on the writer. - private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); + private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); @@ -269,10 +277,10 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { } byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); - WriterLength wl = this.writers.get(tableAndFamily); + WriterInfo wi = this.writers.get(tableAndFamily); // If this is a new column family, verify that the directory exists - if (wl == null) { + if (wi == null) { Path writerPath = null; if (writeMultipleTables) { Path tableRelPath = getTableRelativePath(tableNameBytes); @@ -286,14 +294,14 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { // This can only happen once a row is finished though if ( - wl != null && wl.written + length >= maxsize + wi != null && wi.written + length >= maxsize && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 ) { - rollWriters(wl); + rollWriters(wi); } // create a new WAL writer, if necessary - if (wl == null || wl.writer == null) { + if (wi == null || wi.writer == null) { InetSocketAddress[] favoredNodes = null; if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; @@ -324,14 +332,15 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { } } } - wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); + wi = getNewWriter(tableNameBytes, family, conf, favoredNodes); } // we now have the proper WAL writer. full steam ahead PrivateCellUtil.updateLatestStamp(kv, this.now); - wl.writer.append((ExtendedCell) kv); - wl.written += length; + wi.writer.append((ExtendedCell) kv); + wi.written += length; + wi.maxSequenceId = Math.max(kv.getSequenceId(), wi.maxSequenceId); // Copy the row so we know when a row transition. this.previousRows.put(family, rowKey); @@ -347,24 +356,25 @@ private Path getTableRelativePath(byte[] tableNameBytes) { return tableRelPath; } - private void rollWriters(WriterLength writerLength) throws IOException { - if (writerLength != null) { - closeWriter(writerLength); + private void rollWriters(WriterInfo writerInfo) throws IOException { + if (writerInfo != null) { + closeWriter(writerInfo); } else { - for (WriterLength wl : this.writers.values()) { - closeWriter(wl); + for (WriterInfo wi : this.writers.values()) { + closeWriter(wi); } } } - private void closeWriter(WriterLength wl) throws IOException { - if (wl.writer != null) { + private void closeWriter(WriterInfo wi) throws IOException { + if (wi.writer != null) { LOG.info( - "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); - close(wl.writer); - wl.writer = null; + "Writer=" + wi.writer.getPath() + ((wi.written == 0) ? "" : ", wrote=" + wi.written)); + close(wi.writer, wi); + wi.writer = null; } - wl.written = 0; + wi.written = 0; + wi.maxSequenceId = -1; } private Configuration createRemoteClusterConf(Configuration conf) { @@ -404,11 +414,11 @@ private Configuration createRemoteClusterConf(Configuration conf) { /* * Create a new StoreFile.Writer. - * @return A WriterLength, containing a new StoreFile.Writer. + * @return A WriterInfo, containing a new StoreFile.Writer. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important") - private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, + private WriterInfo getNewWriter(byte[] tableName, byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) throws IOException { byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); Path familydir = new Path(outputDir, Bytes.toString(family)); @@ -416,7 +426,7 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration familydir = new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); } - WriterLength wl = new WriterLength(); + WriterInfo wi = new WriterInfo(); Algorithm compression = overriddenCompression; compression = compression == null ? compressionMap.get(tableAndFamily) : compression; compression = compression == null ? defaultCompression : compression; @@ -443,23 +453,26 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration HFileContext hFileContext = contextBuilder.build(); if (null == favoredNodes) { - wl.writer = + wi.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) .withBloomType(bloomType).withFileContext(hFileContext).build(); } else { - wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) + wi.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) .withFavoredNodes(favoredNodes).build(); } - this.writers.put(tableAndFamily, wl); - return wl; + this.writers.put(tableAndFamily, wi); + return wi; } - private void close(final StoreFileWriter w) throws IOException { + private void close(final StoreFileWriter w, final WriterInfo wi) throws IOException { if (w != null) { w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); + if (conf.getBoolean(SET_MAX_SEQ_ID_KEY, false) && wi.maxSequenceId >= 0) { + w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(wi.maxSequenceId)); + } w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); w.appendTrackedTimestampsToMetadata(); @@ -469,8 +482,8 @@ private void close(final StoreFileWriter w) throws IOException { @Override public void close(TaskAttemptContext c) throws IOException, InterruptedException { - for (WriterLength wl : this.writers.values()) { - close(wl.writer); + for (WriterInfo wi : this.writers.values()) { + close(wi.writer, wi); } } }; @@ -493,8 +506,9 @@ static void configureStoragePolicy(final Configuration conf, final FileSystem fs /* * Data structure to hold a Writer and amount of data written on it. */ - static class WriterLength { + static class WriterInfo { long written = 0; + long maxSequenceId = -1; StoreFileWriter writer = null; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 220e9a3793cd..aaa59aa50fca 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -36,7 +36,10 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -45,11 +48,15 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; @@ -225,6 +232,113 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { }); } + /** + * Tests that the sequence IDs of cells are retained in the resulting HFile and usable by a + * RegionScanner. It does this by running the WALPlayer multiple times and using a RegionScanner + * to read the files; without sequence IDs, the files will be sorted by size or name and will not + * always return the correct result. + */ + @Test + public void testMaxSeqIdHFileMetadata() throws Exception { + final int numEdits = 20; + final int flushInterval = 10; + + // Phase 1: Setup test data and configuration + final TableName tableName = TableName.valueOf(name.getMethodName()); + final byte[] family = Bytes.toBytes("family"); + final byte[] column = Bytes.toBytes("c1"); + final byte[] row = Bytes.toBytes("row"); + final Table table = TEST_UTIL.createTable(tableName, family); + + long now = EnvironmentEdgeManager.currentTime(); + { + Put put = new Put(row); + put.addColumn(family, column, now, column); + table.put(put); + } + + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + String walPlayerOutputRoot = "/tmp/" + name.getMethodName(); + + Configuration walPlayerConfig = new Configuration(TEST_UTIL.getConfiguration()); + walPlayerConfig.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + walPlayerConfig.setBoolean(HFileOutputFormat2.SET_MAX_SEQ_ID_KEY, true); + + // Phase 2: Write edits with periodic WAL rolling and WALPlayer execution + int walPlayerRunCount = 0; + byte[] lastVal = null; + + for (int i = 0; i < numEdits; i++) { + lastVal = new byte[12]; + ThreadLocalRandom.current().nextBytes(lastVal); + + Put put = new Put(row); + put.addColumn(family, column, now, lastVal); + table.put(put); + + // Roll WALs and run WALPlayer every flushInterval iterations + if (i > 0 && (i % flushInterval == 0) || i + 1 == numEdits) { + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + + walPlayerRunCount++; + String walPlayerRunDir = walPlayerOutputRoot + "/run_" + walPlayerRunCount; + Configuration runConfig = new Configuration(walPlayerConfig); + runConfig.set(WALPlayer.BULK_OUTPUT_CONF_KEY, walPlayerRunDir); + + WALPlayer player = new WALPlayer(runConfig); + assertEquals(0, ToolRunner.run(runConfig, player, + new String[] { walInputDir, tableName.getNameAsString() })); + } + } + + table.close(); + + final byte[] finalLastVal = lastVal; + + // Phase 3: Collect all generated HFiles into proper structure for region scanner + TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(tableName); + RegionInfo regionInfo = cluster.getRegions(tableName).get(0).getRegionInfo(); + FileSystem fs = cluster.getRegionServer(0).getFileSystem(); + + Path regionOutPath = CommonFSUtils.getRegionDir(new Path(walPlayerOutputRoot), + htd.getTableName(), regionInfo.getEncodedName()); + Path familyOutPath = new Path(regionOutPath, new String(family)); + fs.mkdirs(familyOutPath); + + // Copy all HFiles from each WALPlayer run + for (int i = 1; i <= walPlayerRunCount; i++) { + Path walPlayerRunPath = new Path(walPlayerOutputRoot, "run_" + i); + RemoteIterator files = + fs.listFiles(new Path(walPlayerRunPath, tableName.getNamespaceAsString()), true); + + while (files.hasNext()) { + LocatedFileStatus fileStatus = files.next(); + // Skip hidden/metadata files (starting with '.') + if (fileStatus.isFile() && !fileStatus.getPath().getName().startsWith(".")) { + FileUtil.copy(fs, fileStatus.getPath(), fs, + new Path(familyOutPath, fileStatus.getPath().getName()), false, walPlayerConfig); + } + } + } + + // Phase 4: Verify sequence IDs are preserved correctly + Scan scan = new Scan(); + try (ClientSideRegionScanner scanner = new ClientSideRegionScanner(walPlayerConfig, fs, + new Path(walPlayerOutputRoot), htd, regionInfo, scan, null)) { + + // Verify exactly one row returned + Result result = scanner.next(); + assertThat(result, notNullValue()); + assertThat(result.listCells(), notNullValue()); + + // Verify the value with highest sequence ID (from last iteration) wins + byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column)); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); + } + } + /** * Simple end-to-end test */