1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-16 17:12:46 +02:00
seaweedfs/weed/storage/store_ec.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

413 lines
14 KiB
Go

package storage
import (
"context"
"fmt"
"io"
"os"
"slices"
"sync"
"time"
"github.com/klauspost/reedsolomon"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
collectionEcShardSize := make(map[string]int64)
for diskId, location := range s.Locations {
location.ecVolumesLock.RLock()
for _, ecShards := range location.ecVolumes {
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage(uint32(diskId))...)
for _, ecShard := range ecShards.Shards {
collectionEcShardSize[ecShards.Collection] += ecShard.Size()
}
}
location.ecVolumesLock.RUnlock()
}
for col, size := range collectionEcShardSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
}
return &master_pb.Heartbeat{
EcShards: ecShardMessages,
HasNoEcShards: len(ecShardMessages) == 0,
}
}
func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
for diskId, location := range s.Locations {
if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId)
var shardBits erasure_coding.ShardBits
s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(location.DiskType),
ExpireAtSec: ecVolume.ExpireAtSec,
DiskId: uint32(diskId),
}
return nil
} else if err == os.ErrNotExist {
continue
} else {
return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
}
}
return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
}
func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
diskId, ecShard, found := s.findEcShard(vid, shardId)
if !found {
return nil
}
var shardBits erasure_coding.ShardBits
message := master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(ecShard.DiskType),
DiskId: diskId,
}
location := s.Locations[diskId]
if deleted := location.UnloadEcShard(vid, shardId); deleted {
glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
s.DeletedEcShardsChan <- message
return nil
}
return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
}
func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) {
for diskId, location := range s.Locations {
if v, found := location.FindEcShard(vid, shardId); found {
return uint32(diskId), v, found
}
}
return 0, nil, false
}
func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
for _, location := range s.Locations {
if s, found := location.FindEcVolume(vid); found {
return s, true
}
}
return nil, false
}
// shardFiles is a list of shard files, which is used to return the shard locations
func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
for _, location := range s.Locations {
if s, foundShards := location.CollectEcShards(vid, shardFileNames); foundShards {
ecVolume = s
found = true
}
}
return
}
func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
for _, location := range s.Locations {
location.DestroyEcVolume(vid)
}
}
func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) {
for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found {
offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %w", err)
}
if size.IsDeleted() {
return 0, ErrorDeleted
}
if onReadSizeFn != nil {
onReadSizeFn(size)
}
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
}
bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %w", err)
}
if isDeleted {
return 0, ErrorDeleted
}
err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("readbytes: %w", err)
}
return len(bytes), nil
}
}
return 0, fmt.Errorf("ec shard %d not found", vid)
}
func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
}
for i, interval := range intervals {
if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
return nil, isDeleted, e
} else {
if isDeleted {
is_deleted = true
}
if i == 0 {
data = d
} else {
data = append(data, d...)
}
}
}
return
}
func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
var readSize int
if readSize, err = shard.ReadAt(data, actualOffset); err != nil {
if readSize != int(interval.Size) {
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
return
}
}
} else {
ecVolume.ShardLocationsLock.RLock()
sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
ecVolume.ShardLocationsLock.RUnlock()
// try reading directly
if hasShardIdLocation {
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
}
// try reading by recovering from other shards
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
}
return
}
func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
// failed to access the source data nodes, clear it up
ecVolume.ShardLocationsLock.Lock()
delete(ecVolume.ShardLocations, shardId)
ecVolume.ShardLocationsLock.Unlock()
}
func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
shardCount := len(ecVolume.ShardLocations)
if shardCount < erasure_coding.DataShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
shardCount == erasure_coding.TotalShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
shardCount >= erasure_coding.DataShardsCount &&
ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
// still fresh
return nil
}
glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupEcVolumeRequest{
VolumeId: uint32(ecVolume.VolumeId),
}
resp, err := masterClient.LookupEcVolume(context.Background(), req)
if err != nil {
return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
}
if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
}
ecVolume.ShardLocationsLock.Lock()
for _, shardIdLocations := range resp.ShardIdLocations {
shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
delete(ecVolume.ShardLocations, shardId)
for _, loc := range shardIdLocations.Locations {
ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], pb.NewServerAddressFromLocation(loc))
}
}
ecVolume.ShardLocationsRefreshTime = time.Now()
ecVolume.ShardLocationsLock.Unlock()
return nil
})
return
}
func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
}
for _, sourceDataNode := range sourceDataNodes {
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
}
glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
return
}
func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
Size: int64(len(buf)),
FileKey: uint64(needleId),
})
if err != nil {
return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
for {
resp, receiveErr := shardReadClient.Recv()
if receiveErr == io.EOF {
break
}
if receiveErr != nil {
return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr)
}
if resp.IsDeleted {
is_deleted = true
}
copy(buf[n:n+len(resp.Data)], resp.Data)
n += len(resp.Data)
}
return nil
})
if err != nil {
return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
return
}
func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
return 0, false, fmt.Errorf("failed to create encoder: %w", err)
}
bufs := make([][]byte, erasure_coding.TotalShardsCount)
var wg sync.WaitGroup
ecVolume.ShardLocationsLock.RLock()
for shardId, locations := range ecVolume.ShardLocations {
// skip current shard or empty shard
if shardId == shardIdToRecover {
continue
}
if len(locations) == 0 {
glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
continue
}
// read from remote locations
wg.Add(1)
go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
defer wg.Done()
data := make([]byte, len(buf))
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)
}
if isDeleted {
is_deleted = true
}
if nRead == len(buf) {
bufs[shardId] = data
}
}(shardId, locations)
}
ecVolume.ShardLocationsLock.RUnlock()
wg.Wait()
if err = enc.ReconstructData(bufs); err != nil {
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
return 0, false, err
}
glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
copy(buf, bufs[shardIdToRecover])
return len(buf), is_deleted, nil
}
func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
for _, location := range s.Locations {
location.ecVolumesLock.RLock()
for _, v := range location.ecVolumes {
ecVolumes = append(ecVolumes, v)
}
location.ecVolumesLock.RUnlock()
}
slices.SortFunc(ecVolumes, func(a, b *erasure_coding.EcVolume) int {
return int(a.VolumeId) - int(b.VolumeId)
})
return ecVolumes
}