mirror of
https://github.com/jkl1337/duplicacy.git
synced 2026-01-02 11:44:45 -06:00
Merge branch 'master' into mac-exclude
This commit is contained in:
@@ -33,10 +33,13 @@ type BackupManager struct {
|
||||
snapshotCache *FileStorage // for copies of chunks needed by snapshots
|
||||
|
||||
config *Config // contains a number of options
|
||||
|
||||
|
||||
nobackupFile string // don't backup directory when this file name is found
|
||||
|
||||
excludeByAttribute bool // don't backup file based on file attribute
|
||||
filtersFile string // the path to the filters file
|
||||
|
||||
excludeByAttribute bool // don't backup file based on file attribute
|
||||
|
||||
}
|
||||
|
||||
func (manager *BackupManager) SetDryRun(dryRun bool) {
|
||||
@@ -46,7 +49,7 @@ func (manager *BackupManager) SetDryRun(dryRun bool) {
|
||||
// CreateBackupManager creates a backup manager using the specified 'storage'. 'snapshotID' is a unique id to
|
||||
// identify snapshots created for this repository. 'top' is the top directory of the repository. 'password' is the
|
||||
// master key which can be nil if encryption is not enabled.
|
||||
func CreateBackupManager(snapshotID string, storage Storage, top string, password string, nobackupFile string, excludeByAttribute bool) *BackupManager {
|
||||
func CreateBackupManager(snapshotID string, storage Storage, top string, password string, nobackupFile string, filtersFile string, excludeByAttribute bool) *BackupManager {
|
||||
|
||||
config, _, err := DownloadConfig(storage, password)
|
||||
if err != nil {
|
||||
@@ -67,10 +70,12 @@ func CreateBackupManager(snapshotID string, storage Storage, top string, passwor
|
||||
SnapshotManager: snapshotManager,
|
||||
|
||||
config: config,
|
||||
|
||||
|
||||
nobackupFile: nobackupFile,
|
||||
|
||||
excludeByAttribute: excludeByAttribute,
|
||||
filtersFile: filtersFile,
|
||||
|
||||
excludeByAttribute: excludeByAttribute,
|
||||
}
|
||||
|
||||
if IsDebugging() {
|
||||
@@ -80,6 +85,11 @@ func CreateBackupManager(snapshotID string, storage Storage, top string, passwor
|
||||
return backupManager
|
||||
}
|
||||
|
||||
// loadRSAPrivateKey loads the specifed private key file for decrypting file chunks
|
||||
func (manager *BackupManager) LoadRSAPrivateKey(keyFile string, passphrase string) {
|
||||
manager.config.loadRSAPrivateKey(keyFile, passphrase)
|
||||
}
|
||||
|
||||
// SetupSnapshotCache creates the snapshot cache, which is merely a local storage under the default .duplicacy
|
||||
// directory
|
||||
func (manager *BackupManager) SetupSnapshotCache(storageName string) bool {
|
||||
@@ -107,6 +117,7 @@ func (manager *BackupManager) SetupSnapshotCache(storageName string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
// setEntryContent sets the 4 content pointers for each entry in 'entries'. 'offset' indicates the value
|
||||
// to be added to the StartChunk and EndChunk points, used when intending to append 'entries' to the
|
||||
// original unchanged entry list.
|
||||
@@ -180,6 +191,15 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
|
||||
|
||||
LOG_DEBUG("BACKUP_PARAMETERS", "top: %s, quick: %t, tag: %s", top, quickMode, tag)
|
||||
|
||||
if manager.config.DataShards != 0 && manager.config.ParityShards != 0 {
|
||||
LOG_INFO("BACKUP_ERASURECODING", "Erasure coding is enabled with %d data shards and %d parity shards",
|
||||
manager.config.DataShards, manager.config.ParityShards)
|
||||
}
|
||||
|
||||
if manager.config.rsaPublicKey != nil && len(manager.config.FileKey) > 0 {
|
||||
LOG_INFO("BACKUP_KEY", "RSA encryption is enabled")
|
||||
}
|
||||
|
||||
remoteSnapshot := manager.SnapshotManager.downloadLatestSnapshot(manager.snapshotID)
|
||||
if remoteSnapshot == nil {
|
||||
remoteSnapshot = CreateEmptySnapshot(manager.snapshotID)
|
||||
@@ -192,7 +212,8 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
|
||||
defer DeleteShadowCopy()
|
||||
|
||||
LOG_INFO("BACKUP_INDEXING", "Indexing %s", top)
|
||||
localSnapshot, skippedDirectories, skippedFiles, err := CreateSnapshotFromDirectory(manager.snapshotID, shadowTop, manager.nobackupFile, manager.excludeByAttribute)
|
||||
localSnapshot, skippedDirectories, skippedFiles, err := CreateSnapshotFromDirectory(manager.snapshotID, shadowTop,
|
||||
manager.nobackupFile, manager.filtersFile, manager.excludeByAttribute)
|
||||
if err != nil {
|
||||
LOG_ERROR("SNAPSHOT_LIST", "Failed to list the directory %s: %v", top, err)
|
||||
return false
|
||||
@@ -202,6 +223,11 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
|
||||
return true
|
||||
}
|
||||
|
||||
if len(localSnapshot.Files) == 0 {
|
||||
LOG_ERROR("SNAPSHOT_EMPTY", "No files under the repository to be backed up")
|
||||
return false
|
||||
}
|
||||
|
||||
// This cache contains all chunks referenced by last snasphot. Any other chunks will lead to a call to
|
||||
// UploadChunk.
|
||||
chunkCache := make(map[string]bool)
|
||||
@@ -506,6 +532,11 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
|
||||
chunkID := chunk.GetID()
|
||||
chunkSize := chunk.GetLength()
|
||||
|
||||
if chunkSize == 0 {
|
||||
LOG_DEBUG("CHUNK_EMPTY", "Ignored chunk %s of size 0", chunkID)
|
||||
return
|
||||
}
|
||||
|
||||
chunkIndex++
|
||||
|
||||
_, found := chunkCache[chunkID]
|
||||
@@ -727,7 +758,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
|
||||
// the same as 'top'. 'quickMode' will bypass files with unchanged sizes and timestamps. 'deleteMode' will
|
||||
// remove local files that don't exist in the snapshot. 'patterns' is used to include/exclude certain files.
|
||||
func (manager *BackupManager) Restore(top string, revision int, inPlace bool, quickMode bool, threads int, overwrite bool,
|
||||
deleteMode bool, setOwner bool, showStatistics bool, patterns []string) bool {
|
||||
deleteMode bool, setOwner bool, showStatistics bool, patterns []string, allowFailures bool) int {
|
||||
|
||||
startTime := time.Now().Unix()
|
||||
|
||||
@@ -750,7 +781,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
err = os.Mkdir(top, 0744)
|
||||
if err != nil {
|
||||
LOG_ERROR("RESTORE_MKDIR", "Can't create the directory to be restored: %v", err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -758,16 +789,17 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
err = os.Mkdir(path.Join(top, DUPLICACY_DIRECTORY), 0744)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
LOG_ERROR("RESTORE_MKDIR", "Failed to create the preference directory: %v", err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
|
||||
remoteSnapshot := manager.SnapshotManager.DownloadSnapshot(manager.snapshotID, revision)
|
||||
manager.SnapshotManager.DownloadSnapshotContents(remoteSnapshot, patterns, true)
|
||||
|
||||
localSnapshot, _, _, err := CreateSnapshotFromDirectory(manager.snapshotID, top, manager.nobackupFile, manager.excludeByAttribute)
|
||||
localSnapshot, _, _, err := CreateSnapshotFromDirectory(manager.snapshotID, top, manager.nobackupFile,
|
||||
manager.filtersFile, manager.excludeByAttribute)
|
||||
if err != nil {
|
||||
LOG_ERROR("SNAPSHOT_LIST", "Failed to list the repository: %v", err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
|
||||
LOG_INFO("RESTORE_START", "Restoring %s to revision %d", top, revision)
|
||||
@@ -794,6 +826,11 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
|
||||
var totalFileSize int64
|
||||
var downloadedFileSize int64
|
||||
var failedFiles int
|
||||
var skippedFileSize int64
|
||||
var skippedFiles int64
|
||||
|
||||
var downloadedFiles []*Entry
|
||||
|
||||
i := 0
|
||||
for _, entry := range remoteSnapshot.Files {
|
||||
@@ -811,6 +848,9 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
if compare == 0 {
|
||||
i++
|
||||
if quickMode && local.IsSameAs(entry) {
|
||||
LOG_TRACE("RESTORE_SKIP", "File %s unchanged (by size and timestamp)", local.Path)
|
||||
skippedFileSize += entry.Size
|
||||
skippedFiles++
|
||||
skipped = true
|
||||
}
|
||||
}
|
||||
@@ -840,7 +880,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
err = os.Symlink(entry.Link, fullPath)
|
||||
if err != nil {
|
||||
LOG_ERROR("RESTORE_SYMLINK", "Can't create symlink %s: %v", entry.Path, err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
entry.RestoreMetadata(fullPath, nil, setOwner)
|
||||
LOG_TRACE("DOWNLOAD_DONE", "Symlink %s updated", entry.Path)
|
||||
@@ -849,7 +889,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
|
||||
if err == nil && !stat.IsDir() {
|
||||
LOG_ERROR("RESTORE_NOTDIR", "The path %s is not a directory", fullPath)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
|
||||
if os.IsNotExist(err) {
|
||||
@@ -858,7 +898,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
err = os.MkdirAll(fullPath, 0700)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
LOG_ERROR("RESTORE_MKDIR", "%v", err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -876,14 +916,13 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
// Sort entries by their starting chunks in order to linearize the access to the chunk chain.
|
||||
sort.Sort(ByChunk(fileEntries))
|
||||
|
||||
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, showStatistics, threads)
|
||||
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, showStatistics, threads, allowFailures)
|
||||
chunkDownloader.AddFiles(remoteSnapshot, fileEntries)
|
||||
|
||||
chunkMaker := CreateChunkMaker(manager.config, true)
|
||||
|
||||
startDownloadingTime := time.Now().Unix()
|
||||
|
||||
var downloadedFiles []*Entry
|
||||
// Now download files one by one
|
||||
for _, file := range fileEntries {
|
||||
|
||||
@@ -893,16 +932,21 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
if quickMode {
|
||||
if file.IsSameAsFileInfo(stat) {
|
||||
LOG_TRACE("RESTORE_SKIP", "File %s unchanged (by size and timestamp)", file.Path)
|
||||
skippedFileSize += file.Size
|
||||
skippedFiles++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if file.Size == 0 && file.IsSameAsFileInfo(stat) {
|
||||
LOG_TRACE("RESTORE_SKIP", "File %s unchanged (size 0)", file.Path)
|
||||
skippedFileSize += file.Size
|
||||
skippedFiles++
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
err = os.MkdirAll(path.Dir(fullPath), 0744)
|
||||
parent, _ := SplitDir(fullPath)
|
||||
err = os.MkdirAll(parent, 0744)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_MKDIR", "Failed to create directory: %v", err)
|
||||
}
|
||||
@@ -913,22 +957,39 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
newFile, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, file.GetPermissions())
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_OPEN", "Failed to create empty file: %v", err)
|
||||
return false
|
||||
return 0
|
||||
}
|
||||
newFile.Close()
|
||||
|
||||
file.RestoreMetadata(fullPath, nil, setOwner)
|
||||
if !showStatistics {
|
||||
LOG_INFO("DOWNLOAD_DONE", "Downloaded %s (0)", file.Path)
|
||||
downloadedFileSize += file.Size
|
||||
downloadedFiles = append(downloadedFiles, file)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if manager.RestoreFile(chunkDownloader, chunkMaker, file, top, inPlace, overwrite, showStatistics,
|
||||
totalFileSize, downloadedFileSize, startDownloadingTime) {
|
||||
downloaded, err := manager.RestoreFile(chunkDownloader, chunkMaker, file, top, inPlace, overwrite, showStatistics,
|
||||
totalFileSize, downloadedFileSize, startDownloadingTime, allowFailures)
|
||||
if err != nil {
|
||||
// RestoreFile returned an error; if allowFailures is false RestoerFile would error out and not return so here
|
||||
// we just need to show a warning
|
||||
failedFiles++
|
||||
LOG_WARN("DOWNLOAD_FAIL", "Failed to restore %s: %v", file.Path, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// No error
|
||||
if downloaded {
|
||||
// No error, file was restored
|
||||
downloadedFileSize += file.Size
|
||||
downloadedFiles = append(downloadedFiles, file)
|
||||
} else {
|
||||
// No error, file was skipped
|
||||
skippedFileSize += file.Size
|
||||
skippedFiles++
|
||||
}
|
||||
file.RestoreMetadata(fullPath, nil, setOwner)
|
||||
}
|
||||
@@ -956,11 +1017,16 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
}
|
||||
}
|
||||
|
||||
if failedFiles > 0 {
|
||||
return failedFiles
|
||||
}
|
||||
|
||||
LOG_INFO("RESTORE_END", "Restored %s to revision %d", top, revision)
|
||||
if showStatistics {
|
||||
LOG_INFO("RESTORE_STATS", "Files: %d total, %s bytes", len(fileEntries), PrettySize(totalFileSize))
|
||||
LOG_INFO("RESTORE_STATS", "Downloaded %d file, %s bytes, %d chunks",
|
||||
len(downloadedFiles), PrettySize(downloadedFileSize), chunkDownloader.numberOfDownloadedChunks)
|
||||
LOG_INFO("RESTORE_STATS", "Skipped %d file, %s bytes", skippedFiles, PrettySize(skippedFileSize))
|
||||
}
|
||||
|
||||
runningTime := time.Now().Unix() - startTime
|
||||
@@ -972,7 +1038,7 @@ func (manager *BackupManager) Restore(top string, revision int, inPlace bool, qu
|
||||
|
||||
chunkDownloader.Stop()
|
||||
|
||||
return true
|
||||
return 0
|
||||
}
|
||||
|
||||
// fileEncoder encodes one file at a time to avoid loading the full json description of the entire file tree
|
||||
@@ -985,12 +1051,12 @@ type fileEncoder struct {
|
||||
buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
// Read reads data from the embeded buffer
|
||||
// Read reads data from the embedded buffer
|
||||
func (encoder fileEncoder) Read(data []byte) (n int, err error) {
|
||||
return encoder.buffer.Read(data)
|
||||
}
|
||||
|
||||
// NextFile switchs to the next file and generates its json description in the buffer. It also takes care of
|
||||
// NextFile switches to the next file and generates its json description in the buffer. It also takes care of
|
||||
// the ending ']' and the commas between files.
|
||||
func (encoder *fileEncoder) NextFile() (io.Reader, bool) {
|
||||
if encoder.currentIndex == len(encoder.files) {
|
||||
@@ -1130,10 +1196,13 @@ func (manager *BackupManager) UploadSnapshot(chunkMaker *ChunkMaker, uploader *C
|
||||
}
|
||||
|
||||
// Restore downloads a file from the storage. If 'inPlace' is false, the download file is saved first to a temporary
|
||||
// file under the .duplicacy directory and then replaces the existing one. Otherwise, the exising file will be
|
||||
// file under the .duplicacy directory and then replaces the existing one. Otherwise, the existing file will be
|
||||
// overwritten directly.
|
||||
// Return: true, nil: Restored file;
|
||||
// false, nil: Skipped file;
|
||||
// false, error: Failure to restore file (only if allowFailures == true)
|
||||
func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chunkMaker *ChunkMaker, entry *Entry, top string, inPlace bool, overwrite bool,
|
||||
showStatistics bool, totalFileSize int64, downloadedFileSize int64, startTime int64) bool {
|
||||
showStatistics bool, totalFileSize int64, downloadedFileSize int64, startTime int64, allowFailures bool) (bool, error) {
|
||||
|
||||
LOG_TRACE("DOWNLOAD_START", "Downloading %s", entry.Path)
|
||||
|
||||
@@ -1166,6 +1235,9 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
lengthMap := make(map[string]int)
|
||||
var offset int64
|
||||
|
||||
// If the file is newly created (needed by sparse file optimization)
|
||||
isNewFile := false
|
||||
|
||||
existingFile, err = os.Open(fullPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@@ -1175,7 +1247,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
existingFile, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_CREATE", "Failed to create the file %s for in-place writing: %v", fullPath, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
n := int64(1)
|
||||
@@ -1187,31 +1259,29 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
_, err = existingFile.Seek(entry.Size-n, 0)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_CREATE", "Failed to resize the initial file %s for in-place writing: %v", fullPath, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
_, err = existingFile.Write([]byte("\x00\x00")[:n])
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_CREATE", "Failed to initialize the sparse file %s for in-place writing: %v", fullPath, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
existingFile.Close()
|
||||
existingFile, err = os.Open(fullPath)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_OPEN", "Can't reopen the initial file just created: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
isNewFile = true
|
||||
}
|
||||
} else {
|
||||
LOG_TRACE("DOWNLOAD_OPEN", "Can't open the existing file: %v", err)
|
||||
}
|
||||
} else {
|
||||
if !overwrite {
|
||||
LOG_ERROR("DOWNLOAD_OVERWRITE",
|
||||
"File %s already exists. Please specify the -overwrite option to continue", entry.Path)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// The key in this map is the number of zeroes. The value is the corresponding hash.
|
||||
knownHashes := make(map[int]string)
|
||||
|
||||
fileHash := ""
|
||||
if existingFile != nil {
|
||||
|
||||
@@ -1221,6 +1291,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
fileHasher := manager.config.NewFileHasher()
|
||||
buffer := make([]byte, 64*1024)
|
||||
err = nil
|
||||
isSkipped := false
|
||||
// We set to read one more byte so the file hash will be different if the file to be restored is a
|
||||
// truncated portion of the existing file
|
||||
for i := entry.StartChunk; i <= entry.EndChunk+1; i++ {
|
||||
@@ -1236,6 +1307,28 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
chunkSize = 1 // the size of extra chunk beyond EndChunk
|
||||
}
|
||||
count := 0
|
||||
|
||||
if isNewFile {
|
||||
if hash, found := knownHashes[chunkSize]; found {
|
||||
// We have read the same number of zeros before, so we just retrieve the hash from the map
|
||||
existingChunks = append(existingChunks, hash)
|
||||
existingLengths = append(existingLengths, chunkSize)
|
||||
offsetMap[hash] = offset
|
||||
lengthMap[hash] = chunkSize
|
||||
offset += int64(chunkSize)
|
||||
isSkipped = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if isSkipped {
|
||||
_, err := existingFile.Seek(offset, 0)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_SEEK", "Failed to seek to offset %d: %v", offset, err)
|
||||
}
|
||||
isSkipped = false
|
||||
}
|
||||
|
||||
for count < chunkSize {
|
||||
n := chunkSize - count
|
||||
if n > cap(buffer) {
|
||||
@@ -1252,7 +1345,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
}
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_SPLIT", "Failed to read existing file: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
if count > 0 {
|
||||
@@ -1262,13 +1355,30 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
offsetMap[hash] = offset
|
||||
lengthMap[hash] = chunkSize
|
||||
offset += int64(chunkSize)
|
||||
if isNewFile {
|
||||
knownHashes[chunkSize] = hash
|
||||
}
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
fileHash = hex.EncodeToString(fileHasher.Sum(nil))
|
||||
|
||||
if fileHash == entry.Hash && fileHash != "" {
|
||||
LOG_TRACE("DOWNLOAD_SKIP", "File %s unchanged (by hash)", entry.Path)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// fileHash != entry.Hash, warn/error depending on -overwrite option
|
||||
if !overwrite {
|
||||
LOG_WERROR(allowFailures, "DOWNLOAD_OVERWRITE",
|
||||
"File %s already exists. Please specify the -overwrite option to overwrite", entry.Path)
|
||||
return false, fmt.Errorf("file exists")
|
||||
}
|
||||
|
||||
} else {
|
||||
// If it is not inplace, we want to reuse any chunks in the existing file regardless their offets, so
|
||||
// we run the chunk maker to split the original file.
|
||||
@@ -1288,9 +1398,11 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
return nil, false
|
||||
})
|
||||
}
|
||||
|
||||
// This is an additional check comparing fileHash to entry.Hash above, so this should no longer occur
|
||||
if fileHash == entry.Hash && fileHash != "" {
|
||||
LOG_TRACE("DOWNLOAD_SKIP", "File %s unchanged (by hash)", entry.Path)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1318,7 +1430,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
existingFile, err = os.OpenFile(fullPath, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_OPEN", "Failed to open the file %s for in-place writing", fullPath)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1350,7 +1462,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
_, err = existingFile.Seek(offset, 0)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_SEEK", "Failed to set the offset to %d for file %s: %v", offset, fullPath, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if the chunk is available in the existing file
|
||||
@@ -1360,17 +1472,20 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
_, err := io.CopyN(hasher, existingFile, int64(existingLengths[j]))
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_READ", "Failed to read the existing chunk %s: %v", hash, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
if IsDebugging() {
|
||||
LOG_DEBUG("DOWNLOAD_UNCHANGED", "Chunk %s is unchanged", manager.config.GetChunkIDFromHash(hash))
|
||||
}
|
||||
} else {
|
||||
chunk := chunkDownloader.WaitForChunk(i)
|
||||
if chunk.isBroken {
|
||||
return false, fmt.Errorf("chunk %s is corrupted", manager.config.GetChunkIDFromHash(hash))
|
||||
}
|
||||
_, err = existingFile.Write(chunk.GetBytes()[start:end])
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_WRITE", "Failed to write to the file: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
hasher.Write(chunk.GetBytes()[start:end])
|
||||
}
|
||||
@@ -1381,15 +1496,15 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
// Must truncate the file if the new size is smaller
|
||||
if err = existingFile.Truncate(offset); err != nil {
|
||||
LOG_ERROR("DOWNLOAD_TRUNCATE", "Failed to truncate the file at %d: %v", offset, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Verify the download by hash
|
||||
hash := hex.EncodeToString(hasher.Sum(nil))
|
||||
if hash != entry.Hash && hash != "" && entry.Hash != "" && !strings.HasPrefix(entry.Hash, "#") {
|
||||
LOG_ERROR("DOWNLOAD_HASH", "File %s has a mismatched hash: %s instead of %s (in-place)",
|
||||
LOG_WERROR(allowFailures, "DOWNLOAD_HASH", "File %s has a mismatched hash: %s instead of %s (in-place)",
|
||||
fullPath, "", entry.Hash)
|
||||
return false
|
||||
return false, fmt.Errorf("file corrupt (hash mismatch)")
|
||||
}
|
||||
|
||||
} else {
|
||||
@@ -1398,7 +1513,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
newFile, err = os.OpenFile(temporaryPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_OPEN", "Failed to open file for writing: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
hasher := manager.config.NewFileHasher()
|
||||
@@ -1436,6 +1551,9 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
|
||||
if !hasLocalCopy {
|
||||
chunk := chunkDownloader.WaitForChunk(i)
|
||||
if chunk.isBroken {
|
||||
return false, fmt.Errorf("chunk %s is corrupted", manager.config.GetChunkIDFromHash(hash))
|
||||
}
|
||||
// If the chunk was downloaded from the storage, we may still need a portion of it.
|
||||
start := 0
|
||||
if i == entry.StartChunk {
|
||||
@@ -1451,7 +1569,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
_, err = newFile.Write(data)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_WRITE", "Failed to write file: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
hasher.Write(data)
|
||||
@@ -1460,9 +1578,9 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
|
||||
hash := hex.EncodeToString(hasher.Sum(nil))
|
||||
if hash != entry.Hash && hash != "" && entry.Hash != "" && !strings.HasPrefix(entry.Hash, "#") {
|
||||
LOG_ERROR("DOWNLOAD_HASH", "File %s has a mismatched hash: %s instead of %s",
|
||||
LOG_WERROR(allowFailures, "DOWNLOAD_HASH", "File %s has a mismatched hash: %s instead of %s",
|
||||
entry.Path, hash, entry.Hash)
|
||||
return false
|
||||
return false, fmt.Errorf("file corrupt (hash mismatch)")
|
||||
}
|
||||
|
||||
if existingFile != nil {
|
||||
@@ -1476,31 +1594,40 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
|
||||
err = os.Remove(fullPath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
LOG_ERROR("DOWNLOAD_REMOVE", "Failed to remove the old file: %v", err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err = os.Rename(temporaryPath, fullPath)
|
||||
if err != nil {
|
||||
LOG_ERROR("DOWNLOAD_RENAME", "Failed to rename the file %s to %s: %v", temporaryPath, fullPath, err)
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if !showStatistics {
|
||||
LOG_INFO("DOWNLOAD_DONE", "Downloaded %s (%d)", entry.Path, entry.Size)
|
||||
}
|
||||
return true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// CopySnapshots copies the specified snapshots from one storage to the other.
|
||||
func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapshotID string,
|
||||
revisionsToBeCopied []int, threads int) bool {
|
||||
revisionsToBeCopied []int, uploadingThreads int, downloadingThreads int) bool {
|
||||
|
||||
if !manager.config.IsCompatiableWith(otherManager.config) {
|
||||
LOG_ERROR("CONFIG_INCOMPATIABLE", "Two storages are not compatiable for the copy operation")
|
||||
LOG_ERROR("CONFIG_INCOMPATIBLE", "Two storages are not compatible for the copy operation")
|
||||
return false
|
||||
}
|
||||
|
||||
if otherManager.config.DataShards != 0 && otherManager.config.ParityShards != 0 {
|
||||
LOG_INFO("BACKUP_ERASURECODING", "Erasure coding is enabled for the destination storage with %d data shards and %d parity shards",
|
||||
otherManager.config.DataShards, otherManager.config.ParityShards)
|
||||
}
|
||||
|
||||
if otherManager.config.rsaPublicKey != nil && len(otherManager.config.FileKey) > 0 {
|
||||
LOG_INFO("BACKUP_KEY", "RSA encryption is enabled for the destination")
|
||||
}
|
||||
|
||||
if snapshotID == "" && len(revisionsToBeCopied) > 0 {
|
||||
LOG_ERROR("SNAPSHOT_ERROR", "You must specify the snapshot id when one or more revisions are specified.")
|
||||
return false
|
||||
@@ -1580,6 +1707,9 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
|
||||
return true
|
||||
}
|
||||
|
||||
// These two maps store hashes of chunks in the source and destination storages, respectively. Note that
|
||||
// the value of 'chunks' is used to indicated if the chunk is a snapshot chunk, while the value of 'otherChunks'
|
||||
// is not used.
|
||||
chunks := make(map[string]bool)
|
||||
otherChunks := make(map[string]bool)
|
||||
|
||||
@@ -1592,21 +1722,15 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
|
||||
LOG_TRACE("SNAPSHOT_COPY", "Copying snapshot %s at revision %d", snapshot.ID, snapshot.Revision)
|
||||
|
||||
for _, chunkHash := range snapshot.FileSequence {
|
||||
if _, found := chunks[chunkHash]; !found {
|
||||
chunks[chunkHash] = true
|
||||
}
|
||||
chunks[chunkHash] = true // The chunk is a snapshot chunk
|
||||
}
|
||||
|
||||
for _, chunkHash := range snapshot.ChunkSequence {
|
||||
if _, found := chunks[chunkHash]; !found {
|
||||
chunks[chunkHash] = true
|
||||
}
|
||||
chunks[chunkHash] = true // The chunk is a snapshot chunk
|
||||
}
|
||||
|
||||
for _, chunkHash := range snapshot.LengthSequence {
|
||||
if _, found := chunks[chunkHash]; !found {
|
||||
chunks[chunkHash] = true
|
||||
}
|
||||
chunks[chunkHash] = true // The chunk is a snapshot chunk
|
||||
}
|
||||
|
||||
description := manager.SnapshotManager.DownloadSequence(snapshot.ChunkSequence)
|
||||
@@ -1619,9 +1743,11 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
|
||||
|
||||
for _, chunkHash := range snapshot.ChunkHashes {
|
||||
if _, found := chunks[chunkHash]; !found {
|
||||
chunks[chunkHash] = true
|
||||
chunks[chunkHash] = false // The chunk is a file chunk
|
||||
}
|
||||
}
|
||||
|
||||
snapshot.ChunkHashes = nil
|
||||
}
|
||||
|
||||
otherChunkFiles, otherChunkSizes := otherManager.SnapshotManager.ListAllFiles(otherManager.storage, "chunks/")
|
||||
@@ -1640,62 +1766,64 @@ func (manager *BackupManager) CopySnapshots(otherManager *BackupManager, snapsho
|
||||
|
||||
LOG_DEBUG("SNAPSHOT_COPY", "Found %d chunks on destination storage", len(otherChunks))
|
||||
|
||||
chunksToCopy := 0
|
||||
chunksToSkip := 0
|
||||
var chunksToCopy []string
|
||||
|
||||
for chunkHash, _ := range chunks {
|
||||
for chunkHash := range chunks {
|
||||
otherChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)
|
||||
if _, found := otherChunks[otherChunkID]; found {
|
||||
chunksToSkip++
|
||||
} else {
|
||||
chunksToCopy++
|
||||
if _, found := otherChunks[otherChunkID]; !found {
|
||||
chunksToCopy = append(chunksToCopy, chunkHash)
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG("SNAPSHOT_COPY", "Chunks to copy = %d, to skip = %d, total = %d", chunksToCopy, chunksToSkip, chunksToCopy+chunksToSkip)
|
||||
LOG_DEBUG("SNAPSHOT_COPY", "Total chunks in source snapshot revisions = %d\n", len(chunks))
|
||||
LOG_INFO("SNAPSHOT_COPY", "Chunks to copy: %d, to skip: %d, total: %d", len(chunksToCopy), len(chunks) - len(chunksToCopy), len(chunks))
|
||||
|
||||
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, threads)
|
||||
chunkDownloader := CreateChunkDownloader(manager.config, manager.storage, nil, false, downloadingThreads, false)
|
||||
|
||||
chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, threads,
|
||||
var uploadedBytes int64
|
||||
startTime := time.Now()
|
||||
|
||||
copiedChunks := 0
|
||||
chunkUploader := CreateChunkUploader(otherManager.config, otherManager.storage, nil, uploadingThreads,
|
||||
func(chunk *Chunk, chunkIndex int, skipped bool, chunkSize int, uploadSize int) {
|
||||
if skipped {
|
||||
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) exists at the destination", chunk.GetID(), chunkIndex, len(chunks))
|
||||
} else {
|
||||
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) copied to the destination", chunk.GetID(), chunkIndex, len(chunks))
|
||||
action := "Skipped"
|
||||
if !skipped {
|
||||
copiedChunks++
|
||||
action = "Copied"
|
||||
}
|
||||
|
||||
atomic.AddInt64(&uploadedBytes, int64(chunkSize))
|
||||
|
||||
elapsedTime := time.Now().Sub(startTime).Seconds()
|
||||
speed := int64(float64(atomic.LoadInt64(&uploadedBytes)) / elapsedTime)
|
||||
remainingTime := int64(float64(len(chunksToCopy) - chunkIndex - 1) / float64(chunkIndex + 1) * elapsedTime)
|
||||
percentage := float64(chunkIndex + 1) / float64(len(chunksToCopy)) * 100.0
|
||||
LOG_INFO("COPY_PROGRESS", "%s chunk %s (%d/%d) %sB/s %s %.1f%%",
|
||||
action, chunk.GetID(), chunkIndex + 1, len(chunksToCopy),
|
||||
PrettySize(speed), PrettyTime(remainingTime), percentage)
|
||||
otherManager.config.PutChunk(chunk)
|
||||
})
|
||||
|
||||
chunkUploader.Start()
|
||||
|
||||
totalCopied := 0
|
||||
totalSkipped := 0
|
||||
chunkIndex := 0
|
||||
|
||||
for chunkHash, _ := range chunks {
|
||||
chunkIndex++
|
||||
for _, chunkHash := range chunksToCopy {
|
||||
chunkDownloader.AddChunk(chunkHash)
|
||||
}
|
||||
for i, chunkHash := range chunksToCopy {
|
||||
chunkID := manager.config.GetChunkIDFromHash(chunkHash)
|
||||
newChunkID := otherManager.config.GetChunkIDFromHash(chunkHash)
|
||||
if _, found := otherChunks[newChunkID]; !found {
|
||||
LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID)
|
||||
i := chunkDownloader.AddChunk(chunkHash)
|
||||
chunk := chunkDownloader.WaitForChunk(i)
|
||||
newChunk := otherManager.config.GetChunk()
|
||||
newChunk.Reset(true)
|
||||
newChunk.Write(chunk.GetBytes())
|
||||
chunkUploader.StartChunk(newChunk, chunkIndex)
|
||||
totalCopied++
|
||||
} else {
|
||||
LOG_INFO("SNAPSHOT_COPY", "Chunk %s (%d/%d) skipped at the destination", chunkID, chunkIndex, len(chunks))
|
||||
totalSkipped++
|
||||
}
|
||||
LOG_DEBUG("SNAPSHOT_COPY", "Copying chunk %s to %s", chunkID, newChunkID)
|
||||
chunk := chunkDownloader.WaitForChunk(i)
|
||||
newChunk := otherManager.config.GetChunk()
|
||||
newChunk.Reset(true)
|
||||
newChunk.Write(chunk.GetBytes())
|
||||
newChunk.isSnapshot = chunks[chunkHash]
|
||||
chunkUploader.StartChunk(newChunk, i)
|
||||
}
|
||||
|
||||
chunkDownloader.Stop()
|
||||
chunkUploader.Stop()
|
||||
|
||||
LOG_INFO("SNAPSHOT_COPY", "Copy complete, %d total chunks, %d chunks copied, %d skipped", totalCopied+totalSkipped, totalCopied, totalSkipped)
|
||||
LOG_INFO("SNAPSHOT_COPY", "Copied %d new chunks and skipped %d existing chunks", copiedChunks, len(chunks) - copiedChunks)
|
||||
|
||||
for _, snapshot := range snapshots {
|
||||
if revisionMap[snapshot.ID][snapshot.Revision] == false {
|
||||
|
||||
Reference in New Issue
Block a user