diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index fd9db2246..5ca6619bd 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -48,7 +48,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser // copied from *Needle.prepareWriteBuffer() n.Size = 4 + types.Size(n.DataSize) + 1 n.Checksum = needle.NewCRC(n.Data) - if _, err = vs.store.WriteVolumeNeedle(v.Id, n, false); err != nil { + if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil { return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) } diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 6c039ebf5..3ea902ed3 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, err := vs.store.WriteVolumeNeedle(v.Id, n, false) + _, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false) return err }) diff --git a/weed/storage/store.go b/weed/storage/store.go index c407a6081..fe3ec1912 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -332,13 +332,13 @@ func (s *Store) Close() { } } -func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, checkCookie bool, fsync bool) (isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.IsReadOnly() { err = fmt.Errorf("volume %d is read only", i) return } - _, _, isUnchanged, err = v.writeNeedle2(n, fsync && s.isStopping) + _, _, isUnchanged, err = v.writeNeedle2(n, checkCookie, fsync && s.isStopping) return } glog.V(0).Infoln("volume", i, "not found!") diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 89fff4b2b..c9596d11d 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) { } func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) - _, size, _, err := v.writeNeedle2(n, false) + _, size, _, err := v.writeNeedle2(n, true, false) if err != nil { t.Fatalf("write file %d: %v", i, err) } diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index ad3dad388..794b1c125 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -91,7 +91,7 @@ func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) { v.asyncRequestsChan <- request } -func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) { +func (v *Volume) syncWrite(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) { // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version()) @@ -103,10 +103,10 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan return } - return v.doWriteRequest(n) + return v.doWriteRequest(n, checkCookie) } -func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) { +func (v *Volume) writeNeedle2(n *needle.Needle, checkCookie bool, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) { // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL { n.SetHasTtl() @@ -114,7 +114,7 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size } if !fsync { - return v.syncWrite(n) + return v.syncWrite(n, checkCookie) } else { asyncRequest := needle.NewAsyncRequest(n, true) // using len(n.Data) here instead of n.Size before n.Size is populated in n.Append() @@ -127,7 +127,7 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size } } -func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) { +func (v *Volume) doWriteRequest(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) { // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.isFileUnchanged(n) { size = Size(n.DataSize) @@ -143,10 +143,12 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return } - if n.Cookie == 0 { + if n.Cookie == 0 && !checkCookie { // this is from batch deletion, and read back again when tailing a remote volume + // which only happens when checkCookie == false and fsync == false n.Cookie = existingNeedle.Cookie - } else if existingNeedle.Cookie != n.Cookie { + } + if existingNeedle.Cookie != n.Cookie { glog.V(0).Infof("write cookie mismatch: existing %s, new %s", needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n)) err = fmt.Errorf("mismatching cookie %x", n.Cookie) @@ -274,7 +276,7 @@ func (v *Volume) startWorker() { for i := 0; i < len(currentRequests); i++ { if currentRequests[i].IsWriteRequest { - offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N) + offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N, true) currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err) } else { size, err := v.doDeleteRequest(currentRequests[i].N) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 061c5a12c..b114b468d 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -42,7 +42,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt } if s.GetVolume(volumeId) != nil { - isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync) + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) if err != nil { err = fmt.Errorf("failed to write to local disk: %v", err) glog.V(0).Infoln(err)