1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-01 06:40:45 +02:00

better support FUSE Lookup()

This commit is contained in:
Chris Lu 2018-05-05 02:01:50 -07:00
parent 050ab19264
commit fffb14bc87
12 changed files with 117 additions and 76 deletions

View file

@ -59,27 +59,27 @@ func (WFS) Root() (fs.Node, error) {
var fileIdMap = make(map[uint64]filer.FileId) var fileIdMap = make(map[uint64]filer.FileId)
type Dir struct { type Dir struct {
Id uint64
Path string Path string
DirentMap map[string]*fuse.Dirent DirentMap map[string]*fuse.Dirent
} }
func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
attr.Inode = dir.Id
attr.Mode = os.ModeDir | 0555 attr.Mode = os.ModeDir | 0555
return nil return nil
} }
func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) { func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
if dirent, ok := dir.DirentMap[name]; ok { if entry, err := filer.LookupDirectoryEntry(*mountOptions.filer, dir.Path, name); err == nil {
if dirent.Type == fuse.DT_File { if !entry.Found {
return &File{Id: dirent.Inode, FileId: fileIdMap[dirent.Inode], Name: dirent.Name}, nil return nil, fuse.ENOENT
}
if entry.FileId != "" {
return &File{FileId: filer.FileId(entry.FileId), Name: name}, nil
} else {
return &Dir{Path: path.Join(dir.Path, name)}, nil
} }
return &Dir{
Id: dirent.Inode,
Path: path.Join(dir.Path, dirent.Name),
}, nil
} }
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
@ -90,17 +90,16 @@ func (dir *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
} }
if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil { if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil {
for _, d := range dirs.Directories { for _, d := range dirs.Directories {
dirId := uint64(d.Id) dirent := fuse.Dirent{Name: string(d), Type: fuse.DT_Dir}
dirent := fuse.Dirent{Inode: dirId, Name: d.Name, Type: fuse.DT_Dir}
ret = append(ret, dirent) ret = append(ret, dirent)
dir.DirentMap[d.Name] = &dirent dir.DirentMap[string(d)] = &dirent
} }
} }
if files, e := filer.ListFiles(*mountOptions.filer, dir.Path, ""); e == nil { if files, e := filer.ListFiles(*mountOptions.filer, dir.Path, ""); e == nil {
for _, f := range files.Files { for _, f := range files.Files {
if fileId, e := storage.ParseFileId(string(f.Id)); e == nil { if fileId, e := storage.ParseFileId(string(f.Id)); e == nil {
fileInode := uint64(fileId.VolumeId)<<48 + fileId.Key fileInode := uint64(fileId.VolumeId)<<48 + fileId.Key
dirent := fuse.Dirent{Inode: fileInode, Name: f.Name, Type: fuse.DT_File} dirent := fuse.Dirent{Name: f.Name, Type: fuse.DT_File}
ret = append(ret, dirent) ret = append(ret, dirent)
dir.DirentMap[f.Name] = &dirent dir.DirentMap[f.Name] = &dirent
fileIdMap[fileInode] = f.Id fileIdMap[fileInode] = f.Id
@ -120,13 +119,11 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
} }
type File struct { type File struct {
Id uint64
FileId filer.FileId FileId filer.FileId
Name string Name string
} }
func (file *File) Attr(context context.Context, attr *fuse.Attr) error { func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
attr.Inode = file.Id
attr.Mode = 0444 attr.Mode = 0444
ret, err := filer.GetFileSize(*mountOptions.filer, string(file.FileId)) ret, err := filer.GetFileSize(*mountOptions.filer, string(file.FileId))
if err == nil { if err == nil {

View file

@ -65,7 +65,7 @@ func GetFileContent(server string, fileId string) (ret *GetFileContentResult, er
} }
type ListDirectoriesResult struct { type ListDirectoriesResult struct {
Directories []DirectoryEntry Directories []DirectoryName
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@ -80,6 +80,23 @@ func ListDirectories(server string, directory string) (ret *ListDirectoriesResul
return nil, err return nil, err
} }
type LookupDirectoryEntryResult struct {
Found bool
FileId string
Error string `json:"error,omitempty"`
}
func LookupDirectoryEntry(server string, directory string, name string) (ret *LookupDirectoryEntryResult, err error) {
ret = new(LookupDirectoryEntryResult)
if err := call(server, ApiRequest{Command: "lookupDirectoryEntry", Directory: directory, FileName: name}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
func DeleteDirectoryOrFile(server string, path string, isDir bool) error { func DeleteDirectoryOrFile(server string, path string, isDir bool) error {
destUrl := fmt.Sprintf("http://%s%s", server, path) destUrl := fmt.Sprintf("http://%s%s", server, path)
if isDir { if isDir {

View file

@ -5,11 +5,11 @@ import (
) )
type DirectoryManager interface { type DirectoryManager interface {
FindDirectory(dirPath string) (filer.DirectoryId, error) FindDirectory(dirPath string) (DirectoryId, error)
ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error)
MakeDirectory(currentDirPath string, dirName string) (filer.DirectoryId, error) MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error)
MoveUnderDirectory(oldDirPath string, newParentDirPath string) error MoveUnderDirectory(oldDirPath string, newParentDirPath string) error
DeleteDirectory(dirPath string) error DeleteDirectory(dirPath string) error
//functions used by FUSE //functions used by FUSE
FindDirectoryById(filer.DirectoryId, error) FindDirectoryById(DirectoryId, error)
} }

View file

@ -16,12 +16,14 @@ import (
var writeLock sync.Mutex //serialize changes to dir.log var writeLock sync.Mutex //serialize changes to dir.log
type DirectoryId int32
type DirectoryEntryInMap struct { type DirectoryEntryInMap struct {
sync.Mutex sync.Mutex
Name string Name string
Parent *DirectoryEntryInMap Parent *DirectoryEntryInMap
subDirectories map[string]*DirectoryEntryInMap subDirectories map[string]*DirectoryEntryInMap
Id filer.DirectoryId Id DirectoryId
} }
func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) { func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) {
@ -45,18 +47,18 @@ func (de *DirectoryEntryInMap) hasChildren() bool {
defer de.Unlock() defer de.Unlock()
return len(de.subDirectories) > 0 return len(de.subDirectories) > 0
} }
func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryEntry) { func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryName) {
de.Lock() de.Lock()
defer de.Unlock() defer de.Unlock()
for k, v := range de.subDirectories { for k, _ := range de.subDirectories {
dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) dirNames = append(dirNames, filer.DirectoryName(k))
} }
return dirNames return dirNames
} }
type DirectoryManagerInMap struct { type DirectoryManagerInMap struct {
Root *DirectoryEntryInMap Root *DirectoryEntryInMap
max filer.DirectoryId max DirectoryId
logFile *os.File logFile *os.File
isLoading bool isLoading bool
} }
@ -113,7 +115,7 @@ func (dm *DirectoryManagerInMap) processEachLine(line string) error {
if pe != nil { if pe != nil {
return pe return pe
} }
if e := dm.loadDirectory(parts[1], filer.DirectoryId(v)); e != nil { if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil {
return e return e
} }
case "mov": case "mov":
@ -172,7 +174,7 @@ func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryI
} }
return dir, nil return dir, nil
} }
func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryId, error) { func (dm *DirectoryManagerInMap) findDirectoryId(dirPath string) (DirectoryId, error) {
d, e := dm.findDirectory(dirPath) d, e := dm.findDirectory(dirPath)
if e == nil { if e == nil {
return d.Id, nil return d.Id, nil
@ -180,7 +182,7 @@ func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryI
return dm.Root.Id, e return dm.Root.Id, e
} }
func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.DirectoryId) error { func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error {
dirPath = CleanFilePath(dirPath) dirPath = CleanFilePath(dirPath)
if dirPath == "/" { if dirPath == "/" {
return nil return nil
@ -248,7 +250,7 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn
return dir, created return dir, created
} }
func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (filer.DirectoryId, error) { func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) {
dir, _ := dm.makeDirectory(dirPath) dir, _ := dm.makeDirectory(dirPath)
return dir.Id, nil return dir.Id, nil
} }
@ -275,7 +277,7 @@ func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParent
return nil return nil
} }
func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryName, err error) {
d, e := dm.findDirectory(dirPath) d, e := dm.findDirectory(dirPath)
if e != nil { if e != nil {
return dirNames, e return dirNames, e

View file

@ -19,17 +19,17 @@ func TestDirectory(t *testing.T) {
dm.MakeDirectory("/a/b/e/f") dm.MakeDirectory("/a/b/e/f")
dm.MakeDirectory("/a/b/e/f/g") dm.MakeDirectory("/a/b/e/f/g")
dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t") dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t")
if _, err := dm.FindDirectory("/a/b/e/f/g"); err == nil { if _, err := dm.findDirectoryId("/a/b/e/f/g"); err == nil {
t.Fatal("/a/b/e/f/g should not exist any more after moving") t.Fatal("/a/b/e/f/g should not exist any more after moving")
} }
if _, err := dm.FindDirectory("/a/b/t"); err != nil { if _, err := dm.findDirectoryId("/a/b/t"); err != nil {
t.Fatal("/a/b/t should exist after moving") t.Fatal("/a/b/t should exist after moving")
} }
if _, err := dm.FindDirectory("/a/b/g"); err == nil { if _, err := dm.findDirectoryId("/a/b/g"); err == nil {
t.Fatal("/a/b/g should not exist after moving") t.Fatal("/a/b/g should not exist after moving")
} }
dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "") dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "")
if _, err := dm.FindDirectory("/a/b/f"); err != nil { if _, err := dm.findDirectoryId("/a/b/f"); err != nil {
t.Fatal("/a/b/g should not exist after moving") t.Fatal("/a/b/g should not exist after moving")
} }
dm.MakeDirectory("/a/b/g/h/i") dm.MakeDirectory("/a/b/g/h/i")

View file

@ -45,27 +45,37 @@ func (filer *FilerEmbedded) CreateFile(filePath string, fid string) (err error)
} }
func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) {
dir, file := filepath.Split(filePath) dir, file := filepath.Split(filePath)
dirId, e := filer.directories.FindDirectory(dir) return filer.findFileEntry(dir, file)
}
func (filer *FilerEmbedded) findFileEntry(parentPath string, fileName string) (fid string, err error) {
dirId, e := filer.directories.findDirectoryId(parentPath)
if e != nil { if e != nil {
return "", e return "", e
} }
return filer.files.FindFile(dirId, file) return filer.files.FindFile(dirId, fileName)
} }
func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return filer.directories.FindDirectory(dirPath) func (filer *FilerEmbedded) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
if _, err = filer.directories.findDirectory(filepath.Join(dirPath, name)); err == nil {
return true, "", nil
}
if fileId, err = filer.findFileEntry(dirPath, name); err == nil {
return true, fileId, nil
}
return false, "", err
} }
func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
return filer.directories.ListDirectories(dirPath) return filer.directories.ListDirectories(dirPath)
} }
func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
dirId, e := filer.directories.FindDirectory(dirPath) dirId, e := filer.directories.findDirectoryId(dirPath)
if e != nil { if e != nil {
return nil, e return nil, e
} }
return filer.files.ListFiles(dirId, lastFileName, limit), nil return filer.files.ListFiles(dirId, lastFileName, limit), nil
} }
func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) { func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) {
dirId, e := filer.directories.FindDirectory(dirPath) dirId, e := filer.directories.findDirectoryId(dirPath)
if e != nil { if e != nil {
return e return e
} }
@ -74,7 +84,7 @@ func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err
return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs)) return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs))
} }
for _, sub := range sub_dirs { for _, sub := range sub_dirs {
if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, sub.Name), recursive); delete_sub_err != nil { if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, string(sub)), recursive); delete_sub_err != nil {
return delete_sub_err return delete_sub_err
} }
} }
@ -108,7 +118,7 @@ func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err
func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) { func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) {
dir, file := filepath.Split(filePath) dir, file := filepath.Split(filePath)
dirId, e := filer.directories.FindDirectory(dir) dirId, e := filer.directories.findDirectoryId(dir)
if e != nil { if e != nil {
return "", e return "", e
} }
@ -126,8 +136,8 @@ func (filer *FilerEmbedded) Move(fromPath string, toPath string) error {
filer.mvMutex.Lock() filer.mvMutex.Lock()
defer filer.mvMutex.Unlock() defer filer.mvMutex.Unlock()
if _, dir_err := filer.FindDirectory(fromPath); dir_err == nil { if _, dir_err := filer.directories.findDirectoryId(fromPath); dir_err == nil {
if _, err := filer.FindDirectory(toPath); err == nil { if _, err := filer.directories.findDirectoryId(toPath); err == nil {
// move folder under an existing folder // move folder under an existing folder
return filer.directories.MoveUnderDirectory(fromPath, toPath, "") return filer.directories.MoveUnderDirectory(fromPath, toPath, "")
} }
@ -135,7 +145,7 @@ func (filer *FilerEmbedded) Move(fromPath string, toPath string) error {
return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath))
} }
if fid, file_err := filer.DeleteFile(fromPath); file_err == nil { if fid, file_err := filer.DeleteFile(fromPath); file_err == nil {
if _, err := filer.FindDirectory(toPath); err == nil { if _, err := filer.directories.findDirectoryId(toPath); err == nil {
// move file under an existing folder // move file under an existing folder
return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid)
} }

View file

@ -28,7 +28,7 @@ func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) {
return return
} }
func genKey(dirId filer.DirectoryId, fileName string) []byte { func genKey(dirId DirectoryId, fileName string) []byte {
ret := make([]byte, 0, 4+len(fileName)) ret := make([]byte, 0, 4+len(fileName))
for i := 3; i >= 0; i-- { for i := 3; i >= 0; i-- {
ret = append(ret, byte(dirId>>(uint(i)*8))) ret = append(ret, byte(dirId>>(uint(i)*8)))
@ -37,11 +37,11 @@ func genKey(dirId filer.DirectoryId, fileName string) []byte {
return ret return ret
} }
func (fl *FileListInLevelDb) CreateFile(dirId filer.DirectoryId, fileName string, fid string) (err error) { func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) {
glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid)
return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil)
} }
func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) {
if fid, err = fl.FindFile(dirId, fileName); err != nil { if fid, err = fl.FindFile(dirId, fileName); err != nil {
if err == leveldb.ErrNotFound { if err == leveldb.ErrNotFound {
return "", nil return "", nil
@ -51,7 +51,7 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
err = fl.db.Delete(genKey(dirId, fileName), nil) err = fl.db.Delete(genKey(dirId, fileName), nil)
return fid, err return fid, err
} }
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil) data, e := fl.db.Get(genKey(dirId, fileName), nil)
if e == leveldb.ErrNotFound { if e == leveldb.ErrNotFound {
return "", filer.ErrNotFound return "", filer.ErrNotFound
@ -60,7 +60,7 @@ func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string)
} }
return string(data), nil return string(data), nil
} }
func (fl *FileListInLevelDb) ListFiles(dirId filer.DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) { func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) {
glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit)
dirKey := genKey(dirId, "") dirKey := genKey(dirId, "")
iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil)

View file

@ -11,12 +11,7 @@ type FileEntry struct {
Id FileId `json:"fid,omitempty"` Id FileId `json:"fid,omitempty"`
} }
type DirectoryId int32 type DirectoryName string
type DirectoryEntry struct {
Name string //dir name without path
Id DirectoryId
}
type Filer interface { type Filer interface {
CreateFile(fullFileName string, fid string) (err error) CreateFile(fullFileName string, fid string) (err error)
@ -24,11 +19,11 @@ type Filer interface {
DeleteFile(fullFileName string) (fid string, err error) DeleteFile(fullFileName string) (fid string, err error)
//Optional functions. embedded filer support these //Optional functions. embedded filer support these
FindDirectory(dirPath string) (dirId DirectoryId, err error) ListDirectories(dirPath string) (dirs []DirectoryName, err error)
ListDirectories(dirPath string) (dirs []DirectoryEntry, err error)
ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error)
DeleteDirectory(dirPath string, recursive bool) (err error) DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error) Move(fromPath string, toPath string) (err error)
LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error)
} }
var ErrNotFound = errors.New("filer: no entry is found in filer store") var ErrNotFound = errors.New("filer: no entry is found in filer store")

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"path/filepath"
) )
type FlatNamespaceFiler struct { type FlatNamespaceFiler struct {
@ -28,10 +29,13 @@ func (filer *FlatNamespaceFiler) CreateFile(fullFileName string, fid string) (er
func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) { func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) {
return filer.store.Get(fullFileName) return filer.store.Get(fullFileName)
} }
func (filer *FlatNamespaceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { func (filer *FlatNamespaceFiler) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
return 0, ErrNotImplemented if fileId, err = filer.FindFile(filepath.Join(dirPath, name)); err == nil {
return true, fileId, nil
}
return false, "", err
} }
func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
return nil, ErrNotImplemented return nil, ErrNotImplemented
} }
func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {

View file

@ -14,6 +14,8 @@ import (
"strings" "strings"
) )
type DirectoryId int32
func databaseExists(db *sql.DB, databaseName string) (bool, error) { func databaseExists(db *sql.DB, databaseName string) (bool, error) {
sqlStatement := "SELECT datname from pg_database WHERE datname='%s'" sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName)) row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
@ -293,13 +295,13 @@ func (s *PostgresStore) delete(uriPath string) error {
return nil return nil
} }
func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) { func (s *PostgresStore) lookupDirectory(dirPath string) (DirectoryId, string, error) {
directoryRoot, directoryName := s.mySplitPath(dirPath) directoryRoot, directoryName := s.mySplitPath(dirPath)
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName) row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
var id filer.DirectoryId var id DirectoryId
var dirRoot string var dirRoot string
var dirName string var dirName string
err := row.Scan(&id, &dirRoot, &dirName) err := row.Scan(&id, &dirRoot, &dirName)
@ -312,7 +314,7 @@ func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, stri
return id, filepath.Join(dirRoot, dirName), err return id, filepath.Join(dirRoot, dirName), err
} }
func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) { func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryName, err error) {
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
rows, err := s.db.Query(sqlStatement, dirPath, limit) rows, err := s.db.Query(sqlStatement, dirPath, limit)
@ -323,7 +325,7 @@ func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer
if rows != nil { if rows != nil {
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var id filer.DirectoryId var id DirectoryId
var directoryRoot string var directoryRoot string
var directoryName string var directoryName string
@ -331,7 +333,7 @@ func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer
if scanErr != nil { if scanErr != nil {
err = scanErr err = scanErr
} }
dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id}) dirs = append(dirs, filer.DirectoryName(directoryName))
} }
} }
return return
@ -344,7 +346,7 @@ func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bo
sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
row := s.db.QueryRow(sqlStatement, dirPath+"%") row := s.db.QueryRow(sqlStatement, dirPath+"%")
var id filer.DirectoryId var id DirectoryId
err := row.Scan(&id) err := row.Scan(&id)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {

View file

@ -10,6 +10,7 @@ import (
_ "github.com/lib/pq" _ "github.com/lib/pq"
_ "path/filepath" _ "path/filepath"
"path/filepath"
) )
const ( const (
@ -74,6 +75,17 @@ func (s *PostgresStore) FindFile(fullFilePath string) (fid string, err error) {
return fid, err return fid, err
} }
func (s *PostgresStore) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
fullPath := filepath.Join(dirPath, name)
if fileId, err = s.FindFile(fullPath); err == nil {
return true, fileId, nil
}
if _, _, err := s.lookupDirectory(fullPath); err == nil {
return true, "", err
}
return false, "", err
}
func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) { func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) {
if err != nil { if err != nil {
return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
@ -90,14 +102,9 @@ func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error)
} }
} }
func (s *PostgresStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
dirId, _, err = s.lookupDirectory(dirPath)
return dirId, err
}
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { dirs, err = s.findDirectories(dirPath, 1000)
dirs, err = s.findDirectories(dirPath, 20)
glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath) glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath)

View file

@ -21,6 +21,13 @@ func (fs *FilerServer) apiHandler(w http.ResponseWriter, r *http.Request) {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} }
switch apiRequest.Command { switch apiRequest.Command {
case "lookupDirectoryEntry":
res := filer.LookupDirectoryEntryResult{}
res.Found, res.FileId, err = fs.filer.LookupDirectoryEntry(apiRequest.Directory, apiRequest.FileName)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "listDirectories": case "listDirectories":
res := filer.ListDirectoriesResult{} res := filer.ListDirectoriesResult{}
res.Directories, err = fs.filer.ListDirectories(apiRequest.Directory) res.Directories, err = fs.filer.ListDirectories(apiRequest.Directory)
@ -30,7 +37,7 @@ func (fs *FilerServer) apiHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusOK, res) writeJsonQuiet(w, r, http.StatusOK, res)
case "listFiles": case "listFiles":
res := filer.ListFilesResult{} res := filer.ListFilesResult{}
res.Files, err = fs.filer.ListFiles(apiRequest.Directory, apiRequest.FileName, 100) res.Files, err = fs.filer.ListFiles(apiRequest.Directory, apiRequest.FileName, 1000)
if err != nil { if err != nil {
res.Error = err.Error() res.Error = err.Error()
} }