mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-09 21:02:46 +02:00
OPTION A COMPLETE: Full production integration of ML optimization system ## Major Integration Components: ### 1. Command Line Interface - Add ML optimization flags to 'weed mount' command: * -ml.enabled: Enable/disable ML optimizations * -ml.prefetchWorkers: Configure concurrent prefetch workers (default: 8) * -ml.confidenceThreshold: Set ML confidence threshold (default: 0.6) * -ml.maxPrefetchAhead: Max chunks to prefetch ahead (default: 8) * -ml.batchSize: Batch size for prefetch operations (default: 3) - Updated command help text with ML Optimization section and usage examples - Complete flag parsing and validation pipeline ### 2. Core WFS Integration - Add MLIntegrationManager to WFS struct with proper lifecycle management - Initialize ML optimization based on mount flags with custom configuration - Integrate ML system shutdown with graceful cleanup on mount termination - Memory-safe initialization with proper error handling ### 3. FUSE Operation Hooks - **File Open (wfs.Open)**: Apply ML-specific optimizations (FOPEN_KEEP_CACHE, direct I/O) - **File Read (wfs.Read)**: Record access patterns for ML prefetch decision making - **File Close (wfs.Release)**: Update ML file tracking and cleanup resources - **Get Attributes (wfs.GetAttr)**: Apply ML-aware attribute cache timeouts - All hooks properly guarded with nil checks and enabled status validation ### 4. Configuration Management - Mount options propagated through Option struct to ML system - NewMLIntegrationManagerWithConfig for runtime configuration - Default fallbacks and validation for all ML parameters - Seamless integration with existing mount option processing ## Production Features: ✅ **Zero-Impact Design**: ML optimizations only activate when explicitly enabled ✅ **Backward Compatibility**: All existing mount functionality preserved ✅ **Resource Management**: Proper initialization, shutdown, and cleanup ✅ **Error Handling**: Graceful degradation if ML components fail ✅ **Performance Monitoring**: Integration points for metrics and debugging ✅ **Configuration Flexibility**: Runtime tunable parameters via mount flags ## Testing Verification: - ✅ Successful compilation of entire codebase - ✅ Mount command properly shows ML flags in help text - ✅ Flag parsing and validation working correctly - ✅ ML optimization system initializes when enabled - ✅ FUSE operations integrate ML hooks without breaking existing functionality ## Usage Examples: Basic ML optimization: backers.md bin build cmd CODE_OF_CONDUCT.md DESIGN.md docker examples filerldb2 go.mod go.sum k8s LICENSE Makefile ML_OPTIMIZATION_PLAN.md note other random README.md s3tests_boto3 scripts seaweedfs-rdma-sidecar snap SSE-C_IMPLEMENTATION.md telemetry test test-volume-data unmaintained util venv weed chrislu console Aug 27 13:07 chrislu ttys004 Aug 27 13:11 chrislu ttys012 Aug 28 14:00 Filesystem 512-blocks Used Available Capacity iused ifree %iused Mounted on /dev/disk3s1s1 1942700360 22000776 332038696 7% 425955 1660193480 0% / devfs 494 494 0 100% 856 0 100% /dev /dev/disk3s6 1942700360 6291632 332038696 2% 3 1660193480 0% /System/Volumes/VM /dev/disk3s2 1942700360 13899920 332038696 5% 1270 1660193480 0% /System/Volumes/Preboot /dev/disk3s4 1942700360 4440 332038696 1% 54 1660193480 0% /System/Volumes/Update /dev/disk1s2 1024000 12328 983744 2% 1 4918720 0% /System/Volumes/xarts /dev/disk1s1 1024000 11064 983744 2% 32 4918720 0% /System/Volumes/iSCPreboot /dev/disk1s3 1024000 7144 983744 1% 92 4918720 0% /System/Volumes/Hardware /dev/disk3s5 1942700360 1566013608 332038696 83% 11900819 1660193480 1% /System/Volumes/Data map auto_home 0 0 0 100% 0 0 - /System/Volumes/Data/home Filesystem 512-blocks Used Available Capacity iused ifree %iused Mounted on /dev/disk3s1s1 1942700360 22000776 332038696 7% 425955 1660193480 0% / devfs 494 494 0 100% 856 0 100% /dev /dev/disk3s6 1942700360 6291632 332038696 2% 3 1660193480 0% /System/Volumes/VM /dev/disk3s2 1942700360 13899920 332038696 5% 1270 1660193480 0% /System/Volumes/Preboot /dev/disk3s4 1942700360 4440 332038696 1% 54 1660193480 0% /System/Volumes/Update /dev/disk1s2 1024000 12328 983744 2% 1 4918720 0% /System/Volumes/xarts /dev/disk1s1 1024000 11064 983744 2% 32 4918720 0% /System/Volumes/iSCPreboot /dev/disk1s3 1024000 7144 983744 1% 92 4918720 0% /System/Volumes/Hardware /dev/disk3s5 1942700360 1566013608 332038696 83% 11900819 1660193480 1% /System/Volumes/Data map auto_home 0 0 0 100% 0 0 - /System/Volumes/Data/home /Users/chrislu/go/src/github.com/seaweedfs/seaweedfs HQ-KT6TWPKFQD /Users/chrislu/go/src/github.com/seaweedfs/seaweedfs Custom ML configuration: backers.md bin build cmd CODE_OF_CONDUCT.md DESIGN.md docker examples filerldb2 go.mod go.sum k8s LICENSE Makefile ML_OPTIMIZATION_PLAN.md note other random README.md s3tests_boto3 scripts seaweedfs-rdma-sidecar snap SSE-C_IMPLEMENTATION.md telemetry test test-volume-data unmaintained util venv weed /Users/chrislu/go/src/github.com/seaweedfs/seaweedfs ## Architecture Impact: - Clean separation between core FUSE and ML optimization layers - Modular design allows easy extension and maintenance - Production-ready with comprehensive error handling and resource management - Foundation established for advanced ML features (Phase 4) This completes Option A: Production Integration, providing a fully functional ML-aware FUSE mount system ready for real-world ML workloads.
315 lines
9.9 KiB
Go
315 lines
9.9 KiB
Go
//go:build linux || darwin || freebsd
|
|
// +build linux darwin freebsd
|
|
|
|
package command
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/user"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
|
|
"github.com/hanwen/go-fuse/v2/fuse"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/mount"
|
|
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/mount/unmount"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"google.golang.org/grpc/reflection"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
|
)
|
|
|
|
func runMount(cmd *Command, args []string) bool {
|
|
|
|
if *mountOptions.debug {
|
|
go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
|
|
}
|
|
|
|
grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
|
|
if *mountReadRetryTime < time.Second {
|
|
*mountReadRetryTime = time.Second
|
|
}
|
|
util.RetryWaitTime = *mountReadRetryTime
|
|
|
|
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
|
|
if umaskErr != nil {
|
|
fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
|
|
return false
|
|
}
|
|
|
|
if len(args) > 0 {
|
|
return false
|
|
}
|
|
|
|
return RunMount(&mountOptions, os.FileMode(umask))
|
|
}
|
|
|
|
func RunMount(option *MountOptions, umask os.FileMode) bool {
|
|
|
|
// basic checks
|
|
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
|
|
if chunkSizeLimitMB <= 0 {
|
|
fmt.Printf("Please specify a reasonable buffer size.\n")
|
|
return false
|
|
}
|
|
|
|
// try to connect to filer
|
|
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
|
|
util.LoadSecurityConfiguration()
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
var cipher bool
|
|
var err error
|
|
for i := 0; i < 10; i++ {
|
|
err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err)
|
|
}
|
|
cipher = resp.Cipher
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
|
|
glog.V(0).Infof("wait for %d seconds ...", i+1)
|
|
time.Sleep(time.Duration(i+1) * time.Second)
|
|
}
|
|
}
|
|
if err != nil {
|
|
glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
|
|
return true
|
|
}
|
|
|
|
filerMountRootPath := *option.filerMountRootPath
|
|
|
|
// clean up mount point
|
|
dir := util.ResolvePath(*option.dir)
|
|
if dir == "" {
|
|
fmt.Printf("Please specify the mount directory via \"-dir\"")
|
|
return false
|
|
}
|
|
|
|
unmount.Unmount(dir)
|
|
|
|
// start on local unix socket
|
|
if *option.localSocket == "" {
|
|
mountDirHash := util.HashToInt32([]byte(dir))
|
|
if mountDirHash < 0 {
|
|
mountDirHash = -mountDirHash
|
|
}
|
|
*option.localSocket = fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash)
|
|
}
|
|
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
|
|
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
|
|
}
|
|
montSocketListener, err := net.Listen("unix", *option.localSocket)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
|
|
}
|
|
|
|
// detect mount folder mode
|
|
if *option.dirAutoCreate {
|
|
os.MkdirAll(dir, os.FileMode(0777)&^umask)
|
|
}
|
|
fileInfo, err := os.Stat(dir)
|
|
|
|
// collect uid, gid
|
|
uid, gid := uint32(0), uint32(0)
|
|
mountMode := os.ModeDir | 0777
|
|
if err == nil {
|
|
mountMode = os.ModeDir | os.FileMode(0777)&^umask
|
|
uid, gid = util.GetFileUidGid(fileInfo)
|
|
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
|
|
} else {
|
|
fmt.Printf("can not stat %s\n", dir)
|
|
return false
|
|
}
|
|
|
|
// detect uid, gid
|
|
if uid == 0 {
|
|
if u, err := user.Current(); err == nil {
|
|
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
|
|
uid = uint32(parsedId)
|
|
}
|
|
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
|
|
gid = uint32(parsedId)
|
|
}
|
|
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
|
|
}
|
|
}
|
|
|
|
// mapping uid, gid
|
|
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
|
|
if err != nil {
|
|
fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
|
|
return false
|
|
}
|
|
|
|
// Ensure target mount point availability
|
|
if isValid := checkMountPointAvailable(dir); !isValid {
|
|
glog.Fatalf("Target mount point is not available: %s, please check!", dir)
|
|
return true
|
|
}
|
|
|
|
serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
|
|
|
|
// mount fuse
|
|
fuseMountOptions := &fuse.MountOptions{
|
|
AllowOther: *option.allowOthers,
|
|
Options: option.extraOptions,
|
|
MaxBackground: 128,
|
|
MaxWrite: 1024 * 1024 * 2,
|
|
MaxReadAhead: 1024 * 1024 * 2,
|
|
IgnoreSecurityLabels: false,
|
|
RememberInodes: false,
|
|
FsName: serverFriendlyName + ":" + filerMountRootPath,
|
|
Name: "seaweedfs",
|
|
SingleThreaded: false,
|
|
DisableXAttrs: *option.disableXAttr,
|
|
Debug: *option.debug,
|
|
EnableLocks: false,
|
|
ExplicitDataCacheControl: false,
|
|
DirectMount: true,
|
|
DirectMountFlags: 0,
|
|
//SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
|
|
EnableAcl: true,
|
|
}
|
|
if *option.nonempty {
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
|
|
}
|
|
if *option.readOnly {
|
|
if runtime.GOOS == "darwin" {
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
|
|
} else {
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
|
|
}
|
|
}
|
|
if runtime.GOOS == "darwin" {
|
|
// https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
|
|
ioSizeMB := 1
|
|
for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
|
|
ioSizeMB *= 2
|
|
}
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
|
|
if runtime.GOARCH == "amd64" {
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
|
|
}
|
|
// fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
|
|
fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
|
|
}
|
|
|
|
// find mount point
|
|
mountRoot := filerMountRootPath
|
|
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
|
|
mountRoot = mountRoot[0 : len(mountRoot)-1]
|
|
}
|
|
|
|
cacheDirForWrite := *option.cacheDirForWrite
|
|
if cacheDirForWrite == "" {
|
|
cacheDirForWrite = *option.cacheDirForRead
|
|
}
|
|
|
|
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
|
|
MountDirectory: dir,
|
|
FilerAddresses: filerAddresses,
|
|
GrpcDialOption: grpcDialOption,
|
|
FilerMountRootPath: mountRoot,
|
|
Collection: *option.collection,
|
|
Replication: *option.replication,
|
|
TtlSec: int32(*option.ttlSec),
|
|
DiskType: types.ToDiskType(*option.diskType),
|
|
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
|
|
ConcurrentWriters: *option.concurrentWriters,
|
|
CacheDirForRead: *option.cacheDirForRead,
|
|
CacheSizeMBForRead: *option.cacheSizeMBForRead,
|
|
CacheDirForWrite: cacheDirForWrite,
|
|
CacheMetaTTlSec: *option.cacheMetaTtlSec,
|
|
DataCenter: *option.dataCenter,
|
|
Quota: int64(*option.collectionQuota) * 1024 * 1024,
|
|
MountUid: uid,
|
|
MountGid: gid,
|
|
MountMode: mountMode,
|
|
MountCtime: fileInfo.ModTime(),
|
|
MountMtime: time.Now(),
|
|
Umask: umask,
|
|
VolumeServerAccess: *mountOptions.volumeServerAccess,
|
|
Cipher: cipher,
|
|
UidGidMapper: uidGidMapper,
|
|
DisableXAttr: *option.disableXAttr,
|
|
IsMacOs: runtime.GOOS == "darwin",
|
|
// RDMA acceleration options
|
|
RdmaEnabled: *option.rdmaEnabled,
|
|
RdmaSidecarAddr: *option.rdmaSidecarAddr,
|
|
RdmaFallback: *option.rdmaFallback,
|
|
RdmaReadOnly: *option.rdmaReadOnly,
|
|
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
|
|
RdmaTimeoutMs: *option.rdmaTimeoutMs,
|
|
// ML optimization options
|
|
MLOptimizationEnabled: *option.mlOptimizationEnabled,
|
|
MLPrefetchWorkers: *option.mlPrefetchWorkers,
|
|
MLConfidenceThreshold: *option.mlConfidenceThreshold,
|
|
MLMaxPrefetchAhead: *option.mlMaxPrefetchAhead,
|
|
MLBatchSize: *option.mlBatchSize,
|
|
})
|
|
|
|
// create mount root
|
|
mountRootPath := util.FullPath(mountRoot)
|
|
mountRootParent, mountDir := mountRootPath.DirAndName()
|
|
if err = filer_pb.Mkdir(context.Background(), seaweedFileSystem, mountRootParent, mountDir, nil); err != nil {
|
|
fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err)
|
|
return false
|
|
}
|
|
|
|
server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
|
|
if err != nil {
|
|
glog.Fatalf("Mount fail: %v", err)
|
|
}
|
|
grace.OnInterrupt(func() {
|
|
unmount.Unmount(dir)
|
|
})
|
|
|
|
if mountOptions.fuseCommandPid != 0 {
|
|
// send a signal to the parent process to notify that the mount is ready
|
|
err = syscall.Kill(mountOptions.fuseCommandPid, syscall.SIGTERM)
|
|
if err != nil {
|
|
fmt.Printf("failed to notify parent process: %v\n", err)
|
|
return false
|
|
}
|
|
}
|
|
|
|
grpcS := pb.NewGrpcServer()
|
|
mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
|
|
reflection.Register(grpcS)
|
|
go grpcS.Serve(montSocketListener)
|
|
|
|
err = seaweedFileSystem.StartBackgroundTasks()
|
|
if err != nil {
|
|
fmt.Printf("failed to start background tasks: %v\n", err)
|
|
return false
|
|
}
|
|
|
|
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
|
|
glog.V(0).Infof("This is SeaweedFS version %s %s %s", version.Version(), runtime.GOOS, runtime.GOARCH)
|
|
|
|
server.Serve()
|
|
|
|
seaweedFileSystem.ClearCacheDir()
|
|
|
|
return true
|
|
}
|