diff --git a/weed/storage/needle/async_request.go b/weed/storage/needle/async_request.go index 259419115..ea02c55c5 100644 --- a/weed/storage/needle/async_request.go +++ b/weed/storage/needle/async_request.go @@ -4,7 +4,6 @@ type AsyncRequest struct { N *Needle IsWriteRequest bool ActualSize int64 - Fsync bool offset uint64 size uint64 doneChan chan interface{} @@ -12,7 +11,7 @@ type AsyncRequest struct { err error } -func NewAsyncRequest(n *Needle, isWriteRequest bool, fsync bool) *AsyncRequest { +func NewAsyncRequest(n *Needle, isWriteRequest bool) *AsyncRequest { return &AsyncRequest{ offset: 0, size: 0, @@ -21,7 +20,6 @@ func NewAsyncRequest(n *Needle, isWriteRequest bool, fsync bool) *AsyncRequest { N: n, isUnchanged: false, IsWriteRequest: isWriteRequest, - Fsync: fsync, err: nil, } } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 6d2899c53..dce800242 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -68,21 +68,23 @@ func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) { v.asyncRequestsChan <- request } -func (v *Volume) writeNeedleDeprecated(n *needle.Needle, fsync bool) (offset uint64, size uint32, isUnchanged bool, err error) { +func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + actualSize := needle.GetActualSize(uint32(len(n.Data)), v.Version()) + v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { + err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()) + return + } if v.isFileUnchanged(n) { size = n.DataSize isUnchanged = true return } - if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL { - n.SetHasTtl() - n.Ttl = v.Ttl - } - // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { @@ -103,11 +105,7 @@ func (v *Volume) writeNeedleDeprecated(n *needle.Needle, fsync bool) (offset uin if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { return } - if fsync { - if err = v.DataBackend.Sync(); err != nil { - return - } - } + v.lastAppendAtNs = n.AppendAtNs // add to needle map @@ -129,14 +127,18 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size n.Ttl = v.Ttl } - asyncRequest := needle.NewAsyncRequest(n, true, fsync) - // using len(n.Data) here instead of n.Size before n.Size is populated in n.Append() - asyncRequest.ActualSize = needle.GetActualSize(uint32(len(n.Data)), v.Version()) + if !fsync { + return v.syncWrite(n) + } else { + asyncRequest := needle.NewAsyncRequest(n, true) + // using len(n.Data) here instead of n.Size before n.Size is populated in n.Append() + asyncRequest.ActualSize = needle.GetActualSize(uint32(len(n.Data)), v.Version()) - v.asyncRequestAppend(asyncRequest) - offset, _, isUnchanged, err = asyncRequest.WaitComplete() + v.asyncRequestAppend(asyncRequest) + offset, _, isUnchanged, err = asyncRequest.WaitComplete() - return + return + } } func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { @@ -181,10 +183,17 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size uint32, i return } -func (v *Volume) deleteNeedleDeprecated(n *needle.Needle) (uint32, error) { +func (v *Volume) syncDelete(n *needle.Needle) (uint32, error) { glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + actualSize := needle.GetActualSize(0, v.Version()) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { + err := fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()) + return 0, err + } + nv, ok := v.nm.Get(n.Id) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok && nv.Size != TombstoneFileSize { @@ -205,13 +214,20 @@ func (v *Volume) deleteNeedleDeprecated(n *needle.Needle) (uint32, error) { } func (v *Volume) deleteNeedle2(n *needle.Needle) (uint32, error) { - asyncRequest := needle.NewAsyncRequest(n, false, false) - asyncRequest.ActualSize = needle.GetActualSize(0, v.Version()) + // todo: delete info is always appended no fsync, it may need fsync in future + fsync := false - v.asyncRequestAppend(asyncRequest) - _, size, _, err := asyncRequest.WaitComplete() + if !fsync { + return v.syncDelete(n) + } else { + asyncRequest := needle.NewAsyncRequest(n, false) + asyncRequest.ActualSize = needle.GetActualSize(0, v.Version()) - return uint32(size), err + v.asyncRequestAppend(asyncRequest) + _, size, _, err := asyncRequest.WaitComplete() + + return uint32(size), err + } } func (v *Volume) doDeleteRequest(n *needle.Needle) (uint32, error) { @@ -279,7 +295,6 @@ func (v *Volume) startWorker() { if chanClosed { break } - fsync := false currentRequests := make([]*needle.AsyncRequest, 0, 128) currentBytesToWrite := int64(0) for { @@ -296,9 +311,6 @@ func (v *Volume) startWorker() { } currentRequests = append(currentRequests, request) currentBytesToWrite += request.ActualSize - if request.Fsync { - fsync = true - } // submit at most 4M bytes or 128 requests at one time to decrease request delay. // it also need to break if there is no data in channel to avoid io hang. if currentBytesToWrite >= 4*1024*1024 || len(currentRequests) >= 128 || len(v.asyncRequestsChan) == 0 { @@ -329,17 +341,15 @@ func (v *Volume) startWorker() { } } - if fsync { - // if sync error, data is not reliable, we should mark the completed request as fail and rollback - if err := v.DataBackend.Sync(); err != nil { - // todo: this may generate dirty data or cause data inconsistent, may be weed need to panic? - if te := v.DataBackend.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", v.DataBackend.Name(), end, te) - } - for i := 0; i < len(currentRequests); i++ { - if currentRequests[i].IsSucceed() { - currentRequests[i].UpdateResult(0, 0, false, err) - } + // if sync error, data is not reliable, we should mark the completed request as fail and rollback + if err := v.DataBackend.Sync(); err != nil { + // todo: this may generate dirty data or cause data inconsistent, may be weed need to panic? + if te := v.DataBackend.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", v.DataBackend.Name(), end, te) + } + for i := 0; i < len(currentRequests); i++ { + if currentRequests[i].IsSucceed() { + currentRequests[i].UpdateResult(0, 0, false, err) } } }