From 2424a2eeedb08b956bfa64356dbed0a8c6fbacc5 Mon Sep 17 00:00:00 2001 From: Gilbert Chen Date: Tue, 13 Jun 2017 12:27:01 -0400 Subject: [PATCH] Switch from goamz to aws-sdk-go for the S3 storage backend --- src/duplicacy_s3storage.go | 180 +++++++++++++++++++++++-------------- src/duplicacy_utils.go | 10 +++ 2 files changed, 121 insertions(+), 69 deletions(-) diff --git a/src/duplicacy_s3storage.go b/src/duplicacy_s3storage.go index 501a80e..b4bb76e 100644 --- a/src/duplicacy_s3storage.go +++ b/src/duplicacy_s3storage.go @@ -5,57 +5,67 @@ package duplicacy import ( - "time" - "github.com/gilbertchen/goamz/aws" - "github.com/gilbertchen/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" ) type S3Storage struct { RateLimitedStorage - buckets []*s3.Bucket + client *s3.S3 + bucket string storageDir string + numberOfThreads int } // CreateS3Storage creates a amazon s3 storage object. func CreateS3Storage(regionName string, endpoint string, bucketName string, storageDir string, accessKey string, secretKey string, threads int) (storage *S3Storage, err error) { - var region aws.Region + token := "" + + auth := credentials.NewStaticCredentials(accessKey, secretKey, token) - if endpoint == "" { - if regionName == "" { - regionName = "us-east-1" + if regionName == "" && endpoint == "" { + defaultRegionConfig := &aws.Config { + Region: aws.String("us-east-1"), + Credentials: auth, } - region = aws.Regions[regionName] - } else { - region = aws.Region { Name: regionName, S3Endpoint:"https://" + endpoint } - } + + s3Client := s3.New(session.New(defaultRegionConfig)) - auth := aws.Auth{ AccessKey: accessKey, SecretKey: secretKey } + response, err := s3Client.GetBucketLocation(&s3.GetBucketLocationInput{Bucket: aws.String(bucketName)}) - var buckets []*s3.Bucket - for i := 0; i < threads; i++ { - s3Client := s3.New(auth, region) - s3Client.AttemptStrategy = aws.AttemptStrategy{ - Min: 8, - Total: 300 * time.Second, - Delay: 1000 * time.Millisecond, + if err != nil { + return nil, err + } + + regionName = "us-east-1" + if response.LocationConstraint != nil { + regionName = *response.LocationConstraint } - - bucket := s3Client.Bucket(bucketName) - buckets = append(buckets, bucket) } - + + config := &aws.Config { + Region: aws.String(regionName), + Credentials: auth, + Endpoint: aws.String(endpoint), + } + if len(storageDir) > 0 && storageDir[len(storageDir) - 1] != '/' { storageDir += "/" } storage = &S3Storage { - buckets: buckets, + client: s3.New(session.New(config)), + bucket: bucketName, storageDir: storageDir, + numberOfThreads: threads, } - + return storage, nil } @@ -65,67 +75,82 @@ func (storage *S3Storage) ListFiles(threadIndex int, dir string) (files []string dir += "/" } - dirLength := len(storage.storageDir + dir) if dir == "snapshots/" { - results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "/", "", 100) + dir = storage.storageDir + dir + input := s3.ListObjectsInput { + Bucket: aws.String(storage.bucket), + Prefix: aws.String(dir), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(1000), + } + + output, err := storage.client.ListObjects(&input) if err != nil { return nil, nil, err } - - for _, subDir := range results.CommonPrefixes { - files = append(files, subDir[dirLength:]) + + for _, subDir := range output.CommonPrefixes { + files = append(files, (*subDir.Prefix)[len(dir):]) } return files, nil, nil - } else if dir == "chunks/" { + } else { + dir = storage.storageDir + dir marker := "" for { - results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", marker, 1000) + input := s3.ListObjectsInput { + Bucket: aws.String(storage.bucket), + Prefix: aws.String(dir), + MaxKeys: aws.Int64(1000), + Marker: aws.String(marker), + } + + output, err := storage.client.ListObjects(&input) if err != nil { return nil, nil, err } - for _, object := range results.Contents { - files = append(files, object.Key[dirLength:]) - sizes = append(sizes, object.Size) + for _, object := range output.Contents { + files = append(files, (*object.Key)[len(dir):]) + sizes = append(sizes, *object.Size) } - if !results.IsTruncated { + if !*output.IsTruncated { break } - marker = results.Contents[len(results.Contents) - 1].Key + marker = *output.Contents[len(output.Contents) - 1].Key } return files, sizes, nil + } - } else { - - results, err := storage.buckets[threadIndex].List(storage.storageDir + dir, "", "", 1000) - if err != nil { - return nil, nil, err - } - - for _, object := range results.Contents { - files = append(files, object.Key[dirLength:]) - } - return files, nil, nil - } } // DeleteFile deletes the file or directory at 'filePath'. func (storage *S3Storage) DeleteFile(threadIndex int, filePath string) (err error) { - return storage.buckets[threadIndex].Del(storage.storageDir + filePath) + input := &s3.DeleteObjectInput { + Bucket: aws.String(storage.bucket), + Key: aws.String(storage.storageDir + filePath), + } + _, err = storage.client.DeleteObject(input) + return err } // MoveFile renames the file. func (storage *S3Storage) MoveFile(threadIndex int, from string, to string) (err error) { - options := s3.CopyOptions { ContentType: "application/duplicacy" } - _, err = storage.buckets[threadIndex].PutCopy(storage.storageDir + to, s3.Private, options, storage.buckets[threadIndex].Name + "/" + storage.storageDir + from) - if err != nil { - return nil + input := &s3.CopyObjectInput { + Bucket: aws.String(storage.bucket), + CopySource: aws.String(storage.bucket + "/" + storage.storageDir + from), + Key: aws.String(storage.storageDir + to), } + _, err = storage.client.CopyObject(input) + if err != nil { + return err + } + return storage.DeleteFile(threadIndex, from) + } // CreateDirectory creates a new directory. @@ -136,19 +161,24 @@ func (storage *S3Storage) CreateDirectory(threadIndex int, dir string) (err erro // GetFileInfo returns the information about the file or directory at 'filePath'. func (storage *S3Storage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { - response, err := storage.buckets[threadIndex].Head(storage.storageDir + filePath, nil) + input := &s3.HeadObjectInput { + Bucket: aws.String(storage.bucket), + Key: aws.String(storage.storageDir + filePath), + } + + output, err := storage.client.HeadObject(input) if err != nil { - if e, ok := err.(*s3.Error); ok && (e.StatusCode == 403 || e.StatusCode == 404) { + if e, ok := err.(awserr.RequestFailure); ok && (e.StatusCode() == 403 || e.StatusCode() == 404) { return false, false, 0, nil } else { return false, false, 0, err - } + } } - - if response.StatusCode == 403 || response.StatusCode == 404 { + + if output == nil || output.ContentLength == nil { return false, false, 0, nil } else { - return true, false, response.ContentLength, nil + return true, false, *output.ContentLength, nil } } @@ -174,14 +204,19 @@ func (storage *S3Storage) FindChunk(threadIndex int, chunkID string, isFossil bo // DownloadFile reads the file at 'filePath' into the chunk. func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { - readCloser, err := storage.buckets[threadIndex].GetReader(storage.storageDir + filePath) + input := &s3.GetObjectInput { + Bucket: aws.String(storage.bucket), + Key: aws.String(storage.storageDir + filePath), + } + + output, err := storage.client.GetObject(input) if err != nil { return err } - - defer readCloser.Close() - - _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit / len(storage.buckets)) + + defer output.Body.Close() + + _, err = RateLimitedCopy(chunk, output.Body, storage.DownloadRateLimit / len(storage.bucket)) return err } @@ -189,9 +224,16 @@ func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk * // UploadFile writes 'content' to the file at 'filePath'. func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { - options := s3.Options { } - reader := CreateRateLimitedReader(content, storage.UploadRateLimit / len(storage.buckets)) - return storage.buckets[threadIndex].PutReader(storage.storageDir + filePath, reader, int64(len(content)), "application/duplicacy", s3.Private, options) + input := &s3.PutObjectInput { + Bucket: aws.String(storage.bucket), + Key: aws.String(storage.storageDir + filePath), + ACL: aws.String(s3.ObjectCannedACLPrivate), + Body: CreateRateLimitedReader(content, storage.UploadRateLimit / len(storage.bucket)), + ContentType: aws.String("application/duplicacy"), + } + + _, err = storage.client.PutObject(input) + return err } // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when diff --git a/src/duplicacy_utils.go b/src/duplicacy_utils.go index 899adbd..90d0ee5 100644 --- a/src/duplicacy_utils.go +++ b/src/duplicacy_utils.go @@ -48,6 +48,16 @@ func (reader *RateLimitedReader) Reset() { reader.Next = 0 } +func (reader *RateLimitedReader) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekStart { + reader.Next = int(offset) + } else if whence == io.SeekCurrent { + reader.Next += int(offset) + } else { + reader.Next = len(reader.Content) - int(offset) + } + return int64(reader.Next), nil +} func (reader *RateLimitedReader) Read(p []byte) (n int, err error) {