diff --git a/weed/server/volume_grpc_read_all.go b/weed/server/volume_grpc_read_all.go index 3ee0b7d86..7fe5bad03 100644 --- a/weed/server/volume_grpc_read_all.go +++ b/weed/server/volume_grpc_read_all.go @@ -5,7 +5,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) { @@ -24,9 +23,9 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se return fmt.Errorf("not found volume id %d", vid) } - scanner := &VolumeFileScanner4ReadAll{ - stream: stream, - v: v, + scanner := &storage.VolumeFileScanner4ReadAll{ + Stream: stream, + V: v, } offset := int64(v.SuperBlock.BlockSize()) @@ -35,30 +34,3 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se return err } - -type VolumeFileScanner4ReadAll struct { - stream volume_server_pb.VolumeServer_ReadAllNeedlesServer - v *storage.Volume -} - -func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error { - return nil - -} -func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool { - return true -} - -func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { - - sendErr := scanner.stream.Send(&volume_server_pb.ReadAllNeedlesResponse{ - VolumeId: uint32(scanner.v.Id), - NeedleId: uint64(n.Id), - Cookie: uint32(n.Cookie), - NeedleBlob: n.Data, - }) - if sendErr != nil { - return sendErr - } - return nil -} diff --git a/weed/storage/volume_read_all.go b/weed/storage/volume_read_all.go new file mode 100644 index 000000000..453a4495c --- /dev/null +++ b/weed/storage/volume_read_all.go @@ -0,0 +1,42 @@ +package storage + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +type VolumeFileScanner4ReadAll struct { + Stream volume_server_pb.VolumeServer_ReadAllNeedlesServer + V *Volume +} + +func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error { + return nil + +} +func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + + nv, ok := scanner.V.nm.Get(n.Id) + if !ok { + return nil + } + if nv.Offset.ToActualOffset() != offset { + return nil + } + + sendErr := scanner.Stream.Send(&volume_server_pb.ReadAllNeedlesResponse{ + VolumeId: uint32(scanner.V.Id), + NeedleId: uint64(n.Id), + Cookie: uint32(n.Cookie), + NeedleBlob: n.Data, + }) + if sendErr != nil { + return sendErr + } + return nil +}