From f2f90436efaf69f5e28a9b57e9f80e0f3e02847e Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Wed, 30 Oct 2019 15:49:58 +0800 Subject: [PATCH 1/7] fix leader master /dir/lookup api Signed-off-by: Lei Liu --- weed/sequence/memory_sequencer.go | 6 +++--- weed/sequence/sequence.go | 2 +- weed/server/master_server_handlers.go | 14 ++++++++++---- weed/topology/topology.go | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/weed/sequence/memory_sequencer.go b/weed/sequence/memory_sequencer.go index d727dc723..e20c29cc7 100644 --- a/weed/sequence/memory_sequencer.go +++ b/weed/sequence/memory_sequencer.go @@ -15,12 +15,12 @@ func NewMemorySequencer() (m *MemorySequencer) { return } -func (m *MemorySequencer) NextFileId(count uint64) (uint64, uint64) { +func (m *MemorySequencer) NextFileId(count uint64) uint64 { m.sequenceLock.Lock() defer m.sequenceLock.Unlock() ret := m.counter - m.counter += uint64(count) - return ret, count + m.counter += count + return ret } func (m *MemorySequencer) SetMax(seenValue uint64) { diff --git a/weed/sequence/sequence.go b/weed/sequence/sequence.go index fbdc3b8ef..2258d001b 100644 --- a/weed/sequence/sequence.go +++ b/weed/sequence/sequence.go @@ -1,7 +1,7 @@ package sequence type Sequencer interface { - NextFileId(count uint64) (uint64, uint64) + NextFileId(count uint64) uint64 SetMax(uint64) Peek() uint64 } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index c10f9a5b7..9bcd35ced 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -65,11 +65,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo var err error if ms.Topo.IsLeader() { volumeId, newVolumeIdErr := needle.NewVolumeId(vid) - machines := ms.Topo.Lookup(collection, volumeId) - for _, loc := range machines { - locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) + if newVolumeIdErr != nil { + err = fmt.Errorf("Unknown volume id %s", vid) + } else { + machines := ms.Topo.Lookup(collection, volumeId) + for _, loc := range machines { + locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) + } + if locations == nil { + err = fmt.Errorf("volume id %s not found", vid) + } } - err = newVolumeIdErr } else { machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) for _, loc := range machines { diff --git a/weed/topology/topology.go b/weed/topology/topology.go index ea0769248..b7ebe8af5 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -125,7 +125,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, if datanodes.Length() == 0 { return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } - fileId, count := t.Sequence.NextFileId(count) + fileId := t.Sequence.NextFileId(count) return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } From 1bcef02a6c730d7d8287e03c5226a61ff6e220a3 Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Thu, 31 Oct 2019 14:25:05 +0800 Subject: [PATCH 2/7] fix dir/lookup and col/delete api 1, Fix Layouts first letter capitalized 2, Return http 204 when delete a collection Signed-off-by: Lei Liu --- weed/server/master_server_handlers_admin.go | 10 +++++++--- weed/topology/topology_map.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 486bf31f4..42ec50dd5 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -18,9 +18,10 @@ import ( ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { - collection, ok := ms.Topo.FindCollection(r.FormValue("collection")) + collectionName := r.FormValue("collection") + collection, ok := ms.Topo.FindCollection(collectionName) if !ok { - writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName)) return } for _, server := range collection.ListVolumeServers() { @@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } } - ms.Topo.DeleteCollection(r.FormValue("collection")) + ms.Topo.DeleteCollection(collectionName) + + w.WriteHeader(http.StatusNoContent) + return } func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go index 37a88c9ed..0ad30f12e 100644 --- a/weed/topology/topology_map.go +++ b/weed/topology/topology_map.go @@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} { } } } - m["layouts"] = layouts + m["Layouts"] = layouts return m } From 1294999d8b60b6ee8f562264c19caa95391371b0 Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Thu, 31 Oct 2019 20:43:20 +0800 Subject: [PATCH 3/7] return error when garbageThreshold is not a valid float number Signed-off-by: Lei Liu --- weed/server/master_server_handlers_admin.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 42ec50dd5..a5d976008 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -57,6 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque gcThreshold, err = strconv.ParseFloat(gcString, 32) if err != nil { glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err) + writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString)) return } } From 1dd101f78286c43d57a6a7732aee143b4ce6af6a Mon Sep 17 00:00:00 2001 From: zhangsong Date: Fri, 8 Nov 2019 19:00:47 +0800 Subject: [PATCH 4/7] use read lock to avoid io hang during heartbeat --- weed/storage/store.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/weed/storage/store.go b/weed/storage/store.go index 66dd021ff..f115721cb 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -165,8 +165,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) for _, location := range s.Locations { + var deleteVids []needle.VolumeId maxVolumeCount = maxVolumeCount + location.MaxVolumeCount - location.Lock() + location.RLock() for _, v := range location.volumes { if maxFileKey < v.MaxFileKey() { maxFileKey = v.MaxFileKey() @@ -175,8 +176,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { - location.deleteVolumeById(v.Id) - glog.V(0).Infoln("volume", v.Id, "is deleted.") + deleteVids = append(deleteVids, v.Id) } else { glog.V(0).Infoln("volume", v.Id, "is expired.") } @@ -184,7 +184,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { fileSize, _, _ := v.FileStat() collectionVolumeSize[v.Collection] += fileSize } - location.Unlock() + location.RUnlock() + + if len(deleteVids) > 0 { + // delete expired volumes. + location.Lock() + for _, vid := range deleteVids { + location.deleteVolumeById(vid) + glog.V(0).Infoln("volume", vid, "is deleted.") + } + location.Unlock() + } } for col, size := range collectionVolumeSize { From 6cc9e0d788722e3b78767db0acf33519a7b28645 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 8 Nov 2019 22:40:28 -0800 Subject: [PATCH 5/7] volume: fix ec shard reading fix https://github.com/chrislusf/seaweedfs/issues/1106 --- weed/server/volume_grpc_erasure_coding.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 8140a06f6..242480197 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -252,9 +252,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea startOffset, bytesToRead := req.Offset, req.Size for bytesToRead > 0 { - bytesread, err := ecShard.ReadAt(buffer, startOffset) + // min of bytesToRead and bufSize + bufferSize := bufSize + if bufferSize > bytesToRead { + bufferSize = bytesToRead + } + bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset) - // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize) if bytesread > 0 { if int64(bytesread) > bytesToRead { @@ -268,6 +273,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea return err } + startOffset += int64(bytesread) bytesToRead -= int64(bytesread) } From 84c503c6a7eabb0f6924f705a60e39e521ceb5fb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 8 Nov 2019 22:40:55 -0800 Subject: [PATCH 6/7] adjust ec reading log level --- weed/storage/store_ec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 8271324cf..7e3f1a46c 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -288,7 +288,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ } for _, sourceDataNode := range sourceDataNodes { - glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) + glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset) if err == nil { return @@ -340,7 +340,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode } func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { - glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) + glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) if err != nil { From c34ffed43f9e6afde139c78a723fc56d843e03d4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 8 Nov 2019 22:47:50 -0800 Subject: [PATCH 7/7] go fmt --- weed/storage/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/storage/store.go b/weed/storage/store.go index f115721cb..1d909c23a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -187,7 +187,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { location.RUnlock() if len(deleteVids) > 0 { - // delete expired volumes. + // delete expired volumes. location.Lock() for _, vid := range deleteVids { location.deleteVolumeById(vid)