1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-03 23:56:41 +02:00

read from volume index file directly instead of open a separate file

fix https://github.com/chrislusf/seaweedfs/issues/1640

read from volume index file directly instead of open a separate file,
to ensure reading latest index entries.
This commit is contained in:
Chris Lu 2020-11-27 16:18:48 -08:00
parent 85554bea38
commit 9ac4935f22
2 changed files with 31 additions and 21 deletions

View file

@ -2,9 +2,11 @@ package storage
import (
"fmt"
"io"
"os"
"sync"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@ -31,6 +33,7 @@ type NeedleMapper interface {
MaxFileKey() NeedleId
IndexFileSize() uint64
Sync() error
ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
}
type baseNeedleMapper struct {
@ -64,3 +67,20 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size
func (nm *baseNeedleMapper) Sync() error {
return nm.indexFile.Sync()
}
func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
bytes := make([]byte, NeedleMapEntrySize)
var readCount int
if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
if err == io.EOF {
if readCount == NeedleMapEntrySize {
err = nil
}
}
if err != nil {
return
}
}
key, offset, size = idx.IdxFileEntry(bytes)
return
}

View file

@ -168,25 +168,13 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
// on server side
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
if openErr != nil {
err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr)
return
}
defer indexFile.Close()
fi, statErr := indexFile.Stat()
if statErr != nil {
err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
return
}
fileSize := fi.Size()
fileSize := int64(v.IndexFileSize())
if fileSize%NeedleMapEntrySize != 0 {
err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize)
return
}
bytes := make([]byte, NeedleMapEntrySize)
entryCount := fileSize / NeedleMapEntrySize
l := int64(0)
h := entryCount
@ -200,7 +188,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
}
// read the appendAtNs for entry m
offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
offset, err = v.readOffsetFromIndex(m)
if err != nil {
return
}
@ -224,19 +212,21 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
return Offset{}, true, nil
}
offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
offset, err = v.readOffsetFromIndex(l)
return offset, false, err
}
// bytes is of size NeedleMapEntrySize
func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
return Offset{}, readErr
func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return Offset{}, io.EOF
}
_, offset, _ := idx.IdxFileEntry(bytes)
return offset, nil
_, offset, _, err := v.nm.ReadIndexEntry(m)
return offset, err
}
// generate the volume idx