diff --git a/pom.xml b/pom.xml index 439d016e..d908fccf 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ 0.8.16 2.1.0 0.9.0 - 1.11.578 + 2.30.13 4.8.2 1.9.5 @@ -75,8 +75,8 @@ UTF-8 - 1.6 - 1.6 + 1.8 + 1.8 UTF-8 en_US 1 @@ -119,8 +119,13 @@ ${mysql-connector-java.version} - com.amazonaws - aws-java-sdk + software.amazon.awssdk + s3 + ${aws-java-sdk.version} + + + software.amazon.awssdk + apache-client ${aws-java-sdk.version} diff --git a/src/main/java/com/treasure_data/td_import/Constants.java b/src/main/java/com/treasure_data/td_import/Constants.java index 65306917..5828b601 100644 --- a/src/main/java/com/treasure_data/td_import/Constants.java +++ b/src/main/java/com/treasure_data/td_import/Constants.java @@ -17,8 +17,6 @@ // package com.treasure_data.td_import; -import com.amazonaws.Protocol; - public interface Constants { long MAX_LOG_TIME = 253402300799L; @@ -439,7 +437,6 @@ public interface Constants { //////////////////////////////////////// // S3 - Protocol BI_PREPARE_S3_PROTOCOL = Protocol.HTTPS; int BI_PREPARE_S3_MAX_CONNECTIONS = 10; // SDK default: 50 int BI_PREPARE_S3_MAX_ERRORRETRY = 5; // SDK default: 3 int BI_PREPARE_S3_SOCKET_TIMEOUT = 8 * 60 * 1000; // SDK default: 50 * 1000 diff --git a/src/main/java/com/treasure_data/td_import/source/MysqlSource.java b/src/main/java/com/treasure_data/td_import/source/MysqlSource.java index e762672a..9e243a56 100644 --- a/src/main/java/com/treasure_data/td_import/source/MysqlSource.java +++ b/src/main/java/com/treasure_data/td_import/source/MysqlSource.java @@ -17,26 +17,11 @@ // package com.treasure_data.td_import.source; -import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.logging.Logger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.treasure_data.td_import.Configuration; /** * diff --git a/src/main/java/com/treasure_data/td_import/source/S3Source.java b/src/main/java/com/treasure_data/td_import/source/S3Source.java index 21a6b658..e170aaa4 100644 --- a/src/main/java/com/treasure_data/td_import/source/S3Source.java +++ b/src/main/java/com/treasure_data/td_import/source/S3Source.java @@ -20,26 +20,29 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.Region; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; import com.treasure_data.td_import.Configuration; /** @@ -60,28 +63,28 @@ public static List createSources(SourceDesc desc) { String region = ""; try { region = getBucketLocation(desc, bucket); - } catch (AmazonServiceException e) { + } catch (S3Exception e) { LOG.warning("Failed to get S3 bucket location. Please make sure that your account is given 's3:GetBucketLocation' role."); throw e; } - AmazonS3 client = createAmazonS3Client(desc, region); - List s3objects = getSources(client, bucket, basePath); - List srcs = new ArrayList(); - for (S3ObjectSummary s3object : s3objects) { + S3Client client = createS3Client(desc, region); + List s3objects = getSources(client, bucket, basePath); + List srcs = new ArrayList<>(); + for (S3Object s3object : s3objects) { LOG.info(String.format("create s3-src s3object=%s, rawPath=%s", - s3object.getKey(), rawPath)); - srcs.add(new S3Source(createAmazonS3Client(desc, region), rawPath, - s3object.getBucketName(), s3object.getKey(), s3object.getSize())); + s3object.key(), rawPath)); + srcs.add(new S3Source(createS3Client(desc, region), rawPath, + bucket, s3object.key(), s3object.size())); } return srcs; } - static AmazonS3 createAmazonS3Client(SourceDesc desc, String region) { - return createClientBuilder(desc).withRegion(region).build(); + static S3Client createS3Client(SourceDesc desc, String region) { + return createClientBuilder(desc).region(Region.of(region)).build(); } - static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) { + static S3ClientBuilder createClientBuilder(SourceDesc desc) { String accessKey = desc.getUser(); if (accessKey == null || accessKey.isEmpty()) { throw new IllegalArgumentException("S3 AccessKey is null or empty."); @@ -90,71 +93,88 @@ static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) { if (secretAccessKey == null || secretAccessKey.isEmpty()) { throw new IllegalArgumentException("S3 SecretAccessKey is null or empty."); } - AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretAccessKey); + AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretAccessKey); - ClientConfiguration clientConfig = new ClientConfiguration() - .withProtocol(Configuration.BI_PREPARE_S3_PROTOCOL) - .withMaxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS) - .withMaxErrorRetry(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY) - .withSocketTimeout(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT); + ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder() + .maxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS) + .socketTimeout(java.time.Duration.ofMillis(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT)); - return AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(credentials)) - .withClientConfiguration(clientConfig); + ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() + .retryStrategy(b -> b.maxAttempts(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY)) + .build(); + + return S3Client.builder() + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .httpClientBuilder(httpClientBuilder) + .overrideConfiguration(overrideConfig); } static String getBucketLocation(SourceDesc desc, String bucket) { - AmazonS3 client = createClientBuilder(desc).build(); - String bucketRegion = client.getBucketLocation(bucket); - Region region = Region.fromValue(bucketRegion); - if (region.equals(Region.US_Standard)) { - return "us-east-1"; - } else { - return region.toString(); + try (S3Client client = createClientBuilder(desc).build()) { + GetBucketLocationRequest request = GetBucketLocationRequest + .builder() + .bucket(bucket) + .build(); + String bucketRegion = client.getBucketLocation(request).locationConstraintAsString(); + if (bucketRegion == null || bucketRegion.isEmpty()) { + return "us-east-1"; + } else { + return bucketRegion; + } } } - static List getSources(AmazonS3 client, String bucket, String basePath) { + static List getSources(S3Client client, String bucket, String basePath) { String prefix; int index = basePath.indexOf('*'); if (index >= 0) { prefix = basePath.substring(0, index); } else { - ObjectMetadata om = client.getObjectMetadata(bucket, basePath); - S3ObjectSummary s3object = new S3ObjectSummary(); - s3object.setBucketName(bucket); - s3object.setKey(basePath); - s3object.setSize(om.getContentLength()); - - return Arrays.asList(s3object); + HeadObjectRequest headRequest = HeadObjectRequest.builder() + .bucket(bucket) + .key(basePath) + .build(); + HeadObjectResponse headResponse = client.headObject(headRequest); + S3Object s3object = S3Object.builder() + .key(basePath) + .size(headResponse.contentLength()) + .build(); + + return Collections.singletonList(s3object); } LOG.info(String.format("list s3 files by client %s: bucket=%s, basePath=%s, prefix=%s", client, bucket, basePath, prefix)); - List s3objects = new ArrayList(); - String lastKey = prefix; + List s3objects = new ArrayList<>(); + String continuationToken = null; do { - ObjectListing listing = client.listObjects(new ListObjectsRequest( - bucket, prefix, lastKey, null, 1024)); - for(S3ObjectSummary s3object : listing.getObjectSummaries()) { + ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(prefix) + .maxKeys(20); + if (continuationToken != null) { + requestBuilder.continuationToken(continuationToken); + } + ListObjectsV2Response response = client.listObjectsV2(requestBuilder.build()); + for(S3Object s3object : response.contents()) { s3objects.add(s3object); } - lastKey = listing.getNextMarker(); - } while (lastKey != null); + continuationToken = response.nextContinuationToken(); + } while (continuationToken != null); return filterSources(s3objects, basePath); } - static List filterSources(List s3objects, String basePath) { + static List filterSources(List s3objects, String basePath) { String regex = basePath.replace("*", "([^\\s]*)"); Pattern pattern = Pattern.compile(regex); LOG.info(String.format("regex matching: regex=%s", regex)); - List matched = new ArrayList(); - for (S3ObjectSummary s3object : s3objects) { - Matcher m = pattern.matcher(s3object.getKey()); + List matched = new ArrayList(); + for (S3Object s3object : s3objects) { + Matcher m = pattern.matcher(s3object.key()); if (m.matches()) { matched.add(s3object); } @@ -162,14 +182,14 @@ static List filterSources(List s3objects, Stri return matched; } - protected AmazonS3 client; + protected S3Client client; protected String bucket; protected String key; protected long size; protected String rawPath; - S3Source(AmazonS3 client, String rawPath, String bucket, String key, long size) { + S3Source(S3Client client, String rawPath, String bucket, String key, long size) { super("s3://"+bucket+"/"+key); this.client = client; this.bucket = bucket; @@ -192,12 +212,15 @@ public long getSize() { public InputStream getInputStream() throws IOException { LOG.info(String.format("get s3 file by client %s: bucket=%s, key=%s", client, bucket, key)); - GetObjectRequest req = new GetObjectRequest(bucket, key); - req.setRange(0, size); - S3Object object = client.getObject(req); - - if (object != null) { - return object.getObjectContent(); + GetObjectRequest req = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .range("bytes=0-" + (size - 1)) + .build(); + ResponseInputStream responseStream = client.getObject(req); + + if (responseStream != null) { + return responseStream; } else { throw new IOException("s3 file is null."); } diff --git a/src/test/java/com/treasure_data/td_import/source/TestS3Source.java b/src/test/java/com/treasure_data/td_import/source/TestS3Source.java index eb17ef9e..a0bc2543 100644 --- a/src/test/java/com/treasure_data/td_import/source/TestS3Source.java +++ b/src/test/java/com/treasure_data/td_import/source/TestS3Source.java @@ -1,16 +1,16 @@ package com.treasure_data.td_import.source; -import static org.junit.Assert.assertEquals; - -import com.amazonaws.services.s3.AmazonS3; import org.junit.Test; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.junit.Assert.assertEquals; public class TestS3Source { @Test public void createAmazonS3Client() { - AmazonS3 client; - client = S3Source.createAmazonS3Client(SourceDesc + S3Client client; + client = S3Source.createS3Client(SourceDesc .create("s3://kkk:sss@s3-ap-northeast-1.amazonaws.com/bucket-name/path/to/file.csv"), "ap-northeast-1"); - assertEquals("ap-northeast-1", client.getRegion().getFirstRegionId()); + assertEquals("ap-northeast-1", client.serviceClientConfiguration().region().id()); } }