Compare commits

...

8 Commits

Author SHA1 Message Date
Gilbert Chen
b61906c99e Bump version to 2.4.0 2020-03-05 22:06:24 -05:00
gilbertchen
a0a07d18cc Merge pull request #589 from fracai/b2_download_url
support downloading from a custom URL pointed at B2
2020-03-05 22:00:53 -05:00
Gilbert Chen
a6ce64e715 Fixed handling of repository ids with spaces in the b2 backend
Usually a repository id should not contain spaces or other non-alphanum
characters, but if it does we should be able to handle it correctly.  This
commit fixes the b2 backend to convert file names in a proper way.
2020-03-05 14:45:09 -05:00
Arno Hautala
499b612a0d moving download url config from a key to the storage url pattern 2020-02-25 20:53:19 -05:00
Arno Hautala
46ce0ba1fb support downloading from a custom URL pointed at B2 2020-02-22 22:12:36 -05:00
Gilbert Chen
cc88abd547 Fixed a bug that caused all copied chunks to be RSA encrypted
The field encryptionVersion in the Chunk struct is supposed to pass the status
of RSA encrytpion from a source chunk to a destination chunk in a copy command.
This field needs to be a 3-state boolean in order to pass the status correctly.
2020-02-13 14:03:07 -05:00
Gilbert Chen
e888b6d7e5 Fix bugs in sftp retrying
* Fixed a bug caused by nil sftp client during retry
* Simplify the rework logic in UploadFile
* Change the number of tries from 6 to 8
2020-01-13 16:26:43 -05:00
Gilbert Chen
d43fe1a282 Release the list of chunk hashes after processing each snapshot.
The chunk hash list isn't needed any more after being consolidated.
Releasing it immediately after use helps reduce the memory usage.
2019-12-09 22:45:16 -05:00
9 changed files with 77 additions and 55 deletions

View File

@@ -2059,7 +2059,7 @@ func main() {
app.Name = "duplicacy" app.Name = "duplicacy"
app.HelpName = "duplicacy" app.HelpName = "duplicacy"
app.Usage = "A new generation cloud backup tool based on lock-free deduplication" app.Usage = "A new generation cloud backup tool based on lock-free deduplication"
app.Version = "2.3.0" + " (" + GitCommit + ")" app.Version = "2.4.0" + " (" + GitCommit + ")"
// If the program is interrupted, call the RunAtError function. // If the program is interrupted, call the RunAtError function.
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)

View File

@@ -75,7 +75,7 @@ func B2Escape(path string) string {
return strings.Join(components, "/") return strings.Join(components, "/")
} }
func NewB2Client(applicationKeyID string, applicationKey string, storageDir string, threads int) *B2Client { func NewB2Client(applicationKeyID string, applicationKey string, downloadURL string, storageDir string, threads int) *B2Client {
for storageDir != "" && storageDir[0] == '/' { for storageDir != "" && storageDir[0] == '/' {
storageDir = storageDir[1:] storageDir = storageDir[1:]
@@ -95,6 +95,7 @@ func NewB2Client(applicationKeyID string, applicationKey string, storageDir stri
HTTPClient: http.DefaultClient, HTTPClient: http.DefaultClient,
ApplicationKeyID: applicationKeyID, ApplicationKeyID: applicationKeyID,
ApplicationKey: applicationKey, ApplicationKey: applicationKey,
DownloadURL: downloadURL,
StorageDir: storageDir, StorageDir: storageDir,
UploadURLs: make([]string, threads), UploadURLs: make([]string, threads),
UploadTokens: make([]string, threads), UploadTokens: make([]string, threads),
@@ -325,7 +326,10 @@ func (client *B2Client) AuthorizeAccount(threadIndex int) (err error, allowed bo
client.AuthorizationToken = output.AuthorizationToken client.AuthorizationToken = output.AuthorizationToken
client.APIURL = output.APIURL client.APIURL = output.APIURL
client.DownloadURL = output.DownloadURL if client.DownloadURL == "" {
client.DownloadURL = output.DownloadURL
}
LOG_INFO("BACKBLAZE_URL", "download URL is: %s", client.DownloadURL)
client.IsAuthorized = true client.IsAuthorized = true
client.LastAuthorizationTime = time.Now().Unix() client.LastAuthorizationTime = time.Now().Unix()
@@ -413,16 +417,16 @@ func (client *B2Client) ListFileNames(threadIndex int, startFileName string, sin
input["prefix"] = client.StorageDir input["prefix"] = client.StorageDir
for { for {
url := client.getAPIURL() + "/b2api/v1/b2_list_file_names" apiURL := client.getAPIURL() + "/b2api/v1/b2_list_file_names"
requestHeaders := map[string]string{} requestHeaders := map[string]string{}
requestMethod := http.MethodPost requestMethod := http.MethodPost
var requestInput interface{} var requestInput interface{}
requestInput = input requestInput = input
if includeVersions { if includeVersions {
url = client.getAPIURL() + "/b2api/v1/b2_list_file_versions" apiURL = client.getAPIURL() + "/b2api/v1/b2_list_file_versions"
} else if singleFile { } else if singleFile {
// handle a single file with no versions as a special case to download the last byte of the file // handle a single file with no versions as a special case to download the last byte of the file
url = client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + startFileName) apiURL = client.getDownloadURL() + "/file/" + client.BucketName + "/" + B2Escape(client.StorageDir + startFileName)
// requesting byte -1 works for empty files where 0-0 fails with a 416 error // requesting byte -1 works for empty files where 0-0 fails with a 416 error
requestHeaders["Range"] = "bytes=-1" requestHeaders["Range"] = "bytes=-1"
// HEAD request // HEAD request
@@ -432,7 +436,7 @@ func (client *B2Client) ListFileNames(threadIndex int, startFileName string, sin
var readCloser io.ReadCloser var readCloser io.ReadCloser
var responseHeader http.Header var responseHeader http.Header
var err error var err error
readCloser, responseHeader, _, err = client.call(threadIndex, url, requestMethod, requestHeaders, requestInput) readCloser, responseHeader, _, err = client.call(threadIndex, apiURL, requestMethod, requestHeaders, requestInput)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -445,7 +449,7 @@ func (client *B2Client) ListFileNames(threadIndex int, startFileName string, sin
if singleFile && !includeVersions { if singleFile && !includeVersions {
if responseHeader == nil { if responseHeader == nil {
LOG_DEBUG("BACKBLAZE_LIST", "%s did not return headers", url) LOG_DEBUG("BACKBLAZE_LIST", "%s did not return headers", apiURL)
return []*B2Entry{}, nil return []*B2Entry{}, nil
} }
requiredHeaders := []string{ requiredHeaders := []string{
@@ -459,11 +463,17 @@ func (client *B2Client) ListFileNames(threadIndex int, startFileName string, sin
} }
} }
if len(missingKeys) > 0 { if len(missingKeys) > 0 {
return nil, fmt.Errorf("%s missing headers: %s", url, missingKeys) return nil, fmt.Errorf("%s missing headers: %s", apiURL, missingKeys)
} }
// construct the B2Entry from the response headers of the download request // construct the B2Entry from the response headers of the download request
fileID := responseHeader.Get("x-bz-file-id") fileID := responseHeader.Get("x-bz-file-id")
fileName := responseHeader.Get("x-bz-file-name") fileName := responseHeader.Get("x-bz-file-name")
unescapedFileName, err := url.QueryUnescape(fileName)
if err == nil {
fileName = unescapedFileName
} else {
LOG_WARN("BACKBLAZE_UNESCAPE", "Failed to unescape the file name %s", fileName)
}
fileAction := "upload" fileAction := "upload"
// byte range that is returned: "bytes #-#/# // byte range that is returned: "bytes #-#/#
rangeString := responseHeader.Get("Content-Range") rangeString := responseHeader.Get("Content-Range")
@@ -476,10 +486,10 @@ func (client *B2Client) ListFileNames(threadIndex int, startFileName string, sin
// this should only execute if the requested file is empty and the range request didn't result in a Content-Range header // this should only execute if the requested file is empty and the range request didn't result in a Content-Range header
fileSize, _ = strconv.ParseInt(lengthString, 0, 64) fileSize, _ = strconv.ParseInt(lengthString, 0, 64)
if fileSize != 0 { if fileSize != 0 {
return nil, fmt.Errorf("%s returned non-zero file length", url) return nil, fmt.Errorf("%s returned non-zero file length", apiURL)
} }
} else { } else {
return nil, fmt.Errorf("could not parse headers returned by %s", url) return nil, fmt.Errorf("could not parse headers returned by %s", apiURL)
} }
fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64) fileUploadTimestamp, _ := strconv.ParseInt(responseHeader.Get("X-Bz-Upload-Timestamp"), 0, 64)

View File

@@ -37,7 +37,7 @@ func createB2ClientForTest(t *testing.T) (*B2Client, string) {
return nil, "" return nil, ""
} }
return NewB2Client(b2["account"], b2["key"], b2["directory"], 1), b2["bucket"] return NewB2Client(b2["account"], b2["key"], "", b2["directory"], 1), b2["bucket"]
} }

View File

@@ -15,9 +15,9 @@ type B2Storage struct {
} }
// CreateB2Storage creates a B2 storage object. // CreateB2Storage creates a B2 storage object.
func CreateB2Storage(accountID string, applicationKey string, bucket string, storageDir string, threads int) (storage *B2Storage, err error) { func CreateB2Storage(accountID string, applicationKey string, downloadURL string, bucket string, storageDir string, threads int) (storage *B2Storage, err error) {
client := NewB2Client(accountID, applicationKey, storageDir, threads) client := NewB2Client(accountID, applicationKey, downloadURL, storageDir, threads)
err, _ = client.AuthorizeAccount(0) err, _ = client.AuthorizeAccount(0)
if err != nil { if err != nil {
@@ -204,7 +204,6 @@ func (storage *B2Storage) GetFileInfo(threadIndex int, filePath string) (exist b
// DownloadFile reads the file at 'filePath' into the chunk. // DownloadFile reads the file at 'filePath' into the chunk.
func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1)
readCloser, _, err := storage.client.DownloadFile(threadIndex, filePath) readCloser, _, err := storage.client.DownloadFile(threadIndex, filePath)
if err != nil { if err != nil {
return err return err
@@ -218,7 +217,6 @@ func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *
// UploadFile writes 'content' to the file at 'filePath'. // UploadFile writes 'content' to the file at 'filePath'.
func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
filePath = strings.Replace(filePath, " ", "%20", -1)
return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads) return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads)
} }

View File

@@ -1668,6 +1668,8 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
chunks[chunkHash] = true chunks[chunkHash] = true
} }
} }
snapshot.ChunkHashes = nil
} }
otherChunkFiles, otherChunkSizes := otherManager.SnapshotManager.ListAllFiles(otherManager.storage, "chunks/") otherChunkFiles, otherChunkSizes := otherManager.SnapshotManager.ListAllFiles(otherManager.storage, "chunks/")
@@ -1730,7 +1732,11 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
newChunk := otherManager.config.GetChunk() newChunk := otherManager.config.GetChunk()
newChunk.Reset(true) newChunk.Reset(true)
newChunk.Write(chunk.GetBytes()) newChunk.Write(chunk.GetBytes())
newChunk.encryptionVersion = chunk.encryptionVersion if chunk.encryptionVersion == ENCRYPTION_VERSION_RSA {
newChunk.encryptionVersion = CHUNK_RSA_ENCRYPTION_ENABLED
} else {
newChunk.encryptionVersion = CHUNK_RSA_ENCRYPTION_DISABLED
}
chunkUploader.StartChunk(newChunk, chunkIndex) chunkUploader.StartChunk(newChunk, chunkIndex)
totalCopied++ totalCopied++
} else { } else {

View File

@@ -63,14 +63,21 @@ type Chunk struct {
config *Config // Every chunk is associated with a Config object. Which hashing algorithm to use is determined config *Config // Every chunk is associated with a Config object. Which hashing algorithm to use is determined
// by the config // by the config
encryptionVersion byte // The version type in the encrytion header encryptionVersion byte // The version type in the encrytion header; for a chunk to be copied, this field contains
// one of the CHUNK_RSA_ENCRYPTION_* constants to indicate how the new chunk should be encrypted
} }
// Magic word to identify a duplicacy format encrypted file, plus a version number. // Magic word to identify a duplicacy format encrypted file, plus a version number.
var ENCRYPTION_HEADER = "duplicacy\000" var ENCRYPTION_HEADER = "duplicacy\000"
// RSA encrypted chunks start with "duplicacy\002"
var ENCRYPTION_VERSION_RSA byte = 2 var ENCRYPTION_VERSION_RSA byte = 2
// These constants are used to control how a new chunk should be encrypted by the copy command
var CHUNK_RSA_ENCRYPTION_DEFAULT byte = 0 // No RSA encryption explicitly requested
var CHUNK_RSA_ENCRYPTION_DISABLED byte = 1 // The RSA encryption should be turned off
var CHUNK_RSA_ENCRYPTION_ENABLED byte = 2 // The RSA encryption should be forced on
// CreateChunk creates a new chunk. // CreateChunk creates a new chunk.
func CreateChunk(config *Config, bufferNeeded bool) *Chunk { func CreateChunk(config *Config, bufferNeeded bool) *Chunk {
@@ -193,7 +200,10 @@ func (chunk *Chunk) Encrypt(encryptionKey []byte, derivationKey string, isSnapsh
key := encryptionKey key := encryptionKey
usingRSA := false usingRSA := false
if chunk.config.rsaPublicKey != nil && (!isSnapshot || chunk.encryptionVersion == ENCRYPTION_VERSION_RSA) { // If encryptionVersion is not set, use the default setting (RSA for file chunks only);
// otherwise, enable RSA encryption only when explicitly requested
if chunk.config.rsaPublicKey != nil &&
((!isSnapshot && chunk.encryptionVersion == CHUNK_RSA_ENCRYPTION_DEFAULT) || chunk.encryptionVersion == CHUNK_RSA_ENCRYPTION_ENABLED) {
// If the chunk is not a snpashot chunk, we attempt to encrypt it with the RSA publick key if there is one // If the chunk is not a snpashot chunk, we attempt to encrypt it with the RSA publick key if there is one
randomKey := make([]byte, 32) randomKey := make([]byte, 32)
_, err := rand.Read(randomKey) _, err := rand.Read(randomKey)

View File

@@ -91,7 +91,7 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
storageDir: storageDir, storageDir: storageDir,
minimumNesting: minimumNesting, minimumNesting: minimumNesting,
numberOfThreads: threads, numberOfThreads: threads,
numberOfTries: 6, numberOfTries: 8,
serverAddress: serverAddress, serverAddress: serverAddress,
sftpConfig: sftpConfig, sftpConfig: sftpConfig,
} }
@@ -129,22 +129,19 @@ func (storage *SFTPStorage) retry(f func () error) error {
delay *= 2 delay *= 2
storage.clientLock.Lock() storage.clientLock.Lock()
if storage.client != nil {
storage.client.Close()
storage.client = nil
}
connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig) connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig)
if err != nil { if err != nil {
LOG_WARN("SFT_RECONNECT", "Failed to connect to %s: %v; retrying", storage.serverAddress, err)
storage.clientLock.Unlock() storage.clientLock.Unlock()
return err continue
} }
client, err := sftp.NewClient(connection) client, err := sftp.NewClient(connection)
if err != nil { if err != nil {
LOG_WARN("SFT_RECONNECT", "Failed to create a new SFTP client to %s: %v; retrying", storage.serverAddress, err)
connection.Close() connection.Close()
storage.clientLock.Unlock() storage.clientLock.Unlock()
return err continue
} }
storage.client = client storage.client = client
storage.clientLock.Unlock() storage.clientLock.Unlock()
@@ -275,36 +272,19 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
fullPath := path.Join(storage.storageDir, filePath) fullPath := path.Join(storage.storageDir, filePath)
dirs := strings.Split(filePath, "/") dirs := strings.Split(filePath, "/")
if len(dirs) > 1 { fullDir := path.Dir(fullPath)
fullDir := path.Dir(fullPath) return storage.retry(func() error {
err = storage.retry(func() error {
_, err := storage.getSFTPClient().Stat(fullDir)
return err
})
if err != nil {
// The error may be caused by a non-existent fullDir, or a broken connection. In either case,
// we just assume it is the former because there isn't a way to tell which is the case.
for i := range dirs[1 : len(dirs)-1] {
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
// We don't check the error; just keep going blindly but always store the last err
err = storage.getSFTPClient().Mkdir(subDir)
}
// If there is an error creating the dirs, we check fullDir one more time, because another thread if len(dirs) > 1 {
// may happen to create the same fullDir ahead of this thread _, err := storage.getSFTPClient().Stat(fullDir)
if err != nil { if os.IsNotExist(err) {
err = storage.retry(func() error { for i := range dirs[1 : len(dirs)-1] {
_, err := storage.getSFTPClient().Stat(fullDir) subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
return err // We don't check the error; just keep going blindly
}) storage.getSFTPClient().Mkdir(subDir)
if err != nil {
return err
} }
} }
} }
}
return storage.retry(func() error {
letters := "abcdefghijklmnopqrstuvwxyz" letters := "abcdefghijklmnopqrstuvwxyz"
suffix := make([]byte, 8) suffix := make([]byte, 8)

View File

@@ -531,7 +531,25 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword) accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword)
applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword) applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword)
b2Storage, err := CreateB2Storage(accountID, applicationKey, bucket, storageDir, threads) b2Storage, err := CreateB2Storage(accountID, applicationKey, "", bucket, storageDir, threads)
if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err)
return nil
}
SavePassword(preference, "b2_id", accountID)
SavePassword(preference, "b2_key", applicationKey)
return b2Storage
} else if matched[1] == "b2-custom" {
b2customUrlRegex := regexp.MustCompile(`^b2-custom://([^/]+)/([^/]+)(/(.+))?`)
matched := b2customUrlRegex.FindStringSubmatch(storageURL)
downloadURL := "https://" + matched[1]
bucket := matched[2]
storageDir := matched[4]
accountID := GetPassword(preference, "b2_id", "Enter Backblaze account or application id:", true, resetPassword)
applicationKey := GetPassword(preference, "b2_key", "Enter corresponding Backblaze application key:", true, resetPassword)
b2Storage, err := CreateB2Storage(accountID, applicationKey, downloadURL, bucket, storageDir, threads)
if err != nil { if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err) LOG_ERROR("STORAGE_CREATE", "Failed to load the Backblaze B2 storage at %s: %v", storageURL, err)
return nil return nil

View File

@@ -109,7 +109,7 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "b2" { } else if testStorageName == "b2" {
storage, err := CreateB2Storage(config["account"], config["key"], config["bucket"], config["directory"], threads) storage, err := CreateB2Storage(config["account"], config["key"], "", config["bucket"], config["directory"], threads)
storage.SetDefaultNestingLevels([]int{2, 3}, 2) storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err return storage, err
} else if testStorageName == "gcs-s3" { } else if testStorageName == "gcs-s3" {