Follow-up changes for the -persist PR

* Restore/check should report an error instead of a success at the end if there
  were any errors and -persist is specified
* Don't compute the file hash before passing the file to the chunk maker; this is
  redundant as the chunk maker will produce the file hash
* Add a LOG_WERROR function to switch between LOG_WARN and LOG_ERROR dynamically
This commit is contained in:
Gilbert Chen
2020-09-18 11:23:35 -04:00
parent eecbb8fa99
commit 16d2c14c5a
6 changed files with 139 additions and 147 deletions

View File

@@ -52,6 +52,8 @@ type ChunkDownloader struct {
numberOfDownloadedChunks int // The number of chunks that have been downloaded
numberOfDownloadingChunks int // The number of chunks still being downloaded
numberOfActiveChunks int // The number of chunks that is being downloaded or has been downloaded but not reclaimed
NumberOfFailedChunks int // The number of chunks that can't be downloaded
}
func CreateChunkDownloader(config *Config, storage Storage, snapshotCache *FileStorage, showStatistics bool, threads int, allowFailures bool) *ChunkDownloader {
@@ -279,6 +281,9 @@ func (downloader *ChunkDownloader) WaitForCompletion() {
downloader.numberOfActiveChunks--
downloader.numberOfDownloadedChunks++
downloader.numberOfDownloadingChunks--
if completion.chunk.isBroken {
downloader.NumberOfFailedChunks++
}
}
// Pass the tasks one by one to the download queue
@@ -305,7 +310,10 @@ func (downloader *ChunkDownloader) Stop() {
downloader.taskList[completion.chunkIndex].chunk = completion.chunk
downloader.numberOfDownloadedChunks++
downloader.numberOfDownloadingChunks--
}
if completion.chunk.isBroken {
downloader.NumberOfFailedChunks++
}
}
for i := range downloader.completedTasks {
downloader.config.PutChunk(downloader.taskList[i].chunk)
@@ -359,17 +367,11 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// will be set up before the encryption
chunk.Reset(false)
// This lambda function allows different handling of failures depending on allowFailures state
onFailure := func(failureFn LogFunc, logID string, format string, v ...interface{}) {
// If failures are allowed, complete the task properly
completeFailedChunk := func(chunk *Chunk) {
if downloader.allowFailures {
// Allowing failures: Convert message to warning, mark chunk isBroken = true and complete goroutine
LOG_WARN(logID, format, v...)
chunk.isBroken = true
downloader.completionChannel <- ChunkDownloadCompletion{chunk: chunk, chunkIndex: task.chunkIndex}
} else {
// Process failure as normal
failureFn(logID, format, v...)
}
}
@@ -379,7 +381,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// Find the chunk by ID first.
chunkPath, exist, _, err := downloader.storage.FindChunk(threadIndex, chunkID, false)
if err != nil {
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return false
}
@@ -387,7 +390,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// No chunk is found. Have to find it in the fossil pool again.
fossilPath, exist, _, err := downloader.storage.FindChunk(threadIndex, chunkID, true)
if err != nil {
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Failed to find the chunk %s: %v", chunkID, err)
return false
}
@@ -409,11 +413,12 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
continue
}
completeFailedChunk(chunk)
// A chunk is not found. This is a serious error and hopefully it will never happen.
if err != nil {
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Chunk %s can't be found: %v", chunkID, err)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Chunk %s can't be found: %v", chunkID, err)
} else {
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Chunk %s can't be found", chunkID)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Chunk %s can't be found", chunkID)
}
return false
}
@@ -422,7 +427,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
// downloading again.
err = downloader.storage.MoveFile(threadIndex, fossilPath, chunkPath)
if err != nil {
onFailure(LOG_FATAL, "DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Failed to resurrect chunk %s: %v", chunkID, err)
return false
}
@@ -439,7 +445,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
onFailure(LOG_ERROR, "DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
return false
}
}
@@ -451,7 +458,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
onFailure(LOG_ERROR, "DOWNLOAD_DECRYPT", "Failed to decrypt the chunk %s: %v", chunkID, err)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_DECRYPT", "Failed to decrypt the chunk %s: %v", chunkID, err)
return false
}
}
@@ -463,7 +471,8 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)
continue
} else {
onFailure(LOG_FATAL, "DOWNLOAD_CORRUPTED", "The chunk %s has a hash id of %s", chunkID, actualChunkID)
completeFailedChunk(chunk)
LOG_WERROR(downloader.allowFailures, "DOWNLOAD_CORRUPTED", "The chunk %s has a hash id of %s", chunkID, actualChunkID)
return false
}
}