From 6a03a98f55307699cbbe75f714ac71ace848a763 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Wed, 20 Sep 2017 10:43:47 +0300 Subject: [PATCH 1/7] Exponential Backoff should work now. Maximum sleep is 32*2. --- src/duplicacy_gcdstorage.go | 923 ++++++++++++++++++------------------ 1 file changed, 474 insertions(+), 449 deletions(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index fb5b4d7..fcd784e 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -6,610 +6,635 @@ package duplicacy import ( "io" - "fmt" - "net" + "fmt" + "net" "path" "time" "sync" "strings" - "net/http" - "net/url" + "net/http" + "net/url" "io/ioutil" "math/rand" "encoding/json" - "golang.org/x/net/context" - "golang.org/x/oauth2" - "google.golang.org/api/drive/v3" - "google.golang.org/api/googleapi" + "golang.org/x/net/context" + "golang.org/x/oauth2" + "google.golang.org/api/drive/v3" + "google.golang.org/api/googleapi" ) type GCDStorage struct { - RateLimitedStorage + RateLimitedStorage - service *drive.Service - idCache map[string]string - idCacheLock *sync.Mutex - backoffs []int - - isConnected bool - numberOfThreads int - TestMode bool + service *drive.Service + idCache map[string]string + idCacheLock *sync.Mutex + backoffs []float64 + backoffsRetries []int + isConnected bool + numberOfThreads int + TestMode bool } type GCDConfig struct { - ClientID string `json:"client_id"` - ClientSecret string `json:"client_secret"` - Endpoint oauth2.Endpoint `json:"end_point"` - Token oauth2.Token `json:"token"` + ClientID string `json:"client_id"` + ClientSecret string `json:"client_secret"` + Endpoint oauth2.Endpoint `json:"end_point"` + Token oauth2.Token `json:"token"` } func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) { + const LIMIT_BACKOFF_TIME = 32 + const MAX_NUMBER_OF_RETRIES = 15 + minimumSleepRatio := 0.1 + maximumSleepRatio := 0.2 + minimumSleep := float64(storage.numberOfThreads) * minimumSleepRatio + maximumSleep := float64(storage.numberOfThreads) * maximumSleepRatio + rand.Seed(time.Now().UnixNano()) // unsure if this is needed - retry := false - message := "" - if err == nil { - storage.backoffs[threadIndex] = 1 - return false, nil - } else if e, ok := err.(*googleapi.Error); ok { - if 500 <= e.Code && e.Code < 600 { - // Retry for 5xx response codes. - message = fmt.Sprintf("HTTP status code %d", e.Code) - retry = true - } else if e.Code == 429 { - // Too many requests{ - message = "HTTP status code 429" - retry = true - } else if e.Code == 403 { - // User Rate Limit Exceeded - message = "User Rate Limit Exceeded" - retry = true - } else if e.Code == 401 { - // Only retry on authorization error when storage has been connected before - if storage.isConnected { - message = "Authorization Error" - retry = true - } - } - } else if e, ok := err.(*url.Error); ok { - message = e.Error() - retry = true - } else if err == io.ErrUnexpectedEOF { - // Retry on unexpected EOFs and temporary network errors. - message = "Unexpected EOF" - retry = true - } else if err, ok := err.(net.Error); ok { - message = "Temporary network error" - retry = err.Temporary() - } + retry := false + message := "" + if err == nil { + /** + logic for said calculus is here: https://stackoverflow.com/questions/1527803/generating-random-whole-numbers-in-javascript-in-a-specific-range + i chose 0.1*thread number as a minimum sleep time + and 0.2*thread number as a maximum sleep time + for the first sleep of the first backoff of the threads. + This would mean that both when the program is started, and when multiple threads retry, google won't be ddosed :^) + */ - if !retry || storage.backoffs[threadIndex] >= 256 { - storage.backoffs[threadIndex] = 1 - return false, err - } + storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep + storage.backoffsRetries[threadIndex] = 0 + return false, nil + } else if e, ok := err.(*googleapi.Error); ok { + if 500 <= e.Code && e.Code < 600 { + // Retry for 5xx response codes. + message = fmt.Sprintf("HTTP status code %d", e.Code) + retry = true + } else if e.Code == 429 { + // Too many requests{ + message = "HTTP status code 429" + retry = true + } else if e.Code == 403 { + // User Rate Limit Exceeded + message = e.Message // "User Rate Limit Exceeded" + retry = true - delay := float32(storage.backoffs[threadIndex]) * rand.Float32() - LOG_DEBUG("GCD_RETRY", "%s; retrying after %.2f seconds", message, delay) - time.Sleep(time.Duration(float32(storage.backoffs[threadIndex]) * float32(time.Second))) - storage.backoffs[threadIndex] *= 2 - return true, nil + } else if e.Code == 401 { + // Only retry on authorization error when storage has been connected before + if storage.isConnected { + message = "Authorization Error" + retry = true + } + } + } else if e, ok := err.(*url.Error); ok { + message = e.Error() + retry = true + } else if err == io.ErrUnexpectedEOF { + // Retry on unexpected EOFs and temporary network errors. + message = "Unexpected EOF" + retry = true + } else if err, ok := err.(net.Error); ok { + message = "Temporary network error" + retry = err.Temporary() + } + + if !retry || storage.backoffsRetries[threadIndex] >= MAX_NUMBER_OF_RETRIES { + LOG_INFO("GCD_RETRY", "Thread: %03d. Maximum number of retries reached. Backoff time: %.2f. Number of retries: %d", threadIndex, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) + storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep + storage.backoffsRetries[threadIndex] = 0 + return false, err + } + + delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() + LOG_INFO("GCD_RETRY", "Thread: %03d. Message: %s. Retrying after %.2f seconds. current backoff: %.2f. Number of retries: %d", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) + time.Sleep(time.Duration(delay * float64(time.Second))) + + if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME { + storage.backoffs[threadIndex] *= 2.0 + } else { + storage.backoffs[threadIndex] = LIMIT_BACKOFF_TIME + storage.backoffsRetries[threadIndex] += 1 + } + return true, nil } -func (storage *GCDStorage) convertFilePath(filePath string) (string) { - if strings.HasPrefix(filePath, "chunks/") && strings.HasSuffix(filePath, ".fsl") { - return "fossils/" + filePath[len("chunks/"):len(filePath) - len(".fsl")] - } - return filePath +func (storage *GCDStorage) convertFilePath(filePath string) string { + if strings.HasPrefix(filePath, "chunks/") && strings.HasSuffix(filePath, ".fsl") { + return "fossils/" + filePath[len("chunks/"):len(filePath)-len(".fsl")] + } + return filePath } func (storage *GCDStorage) getPathID(path string) string { - storage.idCacheLock.Lock() - pathID := storage.idCache[path] - storage.idCacheLock.Unlock() - return pathID + storage.idCacheLock.Lock() + pathID := storage.idCache[path] + storage.idCacheLock.Unlock() + return pathID } func (storage *GCDStorage) findPathID(path string) (string, bool) { - storage.idCacheLock.Lock() - pathID, ok := storage.idCache[path] - storage.idCacheLock.Unlock() - return pathID, ok + storage.idCacheLock.Lock() + pathID, ok := storage.idCache[path] + storage.idCacheLock.Unlock() + return pathID, ok } func (storage *GCDStorage) savePathID(path string, pathID string) { - storage.idCacheLock.Lock() - storage.idCache[path] = pathID - storage.idCacheLock.Unlock() + storage.idCacheLock.Lock() + storage.idCache[path] = pathID + storage.idCacheLock.Unlock() } func (storage *GCDStorage) deletePathID(path string) { - storage.idCacheLock.Lock() - delete(storage.idCache, path) - storage.idCacheLock.Unlock() + storage.idCacheLock.Lock() + delete(storage.idCache, path) + storage.idCacheLock.Unlock() } func (storage *GCDStorage) listFiles(threadIndex int, parentID string, listFiles bool) ([]*drive.File, error) { - if parentID == "" { - return nil, fmt.Errorf("No parent ID provided") - } + if parentID == "" { + return nil, fmt.Errorf("No parent ID provided") + } - files := []*drive.File {} + files := []*drive.File{} - startToken := "" + startToken := "" - query := "'" + parentID + "' in parents and " - if listFiles { - query += "mimeType != 'application/vnd.google-apps.folder'" - } else { - query += "mimeType = 'application/vnd.google-apps.folder'" - } + query := "'" + parentID + "' in parents and " + if listFiles { + query += "mimeType != 'application/vnd.google-apps.folder'" + } else { + query += "mimeType = 'application/vnd.google-apps.folder'" + } - maxCount := int64(1000) - if storage.TestMode { - maxCount = 8 - } + maxCount := int64(1000) + if storage.TestMode { + maxCount = 8 + } - for { - var fileList *drive.FileList - var err error + for { + var fileList *drive.FileList + var err error - for { - fileList, err = storage.service.Files.List().Q(query).Fields("nextPageToken", "files(name, mimeType, id, size)").PageToken(startToken).PageSize(maxCount).Do() - if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry { - break - } else if retry { - continue - } else { - return nil, err - } - } + for { + fileList, err = storage.service.Files.List().Q(query).Fields("nextPageToken", "files(name, mimeType, id, size)").PageToken(startToken).PageSize(maxCount).Do() + if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry { + break + } else if retry { + continue + } else { + return nil, err + } + } - files = append(files, fileList.Files...) + files = append(files, fileList.Files...) - startToken = fileList.NextPageToken - if startToken == "" { - break - } - } + startToken = fileList.NextPageToken + if startToken == "" { + break + } + } - - return files, nil + return files, nil } func (storage *GCDStorage) listByName(threadIndex int, parentID string, name string) (string, bool, int64, error) { - var fileList *drive.FileList - var err error + var fileList *drive.FileList + var err error - for { - query := "name = '" + name + "' and '" + parentID + "' in parents" - fileList, err = storage.service.Files.List().Q(query).Fields("files(name, mimeType, id, size)").Do() + for { + query := "name = '" + name + "' and '" + parentID + "' in parents" + fileList, err = storage.service.Files.List().Q(query).Fields("files(name, mimeType, id, size)").Do() - if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry { - break - } else if retry { - continue - } else { - return "", false, 0, err - } - } + if retry, e := storage.shouldRetry(threadIndex, err); e == nil && !retry { + break + } else if retry { + continue + } else { + return "", false, 0, err + } + } - if len(fileList.Files) == 0 { - return "", false, 0, nil - } + if len(fileList.Files) == 0 { + return "", false, 0, nil + } - file := fileList.Files[0] + file := fileList.Files[0] - return file.Id, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil + return file.Id, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil } func (storage *GCDStorage) getIDFromPath(threadIndex int, path string) (string, error) { - fileID := "root" + fileID := "root" - if rootID, ok := storage.findPathID(""); ok { - fileID = rootID - } + if rootID, ok := storage.findPathID(""); ok { + fileID = rootID + } - names := strings.Split(path, "/") - current := "" - for i, name := range names { + names := strings.Split(path, "/") + current := "" + for i, name := range names { - if len(current) == 0 { - current = name - } else { - current = current + "/" + name - } + if len(current) == 0 { + current = name + } else { + current = current + "/" + name + } - currentID, ok := storage.findPathID(current) - if ok { - fileID = currentID - continue - } + currentID, ok := storage.findPathID(current) + if ok { + fileID = currentID + continue + } - var err error - var isDir bool - fileID, isDir, _, err = storage.listByName(threadIndex, fileID, name) - if err != nil { - return "", err - } - if fileID == "" { - return "", fmt.Errorf("Path %s doesn't exist", path) - } - if i != len(names) - 1 && !isDir { - return "", fmt.Errorf("Invalid path %s", path) - } - } - return fileID, nil + var err error + var isDir bool + fileID, isDir, _, err = storage.listByName(threadIndex, fileID, name) + if err != nil { + return "", err + } + if fileID == "" { + return "", fmt.Errorf("Path %s doesn't exist", path) + } + if i != len(names)-1 && !isDir { + return "", fmt.Errorf("Invalid path %s", path) + } + } + return fileID, nil } // CreateGCDStorage creates a GCD storage object. func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storage *GCDStorage, err error) { - description, err := ioutil.ReadFile(tokenFile) - if err != nil { - return nil, err - } + description, err := ioutil.ReadFile(tokenFile) + if err != nil { + return nil, err + } - gcdConfig := &GCDConfig {} - if err := json.Unmarshal(description, gcdConfig); err != nil { - return nil, err - } + gcdConfig := &GCDConfig{} + if err := json.Unmarshal(description, gcdConfig); err != nil { + return nil, err + } - config := oauth2.Config{ - ClientID: gcdConfig.ClientID, - ClientSecret: gcdConfig.ClientSecret, - Endpoint: gcdConfig.Endpoint, - } + config := oauth2.Config{ + ClientID: gcdConfig.ClientID, + ClientSecret: gcdConfig.ClientSecret, + Endpoint: gcdConfig.Endpoint, + } - authClient := config.Client(context.Background(), &gcdConfig.Token) + authClient := config.Client(context.Background(), &gcdConfig.Token) - service, err := drive.New(authClient) - if err != nil { - return nil, err - } + service, err := drive.New(authClient) + if err != nil { + return nil, err + } - storage = &GCDStorage { - service: service, - numberOfThreads: threads, - idCache: make(map[string]string), - idCacheLock: &sync.Mutex{}, - backoffs: make([]int, threads), - } + storage = &GCDStorage{ + service: service, + numberOfThreads: threads, + idCache: make(map[string]string), + idCacheLock: &sync.Mutex{}, + backoffs: make([]float64, threads), + backoffsRetries: make([]int, threads), + } - storagePathID, err := storage.getIDFromPath(0, storagePath) - if err != nil { - return nil, err - } + storagePathID, err := storage.getIDFromPath(0, storagePath) + if err != nil { + return nil, err + } - storage.idCache[""] = storagePathID + storage.idCache[""] = storagePathID - for _, dir := range []string { "chunks", "snapshots", "fossils" } { - dirID, isDir, _, err := storage.listByName(0, storagePathID, dir) - if err != nil { - return nil, err - } - if dirID == "" { - err = storage.CreateDirectory(0, dir) - if err != nil { - return nil, err - } - } else if !isDir { - return nil, fmt.Errorf("%s/%s is not a directory", storagePath + "/" + dir) - } else { - storage.idCache[dir] = dirID - } - } + for _, dir := range []string{"chunks", "snapshots", "fossils"} { + dirID, isDir, _, err := storage.listByName(0, storagePathID, dir) + if err != nil { + return nil, err + } + if dirID == "" { + err = storage.CreateDirectory(0, dir) + if err != nil { + return nil, err + } + } else if !isDir { + return nil, fmt.Errorf("%s/%s is not a directory", storagePath+"/"+dir) + } else { + storage.idCache[dir] = dirID + } + } - storage.isConnected = true + storage.isConnected = true - return storage, nil + return storage, nil } // ListFiles return the list of files and subdirectories under 'dir' (non-recursively) func (storage *GCDStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) { - for len(dir) > 0 && dir[len(dir) - 1] == '/' { - dir = dir[:len(dir) - 1] - } + for len(dir) > 0 && dir[len(dir)-1] == '/' { + dir = dir[:len(dir)-1] + } - if dir == "snapshots" { + if dir == "snapshots" { - files, err := storage.listFiles(threadIndex, storage.getPathID(dir), false) - if err != nil { - return nil, nil, err - } + files, err := storage.listFiles(threadIndex, storage.getPathID(dir), false) + if err != nil { + return nil, nil, err + } - subDirs := []string{} + subDirs := []string{} - for _, file := range files { - storage.savePathID("snapshots/" + file.Name, file.Id) - subDirs = append(subDirs, file.Name + "/") - } - return subDirs, nil, nil - } else if strings.HasPrefix(dir, "snapshots/") { - pathID, err := storage.getIDFromPath(threadIndex, dir) - if err != nil { - return nil, nil, err - } + for _, file := range files { + storage.savePathID("snapshots/"+file.Name, file.Id) + subDirs = append(subDirs, file.Name+"/") + } + return subDirs, nil, nil + } else if strings.HasPrefix(dir, "snapshots/") { + pathID, err := storage.getIDFromPath(threadIndex, dir) + if err != nil { + return nil, nil, err + } - entries, err := storage.listFiles(threadIndex, pathID, true) - if err != nil { - return nil, nil, err - } + entries, err := storage.listFiles(threadIndex, pathID, true) + if err != nil { + return nil, nil, err + } - files := []string{} + files := []string{} - for _, entry := range entries { - storage.savePathID(dir + "/" + entry.Name, entry.Id) - files = append(files, entry.Name) - } - return files, nil, nil - } else { - files := []string{} - sizes := []int64{} + for _, entry := range entries { + storage.savePathID(dir+"/"+entry.Name, entry.Id) + files = append(files, entry.Name) + } + return files, nil, nil + } else { + files := []string{} + sizes := []int64{} - for _, parent := range []string { "chunks", "fossils" } { - entries, err := storage.listFiles(threadIndex, storage.getPathID(parent), true) - if err != nil { - return nil, nil, err - } + for _, parent := range []string{"chunks", "fossils"} { + entries, err := storage.listFiles(threadIndex, storage.getPathID(parent), true) + if err != nil { + return nil, nil, err + } - for _, entry := range entries { - name := entry.Name - if parent == "fossils" { - name += ".fsl" - } - storage.savePathID(parent + "/" + entry.Name, entry.Id) - files = append(files, name) - sizes = append(sizes, entry.Size) - } - } - return files, sizes, nil - } + for _, entry := range entries { + name := entry.Name + if parent == "fossils" { + name += ".fsl" + } + storage.savePathID(parent+"/"+entry.Name, entry.Id) + files = append(files, name) + sizes = append(sizes, entry.Size) + } + } + return files, sizes, nil + } } // DeleteFile deletes the file or directory at 'filePath'. func (storage *GCDStorage) DeleteFile(threadIndex int, filePath string) (err error) { - filePath = storage.convertFilePath(filePath) - fileID, ok := storage.findPathID(filePath) - if !ok { - fileID, err = storage.getIDFromPath(threadIndex, filePath) - if err != nil { - LOG_TRACE("GCD_STORAGE", "Ignored file deletion error: %v", err) - return nil - } - } + filePath = storage.convertFilePath(filePath) + fileID, ok := storage.findPathID(filePath) + if !ok { + fileID, err = storage.getIDFromPath(threadIndex, filePath) + if err != nil { + LOG_TRACE("GCD_STORAGE", "Ignored file deletion error: %v", err) + return nil + } + } - for { - err = storage.service.Files.Delete(fileID).Fields("id").Do() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - storage.deletePathID(filePath) - return nil - } else if retry { - continue - } else { - if e, ok := err.(*googleapi.Error); ok && e.Code == 404 { - LOG_TRACE("GCD_STORAGE", "File %s has disappeared before deletion", filePath) - return nil - } - return err - } - } + for { + err = storage.service.Files.Delete(fileID).Fields("id").Do() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + storage.deletePathID(filePath) + return nil + } else if retry { + continue + } else { + if e, ok := err.(*googleapi.Error); ok && e.Code == 404 { + LOG_TRACE("GCD_STORAGE", "File %s has disappeared before deletion", filePath) + return nil + } + return err + } + } } // MoveFile renames the file. func (storage *GCDStorage) MoveFile(threadIndex int, from string, to string) (err error) { - from = storage.convertFilePath(from) - to = storage.convertFilePath(to) + from = storage.convertFilePath(from) + to = storage.convertFilePath(to) - fileID, ok := storage.findPathID(from) - if !ok { - return fmt.Errorf("Attempting to rename file %s with unknown id", to) - } + fileID, ok := storage.findPathID(from) + if !ok { + return fmt.Errorf("Attempting to rename file %s with unknown id", to) + } - fromParentID := storage.getPathID("chunks") - toParentID := storage.getPathID("fossils") + fromParentID := storage.getPathID("chunks") + toParentID := storage.getPathID("fossils") - if strings.HasPrefix(from, "fossils") { - fromParentID, toParentID = toParentID, fromParentID - } + if strings.HasPrefix(from, "fossils") { + fromParentID, toParentID = toParentID, fromParentID + } - for { - _, err = storage.service.Files.Update(fileID, nil).AddParents(toParentID).RemoveParents(fromParentID).Do() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - break - } else if retry { - continue - } else { - return err - } - } + for { + _, err = storage.service.Files.Update(fileID, nil).AddParents(toParentID).RemoveParents(fromParentID).Do() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + break + } else if retry { + continue + } else { + return err + } + } - storage.savePathID(to, storage.getPathID(from)) - storage.deletePathID(from) - return nil + storage.savePathID(to, storage.getPathID(from)) + storage.deletePathID(from) + return nil } // CreateDirectory creates a new directory. func (storage *GCDStorage) CreateDirectory(threadIndex int, dir string) (err error) { - for len(dir) > 0 && dir[len(dir) - 1] == '/' { - dir = dir[:len(dir) - 1] - } + for len(dir) > 0 && dir[len(dir)-1] == '/' { + dir = dir[:len(dir)-1] + } - exist, isDir, _, err := storage.GetFileInfo(threadIndex, dir) - if err != nil { - return err - } + exist, isDir, _, err := storage.GetFileInfo(threadIndex, dir) + if err != nil { + return err + } - if exist { - if !isDir { - return fmt.Errorf("%s is a file", dir) - } - return nil - } + if exist { + if !isDir { + return fmt.Errorf("%s is a file", dir) + } + return nil + } - parentID := storage.getPathID("") - name := dir + parentID := storage.getPathID("") + name := dir - if strings.HasPrefix(dir, "snapshots/") { - parentID = storage.getPathID("snapshots") - name = dir[len("snapshots/"):] - } + if strings.HasPrefix(dir, "snapshots/") { + parentID = storage.getPathID("snapshots") + name = dir[len("snapshots/"):] + } - file := &drive.File { - Name: name, - MimeType: "application/vnd.google-apps.folder", - Parents: []string { parentID }, - } + file := &drive.File{ + Name: name, + MimeType: "application/vnd.google-apps.folder", + Parents: []string{parentID}, + } - for { - file, err = storage.service.Files.Create(file).Fields("id").Do() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - break - } else if retry { - continue - } else { - return err - } - } + for { + file, err = storage.service.Files.Create(file).Fields("id").Do() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + break + } else if retry { + continue + } else { + return err + } + } - storage.savePathID(dir, file.Id) - return nil + storage.savePathID(dir, file.Id) + return nil } // GetFileInfo returns the information about the file or directory at 'filePath'. func (storage *GCDStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { - for len(filePath) > 0 && filePath[len(filePath) - 1] == '/' { - filePath = filePath[:len(filePath) - 1] - } + for len(filePath) > 0 && filePath[len(filePath)-1] == '/' { + filePath = filePath[:len(filePath)-1] + } - // GetFileInfo is never called on a fossil - fileID, ok := storage.findPathID(filePath) - if !ok { - dir := path.Dir(filePath) - if dir == "." { - dir = "" - } - dirID, err := storage.getIDFromPath(threadIndex, dir) - if err != nil { - return false, false, 0, err - } + // GetFileInfo is never called on a fossil + fileID, ok := storage.findPathID(filePath) + if !ok { + dir := path.Dir(filePath) + if dir == "." { + dir = "" + } + dirID, err := storage.getIDFromPath(threadIndex, dir) + if err != nil { + return false, false, 0, err + } - fileID, isDir, size, err = storage.listByName(threadIndex, dirID, path.Base(filePath)) - if fileID != "" { - storage.savePathID(filePath, fileID) - } - return fileID != "", isDir, size, err - } + fileID, isDir, size, err = storage.listByName(threadIndex, dirID, path.Base(filePath)) + if fileID != "" { + storage.savePathID(filePath, fileID) + } + return fileID != "", isDir, size, err + } - for { - file, err := storage.service.Files.Get(fileID).Fields("id, mimeType").Do() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - return true, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil - } else if retry { - continue - } else { - return false, false, 0, err - } - } + for { + file, err := storage.service.Files.Get(fileID).Fields("id, mimeType").Do() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + return true, file.MimeType == "application/vnd.google-apps.folder", file.Size, nil + } else if retry { + continue + } else { + return false, false, 0, err + } + } } // FindChunk finds the chunk with the specified id. If 'isFossil' is true, it will search for chunk files with // the suffix '.fsl'. func (storage *GCDStorage) FindChunk(threadIndex int, chunkID string, isFossil bool) (filePath string, exist bool, size int64, err error) { - parentID := "" - filePath = "chunks/" + chunkID - realPath := storage.convertFilePath(filePath) - if isFossil { - parentID = storage.getPathID("fossils") - filePath += ".fsl" - } else { - parentID = storage.getPathID("chunks") - } + parentID := "" + filePath = "chunks/" + chunkID + realPath := storage.convertFilePath(filePath) + if isFossil { + parentID = storage.getPathID("fossils") + filePath += ".fsl" + } else { + parentID = storage.getPathID("chunks") + } - fileID := "" - fileID, _, size, err = storage.listByName(threadIndex, parentID, chunkID) - if fileID != "" { - storage.savePathID(realPath, fileID) - } - return filePath, fileID != "", size, err + fileID := "" + fileID, _, size, err = storage.listByName(threadIndex, parentID, chunkID) + if fileID != "" { + storage.savePathID(realPath, fileID) + } + return filePath, fileID != "", size, err } // DownloadFile reads the file at 'filePath' into the chunk. func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { - // We never download the fossil so there is no need to convert the path - fileID, ok := storage.findPathID(filePath) - if !ok { - fileID, err = storage.getIDFromPath(threadIndex, filePath) - if err != nil { - return err - } - storage.savePathID(filePath, fileID) - } + // We never download the fossil so there is no need to convert the path + fileID, ok := storage.findPathID(filePath) + if !ok { + fileID, err = storage.getIDFromPath(threadIndex, filePath) + if err != nil { + return err + } + storage.savePathID(filePath, fileID) + } - var response *http.Response + var response *http.Response - for { - response, err = storage.service.Files.Get(fileID).Download() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - break - } else if retry { - continue - } else { - return err - } - } + for { + response, err = storage.service.Files.Get(fileID).Download() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + break + } else if retry { + continue + } else { + return err + } + } - defer response.Body.Close() + defer response.Body.Close() - _, err = RateLimitedCopy(chunk, response.Body, storage.DownloadRateLimit / storage.numberOfThreads) - return err + _, err = RateLimitedCopy(chunk, response.Body, storage.DownloadRateLimit/storage.numberOfThreads) + return err } // UploadFile writes 'content' to the file at 'filePath'. func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { - // We never upload a fossil so there is no need to convert the path - parent := path.Dir(filePath) + // We never upload a fossil so there is no need to convert the path + parent := path.Dir(filePath) - if parent == "." { - parent = "" - } + if parent == "." { + parent = "" + } - parentID, ok := storage.findPathID(parent) - if !ok { - parentID, err = storage.getIDFromPath(threadIndex, parent) - if err != nil { - return err - } - storage.savePathID(parent, parentID) - } + parentID, ok := storage.findPathID(parent) + if !ok { + parentID, err = storage.getIDFromPath(threadIndex, parent) + if err != nil { + return err + } + storage.savePathID(parent, parentID) + } - file := &drive.File { - Name: path.Base(filePath), - MimeType: "application/octet-stream", - Parents: []string { parentID }, - } + file := &drive.File{ + Name: path.Base(filePath), + MimeType: "application/octet-stream", + Parents: []string{parentID}, + } - for { - reader := CreateRateLimitedReader(content, storage.UploadRateLimit / storage.numberOfThreads) - _, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do() - if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { - break - } else if retry { - continue - } else { - return err - } - } + for { + reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads) + _, err = storage.service.Files.Create(file).Media(reader).Fields("id").Do() + if retry, err := storage.shouldRetry(threadIndex, err); err == nil && !retry { + break + } else if retry { + continue + } else { + return err + } + } - return err + return err } // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when From fc71cb1b493a5181d0b574b6c086a4fc16453e88 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Wed, 20 Sep 2017 11:00:02 +0300 Subject: [PATCH 2/7] Compute the next backoff value before using it --- src/duplicacy_gcdstorage.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index fcd784e..b04ac23 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -108,16 +108,17 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) return false, err } - delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() - LOG_INFO("GCD_RETRY", "Thread: %03d. Message: %s. Retrying after %.2f seconds. current backoff: %.2f. Number of retries: %d", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) - time.Sleep(time.Duration(delay * float64(time.Second))) - if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME { storage.backoffs[threadIndex] *= 2.0 } else { storage.backoffs[threadIndex] = LIMIT_BACKOFF_TIME storage.backoffsRetries[threadIndex] += 1 } + + delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() + LOG_INFO("GCD_RETRY", "Thread: %03d. Message: %s. Retrying after %.2f seconds. Current backoff: %.2f. Number of retries: %d", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) + time.Sleep(time.Duration(delay * float64(time.Second))) + return true, nil } From ef19a3705f3f779036b5fc7d2a35a679c799cea9 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Wed, 20 Sep 2017 11:06:56 +0300 Subject: [PATCH 3/7] The initial thread backoff value should not be empty --- src/duplicacy_gcdstorage.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index b04ac23..adbbc3c 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -305,6 +305,10 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag backoffsRetries: make([]int, threads), } + for b := range storage.backoffs { + storage.backoffs[b] = 0.1 * float64(storage.numberOfThreads) // at the first error, we should still sleep some amount + } + storagePathID, err := storage.getIDFromPath(0, storagePath) if err != nil { return nil, err From d20ea41cd007b5eb9c980a4272f3b598268f9e87 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Wed, 20 Sep 2017 18:52:46 +0300 Subject: [PATCH 4/7] Add a method for debugging which shows the method call chains, to find out where most of the retries come from --- src/duplicacy_gcdstorage.go | 99 +++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 36 deletions(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index adbbc3c..b821fb0 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -5,23 +5,24 @@ package duplicacy import ( - "io" + "io" "fmt" "net" - "path" - "time" - "sync" - "strings" + "path" + "time" + "sync" + "strings" "net/http" "net/url" - "io/ioutil" - "math/rand" - "encoding/json" + "io/ioutil" + "math/rand" + "encoding/json" "golang.org/x/net/context" "golang.org/x/oauth2" "google.golang.org/api/drive/v3" "google.golang.org/api/googleapi" + "runtime" ) type GCDStorage struct { @@ -46,26 +47,18 @@ type GCDConfig struct { } func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) { - const LIMIT_BACKOFF_TIME = 32 + const LIMIT_BACKOFF_TIME = 64 const MAX_NUMBER_OF_RETRIES = 15 minimumSleepRatio := 0.1 maximumSleepRatio := 0.2 minimumSleep := float64(storage.numberOfThreads) * minimumSleepRatio maximumSleep := float64(storage.numberOfThreads) * maximumSleepRatio - rand.Seed(time.Now().UnixNano()) // unsure if this is needed + rand.Seed(time.Now().UnixNano()) // unsure if this is needed retry := false message := "" if err == nil { - /** - logic for said calculus is here: https://stackoverflow.com/questions/1527803/generating-random-whole-numbers-in-javascript-in-a-specific-range - i chose 0.1*thread number as a minimum sleep time - and 0.2*thread number as a maximum sleep time - for the first sleep of the first backoff of the threads. - This would mean that both when the program is started, and when multiple threads retry, google won't be ddosed :^) - */ - - storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep + storage.backoffs[threadIndex] = computeInitialBackoff(minimumSleep, maximumSleep) storage.backoffsRetries[threadIndex] = 0 return false, nil } else if e, ok := err.(*googleapi.Error); ok { @@ -79,7 +72,7 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) retry = true } else if e.Code == 403 { // User Rate Limit Exceeded - message = e.Message // "User Rate Limit Exceeded" + message = e.Message // "User Rate Limit Exceeded" retry = true } else if e.Code == 401 { @@ -103,25 +96,59 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) if !retry || storage.backoffsRetries[threadIndex] >= MAX_NUMBER_OF_RETRIES { LOG_INFO("GCD_RETRY", "Thread: %03d. Maximum number of retries reached. Backoff time: %.2f. Number of retries: %d", threadIndex, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) - storage.backoffs[threadIndex] = rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep + storage.backoffs[threadIndex] = computeInitialBackoff(minimumSleep, maximumSleep) storage.backoffsRetries[threadIndex] = 0 return false, err } if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME { - storage.backoffs[threadIndex] *= 2.0 + storage.backoffs[threadIndex] *= 2.0 } else { storage.backoffs[threadIndex] = LIMIT_BACKOFF_TIME storage.backoffsRetries[threadIndex] += 1 } delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() - LOG_INFO("GCD_RETRY", "Thread: %03d. Message: %s. Retrying after %.2f seconds. Current backoff: %.2f. Number of retries: %d", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) + if storage.backoffs[threadIndex] >= LIMIT_BACKOFF_TIME { + callerChain:=findCallerChain() + LOG_INFO("GCD_RETRY", "Thread: %3d. Message: %s. Retrying after %6.2f seconds. Current backoff: %6.2f. Number of retries: %2d. caller chain: %s", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex], callerChain) + } + time.Sleep(time.Duration(delay * float64(time.Second))) return true, nil } +func findCallerChain() string { + callerStack := "" + pc := make([]uintptr, 15) + n := runtime.Callers(1, pc) + frames := runtime.CallersFrames(pc[:n]) + + for { + frame, more := frames.Next() + if strings.Contains(frame.File, "runtime/") { + break + } + callerStack += "->" + frame.Function + if !more { + break + } + } + return callerStack +} + +/* + logic for said calculus is here: https://stackoverflow.com/questions/1527803/generating-random-whole-numbers-in-javascript-in-a-specific-range + chose 0.1*thread number as a minimum sleep time + and 0.2*thread number as a maximum sleep time + for the first sleep of the first backoff of the threads. + This would mean that both when the program is started, and when multiple threads retry, google won't be ddosed :^) +*/ +func computeInitialBackoff(minimumSleep float64, maximumSleep float64) float64 { + return rand.Float64()*(maximumSleep-minimumSleep+1) + minimumSleep +} + func (storage *GCDStorage) convertFilePath(filePath string) string { if strings.HasPrefix(filePath, "chunks/") && strings.HasSuffix(filePath, ".fsl") { return "fossils/" + filePath[len("chunks/"):len(filePath)-len(".fsl")] @@ -296,23 +323,23 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag return nil, err } - storage = &GCDStorage{ - service: service, - numberOfThreads: threads, - idCache: make(map[string]string), - idCacheLock: &sync.Mutex{}, - backoffs: make([]float64, threads), - backoffsRetries: make([]int, threads), - } + storage = &GCDStorage { + service: service, + numberOfThreads: threads, + idCache: make(map[string]string), + idCacheLock: &sync.Mutex{}, + backoffs: make([]float64, threads), + backoffsRetries:make([]int, threads), + } - for b := range storage.backoffs { + for b := range storage.backoffs { storage.backoffs[b] = 0.1 * float64(storage.numberOfThreads) // at the first error, we should still sleep some amount } - storagePathID, err := storage.getIDFromPath(0, storagePath) - if err != nil { - return nil, err - } + storagePathID, err := storage.getIDFromPath(0, storagePath) + if err != nil { + return nil, err + } storage.idCache[""] = storagePathID From ed52850c98ecf568979f637e541e744e2b9a58f1 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Thu, 21 Sep 2017 08:16:54 +0300 Subject: [PATCH 5/7] Run goimports for the good looks --- src/duplicacy_gcdstorage.go | 65 +++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index b4e43a8..b4205cc 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -5,33 +5,34 @@ package duplicacy import ( - "encoding/json" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "net/url" - "path" - "strings" - "sync" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/url" + "path" + "strings" + "sync" "time" + "runtime" + "golang.org/x/net/context" "golang.org/x/oauth2" "google.golang.org/api/drive/v3" "google.golang.org/api/googleapi" - "runtime" ) type GCDStorage struct { RateLimitedStorage - service *drive.Service - idCache map[string]string - idCacheLock *sync.Mutex - backoffs []float64 + service *drive.Service + idCache map[string]string + idCacheLock *sync.Mutex + backoffs []float64 backoffsRetries []int isConnected bool @@ -101,20 +102,20 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) return false, err } - if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME { + if storage.backoffs[threadIndex] < LIMIT_BACKOFF_TIME { storage.backoffs[threadIndex] *= 2.0 } else { storage.backoffs[threadIndex] = LIMIT_BACKOFF_TIME storage.backoffsRetries[threadIndex] += 1 } - delay := storage.backoffs[threadIndex] * rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() - if storage.backoffs[threadIndex] >= LIMIT_BACKOFF_TIME { - callerChain:=findCallerChain() + delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() + if storage.backoffs[threadIndex] >= LIMIT_BACKOFF_TIME { + callerChain := findCallerChain() LOG_INFO("GCD_RETRY", "Thread: %3d. Message:%s. Retrying after %6.2f seconds. Current backoff: %6.2f. Number of retries: %2d. caller chain: %s", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex], callerChain) } - time.Sleep(time.Duration(delay * float64(time.Second))) + time.Sleep(time.Duration(delay * float64(time.Second))) - return true, nil + return true, nil } func findCallerChain() string { @@ -321,17 +322,17 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag return nil, err } - storage = &GCDStorage { - service: service, - numberOfThreads: threads, - idCache: make(map[string]string), - idCacheLock: &sync.Mutex{}, - backoffs: make([]float64, threads), - backoffsRetries:make([]int, threads), - } + storage = &GCDStorage{ + service: service, + numberOfThreads: threads, + idCache: make(map[string]string), + idCacheLock: &sync.Mutex{}, + backoffs: make([]float64, threads), + backoffsRetries: make([]int, threads), + } - for b := range storage.backoffs { - storage.backoffs[b] = 0.1 * float64(storage.numberOfThreads) // at the first error, we should still sleep some amount + for b := range storage.backoffs { + storage.backoffs[b] = 0.1 * float64(storage.numberOfThreads) // at the first error, we should still sleep some amount } storagePathID, err := storage.getIDFromPath(0, storagePath) From bd39302eee46684202f6f4fdb111aff628d2b515 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Sun, 1 Oct 2017 12:06:19 +0300 Subject: [PATCH 6/7] Remove debugging code not needed for the push request. --- src/duplicacy_gcdstorage.go | 26 +------------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index b4205cc..aa0457c 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -18,8 +18,6 @@ import ( "sync" "time" - "runtime" - "golang.org/x/net/context" "golang.org/x/oauth2" "google.golang.org/api/drive/v3" @@ -109,34 +107,12 @@ func (storage *GCDStorage) shouldRetry(threadIndex int, err error) (bool, error) storage.backoffsRetries[threadIndex] += 1 } delay := storage.backoffs[threadIndex]*rand.Float64() + storage.backoffs[threadIndex]*rand.Float64() - if storage.backoffs[threadIndex] >= LIMIT_BACKOFF_TIME { - callerChain := findCallerChain() - LOG_INFO("GCD_RETRY", "Thread: %3d. Message:%s. Retrying after %6.2f seconds. Current backoff: %6.2f. Number of retries: %2d. caller chain: %s", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex], callerChain) - } + LOG_DEBUG("GCD_RETRY", "Thread: %3d. Message: %s. Retrying after %6.2f seconds. Current backoff: %6.2f. Number of retries: %2d.", threadIndex, message, delay, storage.backoffs[threadIndex], storage.backoffsRetries[threadIndex]) time.Sleep(time.Duration(delay * float64(time.Second))) return true, nil } -func findCallerChain() string { - callerStack := "" - pc := make([]uintptr, 15) - n := runtime.Callers(1, pc) - frames := runtime.CallersFrames(pc[:n]) - - for { - frame, more := frames.Next() - if strings.Contains(frame.File, "runtime/") { - break - } - callerStack += "->" + frame.Function - if !more { - break - } - } - return callerStack -} - /* logic for said calculus is here: https://stackoverflow.com/questions/1527803/generating-random-whole-numbers-in-javascript-in-a-specific-range chose 0.1*thread number as a minimum sleep time From a5d334083769da6858866d2befd29609859f25a9 Mon Sep 17 00:00:00 2001 From: TheBestPessimist Date: Sun, 1 Oct 2017 12:33:32 +0300 Subject: [PATCH 7/7] Fix string format derp. Goimports is weird. It changed something, but i have no idea what. --- src/duplicacy_gcdstorage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/duplicacy_gcdstorage.go b/src/duplicacy_gcdstorage.go index aa0457c..9cb40b7 100644 --- a/src/duplicacy_gcdstorage.go +++ b/src/duplicacy_gcdstorage.go @@ -329,7 +329,7 @@ func CreateGCDStorage(tokenFile string, storagePath string, threads int) (storag return nil, err } } else if !isDir { - return nil, fmt.Errorf("%s/%s is not a directory", storagePath+"/"+dir) + return nil, fmt.Errorf("%s/%s is not a directory", storagePath, dir) } else { storage.idCache[dir] = dirID }