add -persist in check and restore mode (for PR)

This commit is contained in:
Tet Woo Lee
2020-05-06 18:39:52 +12:00
parent 51cbf73caa
commit 4ae16dec7f
9 changed files with 535 additions and 69 deletions

View File

@@ -36,6 +36,7 @@ type ChunkDownloader struct {
snapshotCache *FileStorage // Used as cache if not nil; usually for downloading snapshot chunks
showStatistics bool // Show a stats log for each chunk if true
threads int // Number of threads
allowFailures bool // Whether to failfast on download error, or continue
taskList []ChunkDownloadTask // The list of chunks to be downloaded
completedTasks map[int]bool // Store downloaded chunks
@@ -53,13 +54,14 @@ type ChunkDownloader struct {
numberOfActiveChunks int // The number of chunks that is being downloaded or has been downloaded but not reclaimed
}
func CreateChunkDownloader(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, threads int) *ChunkDownloader {
func CreateChunkDownloader(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, threads int, allowFailures bool) *ChunkDownloader {
downloader := &ChunkDownloader{
config: config,
storage: storage,
snapshotCache: snapshotCache,
showStatistics: showStatistics,
threads: threads,
allowFailures: allowFailures,
taskList: nil,
completedTasks: make(map[int]bool),
@@ -357,13 +359,27 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// will be set up before the encryption
chunk.Reset(false)
// This lambda function allows different handling of failures depending on allowFailures state
onFailure := func(failureFn LogFunc, logID string, format string, v ...interface{}) {
if downloader.allowFailures {
// Allowing failures: Convert message to warning, mark chunk isBroken = true and complete goroutine
LOG_WARN(logID, format, v...)
chunk.isBroken = true
downloader.completionChannel <- ChunkDownloadCompletion{chunk: chunk, chunkIndex: task.chunkIndex}
} else {
// Process failure as normal
failureFn(logID, format, v...)
}
}
const MaxDownloadAttempts = 3
for downloadAttempt := 0; ; downloadAttempt++ {
// Find the chunk by ID first.
chunkPath, exist, _, err := downloader.storage.FindChunk(threadIndex, chunkID, false)
if err != nil {
LOG_ERROR("DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return false
}
@@ -371,7 +387,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// No chunk is found. Have to find it in the fossil pool again.
fossilPath, exist, _, err := downloader.storage.FindChunk(threadIndex, chunkID, true)
if err != nil {
LOG_ERROR("DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return false
}
@@ -395,9 +411,9 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// A chunk is not found. This is a serious error and hopefully it will never happen.
if err != nil {
LOG_FATAL("DOWNLOAD_CHUNK", "Chunk %s can't be found: %v", chunkID, err)
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Chunk %s can't be found: %v", chunkID, err)
} else {
LOG_FATAL("DOWNLOAD_CHUNK", "Chunk %s can't be found", chunkID)
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Chunk %s can't be found", chunkID)
}
return false
}
@@ -406,7 +422,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// downloading again.
err = downloader.storage.MoveFile(threadIndex, fossilPath, chunkPath)
if err != nil {
LOG_FATAL("DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
return false
}
@@ -423,7 +439,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
LOG_ERROR("DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
return false
}
}
@@ -435,7 +451,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
LOG_ERROR("DOWNLOAD_DECRYPT", "Failed to decrypt the chunk %s: %v", chunkID, err)
onFailure(LOG_ERROR, "DOWNLOAD_DECRYPT", "Failed to decrypt the chunk %s: %v", chunkID, err)
return false
}
}
@@ -447,7 +463,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
LOG_FATAL("DOWNLOAD_CORRUPTED", "The chunk %s has a hash id of %s", chunkID, actualChunkID)
onFailure(LOG_FATAL, "DOWNLOAD_CORRUPTED", "The chunk %s has a hash id of %s", chunkID, actualChunkID)
return false
}
}