Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<msgpack.version>0.8.16</msgpack.version>
<super-csv.version>2.1.0</super-csv.version>
<td-client.version>0.9.0</td-client.version>
<aws-java-sdk.version>1.11.578</aws-java-sdk.version>
<aws-java-sdk.version>2.30.13</aws-java-sdk.version>
<junit.version>4.8.2</junit.version>
<mockito.version>1.9.5</mockito.version>

Expand All @@ -75,8 +75,8 @@

<!-- other properties -->
<javac.encoding>UTF-8</javac.encoding>
<javac.source>1.6</javac.source>
<javac.target>1.6</javac.target>
<javac.source>1.8</javac.source>
<javac.target>1.8</javac.target>
<javadoc.encoding>UTF-8</javadoc.encoding>
<javadoc.locale>en_US</javadoc.locale>
<maven-surefire-plugin.threadCount>1</maven-surefire-plugin.threadCount>
Expand Down Expand Up @@ -119,8 +119,13 @@
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/com/treasure_data/td_import/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
//
package com.treasure_data.td_import;

import com.amazonaws.Protocol;

public interface Constants {
long MAX_LOG_TIME = 253402300799L;

Expand Down Expand Up @@ -439,7 +437,6 @@ public interface Constants {
////////////////////////////////////////

// S3
Protocol BI_PREPARE_S3_PROTOCOL = Protocol.HTTPS;
int BI_PREPARE_S3_MAX_CONNECTIONS = 10; // SDK default: 50
int BI_PREPARE_S3_MAX_ERRORRETRY = 5; // SDK default: 3
int BI_PREPARE_S3_SOCKET_TIMEOUT = 8 * 60 * 1000; // SDK default: 50 * 1000
Expand Down
15 changes: 0 additions & 15 deletions src/main/java/com/treasure_data/td_import/source/MysqlSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,11 @@
//
package com.treasure_data.td_import.source;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.treasure_data.td_import.Configuration;

/**
*
Expand Down
161 changes: 92 additions & 69 deletions src/main/java/com/treasure_data/td_import/source/S3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,29 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Region;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import com.treasure_data.td_import.Configuration;

/**
Expand All @@ -60,28 +63,28 @@ public static List<Source> createSources(SourceDesc desc) {
String region = "";
try {
region = getBucketLocation(desc, bucket);
} catch (AmazonServiceException e) {
} catch (S3Exception e) {
LOG.warning("Failed to get S3 bucket location. Please make sure that your account is given 's3:GetBucketLocation' role.");
throw e;
}
AmazonS3 client = createAmazonS3Client(desc, region);
List<S3ObjectSummary> s3objects = getSources(client, bucket, basePath);
List<Source> srcs = new ArrayList<Source>();
for (S3ObjectSummary s3object : s3objects) {
S3Client client = createS3Client(desc, region);
List<S3Object> s3objects = getSources(client, bucket, basePath);
List<Source> srcs = new ArrayList<>();
for (S3Object s3object : s3objects) {
LOG.info(String.format("create s3-src s3object=%s, rawPath=%s",
s3object.getKey(), rawPath));
srcs.add(new S3Source(createAmazonS3Client(desc, region), rawPath,
s3object.getBucketName(), s3object.getKey(), s3object.getSize()));
s3object.key(), rawPath));
srcs.add(new S3Source(createS3Client(desc, region), rawPath,
bucket, s3object.key(), s3object.size()));
}

return srcs;
}

static AmazonS3 createAmazonS3Client(SourceDesc desc, String region) {
return createClientBuilder(desc).withRegion(region).build();
static S3Client createS3Client(SourceDesc desc, String region) {
return createClientBuilder(desc).region(Region.of(region)).build();
}

static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) {
static S3ClientBuilder createClientBuilder(SourceDesc desc) {
String accessKey = desc.getUser();
if (accessKey == null || accessKey.isEmpty()) {
throw new IllegalArgumentException("S3 AccessKey is null or empty.");
Expand All @@ -90,86 +93,103 @@ static AmazonS3ClientBuilder createClientBuilder(SourceDesc desc) {
if (secretAccessKey == null || secretAccessKey.isEmpty()) {
throw new IllegalArgumentException("S3 SecretAccessKey is null or empty.");
}
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretAccessKey);
AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretAccessKey);

ClientConfiguration clientConfig = new ClientConfiguration()
.withProtocol(Configuration.BI_PREPARE_S3_PROTOCOL)
.withMaxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS)
.withMaxErrorRetry(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY)
.withSocketTimeout(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT);
ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder()
.maxConnections(Configuration.BI_PREPARE_S3_MAX_CONNECTIONS)
.socketTimeout(java.time.Duration.ofMillis(Configuration.BI_PREPARE_S3_SOCKET_TIMEOUT));

return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withClientConfiguration(clientConfig);
ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
.retryStrategy(b -> b.maxAttempts(Configuration.BI_PREPARE_S3_MAX_ERRORRETRY))
.build();

return S3Client.builder()
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.httpClientBuilder(httpClientBuilder)
.overrideConfiguration(overrideConfig);
}

static String getBucketLocation(SourceDesc desc, String bucket) {
AmazonS3 client = createClientBuilder(desc).build();
String bucketRegion = client.getBucketLocation(bucket);
Region region = Region.fromValue(bucketRegion);
if (region.equals(Region.US_Standard)) {
return "us-east-1";
} else {
return region.toString();
try (S3Client client = createClientBuilder(desc).build()) {
GetBucketLocationRequest request = GetBucketLocationRequest
.builder()
.bucket(bucket)
.build();
String bucketRegion = client.getBucketLocation(request).locationConstraintAsString();
if (bucketRegion == null || bucketRegion.isEmpty()) {
return "us-east-1";
} else {
return bucketRegion;
}
}
}

static List<S3ObjectSummary> getSources(AmazonS3 client, String bucket, String basePath) {
static List<S3Object> getSources(S3Client client, String bucket, String basePath) {
String prefix;
int index = basePath.indexOf('*');
if (index >= 0) {
prefix = basePath.substring(0, index);
} else {
ObjectMetadata om = client.getObjectMetadata(bucket, basePath);
S3ObjectSummary s3object = new S3ObjectSummary();
s3object.setBucketName(bucket);
s3object.setKey(basePath);
s3object.setSize(om.getContentLength());

return Arrays.asList(s3object);
HeadObjectRequest headRequest = HeadObjectRequest.builder()
.bucket(bucket)
.key(basePath)
.build();
HeadObjectResponse headResponse = client.headObject(headRequest);
S3Object s3object = S3Object.builder()
.key(basePath)
.size(headResponse.contentLength())
.build();

return Collections.singletonList(s3object);
}

LOG.info(String.format("list s3 files by client %s: bucket=%s, basePath=%s, prefix=%s",
client, bucket, basePath, prefix));

List<S3ObjectSummary> s3objects = new ArrayList<S3ObjectSummary>();
String lastKey = prefix;
List<S3Object> s3objects = new ArrayList<>();
String continuationToken = null;
do {
ObjectListing listing = client.listObjects(new ListObjectsRequest(
bucket, prefix, lastKey, null, 1024));
for(S3ObjectSummary s3object : listing.getObjectSummaries()) {
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
.maxKeys(20);
if (continuationToken != null) {
requestBuilder.continuationToken(continuationToken);
}
ListObjectsV2Response response = client.listObjectsV2(requestBuilder.build());
for(S3Object s3object : response.contents()) {
s3objects.add(s3object);
}
lastKey = listing.getNextMarker();
} while (lastKey != null);
continuationToken = response.nextContinuationToken();
} while (continuationToken != null);

return filterSources(s3objects, basePath);
}

static List<S3ObjectSummary> filterSources(List<S3ObjectSummary> s3objects, String basePath) {
static List<S3Object> filterSources(List<S3Object> s3objects, String basePath) {
String regex = basePath.replace("*", "([^\\s]*)");
Pattern pattern = Pattern.compile(regex);

LOG.info(String.format("regex matching: regex=%s", regex));

List<S3ObjectSummary> matched = new ArrayList<S3ObjectSummary>();
for (S3ObjectSummary s3object : s3objects) {
Matcher m = pattern.matcher(s3object.getKey());
List<S3Object> matched = new ArrayList<S3Object>();
for (S3Object s3object : s3objects) {
Matcher m = pattern.matcher(s3object.key());
if (m.matches()) {
matched.add(s3object);
}
}
return matched;
}

protected AmazonS3 client;
protected S3Client client;

protected String bucket;
protected String key;
protected long size;
protected String rawPath;

S3Source(AmazonS3 client, String rawPath, String bucket, String key, long size) {
S3Source(S3Client client, String rawPath, String bucket, String key, long size) {
super("s3://"+bucket+"/"+key);
this.client = client;
this.bucket = bucket;
Expand All @@ -192,12 +212,15 @@ public long getSize() {
public InputStream getInputStream() throws IOException {
LOG.info(String.format("get s3 file by client %s: bucket=%s, key=%s",
client, bucket, key));
GetObjectRequest req = new GetObjectRequest(bucket, key);
req.setRange(0, size);
S3Object object = client.getObject(req);

if (object != null) {
return object.getObjectContent();
GetObjectRequest req = GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.range("bytes=0-" + (size - 1))
.build();
ResponseInputStream<GetObjectResponse> responseStream = client.getObject(req);

if (responseStream != null) {
return responseStream;
} else {
throw new IOException("s3 file is null.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.treasure_data.td_import.source;

import static org.junit.Assert.assertEquals;

import com.amazonaws.services.s3.AmazonS3;
import org.junit.Test;
import software.amazon.awssdk.services.s3.S3Client;

import static org.junit.Assert.assertEquals;

public class TestS3Source {
@Test
public void createAmazonS3Client() {
AmazonS3 client;
client = S3Source.createAmazonS3Client(SourceDesc
S3Client client;
client = S3Source.createS3Client(SourceDesc
.create("s3://kkk:sss@s3-ap-northeast-1.amazonaws.com/bucket-name/path/to/file.csv"), "ap-northeast-1");
assertEquals("ap-northeast-1", client.getRegion().getFirstRegionId());
assertEquals("ap-northeast-1", client.serviceClientConfiguration().region().id());
}
}
Loading