Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;

import java.io.IOException;
Expand Down Expand Up @@ -411,6 +412,9 @@ private Tool initializeWalPlayer(long startTime, long endTime) {
conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
conf.setBoolean(IGNORE_EMPTY_FILES, true);
// HFile output format defaults to false in HFileOutputFormat2, but we are explicitly setting
// it here just in case
conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
Tool walPlayer = new WALPlayer();
walPlayer.setConf(conf);
return walPlayer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;

import java.io.IOException;
import java.net.URI;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -250,8 +252,12 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs)
throws IOException {
MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
Configuration conf = new Configuration(this.conf);
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
getBulkOutputDirForTable(tn).toString());
if (backupInfo.isContinuousBackupEnabled()) {
conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
}
player.setConf(conf);

String inputDirs = StringUtils.join(files, ",");
Expand Down Expand Up @@ -360,10 +366,26 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
setupRegionLocator();
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs);
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());

String[] bulkOutputFiles;
String backupDest = backupInfo.getBackupRootDir();
if (backupInfo.isContinuousBackupEnabled()) {
// For the continuous backup case, the WALs have been converted to HFiles in a separate
// map-reduce job for each table. In order to prevent MR job failures due to HBASE-29891,
// these HFiles were sent to a different output directory for each table. This means
// continuous backups require a list of source directories and a different destination
// directory when copying HFiles to the incremental backup directory.
List<String> uniqueNamespaces = tablesToWALFileList.keySet().stream()
.map(TableName::getNamespaceAsString).distinct().toList();
bulkOutputFiles = uniqueNamespaces.stream()
.map(ns -> new Path(getBulkOutputDir(), ns).toString()).toArray(String[]::new);
backupDest = backupDest + Path.SEPARATOR + backupId;
} else {
bulkOutputFiles = new String[] { getBulkOutputDir().toString() };
}
incrementalCopyHFiles(bulkOutputFiles, backupDest);
} catch (Exception e) {
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId + " ";
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
throw new IOException(e);
Expand Down Expand Up @@ -421,7 +443,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
System.arraycopy(files, 0, strArr, 0, files.length);
strArr[strArr.length - 1] = backupDest;

String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId() + "-"
+ System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
}
Expand Down Expand Up @@ -519,23 +542,25 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti
protected void walToHFiles(List<String> dirPaths, List<String> tableList, long previousBackupTs)
throws IOException {
Tool player = new WALPlayer();
Configuration conf = new Configuration(this.conf);

// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
// because WAL file names contains ','
String dirs = StringUtils.join(dirPaths, ';');
String jobname = "Incremental_Backup-" + backupId;
String jobname = "Incremental_Backup-" + backupId + "-" + System.currentTimeMillis();

Path bulkOutputPath = getBulkOutputDir();
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
setBulkOutputPath(conf, tableList);
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
if (backupInfo.isContinuousBackupEnabled()) {
conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs()));
// We do not want a multi-table HFile format here because continuous backups run the WALPlayer
// individually on each table in the backup set.
conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
}
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };

Expand All @@ -550,43 +575,70 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList, long p
} catch (Exception ee) {
throw new IOException("Can not convert from directory " + dirs
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
} finally {
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
diskBasedSortingEnabledOriginalValue);
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
conf.unset(JOB_NAME_CONF_KEY);
}
}

private void setBulkOutputPath(Configuration conf, List<String> tableList) {
Path bulkOutputPath = getBulkOutputDir();
if (backupInfo.isContinuousBackupEnabled()) {
if (tableList.size() != 1) {
// Continuous backups run the WALPlayer job on one table at a time, so the list of tables
// should have only one element.
throw new RuntimeException(
"Expected table list to have only one element, but got: " + tableList);
}
bulkOutputPath = getTmpBackupDirForTable(TableName.valueOf(tableList.get(0)));
}
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
}

private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException {
Path bulkOutDir = getBulkOutputDirForTable(tn);
Configuration conf = new Configuration(this.conf);

if (tgtFs.exists(bulkOutDir)) {
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
Path tgtPath = getTargetDirForTable(tn);
try {
RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true);
List<String> files = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus file = locatedFiles.next();
if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
files.add(file.getPath().toString());
}
RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true);
List<String> files = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus file = locatedFiles.next();
if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
files.add(file.getPath().toString());
}
incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString());
} finally {
conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
}
incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString());
}
}

/**
* Creates a path to the bulk load output directory for a table. This directory will look like:
* .../backupRoot/.tmp/backupId/namespace/table/data
* @param table The table whose HFiles are being bulk loaded
* @return A Path object representing the directory
*/
protected Path getBulkOutputDirForTable(TableName table) {
Path tablePath = getTmpBackupDirForTable(table);
return new Path(tablePath, "data");
}

/**
* Creates a path to a table's directory within the temporary directory. This directory will look
* like: .../backupRoot/.tmp/backupId/namespace/table
* @param table The table whose HFiles are being bulk loaded
* @return A Path object representing the directory
*/
protected Path getTmpBackupDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
tablePath = new Path(tablePath, table.getQualifierAsString());
return new Path(tablePath, "data");
return new Path(tablePath, table.getQualifierAsString());
}

/**
* Creates a path to a temporary backup directory. This directory will look like:
* .../backupRoot/.tmp/backupId
* @return A Path object representing the directory
*/
protected Path getBulkOutputDir() {
String backupId = backupInfo.getBackupId();
Path path = new Path(backupInfo.getBackupRootDir());
Expand All @@ -595,6 +647,12 @@ protected Path getBulkOutputDir() {
return path;
}

/**
* Creates a path to a destination directory for bulk loaded HFiles. This directory will look
* like: .../backupRoot/backupID/namespace/table
* @param table The table whose HFiles are being bulk loaded
* @return A Path object representing the directory
*/
private Path getTargetDirForTable(TableName table) {
Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId());
path = new Path(path, table.getNamespaceAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.backup.util;

import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -33,9 +35,11 @@
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -158,11 +162,17 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException {
public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs,
TableName[] tableNames, TableName[] newTableNames, String incrBackupId,
boolean keepOriginalSplits) throws IOException {
try (Admin admin = conn.getAdmin()) {
try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
if (tableNames.length != newTableNames.length) {
throw new IOException("Number of source tables and target tables does not match!");
}
FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
Configuration conf = new Configuration(this.conf);
FileSystem fileSys = tableBackupPath.getFileSystem(conf);

BackupInfo backupInfo = backupAdmin.getBackupInfo(incrBackupId);
if (backupInfo.isContinuousBackupEnabled()) {
conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
}

// for incremental backup image, expect the table already created either by user or previous
// full backup. Here, check that all new tables exists
Expand Down
Loading