1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/storage/store_ec_delete.go
2025-08-18 07:44:24 -07:00

151 lines
5.6 KiB
Go

package storage
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
// VERSION CHECK - Should see this in logs if new binary is loaded
glog.Errorf("⭐ VERSION 2024-08-18-06:51 EC DELETE STARTING: needle %d volume %d", n.Id, ecVolume.VolumeId)
glog.Errorf("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId)
// Early validation checks - using ERROR level to ensure they appear
if ecVolume == nil {
glog.Errorf("❌ EC DELETE: ecVolume is nil for needle %d", n.Id)
return 0, fmt.Errorf("ecVolume is nil")
}
if n == nil {
glog.Errorf("❌ EC DELETE: needle is nil")
return 0, fmt.Errorf("needle is nil")
}
glog.Errorf("🔍 EC DELETE DEBUG: Validated inputs - needle %d, volume %d, generation %d", n.Id, ecVolume.VolumeId, ecVolume.Generation)
defer func() {
if r := recover(); r != nil {
glog.Errorf("❌ EC DELETE PANIC: needle %d volume %d - %v", n.Id, ecVolume.VolumeId, r)
}
}()
glog.Errorf("🔍 EC DELETE DEBUG: About to call ReadEcShardNeedle for needle %d volume %d", n.Id, ecVolume.VolumeId)
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
glog.Errorf("🔍 EC DELETE DEBUG: ReadEcShardNeedle returned count=%d, err=%v", count, err)
if err != nil {
glog.Errorf("❌ EC DELETE: Failed to read needle %d from volume %d: %v", n.Id, ecVolume.VolumeId, err)
return 0, err
}
glog.Infof("✅ EC DELETE: Successfully read needle %d, count=%d", n.Id, count)
glog.Infof("🔍 EC DELETE DEBUG: Checking cookie for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie)
if cookie != n.Cookie {
glog.Errorf("❌ EC DELETE: Cookie mismatch for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie)
return 0, fmt.Errorf("unexpected cookie %x", cookie)
}
glog.Infof("✅ EC DELETE: Cookie validation passed for needle %d", n.Id)
glog.Infof("🔍 EC DELETE DEBUG: Deleting needle %d from remote EC shards", n.Id)
if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
glog.Errorf("❌ EC DELETE: Failed to delete needle %d from remote EC shards: %v", n.Id, err)
return 0, err
}
glog.Infof("✅ EC DELETE: Successfully deleted needle %d from remote EC shards", n.Id)
// Record the deletion locally in the .ecj journal file
glog.Infof("🔍 EC DELETION: Recording needle %d in volume %d generation %d",
n.Id, ecVolume.VolumeId, ecVolume.Generation)
if err = ecVolume.DeleteNeedleFromEcx(n.Id); err != nil {
glog.Errorf("❌ Failed to record EC deletion in journal for needle %d: %v", n.Id, err)
// Continue even if journal write fails - the remote deletion succeeded
} else {
glog.Infof("✅ EC deletion recording completed for needle %d", n.Id)
}
return int64(count), nil
}
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
if err != nil {
return err
}
if len(intervals) == 0 {
return erasure_coding.NotFoundError
}
shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
hasDeletionSuccess := false
err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
if err == nil {
hasDeletionSuccess = true
}
for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
hasDeletionSuccess = true
}
}
if hasDeletionSuccess {
return nil
}
return err
}
func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
ecVolume.ShardLocationsLock.RLock()
sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
ecVolume.ShardLocationsLock.RUnlock()
if !hasShardLocations {
return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
}
for _, sourceDataNode := range sourceDataNodes {
glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
if err != nil {
return err
}
glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
}
return nil
}
func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
return operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
_, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
VolumeId: uint32(vid),
Collection: collection,
FileKey: uint64(needleId),
Version: uint32(version),
})
if err != nil {
return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
}
return nil
})
}