mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-07-06 00:57:10 +02:00
volume: find a non-empty offset when binary searching by timestamp
This commit is contained in:
parent
b530f12327
commit
3afa451cdc
|
@ -194,12 +194,31 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
|
||||||
err = fmt.Errorf("read entry %d: %v", m, err)
|
err = fmt.Errorf("read entry %d: %v", m, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for ; offset.IsZero() && m < h; m++ {
|
if offset.IsZero() {
|
||||||
offset, err = v.readOffsetFromIndex(m)
|
leftIndex, _, leftNs, leftErr := v.readLeftNs(m)
|
||||||
if err != nil {
|
if leftErr != nil {
|
||||||
err = fmt.Errorf("read entry %d: %v", m, err)
|
err = leftErr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
rightIndex, rightOffset, rightNs, rightErr := v.readRightNs(m)
|
||||||
|
if rightErr != nil {
|
||||||
|
err = rightErr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if rightNs <= sinceNs {
|
||||||
|
l = rightIndex
|
||||||
|
if l == entryCount {
|
||||||
|
return Offset{}, true, nil
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sinceNs < leftNs {
|
||||||
|
h = leftIndex + 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return rightOffset, false, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
if offset.IsZero() {
|
if offset.IsZero() {
|
||||||
return Offset{}, true, nil
|
return Offset{}, true, nil
|
||||||
|
@ -230,6 +249,38 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *Volume) readRightNs(m int64) (index int64, offset Offset, ts uint64, err error) {
|
||||||
|
index = m
|
||||||
|
for offset.IsZero() {
|
||||||
|
index++
|
||||||
|
offset, err = v.readOffsetFromIndex(index)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("read entry %d: %v", index, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !offset.IsZero() {
|
||||||
|
ts, err = v.readAppendAtNs(offset)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Volume) readLeftNs(m int64) (index int64, offset Offset, ts uint64, err error) {
|
||||||
|
index = m
|
||||||
|
for offset.IsZero() {
|
||||||
|
index--
|
||||||
|
offset, err = v.readOffsetFromIndex(index)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("read entry %d: %v", index, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !offset.IsZero() {
|
||||||
|
ts, err = v.readAppendAtNs(offset)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// bytes is of size NeedleMapEntrySize
|
// bytes is of size NeedleMapEntrySize
|
||||||
func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
|
func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
|
||||||
v.dataFileAccessLock.RLock()
|
v.dataFileAccessLock.RLock()
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
|
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -22,7 +23,7 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
|
||||||
t.Fatalf("volume creation: %v", err)
|
t.Fatalf("volume creation: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
count := 10
|
count := 20
|
||||||
|
|
||||||
for i:=1;i<count;i++{
|
for i:=1;i<count;i++{
|
||||||
n := newRandomNeedle(uint64(i))
|
n := newRandomNeedle(uint64(i))
|
||||||
|
@ -32,9 +33,9 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i:=1;i<5;i++{
|
for i:=1;i<15;i++{
|
||||||
n := newEmptyNeedle(uint64(i))
|
n := newEmptyNeedle(uint64(i))
|
||||||
_, err := v.doDeleteRequest(n)
|
err := v.nm.Put(n.Id, types.Offset{}, types.TombstoneFileSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("delete needle %d: %v", i, err)
|
t.Fatalf("delete needle %d: %v", i, err)
|
||||||
}
|
}
|
||||||
|
@ -42,15 +43,12 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
|
||||||
|
|
||||||
ts1 := time.Now().UnixNano()
|
ts1 := time.Now().UnixNano()
|
||||||
|
|
||||||
var ts2 uint64
|
for i:=15;i<count;i++{
|
||||||
|
|
||||||
for i:=5;i<count;i++{
|
|
||||||
n := newEmptyNeedle(uint64(i))
|
n := newEmptyNeedle(uint64(i))
|
||||||
_, err := v.doDeleteRequest(n)
|
_, err := v.doDeleteRequest(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("delete needle %d: %v", i, err)
|
t.Fatalf("delete needle %d: %v", i, err)
|
||||||
}
|
}
|
||||||
ts2 = n.AppendAtNs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
offset, isLast, err := v.BinarySearchByAppendAtNs(uint64(ts1))
|
offset, isLast, err := v.BinarySearchByAppendAtNs(uint64(ts1))
|
||||||
|
@ -59,11 +57,4 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
|
||||||
}
|
}
|
||||||
fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
|
fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
|
||||||
|
|
||||||
offset, isLast, err = v.BinarySearchByAppendAtNs(uint64(ts2))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("lookup by ts: %v", err)
|
|
||||||
}
|
|
||||||
fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in a new issue