1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-26 04:18:59 +02:00

support read option readDeleted=true

This commit is contained in:
Chris Lu 2020-08-18 19:22:16 -07:00
parent 6ccd7f0a4d
commit fe01191b5b
7 changed files with 31 additions and 26 deletions

View file

@ -111,7 +111,7 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
nv, ok := needleMap.Get(n.Id)
glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset {
if ok && nv.Size.IsValid() && nv.Offset.ToAcutalOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix)

View file

@ -48,7 +48,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
if n.Size.IsValid() {
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {

View file

@ -12,7 +12,7 @@ import (
var (
MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
util.Uint32toBytes(b, types.TombstoneFileSize)
types.SizeToBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
return fmt.Errorf("sorted needle write error: %v", err)

View file

@ -115,12 +115,9 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
return cs.overflow[i].Key >= key
})
if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
for i := deleteCandidate; i < length-1; i++ {
cs.overflow[i] = cs.overflow[i+1]
cs.overflowExtra[i] = cs.overflowExtra[i+1]
if cs.overflow[deleteCandidate].Size.IsValid() {
cs.overflow[deleteCandidate].Size = - cs.overflow[deleteCandidate].Size
}
cs.overflow = cs.overflow[0 : length-1]
cs.overflowExtra = cs.overflowExtra[0 : length-1]
}
}
@ -132,7 +129,7 @@ func (cs *CompactSection) Delete(key NeedleId) Size {
if i := cs.binarySearchValues(skey); i >= 0 {
if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
ret = cs.values[i].Size
cs.values[i].Size = TombstoneFileSize
cs.values[i].Size = -cs.values[i].Size
}
}
if _, v, found := cs.findOverflowEntry(skey); found {

View file

@ -124,14 +124,18 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size)
oldNeedle, found := m.Get(key)
if !found || oldNeedle.Size.IsDeleted() {
return nil
}
m.logDelete(oldNeedle.Size)
// write to index file first
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
return levelDbDelete(m.db, key)
return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size)
}
func (m *LevelDbNeedleMap) Close() {

View file

@ -2,7 +2,6 @@ package types
import (
"fmt"
"math"
"strconv"
"github.com/chrislusf/seaweedfs/weed/util"
@ -13,13 +12,13 @@ type Offset struct {
OffsetLower
}
type Size uint32
type Size int32
func (s Size) IsDeleted() bool {
return s == TombstoneFileSize
return s < 0 || s == TombstoneFileSize
}
func (s Size) IsValid() bool {
return s != TombstoneFileSize
return s >0 && s != TombstoneFileSize
}
type OffsetLower struct {
@ -37,7 +36,7 @@ const (
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
TombstoneFileSize = math.MaxUint32
TombstoneFileSize = Size(-1)
CookieSize = 4
)

View file

@ -195,7 +195,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
}
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
@ -233,7 +233,7 @@ func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
@ -260,13 +260,18 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if !ok || nv.Offset.IsZero() {
return -1, ErrorNotFound
}
if nv.Size.IsDeleted() {
return -1, errors.New("already deleted")
readSize := nv.Size
if readSize.IsDeleted() {
if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
readSize = -readSize
} else {
return -1, errors.New("already deleted")
}
}
if nv.Size == 0 {
if readSize == 0 {
return 0, nil
}
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
if err != nil {
return 0, err
}
@ -299,7 +304,7 @@ func (v *Volume) startWorker() {
currentBytesToWrite := int64(0)
for {
request, ok := <-v.asyncRequestsChan
//volume may be closed
// volume may be closed
if !ok {
chanClosed = true
break
@ -402,8 +407,8 @@ func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorag
if volumeFileScanner.ReadNeedleBody() {
if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
// err = fmt.Errorf("cannot read needle body: %v", err)
// return
}
}
err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)