diff --git a/pom.xml b/pom.xml
index 439d016e..d908fccf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
0.8.16
2.1.0
0.9.0
- 1.11.578
+ 2.30.13
4.8.2
1.9.5
@@ -75,8 +75,8 @@
UTF-8
- 1.6
- 1.6
+ 1.8
+ 1.8
UTF-8
en_US
1
@@ -119,8 +119,13 @@
${mysql-connector-java.version}
- com.amazonaws
- aws-java-sdk
+ software.amazon.awssdk
+ s3
+ ${aws-java-sdk.version}
+
+
+ software.amazon.awssdk
+ apache-client
${aws-java-sdk.version}
diff --git a/src/main/java/com/treasure_data/td_import/Constants.java b/src/main/java/com/treasure_data/td_import/Constants.java
index 65306917..5828b601 100644
--- a/src/main/java/com/treasure_data/td_import/Constants.java
+++ b/src/main/java/com/treasure_data/td_import/Constants.java
@@ -17,8 +17,6 @@
//
package com.treasure_data.td_import;
-import com.amazonaws.Protocol;
-
public interface Constants {
long MAX_LOG_TIME = 253402300799L;
@@ -439,7 +437,6 @@ public interface Constants {
////////////////////////////////////////
// S3
- Protocol BI_PREPARE_S3_PROTOCOL = Protocol.HTTPS;
int BI_PREPARE_S3_MAX_CONNECTIONS = 10; // SDK default: 50
int BI_PREPARE_S3_MAX_ERRORRETRY = 5; // SDK default: 3
int BI_PREPARE_S3_SOCKET_TIMEOUT = 8 * 60 * 1000; // SDK default: 50 * 1000
diff --git a/src/main/java/com/treasure_data/td_import/source/MysqlSource.java b/src/main/java/com/treasure_data/td_import/source/MysqlSource.java
index e762672a..9e243a56 100644
--- a/src/main/java/com/treasure_data/td_import/source/MysqlSource.java
+++ b/src/main/java/com/treasure_data/td_import/source/MysqlSource.java
@@ -17,26 +17,11 @@
//
package com.treasure_data.td_import.source;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.treasure_data.td_import.Configuration;
/**
*
diff --git a/src/main/java/com/treasure_data/td_import/source/S3Source.java b/src/main/java/com/treasure_data/td_import/source/S3Source.java
index 21a6b658..e170aaa4 100644
--- a/src/main/java/com/treasure_data/td_import/source/S3Source.java
+++ b/src/main/java/com/treasure_data/td_import/source/S3Source.java
@@ -20,26 +20,29 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.Region;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
import com.treasure_data.td_import.Configuration;
/**
@@ -60,28 +63,28 @@ public static List createSources(SourceDesc desc) {
String region = "";
try {
region = getBucketLocation(desc, bucket);
- } catch (AmazonServiceException e) {
+ } catch (S3Exception e) {
LOG.warning("Failed to get S3 bucket location. Please make sure that your account is given 's3:GetBucketLocation' role.");
throw e;
}
- AmazonS3 client = createAmazonS3Client(desc, region);
- List s3objects = getSources(client, bucket, basePath);
- List srcs = new ArrayList();
- for (S3ObjectSummary s3object : s3objects) {
+ S3Client client = createS3Client(desc, region);
+ List s3objects = getSources(client, bucket, basePath);
+ List srcs = new ArrayList<>();
+ for (S3Object s3object : s3objects) {
LOG.info(String.format("create s3-src s3object=%s, rawPath=%s",
- s3object.getKey(), rawPath));
- srcs.add(new S3Source(createAmazonS3Client(desc, region), rawPath,
- s3object.getBucketName(), s3object.getKey(), s3object.getSize()));
+ s3object.key(), rawPath));
+ srcs.add(new S3Source(createS3Client(desc, region), rawPath,
+ bucket, s3object.key(), s3object.size()));
}
return srcs;
}
- static AmazonS3 createAmazonS3Client(SourceDesc desc, String region) {
- return createClientBuilder(desc).withRegion(region).build();
+ static S3Client createS3Client(SourceDesc desc, String region) {
+ return createClientBuilder(desc).region(Region.of(region)).build();
}
- static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) {
+ static S3ClientBuilder createClientBuilder(SourceDesc desc) {
String accessKey = desc.getUser();
if (accessKey == null || accessKey.isEmpty()) {
throw new IllegalArgumentException("S3 AccessKey is null or empty.");
@@ -90,71 +93,88 @@ static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) {
if (secretAccessKey == null || secretAccessKey.isEmpty()) {
throw new IllegalArgumentException("S3 SecretAccessKey is null or empty.");
}
- AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretAccessKey);
+ AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretAccessKey);
- ClientConfiguration clientConfig = new ClientConfiguration()
- .withProtocol(Configuration.BI_PREPARE_S3_PROTOCOL)
- .withMaxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS)
- .withMaxErrorRetry(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY)
- .withSocketTimeout(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT);
+ ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder()
+ .maxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS)
+ .socketTimeout(java.time.Duration.ofMillis(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT));
- return AmazonS3ClientBuilder.standard()
- .withCredentials(new AWSStaticCredentialsProvider(credentials))
- .withClientConfiguration(clientConfig);
+ ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
+ .retryStrategy(b -> b.maxAttempts(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY))
+ .build();
+
+ return S3Client.builder()
+ .credentialsProvider(StaticCredentialsProvider.create(credentials))
+ .httpClientBuilder(httpClientBuilder)
+ .overrideConfiguration(overrideConfig);
}
static String getBucketLocation(SourceDesc desc, String bucket) {
- AmazonS3 client = createClientBuilder(desc).build();
- String bucketRegion = client.getBucketLocation(bucket);
- Region region = Region.fromValue(bucketRegion);
- if (region.equals(Region.US_Standard)) {
- return "us-east-1";
- } else {
- return region.toString();
+ try (S3Client client = createClientBuilder(desc).build()) {
+ GetBucketLocationRequest request = GetBucketLocationRequest
+ .builder()
+ .bucket(bucket)
+ .build();
+ String bucketRegion = client.getBucketLocation(request).locationConstraintAsString();
+ if (bucketRegion == null || bucketRegion.isEmpty()) {
+ return "us-east-1";
+ } else {
+ return bucketRegion;
+ }
}
}
- static List getSources(AmazonS3 client, String bucket, String basePath) {
+ static List getSources(S3Client client, String bucket, String basePath) {
String prefix;
int index = basePath.indexOf('*');
if (index >= 0) {
prefix = basePath.substring(0, index);
} else {
- ObjectMetadata om = client.getObjectMetadata(bucket, basePath);
- S3ObjectSummary s3object = new S3ObjectSummary();
- s3object.setBucketName(bucket);
- s3object.setKey(basePath);
- s3object.setSize(om.getContentLength());
-
- return Arrays.asList(s3object);
+ HeadObjectRequest headRequest = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(basePath)
+ .build();
+ HeadObjectResponse headResponse = client.headObject(headRequest);
+ S3Object s3object = S3Object.builder()
+ .key(basePath)
+ .size(headResponse.contentLength())
+ .build();
+
+ return Collections.singletonList(s3object);
}
LOG.info(String.format("list s3 files by client %s: bucket=%s, basePath=%s, prefix=%s",
client, bucket, basePath, prefix));
- List s3objects = new ArrayList();
- String lastKey = prefix;
+ List s3objects = new ArrayList<>();
+ String continuationToken = null;
do {
- ObjectListing listing = client.listObjects(new ListObjectsRequest(
- bucket, prefix, lastKey, null, 1024));
- for(S3ObjectSummary s3object : listing.getObjectSummaries()) {
+ ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(prefix)
+ .maxKeys(20);
+ if (continuationToken != null) {
+ requestBuilder.continuationToken(continuationToken);
+ }
+ ListObjectsV2Response response = client.listObjectsV2(requestBuilder.build());
+ for(S3Object s3object : response.contents()) {
s3objects.add(s3object);
}
- lastKey = listing.getNextMarker();
- } while (lastKey != null);
+ continuationToken = response.nextContinuationToken();
+ } while (continuationToken != null);
return filterSources(s3objects, basePath);
}
- static List filterSources(List s3objects, String basePath) {
+ static List filterSources(List s3objects, String basePath) {
String regex = basePath.replace("*", "([^\\s]*)");
Pattern pattern = Pattern.compile(regex);
LOG.info(String.format("regex matching: regex=%s", regex));
- List matched = new ArrayList();
- for (S3ObjectSummary s3object : s3objects) {
- Matcher m = pattern.matcher(s3object.getKey());
+ List matched = new ArrayList();
+ for (S3Object s3object : s3objects) {
+ Matcher m = pattern.matcher(s3object.key());
if (m.matches()) {
matched.add(s3object);
}
@@ -162,14 +182,14 @@ static List filterSources(List s3objects, Stri
return matched;
}
- protected AmazonS3 client;
+ protected S3Client client;
protected String bucket;
protected String key;
protected long size;
protected String rawPath;
- S3Source(AmazonS3 client, String rawPath, String bucket, String key, long size) {
+ S3Source(S3Client client, String rawPath, String bucket, String key, long size) {
super("s3://"+bucket+"/"+key);
this.client = client;
this.bucket = bucket;
@@ -192,12 +212,15 @@ public long getSize() {
public InputStream getInputStream() throws IOException {
LOG.info(String.format("get s3 file by client %s: bucket=%s, key=%s",
client, bucket, key));
- GetObjectRequest req = new GetObjectRequest(bucket, key);
- req.setRange(0, size);
- S3Object object = client.getObject(req);
-
- if (object != null) {
- return object.getObjectContent();
+ GetObjectRequest req = GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .range("bytes=0-" + (size - 1))
+ .build();
+ ResponseInputStream responseStream = client.getObject(req);
+
+ if (responseStream != null) {
+ return responseStream;
} else {
throw new IOException("s3 file is null.");
}
diff --git a/src/test/java/com/treasure_data/td_import/source/TestS3Source.java b/src/test/java/com/treasure_data/td_import/source/TestS3Source.java
index eb17ef9e..a0bc2543 100644
--- a/src/test/java/com/treasure_data/td_import/source/TestS3Source.java
+++ b/src/test/java/com/treasure_data/td_import/source/TestS3Source.java
@@ -1,16 +1,16 @@
package com.treasure_data.td_import.source;
-import static org.junit.Assert.assertEquals;
-
-import com.amazonaws.services.s3.AmazonS3;
import org.junit.Test;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.junit.Assert.assertEquals;
public class TestS3Source {
@Test
public void createAmazonS3Client() {
- AmazonS3 client;
- client = S3Source.createAmazonS3Client(SourceDesc
+ S3Client client;
+ client = S3Source.createS3Client(SourceDesc
.create("s3://kkk:sss@s3-ap-northeast-1.amazonaws.com/bucket-name/path/to/file.csv"), "ap-northeast-1");
- assertEquals("ap-northeast-1", client.getRegion().getFirstRegionId());
+ assertEquals("ap-northeast-1", client.serviceClientConfiguration().region().id());
}
}