From 5dd67f9acf51d3b1402d8a686747db0b19e45801 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 May 2019 23:48:39 -0700 Subject: [PATCH] reading by recover from other shards --- weed/storage/erasure_coding/ec_test.go | 2 +- weed/storage/store_ec.go | 88 +++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index e2e872dbe..602ea0bc0 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -122,7 +122,7 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err error) { - ecFileOffset, ecFileIndex := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize) + ecFileIndex, ecFileOffset := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize) data = make([]byte, interval.Size) err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset) diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index ed531f206..db94e7b8b 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -12,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/klauspost/reedsolomon" ) func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat { @@ -160,11 +162,19 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co if !found || len(sourceDataNodes) == 0 { return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId) } - glog.V(4).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0]) - _, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNodes[0], ecVolume.VolumeId, shardId, data, actualOffset) - if err != nil { - glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNodes[0], ecVolume.VolumeId, shardId, err) + + // try reading directly + _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset) + if err == nil { + return } + + // try reading by recovering from other shards + _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset) + if err == nil { + return + } + glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err) } return } @@ -203,7 +213,21 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras return } -func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { +func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { + + for _, sourceDataNode := range sourceDataNodes { + glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) + n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset) + if err == nil { + return + } + glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) + } + + return +} + +func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { @@ -239,8 +263,54 @@ func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode return } -func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, string, vid needle.VolumeId, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { - glog.V(1).Infof("recover ec shard %d.%d from other locations", vid, shardIdToRecover) - // TODO add recovering - return 0, fmt.Errorf("recover is not implemented yet") +func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { + glog.V(1).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) + + enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + if err != nil { + return 0, fmt.Errorf("failed to create encoder: %v", err) + } + + bufs := make([][]byte, erasure_coding.TotalShardsCount) + + var wg sync.WaitGroup + ecVolume.ShardLocationsLock.RLock() + for shardId, locations := range ecVolume.ShardLocations { + + // skip currnent shard or empty shard + if shardId == shardIdToRecover { + continue + } + if len(locations) == 0 { + glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations) + continue + } + + // read from remote locations + wg.Add(1) + go func(shardId erasure_coding.ShardId, locations []string) { + defer wg.Done() + data := make([]byte, len(buf)) + n, err = s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset) + if err != nil { + glog.V(3).Infof("readRemoteEcShardInterval %d.%d from %+v", ecVolume.VolumeId, shardId, locations) + } + if n == len(buf) { + bufs[shardId] = data + return + } + }(shardId, locations) + } + ecVolume.ShardLocationsLock.RUnlock() + + wg.Wait() + + if err = enc.ReconstructData(bufs); err != nil { + return 0, err + } + glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) + + copy(buf, bufs[shardIdToRecover]) + + return len(buf), nil }