1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-01 16:22:43 +02:00

pass in option to read deleted entries

not working yet
This commit is contained in:
Chris Lu 2020-08-18 17:37:26 -07:00
parent 51ecb49db3
commit 7e91ae592c
7 changed files with 18 additions and 8 deletions

View file

@ -188,7 +188,7 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
}
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
} else {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
}
if err != nil {
return nil, err

View file

@ -41,7 +41,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,

View file

@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}

View file

@ -18,6 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@ -81,9 +82,14 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
readOption := &storage.ReadOption{
ReadDeleted: r.FormValue("readDeleted") == "true",
}
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}

View file

@ -104,7 +104,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
_, ok := vs.store.ReadVolumeNeedle(volumeId, n)
_, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0

View file

@ -23,6 +23,10 @@ const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
type ReadOption struct {
ReadDeleted bool
}
/*
* A VolumeServer contains one Store
*/
@ -283,9 +287,9 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, e
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) {
if v := s.findVolume(i); v != nil {
return v.readNeedle(n)
return v.readNeedle(n, readOption)
}
return 0, fmt.Errorf("volume %d not found", i)
}

View file

@ -252,7 +252,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
}
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()