mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-17 01:22:47 +02:00
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
298 lines
8.6 KiB
Go
298 lines
8.6 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
|
)
|
|
|
|
var (
|
|
NotFoundError = errors.New("needle not found")
|
|
destroyDelaySeconds int64 = 0
|
|
)
|
|
|
|
type EcVolume struct {
|
|
VolumeId needle.VolumeId
|
|
Collection string
|
|
dir string
|
|
dirIdx string
|
|
ecxFile *os.File
|
|
ecxFileSize int64
|
|
ecxCreatedAt time.Time
|
|
Shards []*EcVolumeShard
|
|
ShardLocations map[ShardId][]pb.ServerAddress
|
|
ShardLocationsRefreshTime time.Time
|
|
ShardLocationsLock sync.RWMutex
|
|
Version needle.Version
|
|
ecjFile *os.File
|
|
ecjFileAccessLock sync.Mutex
|
|
diskType types.DiskType
|
|
datFileSize int64
|
|
ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created
|
|
}
|
|
|
|
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
|
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
|
|
|
|
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
|
|
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
|
|
|
|
// open ecx file
|
|
if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
|
|
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
|
|
}
|
|
ecxFi, statErr := ev.ecxFile.Stat()
|
|
if statErr != nil {
|
|
_ = ev.ecxFile.Close()
|
|
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
|
|
}
|
|
ev.ecxFileSize = ecxFi.Size()
|
|
ev.ecxCreatedAt = ecxFi.ModTime()
|
|
|
|
// open ecj file
|
|
if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
|
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
|
|
}
|
|
|
|
// read volume info
|
|
ev.Version = needle.Version3
|
|
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
|
|
ev.Version = needle.Version(volumeInfo.Version)
|
|
ev.datFileSize = volumeInfo.DatFileSize
|
|
ev.ExpireAtSec = volumeInfo.ExpireAtSec
|
|
} else {
|
|
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
|
|
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
|
|
}
|
|
|
|
ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
|
|
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
|
|
for _, s := range ev.Shards {
|
|
if s.ShardId == ecVolumeShard.ShardId {
|
|
return false
|
|
}
|
|
}
|
|
ev.Shards = append(ev.Shards, ecVolumeShard)
|
|
slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int {
|
|
if a.VolumeId != b.VolumeId {
|
|
return int(a.VolumeId - b.VolumeId)
|
|
}
|
|
return int(a.ShardId - b.ShardId)
|
|
})
|
|
return true
|
|
}
|
|
|
|
func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
|
|
foundPosition := -1
|
|
for i, s := range ev.Shards {
|
|
if s.ShardId == shardId {
|
|
foundPosition = i
|
|
}
|
|
}
|
|
if foundPosition < 0 {
|
|
return nil, false
|
|
}
|
|
|
|
ecVolumeShard = ev.Shards[foundPosition]
|
|
ecVolumeShard.Unmount()
|
|
ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
|
|
return ecVolumeShard, true
|
|
}
|
|
|
|
func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
|
|
for _, s := range ev.Shards {
|
|
if s.ShardId == shardId {
|
|
return s, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func (ev *EcVolume) Close() {
|
|
for _, s := range ev.Shards {
|
|
s.Close()
|
|
}
|
|
if ev.ecjFile != nil {
|
|
ev.ecjFileAccessLock.Lock()
|
|
_ = ev.ecjFile.Close()
|
|
ev.ecjFile = nil
|
|
ev.ecjFileAccessLock.Unlock()
|
|
}
|
|
if ev.ecxFile != nil {
|
|
_ = ev.ecxFile.Sync()
|
|
_ = ev.ecxFile.Close()
|
|
ev.ecxFile = nil
|
|
}
|
|
}
|
|
|
|
func (ev *EcVolume) Destroy() {
|
|
|
|
ev.Close()
|
|
|
|
for _, s := range ev.Shards {
|
|
s.Destroy()
|
|
}
|
|
os.Remove(ev.FileName(".ecx"))
|
|
os.Remove(ev.FileName(".ecj"))
|
|
os.Remove(ev.FileName(".vif"))
|
|
}
|
|
|
|
func (ev *EcVolume) FileName(ext string) string {
|
|
switch ext {
|
|
case ".ecx", ".ecj":
|
|
return ev.IndexBaseFileName() + ext
|
|
}
|
|
// .vif
|
|
return ev.DataBaseFileName() + ext
|
|
}
|
|
|
|
func (ev *EcVolume) DataBaseFileName() string {
|
|
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
|
|
}
|
|
|
|
func (ev *EcVolume) IndexBaseFileName() string {
|
|
return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
|
|
}
|
|
|
|
func (ev *EcVolume) ShardSize() uint64 {
|
|
if len(ev.Shards) > 0 {
|
|
return uint64(ev.Shards[0].Size())
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (ev *EcVolume) Size() (size int64) {
|
|
for _, shard := range ev.Shards {
|
|
size += shard.Size()
|
|
}
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) CreatedAt() time.Time {
|
|
return ev.ecxCreatedAt
|
|
}
|
|
|
|
func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
|
|
for _, s := range ev.Shards {
|
|
shardIds = append(shardIds, s.ShardId)
|
|
}
|
|
return
|
|
}
|
|
|
|
type ShardInfo struct {
|
|
ShardId ShardId
|
|
Size int64
|
|
}
|
|
|
|
func (ev *EcVolume) ShardDetails() (shards []ShardInfo) {
|
|
for _, s := range ev.Shards {
|
|
shards = append(shards, ShardInfo{
|
|
ShardId: s.ShardId,
|
|
Size: s.Size(),
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages []*master_pb.VolumeEcShardInformationMessage) {
|
|
prevVolumeId := needle.VolumeId(math.MaxUint32)
|
|
var m *master_pb.VolumeEcShardInformationMessage
|
|
for _, s := range ev.Shards {
|
|
if s.VolumeId != prevVolumeId {
|
|
m = &master_pb.VolumeEcShardInformationMessage{
|
|
Id: uint32(s.VolumeId),
|
|
Collection: s.Collection,
|
|
DiskType: string(ev.diskType),
|
|
ExpireAtSec: ev.ExpireAtSec,
|
|
DiskId: diskId,
|
|
}
|
|
messages = append(messages, m)
|
|
}
|
|
prevVolumeId = s.VolumeId
|
|
m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
|
|
|
|
// find the needle from ecx file
|
|
offset, size, err = ev.FindNeedleFromEcx(needleId)
|
|
if err != nil {
|
|
return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %w", err)
|
|
}
|
|
|
|
intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
|
|
shard := ev.Shards[0]
|
|
// Usually shard will be padded to round of ErasureCodingSmallBlockSize.
|
|
// So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize,
|
|
// the data would be in small blocks.
|
|
shardSize := shard.ecdFileSize - 1
|
|
if ev.datFileSize > 0 {
|
|
// To get the correct LargeBlockRowsCount
|
|
// use datFileSize to calculate the shardSize to match the EC encoding logic.
|
|
shardSize = ev.datFileSize / DataShardsCount
|
|
}
|
|
// calculate the locations in the ec shards
|
|
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version)))
|
|
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
|
|
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
|
}
|
|
|
|
func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
|
|
var key types.NeedleId
|
|
buf := make([]byte, types.NeedleMapEntrySize)
|
|
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
|
|
for l < h {
|
|
m := (l + h) / 2
|
|
if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
|
|
if n != types.NeedleMapEntrySize {
|
|
return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
|
|
}
|
|
}
|
|
key, offset, size = idx.IdxFileEntry(buf)
|
|
if key == needleId {
|
|
if processNeedleFn != nil {
|
|
err = processNeedleFn(ecxFile, m*types.NeedleMapEntrySize)
|
|
}
|
|
return
|
|
}
|
|
if key < needleId {
|
|
l = m + 1
|
|
} else {
|
|
h = m
|
|
}
|
|
}
|
|
|
|
err = NotFoundError
|
|
return
|
|
}
|
|
|
|
func (ev *EcVolume) IsTimeToDestroy() bool {
|
|
return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds)
|
|
}
|