diff --git a/src/main/java/com/treasure_data/td_import/Constants.java b/src/main/java/com/treasure_data/td_import/Constants.java index eb41555d..96bc7a0f 100644 --- a/src/main/java/com/treasure_data/td_import/Constants.java +++ b/src/main/java/com/treasure_data/td_import/Constants.java @@ -327,6 +327,11 @@ public interface Constants extends com.treasure_data.client.Constants { String BI_PREPARE_TIMEVALUE_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEVALUE; String BI_PREPARE_PARTS_TIMEVALUE_DESC = "long value of the time column"; + // time-offset-value + String BI_PREPARE_PARTS_TIMEOFFSET = "time-offset"; + String BI_PREPARE_TIMEOFFSETVALUE_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEOFFSET; + String BI_PREPARE_PARTS_TIMEOFFSET_DESC = "long value of the time offset"; + // time-format STRF_FORMAT; default=auto detect String BI_PREPARE_PARTS_TIMEFORMAT = "time-format"; String BI_PREPARE_TIMEFORMAT_HYPHEN_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEFORMAT; diff --git a/src/main/java/com/treasure_data/td_import/Options.java b/src/main/java/com/treasure_data/td_import/Options.java index 01ba1a25..43f1bad4 100644 --- a/src/main/java/com/treasure_data/td_import/Options.java +++ b/src/main/java/com/treasure_data/td_import/Options.java @@ -76,6 +76,12 @@ public void initPrepareOptionParser(Properties props) { .withRequiredArg() .describedAs("TIME") .ofType(String.class); + op.acceptsAll(Arrays.asList( + Configuration.BI_PREPARE_PARTS_TIMEOFFSET), + Configuration.BI_PREPARE_PARTS_TIMEOFFSET_DESC) + .withRequiredArg() + .describedAs("TIME") + .ofType(String.class); op.acceptsAll(Arrays.asList( Configuration.BI_PREPARE_PARTS_PRIMARY_KEY), Configuration.BI_PREPARE_PARTS_PRIMARY_KEY_DESC) diff --git a/src/main/java/com/treasure_data/td_import/model/AliasTimeColumnValue.java b/src/main/java/com/treasure_data/td_import/model/AliasTimeColumnValue.java index 7d69cfe0..b4cccf11 100644 --- a/src/main/java/com/treasure_data/td_import/model/AliasTimeColumnValue.java +++ b/src/main/java/com/treasure_data/td_import/model/AliasTimeColumnValue.java @@ -20,7 +20,7 @@ import com.treasure_data.td_import.prepare.Strftime; public class AliasTimeColumnValue extends TimeColumnValue { - public AliasTimeColumnValue(int index, Strftime timeFormat) { - super(index, timeFormat); + public AliasTimeColumnValue(int index, Strftime timeFormat, long offset) { + super(index, timeFormat, offset); } } \ No newline at end of file diff --git a/src/main/java/com/treasure_data/td_import/model/TimeColumnValue.java b/src/main/java/com/treasure_data/td_import/model/TimeColumnValue.java index 5293487e..d302e250 100644 --- a/src/main/java/com/treasure_data/td_import/model/TimeColumnValue.java +++ b/src/main/java/com/treasure_data/td_import/model/TimeColumnValue.java @@ -25,10 +25,16 @@ public class TimeColumnValue { protected int index; protected Strftime timeFormat; + protected final long offset; public TimeColumnValue(int index, Strftime timeFormat) { + this(index, timeFormat, 0); + } + + public TimeColumnValue(int index, Strftime timeFormat, long offset) { this.index = index; this.timeFormat = timeFormat; + this.offset = offset; } public int getIndex() { @@ -39,6 +45,10 @@ public Strftime getTimeFormat() { return timeFormat; } + public long getOffset() { + return offset; + } + public void write(ColumnValue v, RecordWriter with) throws PreparePartsException { v.getColumnType().filterAndWrite(v, this, with); } diff --git a/src/main/java/com/treasure_data/td_import/prepare/PrepareConfiguration.java b/src/main/java/com/treasure_data/td_import/prepare/PrepareConfiguration.java index 9d5ba84e..fdc8ecc5 100644 --- a/src/main/java/com/treasure_data/td_import/prepare/PrepareConfiguration.java +++ b/src/main/java/com/treasure_data/td_import/prepare/PrepareConfiguration.java @@ -549,6 +549,7 @@ static InvalidColumnsHandling get(String key) { protected String aliasTimeColumn; protected TimeValueTimeColumnValue timeValue = new TimeValueTimeColumnValue(-1); protected String timeFormat; + protected long timeOffset; protected boolean hasPrimaryKey = false; protected String primaryKey = null; @@ -610,6 +611,9 @@ public void configure(Properties props, Options options) { // time value setTimeValue(); + // time offset + setTimeOffset(); + // time format setTimeFormat(); @@ -933,6 +937,31 @@ public TimeValueTimeColumnValue getTimeValue() { return timeValue; } + public void setTimeOffset() { + if (!optionSet.has(BI_PREPARE_PARTS_TIMEOFFSET)) { + return; + } + + if (optionSet.has(BI_PREPARE_PARTS_TIMEVALUE)) { + throw new IllegalArgumentException( + "cannot specify both of 'time-value' and 'time-offset' options"); + } + + String v = (String) optionSet.valueOf(BI_PREPARE_PARTS_TIMEOFFSET); + if (v != null) { + try { + timeOffset = Long.parseLong(v); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "'time-offset option requires the long type argument'", e); + } + } + } + + public long getTimeOffset() { + return timeOffset; + } + public void setTimeFormat() { if (optionSet.has(BI_PREPARE_PARTS_TIMEFORMAT)) { if (hasPrimaryKey()) { diff --git a/src/main/java/com/treasure_data/td_import/reader/AbstractRecordReader.java b/src/main/java/com/treasure_data/td_import/reader/AbstractRecordReader.java index 16c2ff28..f2235e71 100644 --- a/src/main/java/com/treasure_data/td_import/reader/AbstractRecordReader.java +++ b/src/main/java/com/treasure_data/td_import/reader/AbstractRecordReader.java @@ -251,26 +251,26 @@ public void setTimeColumnValue(TimeColumnSampling[] sampleColumnValues, if (index < 0) { timeColumnValue = conf.getTimeValue(); } else if (conf.getTimeFormat() != null) { - timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat()); + timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(), conf.getTimeOffset()); } else { String suggested = sampleColumnValues[index].getSTRFTimeFormatRank(); if (suggested != null) { if (suggested.equals(TimeColumnSampling.HHmmss_STRF)) { - timeColumnValue = createTimeColumnValue(index, isAlias, new HHmmssStrftime()); + timeColumnValue = createTimeColumnValue(index, isAlias, new HHmmssStrftime(), conf.getTimeOffset()); } else { - timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(suggested)); + timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(suggested), conf.getTimeOffset()); } } else { - timeColumnValue = createTimeColumnValue(index, isAlias, null); + timeColumnValue = createTimeColumnValue(index, isAlias, null, conf.getTimeOffset()); } } } - private TimeColumnValue createTimeColumnValue(int index, boolean isAlias, Strftime strftime) { + private TimeColumnValue createTimeColumnValue(int index, boolean isAlias, Strftime strftime, long offset) { if (!isAlias) { - return new TimeColumnValue(index, strftime); + return new TimeColumnValue(index, strftime, offset); } else { - return new AliasTimeColumnValue(index, strftime); + return new AliasTimeColumnValue(index, strftime, offset); } } diff --git a/src/main/java/com/treasure_data/td_import/reader/MySQLTableReader.java b/src/main/java/com/treasure_data/td_import/reader/MySQLTableReader.java index cc055a22..0329e998 100644 --- a/src/main/java/com/treasure_data/td_import/reader/MySQLTableReader.java +++ b/src/main/java/com/treasure_data/td_import/reader/MySQLTableReader.java @@ -136,9 +136,9 @@ private void setTimeColumnValue(int timeColumnIndex, int aliasTimeColumnIndex) timeColumnValue = conf.getTimeValue(); } else { if (!isAlias) { - timeColumnValue = new TimeColumnValue(index, conf.getTimeFormat()); + timeColumnValue = new TimeColumnValue(index, conf.getTimeFormat(), conf.getTimeOffset()); } else { - timeColumnValue = new AliasTimeColumnValue(index, conf.getTimeFormat()); + timeColumnValue = new AliasTimeColumnValue(index, conf.getTimeFormat(), conf.getTimeOffset()); } } } diff --git a/src/main/java/com/treasure_data/td_import/reader/VariableLengthColumnsRecordReader.java b/src/main/java/com/treasure_data/td_import/reader/VariableLengthColumnsRecordReader.java index 96eb5100..903a7d76 100644 --- a/src/main/java/com/treasure_data/td_import/reader/VariableLengthColumnsRecordReader.java +++ b/src/main/java/com/treasure_data/td_import/reader/VariableLengthColumnsRecordReader.java @@ -141,9 +141,9 @@ public void setTimeColumnValue() throws PreparePartsException { } } else { if (timeColumnIndex >= 0) { - timeColumnValue = new TimeColumnValue(timeColumnIndex, conf.getTimeFormat()); + timeColumnValue = new TimeColumnValue(timeColumnIndex, conf.getTimeFormat(), conf.getTimeOffset()); } else if (aliasTimeColumnIndex >= 0) { - timeColumnValue = new AliasTimeColumnValue(aliasTimeColumnIndex, conf.getTimeFormat()); + timeColumnValue = new AliasTimeColumnValue(aliasTimeColumnIndex, conf.getTimeFormat(), conf.getTimeOffset()); } else if (conf.getTimeValue().getTimeValue() >= 0) { timeColumnValue = conf.getTimeValue(); } else { diff --git a/src/main/java/com/treasure_data/td_import/writer/AbstractRecordWriter.java b/src/main/java/com/treasure_data/td_import/writer/AbstractRecordWriter.java index 17ed86ef..5bc12c6a 100644 --- a/src/main/java/com/treasure_data/td_import/writer/AbstractRecordWriter.java +++ b/src/main/java/com/treasure_data/td_import/writer/AbstractRecordWriter.java @@ -184,6 +184,8 @@ public void write(TimeColumnValue filter, StringColumnValue v) throws PreparePar } } + time += filter.getOffset(); + write(time); } diff --git a/src/test/java/com/treasure_data/td_import/TestBulkImportOptions.java b/src/test/java/com/treasure_data/td_import/TestBulkImportOptions.java index 7e1b54d5..d6e8077f 100644 --- a/src/test/java/com/treasure_data/td_import/TestBulkImportOptions.java +++ b/src/test/java/com/treasure_data/td_import/TestBulkImportOptions.java @@ -24,6 +24,7 @@ public class TestBulkImportOptions { private final String sampleTimeColumn = "timestamp"; private final String sampleTimeFormat = "timeformat"; private final String sampleTimeValue = "100"; + private final String sampleTimeOffset = "86400"; private final String sampleOutput = "output_dir"; private final String sampleSplitSize = "100"; private final String sampleErrorRecordsHandling = "skip"; @@ -73,6 +74,7 @@ private String[] createPrepareArguments() { "--time-column", sampleTimeColumn, "--time-format", sampleTimeFormat, "--time-value", sampleTimeValue, + "--time-offset", sampleTimeOffset, "--output", sampleOutput, "--split-size", sampleSplitSize, "--error-records-handling", sampleErrorRecordsHandling, @@ -114,6 +116,7 @@ public void assertPrepareOptionEquals(Options actualOpts) throws Exception { assertOptionEquals("T", sampleTimeFormat, actualOpts); assertOptionEquals("time-format", sampleTimeFormat, actualOpts); assertOptionEquals("time-value", sampleTimeValue, actualOpts); + assertOptionEquals("time-offset", sampleTimeOffset, actualOpts); assertOptionEquals("output", sampleOutput, actualOpts); assertOptionEquals("split-size", sampleSplitSize, actualOpts); assertOptionEquals("error-records-handling", sampleErrorRecordsHandling, actualOpts); diff --git a/src/test/java/com/treasure_data/td_import/reader/TestCSVFileReader.java b/src/test/java/com/treasure_data/td_import/reader/TestCSVFileReader.java index 4e454fcd..4f512b2d 100644 --- a/src/test/java/com/treasure_data/td_import/reader/TestCSVFileReader.java +++ b/src/test/java/com/treasure_data/td_import/reader/TestCSVFileReader.java @@ -241,6 +241,14 @@ public void assertContextEquals(TestCSVFileReader test) { } } + public static class Context07 extends Context06 { + + public void assertContextEquals(TestCSVFileReader test) { + super.assertContextEquals(test); + assertEquals(test.reader.getTimeColumnValue().getOffset(), 86400); + } + } + protected String fileName = "./file.csv"; protected int numLine; @@ -398,6 +406,30 @@ public void checkContextWhenReaderConfigurationWithTimeFormat() throws Exception checkContextWhenReaderConfiguration(context); } + @Test + public void checkContextWhenReaderConfigurationWithTimeFormatWithTimeOffset() throws Exception { + Context07 context = new Context07(); + + // override system properties:-( + options = new Options(); + options.initPrepareOptionParser(props); + options.setOptions(new String[] { + "--format", + "csv", + "--column-header", + "--time-format", + context.getSTRFTimeFormat(), + "--time-offset", + "86400" + }); + + createPrepareConfiguration(); + createFileWriter(); + createFileReader(); + + checkContextWhenReaderConfiguration(context); + } + private void checkContextWhenReaderConfiguration(Context context) throws Exception { // create context context.createContext(this);