From 1c65656fb4a64ae739b9a23a2f97f4032182015c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 23:37:10 -0700 Subject: [PATCH] s3: add option to fsync buckets --- weed/command/scaffold.go | 4 ++++ weed/filer2/filer.go | 1 + weed/filer2/filer_buckets.go | 19 +++++++++++++------ weed/server/filer_grpc_server.go | 2 +- weed/server/filer_server.go | 3 ++- weed/server/filer_server_handlers_write.go | 17 ++++++++++------- .../filer_server_handlers_write_autochunk.go | 8 ++++---- .../filer_server_handlers_write_cipher.go | 4 ++-- 8 files changed, 37 insertions(+), 21 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 468476ae4..109f0c04f 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -76,6 +76,10 @@ const ( recursive_delete = false # directories under this folder will be automatically creating a separate bucket buckets_folder = "/buckets" +buckets_fsync = [ # a list of buckets with all write requests fsync=true + "important_bucket", + "should_always_fsync", +] # directories under this folder will be store message queue data queues_folder = "/queues" diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index dde08a092..8ad4fdc81 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -33,6 +33,7 @@ type Filer struct { GrpcDialOption grpc.DialOption DirBucketsPath string DirQueuesPath string + FsyncBuckets []string buckets *FilerBuckets Cipher bool metaLogBuffer *log_buffer.LogBuffer diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go index 3fc4afdab..7a57e7ee1 100644 --- a/weed/filer2/filer_buckets.go +++ b/weed/filer2/filer_buckets.go @@ -13,6 +13,7 @@ type BucketName string type BucketOption struct { Name BucketName Replication string + fsync bool } type FilerBuckets struct { dirBucketsPath string @@ -20,36 +21,42 @@ type FilerBuckets struct { sync.RWMutex } -func (f *Filer) LoadBuckets(dirBucketsPath string) { +func (f *Filer) LoadBuckets() { f.buckets = &FilerBuckets{ buckets: make(map[BucketName]*BucketOption), } - f.DirBucketsPath = dirBucketsPath limit := math.MaxInt32 - entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit) + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit) if err != nil { glog.V(1).Infof("no buckets found: %v", err) return } + shouldFsyncMap := make(map[string]bool) + for _, bucket := range f.FsyncBuckets { + shouldFsyncMap[bucket] = true + } + glog.V(1).Infof("buckets found: %d", len(entries)) f.buckets.Lock() for _, entry := range entries { + _, shouldFsnyc := shouldFsyncMap[entry.Name()] f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ Name: BucketName(entry.Name()), Replication: entry.Replication, + fsync: shouldFsnyc, } } f.buckets.Unlock() } -func (f *Filer) ReadBucketOption(buketName string) (replication string) { +func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { f.buckets.RLock() defer f.buckets.RUnlock() @@ -57,9 +64,9 @@ func (f *Filer) ReadBucketOption(buketName string) (replication string) { option, found := f.buckets.buckets[BucketName(buketName)] if !found { - return "" + return "", false } - return option.Replication + return option.Replication, option.fsync } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 06889fbcb..ead246eb8 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -232,7 +232,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if req.TtlSec > 0 { ttlStr = strconv.Itoa(int(req.TtlSec)) } - collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) + collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) var altRequest *operation.VolumeAssignRequest diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 52dfbaf88..cf8669202 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -95,6 +95,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) v.SetDefault("filer.options.queues_folder", "/queues") fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") fs.filer.DirQueuesPath = v.GetString("filer.options.queues_folder") + fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -107,7 +108,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.LoadBuckets(fs.filer.DirBucketsPath) + fs.filer.LoadBuckets() util.OnInterrupt(func() { fs.filer.Shutdown() diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index ce184875c..74a558e22 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -40,7 +40,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -73,6 +73,9 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + if fsync { + urlLocation += "?fsync=true" + } auth = assignResult.Auth return } @@ -82,7 +85,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) + collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) dataCenter := query.Get("dataCenter") if dataCenter == "" { dataCenter = fs.option.DataCenter @@ -96,12 +99,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString); autoChunked { + if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { return } if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString) + reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -111,7 +114,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) @@ -327,7 +330,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) { +func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { // default collection = fs.option.Collection replication = fs.option.DefaultReplication @@ -350,7 +353,7 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st if t > 0 { collection = bucketAndObjectKey[:t] } - replication = fs.filer.ReadBucketOption(collection) + replication, fsync = fs.filer.ReadBucketOption(collection) } return diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d86530bb8..4c371a9a5 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -21,7 +21,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string) bool { + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { if r.Method != "POST" { glog.V(4).Infoln("AutoChunking not supported for method", r.Method) return false @@ -57,7 +57,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * return false } - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString) + reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -67,7 +67,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string) (filerResult *FilerPostResult, replyerr error) { + contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() start := time.Now() @@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) if assignErr != nil { return nil, assignErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index cbcf8a05c..ebf277984 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -17,9 +17,9 @@ import ( // handling single chunk POST or PUT upload func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string) (filerResult *FilerPostResult, err error) { + replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)