From 00707ec00fb8016ac9ef8858a01a9784a6aee1a0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 Jan 2021 19:01:58 -0800 Subject: [PATCH] mount: outsideContainerClusterMode proxy through filer Running mount outside of the cluster would not need to expose all the volume servers to outside of the cluster. The chunk read and write will go through the filer. --- weed/command/mount_std.go | 1 + weed/filer/reader_at.go | 6 +- weed/filesys/dir.go | 5 -- weed/filesys/file.go | 5 +- weed/filesys/filehandle.go | 6 +- weed/filesys/wfs.go | 13 +++ weed/filesys/wfs_deletion.go | 84 ------------------- weed/filesys/wfs_filer_client.go | 7 -- weed/filesys/wfs_write.go | 5 +- weed/messaging/broker/broker_append.go | 4 - weed/operation/delete_content.go | 3 +- weed/pb/filer_pb/filer_client.go | 1 - .../replication/sink/filersink/fetch_write.go | 3 - weed/replication/source/filer_source.go | 4 - weed/s3api/s3api_handlers.go | 3 - weed/server/webdav_server.go | 5 +- weed/shell/commands.go | 4 - 17 files changed, 28 insertions(+), 131 deletions(-) delete mode 100644 weed/filesys/wfs_deletion.go diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 9e955e344..a95ecd567 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -169,6 +169,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ + FilerAddress: filer, FilerGrpcAddress: filerGrpcAddress, GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 307224f35..41d177210 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -71,7 +71,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } for _, loc := range locations.Locations { - volumeServerAddress := filerClient.AdjustedUrl(loc) + volumeServerAddress := loc.Url targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) targetUrls = append(targetUrls, targetUrl) } @@ -85,11 +85,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), + lookupFileId: lookupFn, chunkCache: chunkCache, fileSize: fileSize, } diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index d86d92ac9..3d0a00a8b 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -404,11 +404,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode() delete(dir.wfs.handles, inodeId) - // delete the chunks last - if isDeleteData { - dir.wfs.deleteFileChunks(entry.Chunks) - } - return nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a2b6660d8..a8d6dac29 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -147,9 +147,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } file.entry.Chunks = chunks - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.reader = nil - file.wfs.deleteFileChunks(truncatedChunks) } file.entry.Attributes.FileSize = req.Size file.dirtyMetadata = true @@ -329,7 +328,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 6225ab968..da42ae562 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } @@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if fh.f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) - fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize) + fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } totalRead, err := fh.f.reader.ReadAt(buff, offset) @@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) - chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) + chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index cd14e8032..236ecdacb 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -3,6 +3,8 @@ package filesys import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/wdclient" "math" "os" "path" @@ -24,6 +26,7 @@ import ( ) type Option struct { + FilerAddress string FilerGrpcAddress string GrpcDialOption grpc.DialOption FilerMountRootPath string @@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { } entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) } + +func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { + if wfs.option.OutsideContainerClusterMode { + return func(fileId string) (targetUrls []string, err error) { + return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil + } + } + return filer.LookupFn(wfs) + +} diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go deleted file mode 100644 index a245b6795..000000000 --- a/weed/filesys/wfs_deletion.go +++ /dev/null @@ -1,84 +0,0 @@ -package filesys - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { - if len(chunks) == 0 { - return - } - - var fileIds []string - for _, chunk := range chunks { - if !chunk.IsChunkManifest { - fileIds = append(fileIds, chunk.GetFileIdString()) - continue - } - dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk) - if manifestResolveErr != nil { - glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) - } - for _, dChunk := range dataChunks { - fileIds = append(fileIds, dChunk.GetFileIdString()) - } - fileIds = append(fileIds, chunk.GetFileIdString()) - } - - wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) - return nil - }) -} - -func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { - - var vids []string - for _, fileId := range fileIds { - vids = append(vids, filer.VolumeId(fileId)) - } - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { - - m := make(map[string]operation.LookupResult) - - glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return m, err - } - - for _, vid := range vids { - lr := operation.LookupResult{ - VolumeId: vid, - Locations: nil, - } - locations, found := resp.LocationsMap[vid] - if !found { - continue - } - for _, loc := range locations.Locations { - lr.Locations = append(lr.Locations, operation.Location{ - Url: wfs.AdjustedUrl(loc), - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = lr - } - - return m, err - } - - _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) - - return err -} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index ef4213af1..e0d352a7b 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro return err } - -func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if wfs.option.OutsideContainerClusterMode { - return location.PublicUrl - } - return location.Url -} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 83e40e7f5..dfe6e57a6 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun Url: resp.Url, PublicUrl: resp.PublicUrl, } - host = wfs.AdjustedUrl(loc) + host = loc.Url collection, replication = resp.Collection, resp.Replication return nil @@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.OutsideContainerClusterMode { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) + } uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 8e5b56fd0..67c9bcb79 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -107,7 +107,3 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient return } - -func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 9868a411d..65baaddf2 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,12 +4,11 @@ import ( "context" "errors" "fmt" + "google.golang.org/grpc" "net/http" "strings" "sync" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 079fbd671..7198de95c 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -20,7 +20,6 @@ var ( type FilerClient interface { WithFilerClient(fn func(SeaweedFilerClient) error) error - AdjustedUrl(location *Location) string } func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index b062adcfe..544b84995 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -128,6 +128,3 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) }, fs.grpcAddress, fs.grpcDialOption) } -func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 3982360b0..eff1da8dc 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -124,10 +124,6 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro } -func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} - func volumeId(fileId string) string { lastCommaIndex := strings.LastIndex(fileId, ",") if lastCommaIndex > 0 { diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 6935c75bd..57b26f3dd 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -50,9 +50,6 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) } -func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} // If none of the http routes match respond with MethodNotAllowed func notFoundHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2b238e534..5bd92a136 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -123,9 +123,6 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } -func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") @@ -523,7 +520,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) - f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) } readSize, err = f.reader.ReadAt(p, f.off) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 0e285214b..6e1348ca5 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -102,10 +102,6 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error } -func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} - func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(entryPath, "http") { var u *url.URL