Implement fast resume; refactor GetDuplicacyPreferencePath()

This commit is contained in:
Gilbert Chen
2017-06-22 22:53:33 -04:00
parent 839be6094f
commit d0c376f593
12 changed files with 393 additions and 146 deletions

View File

@@ -13,6 +13,7 @@ import (
"path"
"time"
"sort"
"sync"
"sync/atomic"
"strings"
"strconv"
@@ -70,9 +71,9 @@ func CreateBackupManager(snapshotID string, storage Storage, top string, passwor
// SetupSnapshotCache creates the snapshot cache, which is merely a local storage under the default .duplicacy
// directory
func (manager *BackupManager) SetupSnapshotCache(top string, storageName string) bool {
func (manager *BackupManager) SetupSnapshotCache(storageName string) bool {
preferencePath := GetDuplicacyPreferencePath(top)
preferencePath := GetDuplicacyPreferencePath()
cacheDir := path.Join(preferencePath, "cache", storageName)
storage, err := CreateFileStorage(cacheDir, 1)
@@ -94,11 +95,19 @@ func (manager *BackupManager) SetupSnapshotCache(top string, storageName string)
return true
}
// setEntryContent sets the 4 content pointers for each entry in 'entries'. 'offset' indicates the value
// to be added to the StartChunk and EndChunk points, used when intending to append 'entries' to the
// original unchanged entry list.
//
// This function assumes the Size field of each entry is equal to the length of the chunk content that belong
// to the file.
func setEntryContent(entries[] *Entry, chunkLengths[]int, offset int) {
if len(entries) == 0 {
return
}
// The following code works by iterating over 'entries' and 'chunkLength' and keeping track of the
// accumulated total file size and the accumulated total chunk size.
i := 0
totalChunkSize := int64(0)
totalFileSize := entries[i].Size
@@ -115,6 +124,8 @@ func setEntryContent(entries[] *Entry, chunkLengths[]int, offset int) {
break
}
// If the current file ends at the end of the current chunk, the next file will
// start at the next chunk
if totalChunkSize + int64(length) == totalFileSize {
entries[i].StartChunk = j + 1 + offset
entries[i].StartOffset = 0
@@ -126,6 +137,9 @@ func setEntryContent(entries[] *Entry, chunkLengths[]int, offset int) {
totalFileSize += entries[i].Size
}
if i >= len(entries) {
break
}
totalChunkSize += int64(length)
}
}
@@ -150,7 +164,6 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
remoteSnapshot := manager.SnapshotManager.downloadLatestSnapshot(manager.snapshotID)
if remoteSnapshot == nil {
quickMode = false
remoteSnapshot = CreateEmptySnapshot(manager.snapshotID)
LOG_INFO("BACKUP_START", "No previous backup found")
} else {
@@ -171,28 +184,72 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
// UploadChunk.
chunkCache := make(map[string]bool)
var incompleteSnapshot *Snapshot
// A revision number of 0 means this is the initial backup
if remoteSnapshot.Revision > 0 {
// Add all chunks in the last snapshot to the
// Add all chunks in the last snapshot to the cache
for _, chunkID := range manager.SnapshotManager.GetSnapshotChunks(remoteSnapshot) {
chunkCache[chunkID] = true
}
} else if manager.storage.IsFastListing() {
// If the listing operation is fast, list all chunks and put them in the cache.
LOG_INFO("BACKUP_LIST", "Listing all chunks")
allChunks, _ := manager.SnapshotManager.ListAllFiles(manager.storage, "chunks/")
for _, chunk := range allChunks {
if len(chunk) == 0 || chunk[len(chunk) - 1] == '/' {
continue
}
if strings.HasSuffix(chunk, ".fsl") {
continue
}
chunk = strings.Replace(chunk, "/", "", -1)
chunkCache[chunk] = true
} else {
// In quick mode, attempt to load the incomplete snapshot from last incomplete backup if there is one.
if quickMode {
incompleteSnapshot = LoadIncompleteSnapshot()
}
// If the listing operation is fast or there is an incomplete snapshot, list all chunks and
// put them in the cache.
if manager.storage.IsFastListing() || incompleteSnapshot != nil {
LOG_INFO("BACKUP_LIST", "Listing all chunks")
allChunks, _ := manager.SnapshotManager.ListAllFiles(manager.storage, "chunks/")
for _, chunk := range allChunks {
if len(chunk) == 0 || chunk[len(chunk) - 1] == '/' {
continue
}
if strings.HasSuffix(chunk, ".fsl") {
continue
}
chunk = strings.Replace(chunk, "/", "", -1)
chunkCache[chunk] = true
}
}
if incompleteSnapshot != nil {
// This is the last chunk from the incomplete snapshot that can be found in the cache
lastCompleteChunk := -1
for i, chunkHash := range incompleteSnapshot.ChunkHashes {
chunkID := manager.config.GetChunkIDFromHash(chunkHash)
if _, ok := chunkCache[chunkID]; ok {
lastCompleteChunk = i
} else {
break
}
}
// Only keep those files whose chunks exist in the cache
var files []*Entry
for _, file := range incompleteSnapshot.Files {
if file.StartChunk <= lastCompleteChunk && file.EndChunk <= lastCompleteChunk {
files = append(files, file)
} else {
break
}
}
incompleteSnapshot.Files = files
// Remove incomplete chunks (they may not have been uploaded)
incompleteSnapshot.ChunkHashes = incompleteSnapshot.ChunkHashes[:lastCompleteChunk + 1]
incompleteSnapshot.ChunkLengths = incompleteSnapshot.ChunkLengths[:lastCompleteChunk + 1]
remoteSnapshot = incompleteSnapshot
}
}
var numberOfNewFileChunks int // number of new file chunks
@@ -211,10 +268,11 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
var modifiedEntries [] *Entry // Files that has been modified or newly created
var preservedEntries [] *Entry // Files unchanges
// If the quick mode is enabled, we simply treat all files as if they were new, and break them into chunks.
// If the quick mode is disable and there isn't an incomplete snapshot from last (failed) backup,
// we simply treat all files as if they were new, and break them into chunks.
// Otherwise, we need to find those that are new or recently modified
if !quickMode {
if remoteSnapshot.Revision == 0 && incompleteSnapshot == nil {
modifiedEntries = localSnapshot.Files
for _, entry := range modifiedEntries {
totalModifiedFileSize += entry.Size
@@ -268,7 +326,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
var preservedChunkHashes []string
var preservedChunkLengths []int
// For each preserved file, adjust the indices StartChunk and EndChunk. This is done by finding gaps
// For each preserved file, adjust the StartChunk and EndChunk pointers. This is done by finding gaps
// between these indices and subtracting the number of deleted chunks.
last := -1
deletedChunks := 0
@@ -295,6 +353,7 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
var uploadedEntries [] *Entry
var uploadedChunkHashes []string
var uploadedChunkLengths []int
var uploadedChunkLock = &sync.Mutex{}
// the file reader implements the Reader interface. When an EOF is encounter, it opens the next file unless it
// is the last file.
@@ -318,6 +377,37 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
chunkMaker := CreateChunkMaker(manager.config, false)
chunkUploader := CreateChunkUploader(manager.config, manager.storage, nil, threads, nil)
localSnapshotReady := false
var once sync.Once
if remoteSnapshot.Revision == 0 {
// In case an error occurs during the initial backup, save the incomplete snapshot
RunAtError = func() {
once.Do(
func() {
if !localSnapshotReady {
// Lock it to gain exclusive access to uploadedChunkHashes and uploadedChunkLengths
uploadedChunkLock.Lock()
for _, entry := range uploadedEntries {
entry.EndChunk = -1
}
setEntryContent(uploadedEntries, uploadedChunkLengths, len(preservedChunkHashes))
if len(preservedChunkHashes) > 0 {
localSnapshot.ChunkHashes = preservedChunkHashes
localSnapshot.ChunkHashes = append(localSnapshot.ChunkHashes, uploadedChunkHashes...)
localSnapshot.ChunkLengths = preservedChunkLengths
localSnapshot.ChunkLengths = append(localSnapshot.ChunkLengths, uploadedChunkLengths...)
} else {
localSnapshot.ChunkHashes = uploadedChunkHashes
localSnapshot.ChunkLengths = uploadedChunkLengths
}
uploadedChunkLock.Unlock()
}
SaveIncompleteSnapshot(localSnapshot)
})
}
}
if fileReader.CurrentFile != nil {
LOG_TRACE("PACK_START", "Packing %s", fileReader.CurrentEntry.Path)
@@ -398,8 +488,11 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
chunkUploader.StartChunk(chunk, chunkIndex)
}
// Must lock it because the RunAtError function called by other threads may access these two slices
uploadedChunkLock.Lock()
uploadedChunkHashes = append(uploadedChunkHashes, hash)
uploadedChunkLengths = append(uploadedChunkLengths, chunkSize)
uploadedChunkLock.Unlock()
},
func (fileSize int64, hash string) (io.Reader, bool) {
@@ -445,6 +538,8 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
localSnapshot.ChunkLengths = uploadedChunkLengths
}
localSnapshotReady = true
localSnapshot.EndTime = time.Now().Unix()
err = manager.SnapshotManager.CheckSnapshot(localSnapshot)
@@ -455,10 +550,15 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
localSnapshot.Tag = tag
localSnapshot.Options = ""
if !quickMode {
if !quickMode || remoteSnapshot.Revision == 0 {
localSnapshot.Options = "-hash"
}
if _, found := os.LookupEnv("DUPLICACY_FAIL_SNAPSHOT"); found {
LOG_ERROR("SNAPSHOT_FAIL", "Artificially fail the backup for testing purposes")
return false
}
if shadowCopy {
if localSnapshot.Options == "" {
localSnapshot.Options = "-vss"
@@ -505,6 +605,8 @@ func (manager *BackupManager) Backup(top string, quickMode bool, threads int, ta
manager.SnapshotManager.CleanSnapshotCache(localSnapshot, nil)
LOG_INFO("BACKUP_END", "Backup for %s at revision %d completed", top, localSnapshot.Revision)
RunAtError = func() {}
RemoveIncompleteSnapshot()
totalSnapshotChunks := len(localSnapshot.FileSequence) + len(localSnapshot.ChunkSequence) +
len(localSnapshot.LengthSequence)
@@ -981,7 +1083,7 @@ func (manager *BackupManager) RestoreFile(chunkDownloader *ChunkDownloader, chun
var existingFile, newFile *os.File
var err error
preferencePath := GetDuplicacyPreferencePath(top)
preferencePath := GetDuplicacyPreferencePath()
temporaryPath := path.Join(preferencePath, "temporary")
fullPath := joinPath(top, entry.Path)