mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-07-03 07:36:45 +02:00
Merge branch 'master' into refactoring_dat_backend
This commit is contained in:
commit
c5c1d83d91
|
@ -15,12 +15,12 @@ func NewMemorySequencer() (m *MemorySequencer) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemorySequencer) NextFileId(count uint64) (uint64, uint64) {
|
func (m *MemorySequencer) NextFileId(count uint64) uint64 {
|
||||||
m.sequenceLock.Lock()
|
m.sequenceLock.Lock()
|
||||||
defer m.sequenceLock.Unlock()
|
defer m.sequenceLock.Unlock()
|
||||||
ret := m.counter
|
ret := m.counter
|
||||||
m.counter += uint64(count)
|
m.counter += count
|
||||||
return ret, count
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemorySequencer) SetMax(seenValue uint64) {
|
func (m *MemorySequencer) SetMax(seenValue uint64) {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package sequence
|
package sequence
|
||||||
|
|
||||||
type Sequencer interface {
|
type Sequencer interface {
|
||||||
NextFileId(count uint64) (uint64, uint64)
|
NextFileId(count uint64) uint64
|
||||||
SetMax(uint64)
|
SetMax(uint64)
|
||||||
Peek() uint64
|
Peek() uint64
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,11 +65,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
|
||||||
var err error
|
var err error
|
||||||
if ms.Topo.IsLeader() {
|
if ms.Topo.IsLeader() {
|
||||||
volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
|
volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
|
||||||
machines := ms.Topo.Lookup(collection, volumeId)
|
if newVolumeIdErr != nil {
|
||||||
for _, loc := range machines {
|
err = fmt.Errorf("Unknown volume id %s", vid)
|
||||||
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
|
} else {
|
||||||
|
machines := ms.Topo.Lookup(collection, volumeId)
|
||||||
|
for _, loc := range machines {
|
||||||
|
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
|
||||||
|
}
|
||||||
|
if locations == nil {
|
||||||
|
err = fmt.Errorf("volume id %s not found", vid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = newVolumeIdErr
|
|
||||||
} else {
|
} else {
|
||||||
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
|
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
|
||||||
for _, loc := range machines {
|
for _, loc := range machines {
|
||||||
|
|
|
@ -18,9 +18,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
|
collectionName := r.FormValue("collection")
|
||||||
|
collection, ok := ms.Topo.FindCollection(collectionName)
|
||||||
if !ok {
|
if !ok {
|
||||||
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
|
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, server := range collection.ListVolumeServers() {
|
for _, server := range collection.ListVolumeServers() {
|
||||||
|
@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ms.Topo.DeleteCollection(r.FormValue("collection"))
|
ms.Topo.DeleteCollection(collectionName)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -53,6 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
|
||||||
gcThreshold, err = strconv.ParseFloat(gcString, 32)
|
gcThreshold, err = strconv.ParseFloat(gcString, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
|
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
|
||||||
|
writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,9 +252,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
|
||||||
startOffset, bytesToRead := req.Offset, req.Size
|
startOffset, bytesToRead := req.Offset, req.Size
|
||||||
|
|
||||||
for bytesToRead > 0 {
|
for bytesToRead > 0 {
|
||||||
bytesread, err := ecShard.ReadAt(buffer, startOffset)
|
// min of bytesToRead and bufSize
|
||||||
|
bufferSize := bufSize
|
||||||
|
if bufferSize > bytesToRead {
|
||||||
|
bufferSize = bytesToRead
|
||||||
|
}
|
||||||
|
bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
|
||||||
|
|
||||||
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
|
// println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
|
||||||
if bytesread > 0 {
|
if bytesread > 0 {
|
||||||
|
|
||||||
if int64(bytesread) > bytesToRead {
|
if int64(bytesread) > bytesToRead {
|
||||||
|
@ -268,6 +273,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startOffset += int64(bytesread)
|
||||||
bytesToRead -= int64(bytesread)
|
bytesToRead -= int64(bytesread)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,8 +165,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||||
var maxFileKey NeedleId
|
var maxFileKey NeedleId
|
||||||
collectionVolumeSize := make(map[string]uint64)
|
collectionVolumeSize := make(map[string]uint64)
|
||||||
for _, location := range s.Locations {
|
for _, location := range s.Locations {
|
||||||
|
var deleteVids []needle.VolumeId
|
||||||
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
|
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
|
||||||
location.Lock()
|
location.RLock()
|
||||||
for _, v := range location.volumes {
|
for _, v := range location.volumes {
|
||||||
if maxFileKey < v.MaxFileKey() {
|
if maxFileKey < v.MaxFileKey() {
|
||||||
maxFileKey = v.MaxFileKey()
|
maxFileKey = v.MaxFileKey()
|
||||||
|
@ -175,8 +176,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||||
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
|
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
|
||||||
} else {
|
} else {
|
||||||
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
||||||
location.deleteVolumeById(v.Id)
|
deleteVids = append(deleteVids, v.Id)
|
||||||
glog.V(0).Infoln("volume", v.Id, "is deleted.")
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infoln("volume", v.Id, "is expired.")
|
glog.V(0).Infoln("volume", v.Id, "is expired.")
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||||
fileSize, _, _ := v.FileStat()
|
fileSize, _, _ := v.FileStat()
|
||||||
collectionVolumeSize[v.Collection] += fileSize
|
collectionVolumeSize[v.Collection] += fileSize
|
||||||
}
|
}
|
||||||
location.Unlock()
|
location.RUnlock()
|
||||||
|
|
||||||
|
if len(deleteVids) > 0 {
|
||||||
|
// delete expired volumes.
|
||||||
|
location.Lock()
|
||||||
|
for _, vid := range deleteVids {
|
||||||
|
location.deleteVolumeById(vid)
|
||||||
|
glog.V(0).Infoln("volume", vid, "is deleted.")
|
||||||
|
}
|
||||||
|
location.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for col, size := range collectionVolumeSize {
|
for col, size := range collectionVolumeSize {
|
||||||
|
|
|
@ -288,7 +288,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sourceDataNode := range sourceDataNodes {
|
for _, sourceDataNode := range sourceDataNodes {
|
||||||
glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
|
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
|
||||||
n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
|
n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
|
@ -340,7 +340,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||||
glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
|
||||||
|
|
||||||
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
|
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
|
||||||
if datanodes.Length() == 0 {
|
if datanodes.Length() == 0 {
|
||||||
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
|
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
|
||||||
}
|
}
|
||||||
fileId, count := t.Sequence.NextFileId(count)
|
fileId := t.Sequence.NextFileId(count)
|
||||||
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
|
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m["layouts"] = layouts
|
m["Layouts"] = layouts
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue