Implement new chunk directory structure

This commit is contained in:
Gilbert Chen
2017-11-07 12:05:39 -05:00
parent 7e1fb6130a
commit 86767b3df6
22 changed files with 663 additions and 606 deletions

View File

@@ -12,6 +12,7 @@ import (
"os"
"path"
"runtime"
"strings"
"time"
"github.com/pkg/sftp"
@@ -19,10 +20,10 @@ import (
)
type SFTPStorage struct {
RateLimitedStorage
StorageBase
client *sftp.Client
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
storageDir string
numberOfThreads int
}
@@ -45,18 +46,18 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
hostKeyCallback func(hostname string, remote net.Addr,
key ssh.PublicKey) error, threads int) (storage *SFTPStorage, err error) {
config := &ssh.ClientConfig{
sftpConfig := &ssh.ClientConfig{
User: username,
Auth: authMethods,
HostKeyCallback: hostKeyCallback,
}
if server == "sftp.hidrive.strato.com" {
config.Ciphers = []string{"aes128-cbc", "aes128-ctr", "aes256-ctr"}
sftpConfig.Ciphers = []string{"aes128-cbc", "aes128-ctr", "aes256-ctr"}
}
serverAddress := fmt.Sprintf("%s:%d", server, port)
connection, err := ssh.Dial("tcp", serverAddress, config)
connection, err := ssh.Dial("tcp", serverAddress, sftpConfig)
if err != nil {
return nil, err
}
@@ -92,6 +93,8 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
runtime.SetFinalizer(storage, CloseSFTPStorage)
storage.DerivedStorage = storage
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, nil
}
@@ -176,64 +179,6 @@ func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist
return true, fileInfo.IsDir(), fileInfo.Size(), nil
}
// FindChunk finds the chunk with the specified id. If 'isFossil' is true, it will search for chunk files with
// the suffix '.fsl'.
func (storage *SFTPStorage) FindChunk(threadIndex int, chunkID string, isFossil bool) (filePath string, exist bool, size int64, err error) {
dir := path.Join(storage.storageDir, "chunks")
suffix := ""
if isFossil {
suffix = ".fsl"
}
for level := 0; level*2 < len(chunkID); level++ {
if level >= storage.minimumNesting {
filePath = path.Join(dir, chunkID[2*level:]) + suffix
if stat, err := storage.client.Stat(filePath); err == nil && !stat.IsDir() {
return filePath[len(storage.storageDir)+1:], true, stat.Size(), nil
} else if err == nil && stat.IsDir() {
return filePath[len(storage.storageDir)+1:], true, 0, fmt.Errorf("The path %s is a directory", filePath)
}
}
// Find the subdirectory the chunk file may reside.
subDir := path.Join(dir, chunkID[2*level:2*level+2])
stat, err := storage.client.Stat(subDir)
if err == nil && stat.IsDir() {
dir = subDir
continue
}
if level < storage.minimumNesting {
// Create the subdirectory if is doesn't exist.
if err == nil && !stat.IsDir() {
return "", false, 0, fmt.Errorf("The path %s is not a directory", subDir)
}
err = storage.client.Mkdir(subDir)
if err != nil {
// The directory may have been created by other threads so check it again.
stat, _ := storage.client.Stat(subDir)
if stat == nil || !stat.IsDir() {
return "", false, 0, fmt.Errorf("Failed to create the directory %s: %v", subDir, err)
}
}
dir = subDir
continue
}
// The chunk must be under this subdirectory but it doesn't exist.
return path.Join(dir, chunkID[2*level:])[len(storage.storageDir)+1:] + suffix, false, 0, nil
}
LOG_FATAL("CHUNK_FIND", "Chunk %s is still not found after having searched a maximum level of directories",
chunkID)
return "", false, 0, nil
}
// DownloadFile reads the file at 'filePath' into the chunk.
func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
file, err := storage.client.Open(path.Join(storage.storageDir, filePath))
@@ -255,6 +200,30 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
fullPath := path.Join(storage.storageDir, filePath)
dirs := strings.Split(filePath, "/")
if len(dirs) > 1 {
fullDir := path.Dir(fullPath)
_, err := storage.client.Stat(fullDir)
if err != nil {
// The error may be caused by a non-existent fullDir, or a broken connection. In either case,
// we just assume it is the former because there isn't a way to tell which is the case.
for i, _ := range dirs[1 : len(dirs)-1] {
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
// We don't check the error; just keep going blindly but always store the last err
err = storage.client.Mkdir(subDir)
}
// If there is an error creating the dirs, we check fullDir one more time, because another thread
// may happen to create the same fullDir ahead of this thread
if err != nil {
_, err := storage.client.Stat(fullDir)
if err != nil {
return err
}
}
}
}
letters := "abcdefghijklmnopqrstuvwxyz"
suffix := make([]byte, 8)
for i := range suffix {
@@ -301,7 +270,14 @@ func (storage *SFTPStorage) IsMoveFileImplemented() bool { return true }
func (storage *SFTPStorage) IsStrongConsistent() bool { return true }
// If the storage supports fast listing of files names.
func (storage *SFTPStorage) IsFastListing() bool { return false }
func (storage *SFTPStorage) IsFastListing() bool {
for _, level := range storage.readLevels {
if level > 1 {
return false
}
}
return true
}
// Enable the test mode.
func (storage *SFTPStorage) EnableTestMode() {}