From 51939efeac635d0ba8b683cae6176aa60845b5f7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 16 Apr 2014 23:43:27 -0700 Subject: [PATCH] 1. volume server now sends master server its max file key, so that master server does not need to store the sequence on disk any more 2. fix raft server's failure to init cluster during bootstrapping --- go/metastore/backing_test.go | 30 ------- go/metastore/file_backing.go | 34 -------- go/metastore/memory_backing.go | 36 -------- go/metastore/metastore.go | 33 ------- go/sequence/memory_sequencer.go | 21 ++++- go/sequence/sequence.go | 86 +------------------ go/storage/cdb_map.go | 4 +- go/storage/needle_map.go | 13 +-- go/storage/store.go | 8 ++ go/topology/topology.go | 9 +- go/weed/master.go | 3 +- go/weed/server.go | 3 +- go/weed/weed_server/master_server.go | 7 +- .../master_server_handlers_admin.go | 3 +- go/weed/weed_server/raft_server.go | 20 ++--- go/weed/weed_server/raft_server_handlers.go | 2 +- go/weed/weed_server/volume_server.go | 4 +- 17 files changed, 60 insertions(+), 256 deletions(-) delete mode 100644 go/metastore/backing_test.go delete mode 100644 go/metastore/file_backing.go delete mode 100644 go/metastore/memory_backing.go delete mode 100644 go/metastore/metastore.go diff --git a/go/metastore/backing_test.go b/go/metastore/backing_test.go deleted file mode 100644 index a3de491c7..000000000 --- a/go/metastore/backing_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package metastore - -import ( - "testing" -) - -func TestMemoryBacking(t *testing.T) { - ms := &MetaStore{NewMetaStoreMemoryBacking()} - verifySetGet(t, ms) -} - -func TestFileBacking(t *testing.T) { - ms := &MetaStore{NewMetaStoreFileBacking()} - verifySetGet(t, ms) -} - -func verifySetGet(t *testing.T, ms *MetaStore) { - data := uint64(234234) - ms.SetUint64("/tmp/sequence", data) - if !ms.Has("/tmp/sequence") { - t.Errorf("Failed to set data") - } - if val, err := ms.GetUint64("/tmp/sequence"); err == nil { - if val != data { - t.Errorf("Set %d, but read back %d", data, val) - } - } else { - t.Errorf("Failed to get back data:%s", err) - } -} diff --git a/go/metastore/file_backing.go b/go/metastore/file_backing.go deleted file mode 100644 index 1dc0c963f..000000000 --- a/go/metastore/file_backing.go +++ /dev/null @@ -1,34 +0,0 @@ -package metastore - -import ( - "io/ioutil" - "os" -) - -// store data on disk, enough for most cases - -type MetaStoreFileBacking struct { -} - -func NewMetaStoreFileBacking() *MetaStoreFileBacking { - mms := &MetaStoreFileBacking{} - return mms -} - -func (mms *MetaStoreFileBacking) Set(path, val string) error { - return ioutil.WriteFile(path, []byte(val), 0644) -} - -func (mms *MetaStoreFileBacking) Get(path string) (string, error) { - val, e := ioutil.ReadFile(path) - return string(val), e -} - -func (mms *MetaStoreFileBacking) Has(path string) (ok bool) { - seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644) - if se != nil { - return false - } - defer seqFile.Close() - return true -} diff --git a/go/metastore/memory_backing.go b/go/metastore/memory_backing.go deleted file mode 100644 index 4f45c2e5f..000000000 --- a/go/metastore/memory_backing.go +++ /dev/null @@ -1,36 +0,0 @@ -package metastore - -import ( - "fmt" -) - -//this is for testing only - -type MetaStoreMemoryBacking struct { - m map[string]string -} - -func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking { - mms := &MetaStoreMemoryBacking{} - mms.m = make(map[string]string) - return mms -} - -func (mms MetaStoreMemoryBacking) Set(path, val string) error { - mms.m[path] = val - return nil -} - -func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) { - var ok bool - val, ok = mms.m[path] - if !ok { - return "", fmt.Errorf("Missing value for %s", path) - } - return -} - -func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) { - _, ok = mms.m[path] - return -} diff --git a/go/metastore/metastore.go b/go/metastore/metastore.go deleted file mode 100644 index 2b1fdc6d8..000000000 --- a/go/metastore/metastore.go +++ /dev/null @@ -1,33 +0,0 @@ -package metastore - -import ( - "errors" - "strconv" -) - -type MetaStoreBacking interface { - Get(path string) (string, error) - Set(path, val string) error - Has(path string) bool -} - -type MetaStore struct { - MetaStoreBacking -} - -func (m *MetaStore) SetUint64(path string, val uint64) error { - return m.Set(path, strconv.FormatUint(val, 10)) -} - -func (m *MetaStore) GetUint64(path string) (val uint64, err error) { - if b, e := m.Get(path); e == nil { - val, err = strconv.ParseUint(b, 10, 64) - return - } else { - if e != nil { - return 0, e - } - err = errors.New("Not found value for " + path) - } - return -} diff --git a/go/sequence/memory_sequencer.go b/go/sequence/memory_sequencer.go index d72952ff4..c7ee1ae8f 100644 --- a/go/sequence/memory_sequencer.go +++ b/go/sequence/memory_sequencer.go @@ -1,10 +1,13 @@ package sequence -import () +import ( + "sync" +) // just for testing type MemorySequencer struct { - counter uint64 + counter uint64 + sequenceLock sync.Mutex } func NewMemorySequencer() (m *MemorySequencer) { @@ -13,7 +16,21 @@ func NewMemorySequencer() (m *MemorySequencer) { } func (m *MemorySequencer) NextFileId(count int) (uint64, int) { + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() ret := m.counter m.counter += uint64(count) return ret, count } + +func (m *MemorySequencer) SetMax(seenValue uint64) { + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() + if m.counter <= seenValue { + m.counter = seenValue + 1 + } +} + +func (m *MemorySequencer) Peek() uint64 { + return m.counter +} diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go index 493804ec6..5a1bceaaf 100644 --- a/go/sequence/sequence.go +++ b/go/sequence/sequence.go @@ -1,89 +1,9 @@ package sequence -import ( - "bytes" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/metastore" - "encoding/gob" - "sync" -) - -const ( - FileIdSaveInterval = 10000 -) +import () type Sequencer interface { NextFileId(count int) (uint64, int) -} -type SequencerImpl struct { - fileFullPath string - - volumeLock sync.Mutex - sequenceLock sync.Mutex - - FileIdSequence uint64 - fileIdCounter uint64 - - metaStore *metastore.MetaStore -} - -func NewFileSequencer(filepath string) (m *SequencerImpl) { - m = &SequencerImpl{fileFullPath: filepath} - m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()} - m.initilize() - return -} - -func (m *SequencerImpl) initilize() { - if !m.metaStore.Has(m.fileFullPath) { - m.FileIdSequence = FileIdSaveInterval - glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence) - } else { - var err error - if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil { - if data, err := m.metaStore.Get(m.fileFullPath); err == nil { - m.FileIdSequence = decode(data) - glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence) - } else { - glog.V(0).Infof("No existing FileIdSequence: %s", err) - } - } else { - glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence) - } - //in case the server stops between intervals - } - return -} - -//count should be 1 or more -func (m *SequencerImpl) NextFileId(count int) (uint64, int) { - if count <= 0 { - return 0, 0 - } - m.sequenceLock.Lock() - defer m.sequenceLock.Unlock() - if m.fileIdCounter < uint64(count) { - m.fileIdCounter = FileIdSaveInterval - m.FileIdSequence += FileIdSaveInterval - m.saveSequence() - } - m.fileIdCounter = m.fileIdCounter - uint64(count) - return m.FileIdSequence - m.fileIdCounter - uint64(count), count -} -func (m *SequencerImpl) saveSequence() { - glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath) - if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil { - glog.Fatalf("Sequence id Save [ERROR] %s", e) - } -} - -//decode are for backward compatible purpose -func decode(input string) uint64 { - var x uint64 - b := bytes.NewReader([]byte(input)) - decoder := gob.NewDecoder(b) - if e := decoder.Decode(&x); e == nil { - return x - } - return 0 + SetMax(uint64) + Peek() uint64 } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index 8be302111..14437c45b 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -80,8 +80,8 @@ func (m cdbMap) FileCount() int { func (m *cdbMap) DeletedCount() int { return m.DeletionCounter } -func (m *cdbMap) NextFileKey(count int) uint64 { - return 0 +func (m *cdbMap) MaxFileKey() uint64 { + return m.MaximumFileKey } func getMetric(c *cdb.Cdb, m *mapMetric) error { diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 9b8776c5e..9c77fcf73 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -19,7 +19,7 @@ type NeedleMapper interface { FileCount() int DeletedCount() int Visit(visit func(NeedleValue) error) (err error) - NextFileKey(count int) uint64 + MaxFileKey() uint64 } type mapMetric struct { @@ -110,6 +110,9 @@ func walkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } oldSize := nm.m.Set(Key(key), offset, size) bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) @@ -172,11 +175,3 @@ func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { func (nm NeedleMap) MaxFileKey() uint64 { return nm.MaximumFileKey } -func (nm NeedleMap) NextFileKey(count int) (ret uint64) { - if count <= 0 { - return 0 - } - ret = nm.MaximumFileKey - nm.MaximumFileKey += uint64(count) - return -} diff --git a/go/storage/store.go b/go/storage/store.go index 157344781..a4263dcac 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -44,6 +44,9 @@ func (mn *MasterNodes) findMaster() (string, error) { if mn.lastNode < 0 { for _, m := range mn.nodes { if masters, e := operation.ListMasters(m); e == nil { + if len(masters) == 0 { + continue + } mn.nodes = masters mn.lastNode = rand.Intn(len(mn.nodes)) glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode]) @@ -268,6 +271,7 @@ func (s *Store) Join() error { } stats := new([]*VolumeInfo) maxVolumeCount := 0 + var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { @@ -280,6 +284,9 @@ func (s *Store) Join() error { DeletedByteCount: v.nm.DeletedSize(), ReadOnly: v.readOnly} *stats = append(*stats, s) + if maxFileKey < v.nm.MaxFileKey() { + maxFileKey = v.nm.MaxFileKey() + } } } bytes, _ := json.Marshal(stats) @@ -292,6 +299,7 @@ func (s *Store) Join() error { values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) + values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10)) values.Add("dataCenter", s.dataCenter) values.Add("rack", s.rack) jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values) diff --git a/go/topology/topology.go b/go/topology/topology.go index b1fa3f2a2..9db3e78ae 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -19,7 +19,7 @@ type Topology struct { volumeSizeLimit uint64 - sequence sequence.Sequencer + Sequence sequence.Sequencer chanDeadDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode @@ -40,7 +40,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit - t.sequence = seq + t.Sequence = seq t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) @@ -118,7 +118,7 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } - fileId, count := t.sequence.NextFileId(count) + fileId, count := t.Sequence.NextFileId(count) return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } @@ -143,7 +143,8 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) { + t.Sequence.SetMax(maxFileKey) dcName, rackName = t.configuration.Locate(ip, dcName, rackName) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) diff --git a/go/weed/master.go b/go/weed/master.go index 1efa0e79c..c494ff42b 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -72,11 +72,12 @@ func runMaster(cmd *Command, args []string) bool { go func() { time.Sleep(100 * time.Millisecond) + myAddress := *masterIp + ":" + strconv.Itoa(*mport) var peers []string if *masterPeers != "" { peers = strings.Split(*masterPeers, ",") } - raftServer := weed_server.NewRaftServer(r, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse) + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *metaFolder, ms.Topo, *mpulse) ms.SetRaftServer(raftServer) }() diff --git a/go/weed/server.go b/go/weed/server.go index 87b541fd3..61e42fb36 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -164,11 +164,12 @@ func runServer(cmd *Command, args []string) bool { go func() { raftWaitForMaster.Wait() time.Sleep(100 * time.Millisecond) + myAddress := *serverIp + ":" + strconv.Itoa(*masterPort) var peers []string if *serverPeers != "" { peers = strings.Split(*serverPeers, ",") } - raftServer := weed_server.NewRaftServer(r, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse) + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse) ms.SetRaftServer(raftServer) volumeWait.Done() }() diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index b932e1b11..874688cbb 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -11,7 +11,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "path" "sync" ) @@ -48,7 +47,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, whiteList: whiteList, } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) + seq := sequence.NewMemorySequencer() var e error if ms.Topo, e = topology.NewTopology("topo", confFile, seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { @@ -97,7 +96,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer.Leader() != "" { + } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { ms.bounedLeaderChan <- 1 defer func() { <-ms.bounedLeaderChan }() targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) @@ -111,7 +110,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy.ServeHTTP(w, r) } else { //drop it to the floor - writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) } } } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index d34baa349..e549a1dfb 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -36,6 +36,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } port, _ := strconv.Atoi(r.FormValue("port")) maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64) s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) @@ -44,7 +45,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { return } debug(s, "volumes", r.FormValue("volumes")) - ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) + ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack")) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index b5a9dd9b6..f67caaebd 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -77,13 +77,6 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin return nil } } - var err error - for err != nil { - glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...") - time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond) - err = s.Join(s.peers) - } - glog.V(0).Infoln("Joined cluster") } // Initialize the server by joining itself. @@ -124,14 +117,17 @@ func (s *RaftServer) Join(peers []string) error { ConnectionString: "http://" + s.httpAddr, } + var err error var b bytes.Buffer json.NewEncoder(&b).Encode(command) - for _, m := range peers { + if m == s.httpAddr { + continue + } target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) glog.V(0).Infoln("Attempting to connect to:", target) - err := postFollowingOneRedirect(target, "application/json", &b) + err = postFollowingOneRedirect(target, "application/json", &b) if err != nil { glog.V(0).Infoln("Post returned error: ", err.Error()) @@ -139,11 +135,9 @@ func (s *RaftServer) Join(peers []string) error { // If we receive a network error try the next member continue } - - return err + } else { + return nil } - - return nil } return errors.New("Could not connect to any cluster peers") diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go index 6a3c58b29..1ce24a963 100644 --- a/go/weed/weed_server/raft_server_handlers.go +++ b/go/weed/weed_server/raft_server_handlers.go @@ -18,7 +18,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { commandText, _ := ioutil.ReadAll(req.Body) glog.V(0).Info("Command:", string(commandText)) if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { - glog.V(0).Infoln("Error decoding json message:", err) + glog.V(0).Infoln("Error decoding json message:", err, string(commandText)) http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 377e1e99c..a2db0d8a9 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -54,10 +54,10 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicUrl string, fo if err == nil { if !connected { connected = true - glog.V(0).Infoln("Reconnected with master") + glog.V(0).Infoln("Volume Server Connected with master") } } else { - glog.V(4).Infoln("Failing to talk with master:", err.Error()) + glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error()) if connected { connected = false }