diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index 3f31255d60f6..34accf14f361 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; @@ -411,6 +412,9 @@ private Tool initializeWalPlayer(long startTime, long endTime) { conf.setLong(WALInputFormat.START_TIME_KEY, startTime); conf.setLong(WALInputFormat.END_TIME_KEY, endTime); conf.setBoolean(IGNORE_EMPTY_FILES, true); + // HFile output format defaults to false in HFileOutputFormat2, but we are explicitly setting + // it here just in case + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); Tool walPlayer = new WALPlayer(); walPlayer.setConf(conf); return walPlayer; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 28184656a890..db71ac04c6a4 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import java.io.IOException; import java.net.URI; @@ -32,6 +33,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -250,8 +252,12 @@ private void mergeSplitAndCopyBulkloadedHFiles(List activeFiles, private void mergeSplitAndCopyBulkloadedHFiles(List files, TableName tn, FileSystem tgtFs) throws IOException { MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob(); + Configuration conf = new Configuration(this.conf); conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY, getBulkOutputDirForTable(tn).toString()); + if (backupInfo.isContinuousBackupEnabled()) { + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + } player.setConf(conf); String inputDirs = StringUtils.join(files, ","); @@ -360,10 +366,26 @@ public void execute() throws IOException, ColumnFamilyMismatchException { setupRegionLocator(); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs); - incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, - backupInfo.getBackupRootDir()); + + String[] bulkOutputFiles; + String backupDest = backupInfo.getBackupRootDir(); + if (backupInfo.isContinuousBackupEnabled()) { + // For the continuous backup case, the WALs have been converted to HFiles in a separate + // map-reduce job for each table. In order to prevent MR job failures due to HBASE-29891, + // these HFiles were sent to a different output directory for each table. This means + // continuous backups require a list of source directories and a different destination + // directory when copying HFiles to the incremental backup directory. + List uniqueNamespaces = tablesToWALFileList.keySet().stream() + .map(TableName::getNamespaceAsString).distinct().toList(); + bulkOutputFiles = uniqueNamespaces.stream() + .map(ns -> new Path(getBulkOutputDir(), ns).toString()).toArray(String[]::new); + backupDest = backupDest + Path.SEPARATOR + backupId; + } else { + bulkOutputFiles = new String[] { getBulkOutputDir().toString() }; + } + incrementalCopyHFiles(bulkOutputFiles, backupDest); } catch (Exception e) { - String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; + String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId + " "; // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); throw new IOException(e); @@ -421,7 +443,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I System.arraycopy(files, 0, strArr, 0, files.length); strArr[strArr.length - 1] = backupDest; - String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); + String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId() + "-" + + System.currentTimeMillis(); if (LOG.isDebugEnabled()) { LOG.debug("Setting incremental copy HFiles job name to : " + jobname); } @@ -519,23 +542,25 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti protected void walToHFiles(List dirPaths, List tableList, long previousBackupTs) throws IOException { Tool player = new WALPlayer(); + Configuration conf = new Configuration(this.conf); // Player reads all files in arbitrary directory structure and creates // a Map task for each file. We use ';' as separator // because WAL file names contains ',' String dirs = StringUtils.join(dirPaths, ';'); - String jobname = "Incremental_Backup-" + backupId; + String jobname = "Incremental_Backup-" + backupId + "-" + System.currentTimeMillis(); - Path bulkOutputPath = getBulkOutputDir(); - conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + setBulkOutputPath(conf, tableList); conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); conf.set(JOB_NAME_CONF_KEY, jobname); - boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); if (backupInfo.isContinuousBackupEnabled()) { conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs)); conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs())); + // We do not want a multi-table HFile format here because continuous backups run the WALPlayer + // individually on each table in the backup set. + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); } String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; @@ -550,43 +575,70 @@ protected void walToHFiles(List dirPaths, List tableList, long p } catch (Exception ee) { throw new IOException("Can not convert from directory " + dirs + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); - } finally { - conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, - diskBasedSortingEnabledOriginalValue); - conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); - conf.unset(JOB_NAME_CONF_KEY); } } + private void setBulkOutputPath(Configuration conf, List tableList) { + Path bulkOutputPath = getBulkOutputDir(); + if (backupInfo.isContinuousBackupEnabled()) { + if (tableList.size() != 1) { + // Continuous backups run the WALPlayer job on one table at a time, so the list of tables + // should have only one element. + throw new RuntimeException( + "Expected table list to have only one element, but got: " + tableList); + } + bulkOutputPath = getTmpBackupDirForTable(TableName.valueOf(tableList.get(0))); + } + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + } + private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException { Path bulkOutDir = getBulkOutputDirForTable(tn); + Configuration conf = new Configuration(this.conf); if (tgtFs.exists(bulkOutDir)) { conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2); Path tgtPath = getTargetDirForTable(tn); - try { - RemoteIterator locatedFiles = tgtFs.listFiles(bulkOutDir, true); - List files = new ArrayList<>(); - while (locatedFiles.hasNext()) { - LocatedFileStatus file = locatedFiles.next(); - if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { - files.add(file.getPath().toString()); - } + RemoteIterator locatedFiles = tgtFs.listFiles(bulkOutDir, true); + List files = new ArrayList<>(); + while (locatedFiles.hasNext()) { + LocatedFileStatus file = locatedFiles.next(); + if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { + files.add(file.getPath().toString()); } - incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); - } finally { - conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); } + incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); } } + /** + * Creates a path to the bulk load output directory for a table. This directory will look like: + * .../backupRoot/.tmp/backupId/namespace/table/data + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ protected Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getTmpBackupDirForTable(table); + return new Path(tablePath, "data"); + } + + /** + * Creates a path to a table's directory within the temporary directory. This directory will look + * like: .../backupRoot/.tmp/backupId/namespace/table + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ + protected Path getTmpBackupDirForTable(TableName table) { Path tablePath = getBulkOutputDir(); tablePath = new Path(tablePath, table.getNamespaceAsString()); - tablePath = new Path(tablePath, table.getQualifierAsString()); - return new Path(tablePath, "data"); + return new Path(tablePath, table.getQualifierAsString()); } + /** + * Creates a path to a temporary backup directory. This directory will look like: + * .../backupRoot/.tmp/backupId + * @return A Path object representing the directory + */ protected Path getBulkOutputDir() { String backupId = backupInfo.getBackupId(); Path path = new Path(backupInfo.getBackupRootDir()); @@ -595,6 +647,12 @@ protected Path getBulkOutputDir() { return path; } + /** + * Creates a path to a destination directory for bulk loaded HFiles. This directory will look + * like: .../backupRoot/backupID/namespace/table + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ private Path getTargetDirForTable(TableName table) { Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId()); path = new Path(path, table.getNamespaceAsString()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index 50b47565d743..93cd4b6b7321 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.backup.util; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -33,9 +35,11 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; @@ -158,11 +162,17 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException { public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, TableName[] tableNames, TableName[] newTableNames, String incrBackupId, boolean keepOriginalSplits) throws IOException { - try (Admin admin = conn.getAdmin()) { + try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { if (tableNames.length != newTableNames.length) { throw new IOException("Number of source tables and target tables does not match!"); } - FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + Configuration conf = new Configuration(this.conf); + FileSystem fileSys = tableBackupPath.getFileSystem(conf); + + BackupInfo backupInfo = backupAdmin.getBackupInfo(incrBackupId); + if (backupInfo.isContinuousBackupEnabled()) { + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + } // for incremental backup image, expect the table already created either by user or previous // full backup. Here, check that all new tables exists diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index c911f5dbce07..d5bb047c7990 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -22,11 +22,14 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +44,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; @@ -99,16 +105,18 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception TableName tableName = TableName.valueOf("table_" + methodName); Table t1 = TEST_UTIL.createTable(tableName, famName); - try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { - int before = table.getBackupHistory().size(); + try (BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = backupSystemTable.getBackupHistory().size(); // Run continuous backup + LOG.info("Running full backup with continuous backup enabled on table: {}", tableName); String backup1 = backupTables(BackupType.FULL, List.of(tableName), BACKUP_ROOT_DIR, true); + LOG.info("Full backup complete with ID {} for table: {}", backup1, tableName); assertTrue(checkSucceeded(backup1)); // Verify backup history increased and all the backups are succeeded LOG.info("Verify backup history increased and all the backups are succeeded"); - List backups = table.getBackupHistory(); + List backups = backupSystemTable.getBackupHistory(); assertEquals("Backup history should increase", before + 1, backups.size()); // Verify backup manifest contains the correct tables @@ -121,12 +129,12 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception Thread.sleep(10000); // Run incremental backup - LOG.info("Run incremental backup now"); - before = table.getBackupHistory().size(); + LOG.info("Run incremental backup now on table: {}", tableName); + before = backupSystemTable.getBackupHistory().size(); String backup2 = backupTables(BackupType.INCREMENTAL, List.of(tableName), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup2)); - LOG.info("Incremental backup completed"); + LOG.info("Incremental backup completed for table: {}", tableName); // Verify the temporary backup directory was deleted Path backupTmpDir = new Path(BACKUP_ROOT_DIR, ".tmp"); @@ -135,18 +143,107 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception fs.exists(bulkLoadOutputDir)); // Verify backup history increased and all the backups are succeeded - backups = table.getBackupHistory(); + backups = backupSystemTable.getBackupHistory(); assertEquals("Backup history should increase", before + 1, backups.size()); + String originalTableChecksum = TEST_UTIL.checksumRows(t1); + + LOG.info("Truncating table: {}", tableName); TEST_UTIL.truncateTable(tableName); // Restore incremental backup TableName[] tables = new TableName[] { tableName }; BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + LOG.info("Restoring table: {}", tableName); + // In the restore request, the original table is both the "from table" and the "to table". + // This means the table is being restored "into itself". It is not being restored into + // separate table. client.restore( BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); - assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName)); + LOG.info("Verifying data integrity for restored table: {}", tableName); + verifyRestoredTableDataIntegrity(tables[0], originalTableChecksum, NB_ROWS_IN_BATCH); + } + } + + @Test + public void testMultiTableContinuousBackupWithIncrementalBackupSuccess() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + List tables = new ArrayList<>(); + List tableNames = new ArrayList<>(); + tableNames.add(TableName.valueOf("table_" + methodName + "_0")); + tableNames.add(TableName.valueOf("table_" + methodName + "_1")); + tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_0")); + tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_1")); + tableNames.add(TableName.valueOf("sameTableNameDifferentNamespace")); + tableNames.add(TableName.valueOf("ns3", "sameTableNameDifferentNamespace")); + + for (TableName table : tableNames) { + LOG.info("Creating table: {}", table); + tables.add(TEST_UTIL.createTable(table, famName)); + } + + try (BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = backupSystemTable.getBackupHistory().size(); + + // Run continuous backup on multiple tables + LOG.info("Running full backup with continuous backup enabled on tables: {}", tableNames); + String backup1 = backupTables(BackupType.FULL, tableNames, BACKUP_ROOT_DIR, true); + LOG.info("Full backup complete with ID {} for tables: {}", backup1, tableNames); + assertTrue(checkSucceeded(backup1)); + + // Verify backup history increased and all backups have succeeded + LOG.info("Verify backup history increased and all backups have succeeded"); + List backups = backupSystemTable.getBackupHistory(); + assertEquals("Backup history should increase", before + 1, backups.size()); + + // Verify backup manifest contains the correct tables + LOG.info("Verify backup manifest contains the correct tables"); + BackupManifest manifest = getLatestBackupManifest(backups); + assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableNames), + new HashSet<>(manifest.getTableList())); + + loadTables(tables); + Thread.sleep(10000); + + // Run incremental backup + LOG.info("Running incremental backup on tables: {}", tableNames); + before = backupSystemTable.getBackupHistory().size(); + String backup2 = backupTables(BackupType.INCREMENTAL, tableNames, BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); + LOG.info("Incremental backup completed with ID {} for tables: {}", backup2, tableNames); + + // Verify backup history increased and all the backups are succeeded + backups = backupSystemTable.getBackupHistory(); + assertEquals("Backup history should increase", before + 1, backups.size()); + + // We need to get each table's original row checksum before truncating each table + LinkedHashMap originalTableChecksums = new LinkedHashMap<>(); + for (Table table : tables) { + LOG.info("Getting row checksum for table: {}", table); + originalTableChecksums.put(table.getName(), TEST_UTIL.checksumRows(table)); + } + + for (TableName tableName : tableNames) { + LOG.info("Truncating table: {}", tableName); + TEST_UTIL.truncateTable(tableName); + } + + // Restore incremental backup + TableName[] tableNamesArray = tableNames.toArray(new TableName[0]); + BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + LOG.info("Restoring tables: {}", tableNames); + // In the restore request, the original tables are both the list of "from tables" and the + // list of "to tables". This means the tables are being restored "into themselves". They are + // not being restored into separate tables. + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, + tableNamesArray, tableNamesArray, true)); + + for (TableName tableName : originalTableChecksums.keySet()) { + LOG.info("Verifying data integrity for restored table: {}", tableName); + verifyRestoredTableDataIntegrity(tableName, originalTableChecksums.get(tableName), + NB_ROWS_IN_BATCH); + } } } @@ -238,4 +335,45 @@ protected static void loadTable(Table table) throws Exception { table.put(p); } } + + protected static void loadTables(List
tables) throws Exception { + for (Table table : tables) { + LOG.info("Loading data into table: {}", table); + loadTable(table); + } + } + + private void verifyRestoredTableDataIntegrity(TableName restoredTableName, + String originalTableChecksum, int expectedRowCount) throws Exception { + try (Table restoredTable = TEST_UTIL.getConnection().getTable(restoredTableName); + ResultScanner scanner = restoredTable.getScanner(new Scan())) { + + // Verify the checksum for the original table (before it was truncated) matches the checksum + // of the restored table. + String restoredTableChecksum = TEST_UTIL.checksumRows(restoredTable); + assertEquals("The restored table's row checksum did not match the original table's checksum", + originalTableChecksum, restoredTableChecksum); + + // Verify the data in the restored table is the same as when it was originally loaded + // into the table. + int count = 0; + for (Result result : scanner) { + // The data has a numerical match between its row key and value (such as rowLoad1 and + // value1) + // We can use this to ensure a row key has the expected value. + String rowKey = Bytes.toString(result.getRow()); + int index = Integer.parseInt(rowKey.replace("rowLoad", "")); + + // Verify the Value + byte[] actualValue = result.getValue(famName, qualName); + assertNotNull("Value missing for row key: " + rowKey, actualValue); + String expectedValue = "val" + index; + assertEquals("Value mismatch for row key: " + rowKey, expectedValue, + Bytes.toString(actualValue)); + + count++; + } + assertEquals(expectedRowCount, count); + } + } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 0e81c95677c3..87241d437cd8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -161,8 +161,9 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; - static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = + public static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + public static final boolean MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT = true; /** * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 4c0b12ef7333..cf4397d1052f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -378,14 +381,35 @@ public Job createSubmittableJob(String[] args) throws IOException { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); - try (Connection conn = ConnectionFactory.createConnection(conf);) { - List tableInfoList = new ArrayList(); - for (TableName tableName : tableNames) { + try (Connection conn = ConnectionFactory.createConnection(conf)) { + if ( + conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT) + ) { + // The HFiles will be output to something like this for each table: + // .../BULK_OUTPUT_CONF_KEY/namespace/table/columnFamily + List tableInfoList = new ArrayList(); + for (TableName tableName : tableNames) { + Table table = conn.getTable(tableName); + RegionLocator regionLocator = getRegionLocator(tableName, conf, conn); + tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); + } + MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); + } else { + // The HFiles will be output to something like: .../BULK_OUTPUT_CONF_KEY/columnFamily + // This is useful for scenarios where we are running the WALPlayer consecutively on just + // one table at a time, and BULK_OUTPUT_CONF_KEY is already set to a "namespace/table" + // directory path for each table. + if (tableNames.size() != 1) { + throw new IOException("Expected table names list to have only one table since " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following " + + "list of tables instead: " + tableNames); + } + TableName tableName = tableNames.get(0); Table table = conn.getTable(tableName); RegionLocator regionLocator = getRegionLocator(tableName, conf, conn); - tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } - MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); } TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index bbadabab69bf..26a38f5b5367 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -34,6 +35,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; @@ -90,13 +93,20 @@ public class TestWALPlayer { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALPlayer.class); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] COLUMN1 = Bytes.toBytes("c1"); + private static final byte[] COLUMN2 = Bytes.toBytes("c2"); + private static final byte[] ROW = Bytes.toBytes("row"); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static SingleProcessHBaseCluster cluster; private static Path rootDir; private static Path walRootDir; - private static FileSystem fs; + private static FileSystem localFs; private static FileSystem logFs; private static Configuration conf; + private static FileSystem hdfs; + private static String bulkLoadOutputDir; @Rule public TestName name = new TestName(); @@ -106,15 +116,18 @@ public static void beforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); rootDir = TEST_UTIL.createRootDir(); walRootDir = TEST_UTIL.createWALRootDir(); - fs = CommonFSUtils.getRootDirFileSystem(conf); + localFs = CommonFSUtils.getRootDirFileSystem(conf); logFs = CommonFSUtils.getWALFileSystem(conf); cluster = TEST_UTIL.startMiniCluster(); + hdfs = TEST_UTIL.getTestFileSystem(); + bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), + Path.SEPARATOR + "bulkLoadOutput").toString(); } @AfterClass public static void afterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); - fs.delete(rootDir, true); + localFs.delete(rootDir, true); logFs.delete(walRootDir, true); } @@ -235,18 +248,11 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { public void testWALPlayer() throws Exception { final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] COLUMN2 = Bytes.toBytes("c2"); - final byte[] ROW = Bytes.toBytes("row"); Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); - // put a row into the first table - Put p = new Put(ROW); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - p.addColumn(FAMILY, COLUMN2, COLUMN2); - t1.put(p); + putRowIntoTable(t1); + // delete one column Delete d = new Delete(ROW); d.addColumns(FAMILY, COLUMN1); @@ -411,6 +417,109 @@ public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); } + /** + * Verifies the HFile output format for WALPlayer has the following directory structure when + * hbase.mapreduce.use.multi.table.hfileoutputformat is set to true: + * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily + */ + @Test + public void testWALPlayerMultiTableHFileOutputFormat() throws Exception { + String namespace = "ns_" + name.getMethodName(); + TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + putRowIntoTable(t1); + putRowIntoTable(t2); + + Configuration multiTableOutputConf = new Configuration(conf); + setConfSimilarToIncrementalBackupWALToHFilesMethod(multiTableOutputConf); + + // We are testing this config variable's effect on HFile output for the WALPlayer + multiTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); + + WALPlayer player = new WALPlayer(multiTableOutputConf); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString(); + + ToolRunner.run(multiTableOutputConf, player, new String[] { walInputDir, tables }); + + assertMultiTableOutputFormatDirStructure(tableName1, "default"); + assertMultiTableOutputFormatDirStructure(tableName2, namespace); + + hdfs.delete(new Path(bulkLoadOutputDir), true); + } + + /** + * Verifies the HFile output format for WALPlayer has the following directory structure when + * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false: + * .../BULK_OUTPUT_CONF_KEY/columnFamily. Also verifies an exception occurs when the WALPlayer is + * run on multiple tables at once while hbase.mapreduce.use.multi.table.hfileoutputformat is set + * to false. + */ + @Test + public void testWALPlayerSingleTableHFileOutputFormat() throws Exception { + String namespace = "ns_" + name.getMethodName(); + TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + putRowIntoTable(t1); + putRowIntoTable(t2); + + String bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), + Path.SEPARATOR + "bulkLoadOutput").toString(); + + Configuration singleTableOutputConf = new Configuration(conf); + setConfSimilarToIncrementalBackupWALToHFilesMethod(singleTableOutputConf); + + // We are testing this config variable's effect on HFile output for the WALPlayer + singleTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + + WALPlayer player = new WALPlayer(singleTableOutputConf); + + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString(); + + // Expecting a failure here since we are running WALPlayer on multiple tables even though the + // multi-table HFile output format is disabled + try { + ToolRunner.run(singleTableOutputConf, player, new String[] { walInputDir, tables }); + fail("Expected a failure to occur due to using WALPlayer with multiple tables while having " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false"); + } catch (IOException e) { + String expectedMsg = "Expected table names list to have only one table since " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following " + + "list of tables instead: [testWALPlayerSingleTableHFileOutputFormat1, " + namespace + + ":testWALPlayerSingleTableHFileOutputFormat2]"; + assertTrue(e.getMessage().contains(expectedMsg)); + } + + // Successfully run WALPlayer on just one table while having multi-table HFile output format + // disabled + ToolRunner.run(singleTableOutputConf, player, + new String[] { walInputDir, tableName1.getNameAsString() }); + + Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, "family"); + assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable, + hdfs.exists(bulkLoadOutputDirForTable)); + + hdfs.delete(new Path(bulkLoadOutputDir), true); + } + + private void putRowIntoTable(Table table) throws IOException { + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + p.addColumn(FAMILY, COLUMN2, COLUMN2); + table.put(p); + } + private Path createEmptyWALFile(String walDir) throws IOException { FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); Path inputDir = new Path("/" + walDir); @@ -422,4 +531,22 @@ private Path createEmptyWALFile(String walDir) throws IOException { return inputDir; } + + private void setConfSimilarToIncrementalBackupWALToHFilesMethod(Configuration conf) { + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkLoadOutputDir); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + conf.set("mapreduce.job.name", name.getMethodName() + "-" + System.currentTimeMillis()); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); + } + + private void assertMultiTableOutputFormatDirStructure(TableName tableName, String namespace) + throws IOException { + Path qualifierAndFamilyDir = + new Path(tableName.getQualifierAsString(), new String(FAMILY, StandardCharsets.UTF_8)); + Path namespaceQualifierFamilyDir = new Path(namespace, qualifierAndFamilyDir); + Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, namespaceQualifierFamilyDir); + assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable, + hdfs.exists(bulkLoadOutputDirForTable)); + } }