Switch from goamz to aws-sdk-go for the S3 storage backend

This commit is contained in:
Gilbert Chen
2017-06-13 12:27:01 -04:00
parent 2ace6c74e1
commit 2424a2eeed
2 changed files with 121 additions and 69 deletions

View File

@@ -5,57 +5,67 @@
package duplicacy
import (
"time"
"github.com/gilbertchen/goamz/aws"
"github.com/gilbertchen/goamz/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
type S3Storage struct {
RateLimitedStorage
buckets []*s3.Bucket
client *s3.S3
bucket string
storageDir string
numberOfThreads int
}
// CreateS3Storage creates a amazon s3 storage object.
func CreateS3Storage(regionName string, endpoint string, bucketName string, storageDir string,
accessKey string, secretKey string, threads int) (storage *S3Storage, err error) {
var region aws.Region
token := ""
auth := credentials.NewStaticCredentials(accessKey, secretKey, token)
if endpoint == "" {
if regionName == "" {
regionName = "us-east-1"
if regionName == "" && endpoint == "" {
defaultRegionConfig := &aws.Config {
Region: aws.String("us-east-1"),
Credentials: auth,
}
region = aws.Regions[regionName]
} else {
region = aws.Region { Name: regionName, S3Endpoint:"https://" + endpoint }
}
s3Client := s3.New(session.New(defaultRegionConfig))
auth := aws.Auth{ AccessKey: accessKey, SecretKey: secretKey }
response, err := s3Client.GetBucketLocation(&s3.GetBucketLocationInput{Bucket: aws.String(bucketName)})
var buckets []*s3.Bucket
for i := 0; i < threads; i++ {
s3Client := s3.New(auth, region)
s3Client.AttemptStrategy = aws.AttemptStrategy{
Min: 8,
Total: 300 * time.Second,
Delay: 1000 * time.Millisecond,
if err != nil {
return nil, err
}
regionName = "us-east-1"
if response.LocationConstraint != nil {
regionName = *response.LocationConstraint
}
bucket := s3Client.Bucket(bucketName)
buckets = append(buckets, bucket)
}
config := &aws.Config {
Region: aws.String(regionName),
Credentials: auth,
Endpoint: aws.String(endpoint),
}
if len(storageDir) > 0 && storageDir[len(storageDir) - 1] != '/' {
storageDir += "/"
}
storage = &S3Storage {
buckets: buckets,
client: s3.New(session.New(config)),
bucket: bucketName,
storageDir: storageDir,
numberOfThreads: threads,
}
return storage, nil
}
@@ -65,67 +75,82 @@ func (storage *S3Storage) ListFiles(threadIndex int, dir string) (files []string
dir += "/"
}
dirLength := len(storage.storageDir + dir)
if dir == "snapshots/" {
results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "/", "", 100)
dir = storage.storageDir + dir
input := s3.ListObjectsInput {
Bucket: aws.String(storage.bucket),
Prefix: aws.String(dir),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(1000),
}
output, err := storage.client.ListObjects(&input)
if err != nil {
return nil, nil, err
}
for _, subDir := range results.CommonPrefixes {
files = append(files, subDir[dirLength:])
for _, subDir := range output.CommonPrefixes {
files = append(files, (*subDir.Prefix)[len(dir):])
}
return files, nil, nil
} else if dir == "chunks/" {
} else {
dir = storage.storageDir + dir
marker := ""
for {
results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", marker, 1000)
input := s3.ListObjectsInput {
Bucket: aws.String(storage.bucket),
Prefix: aws.String(dir),
MaxKeys: aws.Int64(1000),
Marker: aws.String(marker),
}
output, err := storage.client.ListObjects(&input)
if err != nil {
return nil, nil, err
}
for _, object := range results.Contents {
files = append(files, object.Key[dirLength:])
sizes = append(sizes, object.Size)
for _, object := range output.Contents {
files = append(files, (*object.Key)[len(dir):])
sizes = append(sizes, *object.Size)
}
if !results.IsTruncated {
if !*output.IsTruncated {
break
}
marker = results.Contents[len(results.Contents) - 1].Key
marker = *output.Contents[len(output.Contents) - 1].Key
}
return files, sizes, nil
}
} else {
results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", "", 1000)
if err != nil {
return nil, nil, err
}
for _, object := range results.Contents {
files = append(files, object.Key[dirLength:])
}
return files, nil, nil
}
}
// DeleteFile deletes the file or directory at 'filePath'.
func (storage *S3Storage) DeleteFile(threadIndex int, filePath string) (err error) {
return storage.buckets[threadIndex].Del(storage.storageDir + filePath)
input := &s3.DeleteObjectInput {
Bucket: aws.String(storage.bucket),
Key: aws.String(storage.storageDir + filePath),
}
_, err = storage.client.DeleteObject(input)
return err
}
// MoveFile renames the file.
func (storage *S3Storage) MoveFile(threadIndex int, from string, to string) (err error) {
options := s3.CopyOptions { ContentType: "application/duplicacy" }
_, err = storage.buckets[threadIndex].PutCopy(storage.storageDir + to, s3.Private, options, storage.buckets[threadIndex].Name + "/" + storage.storageDir + from)
if err != nil {
return nil
input := &s3.CopyObjectInput {
Bucket: aws.String(storage.bucket),
CopySource: aws.String(storage.bucket + "/" + storage.storageDir + from),
Key: aws.String(storage.storageDir + to),
}
_, err = storage.client.CopyObject(input)
if err != nil {
return err
}
return storage.DeleteFile(threadIndex, from)
}
// CreateDirectory creates a new directory.
@@ -136,19 +161,24 @@ func (storage *S3Storage) CreateDirectory(threadIndex int, dir string) (err erro
// GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *S3Storage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
response, err := storage.buckets[threadIndex].Head(storage.storageDir + filePath, nil)
input := &s3.HeadObjectInput {
Bucket: aws.String(storage.bucket),
Key: aws.String(storage.storageDir + filePath),
}
output, err := storage.client.HeadObject(input)
if err != nil {
if e, ok := err.(*s3.Error); ok && (e.StatusCode == 403 || e.StatusCode == 404) {
if e, ok := err.(awserr.RequestFailure); ok && (e.StatusCode() == 403 || e.StatusCode() == 404) {
return false, false, 0, nil
} else {
return false, false, 0, err
}
}
}
if response.StatusCode == 403 || response.StatusCode == 404 {
if output == nil || output.ContentLength == nil {
return false, false, 0, nil
} else {
return true, false, response.ContentLength, nil
return true, false, *output.ContentLength, nil
}
}
@@ -174,14 +204,19 @@ func (storage *S3Storage) FindChunk(threadIndex int, chunkID string, isFossil bo
// DownloadFile reads the file at 'filePath' into the chunk.
func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
readCloser, err := storage.buckets[threadIndex].GetReader(storage.storageDir + filePath)
input := &s3.GetObjectInput {
Bucket: aws.String(storage.bucket),
Key: aws.String(storage.storageDir + filePath),
}
output, err := storage.client.GetObject(input)
if err != nil {
return err
}
defer readCloser.Close()
_, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit / len(storage.buckets))
defer output.Body.Close()
_, err = RateLimitedCopy(chunk, output.Body, storage.DownloadRateLimit / len(storage.bucket))
return err
}
@@ -189,9 +224,16 @@ func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk *
// UploadFile writes 'content' to the file at 'filePath'.
func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
options := s3.Options { }
reader := CreateRateLimitedReader(content, storage.UploadRateLimit / len(storage.buckets))
return storage.buckets[threadIndex].PutReader(storage.storageDir + filePath, reader, int64(len(content)), "application/duplicacy", s3.Private, options)
input := &s3.PutObjectInput {
Bucket: aws.String(storage.bucket),
Key: aws.String(storage.storageDir + filePath),
ACL: aws.String(s3.ObjectCannedACLPrivate),
Body: CreateRateLimitedReader(content, storage.UploadRateLimit / len(storage.bucket)),
ContentType: aws.String("application/duplicacy"),
}
_, err = storage.client.PutObject(input)
return err
}
// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when