diff --git a/go/metastore/backing_test.go b/go/metastore/backing_test.go deleted file mode 100644 index a3de491c7..000000000 --- a/go/metastore/backing_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package metastore - -import ( - "testing" -) - -func TestMemoryBacking(t *testing.T) { - ms := &MetaStore{NewMetaStoreMemoryBacking()} - verifySetGet(t, ms) -} - -func TestFileBacking(t *testing.T) { - ms := &MetaStore{NewMetaStoreFileBacking()} - verifySetGet(t, ms) -} - -func verifySetGet(t *testing.T, ms *MetaStore) { - data := uint64(234234) - ms.SetUint64("/tmp/sequence", data) - if !ms.Has("/tmp/sequence") { - t.Errorf("Failed to set data") - } - if val, err := ms.GetUint64("/tmp/sequence"); err == nil { - if val != data { - t.Errorf("Set %d, but read back %d", data, val) - } - } else { - t.Errorf("Failed to get back data:%s", err) - } -} diff --git a/go/metastore/file_backing.go b/go/metastore/file_backing.go deleted file mode 100644 index 1dc0c963f..000000000 --- a/go/metastore/file_backing.go +++ /dev/null @@ -1,34 +0,0 @@ -package metastore - -import ( - "io/ioutil" - "os" -) - -// store data on disk, enough for most cases - -type MetaStoreFileBacking struct { -} - -func NewMetaStoreFileBacking() *MetaStoreFileBacking { - mms := &MetaStoreFileBacking{} - return mms -} - -func (mms *MetaStoreFileBacking) Set(path, val string) error { - return ioutil.WriteFile(path, []byte(val), 0644) -} - -func (mms *MetaStoreFileBacking) Get(path string) (string, error) { - val, e := ioutil.ReadFile(path) - return string(val), e -} - -func (mms *MetaStoreFileBacking) Has(path string) (ok bool) { - seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644) - if se != nil { - return false - } - defer seqFile.Close() - return true -} diff --git a/go/metastore/memory_backing.go b/go/metastore/memory_backing.go deleted file mode 100644 index 4f45c2e5f..000000000 --- a/go/metastore/memory_backing.go +++ /dev/null @@ -1,36 +0,0 @@ -package metastore - -import ( - "fmt" -) - -//this is for testing only - -type MetaStoreMemoryBacking struct { - m map[string]string -} - -func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking { - mms := &MetaStoreMemoryBacking{} - mms.m = make(map[string]string) - return mms -} - -func (mms MetaStoreMemoryBacking) Set(path, val string) error { - mms.m[path] = val - return nil -} - -func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) { - var ok bool - val, ok = mms.m[path] - if !ok { - return "", fmt.Errorf("Missing value for %s", path) - } - return -} - -func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) { - _, ok = mms.m[path] - return -} diff --git a/go/metastore/metastore.go b/go/metastore/metastore.go deleted file mode 100644 index 2b1fdc6d8..000000000 --- a/go/metastore/metastore.go +++ /dev/null @@ -1,33 +0,0 @@ -package metastore - -import ( - "errors" - "strconv" -) - -type MetaStoreBacking interface { - Get(path string) (string, error) - Set(path, val string) error - Has(path string) bool -} - -type MetaStore struct { - MetaStoreBacking -} - -func (m *MetaStore) SetUint64(path string, val uint64) error { - return m.Set(path, strconv.FormatUint(val, 10)) -} - -func (m *MetaStore) GetUint64(path string) (val uint64, err error) { - if b, e := m.Get(path); e == nil { - val, err = strconv.ParseUint(b, 10, 64) - return - } else { - if e != nil { - return 0, e - } - err = errors.New("Not found value for " + path) - } - return -} diff --git a/go/sequence/memory_sequencer.go b/go/sequence/memory_sequencer.go index d72952ff4..c7ee1ae8f 100644 --- a/go/sequence/memory_sequencer.go +++ b/go/sequence/memory_sequencer.go @@ -1,10 +1,13 @@ package sequence -import () +import ( + "sync" +) // just for testing type MemorySequencer struct { - counter uint64 + counter uint64 + sequenceLock sync.Mutex } func NewMemorySequencer() (m *MemorySequencer) { @@ -13,7 +16,21 @@ func NewMemorySequencer() (m *MemorySequencer) { } func (m *MemorySequencer) NextFileId(count int) (uint64, int) { + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() ret := m.counter m.counter += uint64(count) return ret, count } + +func (m *MemorySequencer) SetMax(seenValue uint64) { + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() + if m.counter <= seenValue { + m.counter = seenValue + 1 + } +} + +func (m *MemorySequencer) Peek() uint64 { + return m.counter +} diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go index 493804ec6..5a1bceaaf 100644 --- a/go/sequence/sequence.go +++ b/go/sequence/sequence.go @@ -1,89 +1,9 @@ package sequence -import ( - "bytes" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/metastore" - "encoding/gob" - "sync" -) - -const ( - FileIdSaveInterval = 10000 -) +import () type Sequencer interface { NextFileId(count int) (uint64, int) -} -type SequencerImpl struct { - fileFullPath string - - volumeLock sync.Mutex - sequenceLock sync.Mutex - - FileIdSequence uint64 - fileIdCounter uint64 - - metaStore *metastore.MetaStore -} - -func NewFileSequencer(filepath string) (m *SequencerImpl) { - m = &SequencerImpl{fileFullPath: filepath} - m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()} - m.initilize() - return -} - -func (m *SequencerImpl) initilize() { - if !m.metaStore.Has(m.fileFullPath) { - m.FileIdSequence = FileIdSaveInterval - glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence) - } else { - var err error - if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil { - if data, err := m.metaStore.Get(m.fileFullPath); err == nil { - m.FileIdSequence = decode(data) - glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence) - } else { - glog.V(0).Infof("No existing FileIdSequence: %s", err) - } - } else { - glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence) - } - //in case the server stops between intervals - } - return -} - -//count should be 1 or more -func (m *SequencerImpl) NextFileId(count int) (uint64, int) { - if count <= 0 { - return 0, 0 - } - m.sequenceLock.Lock() - defer m.sequenceLock.Unlock() - if m.fileIdCounter < uint64(count) { - m.fileIdCounter = FileIdSaveInterval - m.FileIdSequence += FileIdSaveInterval - m.saveSequence() - } - m.fileIdCounter = m.fileIdCounter - uint64(count) - return m.FileIdSequence - m.fileIdCounter - uint64(count), count -} -func (m *SequencerImpl) saveSequence() { - glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath) - if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil { - glog.Fatalf("Sequence id Save [ERROR] %s", e) - } -} - -//decode are for backward compatible purpose -func decode(input string) uint64 { - var x uint64 - b := bytes.NewReader([]byte(input)) - decoder := gob.NewDecoder(b) - if e := decoder.Decode(&x); e == nil { - return x - } - return 0 + SetMax(uint64) + Peek() uint64 } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index 8be302111..14437c45b 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -80,8 +80,8 @@ func (m cdbMap) FileCount() int { func (m *cdbMap) DeletedCount() int { return m.DeletionCounter } -func (m *cdbMap) NextFileKey(count int) uint64 { - return 0 +func (m *cdbMap) MaxFileKey() uint64 { + return m.MaximumFileKey } func getMetric(c *cdb.Cdb, m *mapMetric) error { diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 9b8776c5e..9c77fcf73 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -19,7 +19,7 @@ type NeedleMapper interface { FileCount() int DeletedCount() int Visit(visit func(NeedleValue) error) (err error) - NextFileKey(count int) uint64 + MaxFileKey() uint64 } type mapMetric struct { @@ -110,6 +110,9 @@ func walkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } oldSize := nm.m.Set(Key(key), offset, size) bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) @@ -172,11 +175,3 @@ func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { func (nm NeedleMap) MaxFileKey() uint64 { return nm.MaximumFileKey } -func (nm NeedleMap) NextFileKey(count int) (ret uint64) { - if count <= 0 { - return 0 - } - ret = nm.MaximumFileKey - nm.MaximumFileKey += uint64(count) - return -} diff --git a/go/storage/store.go b/go/storage/store.go index 157344781..a4263dcac 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -44,6 +44,9 @@ func (mn *MasterNodes) findMaster() (string, error) { if mn.lastNode < 0 { for _, m := range mn.nodes { if masters, e := operation.ListMasters(m); e == nil { + if len(masters) == 0 { + continue + } mn.nodes = masters mn.lastNode = rand.Intn(len(mn.nodes)) glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode]) @@ -268,6 +271,7 @@ func (s *Store) Join() error { } stats := new([]*VolumeInfo) maxVolumeCount := 0 + var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { @@ -280,6 +284,9 @@ func (s *Store) Join() error { DeletedByteCount: v.nm.DeletedSize(), ReadOnly: v.readOnly} *stats = append(*stats, s) + if maxFileKey < v.nm.MaxFileKey() { + maxFileKey = v.nm.MaxFileKey() + } } } bytes, _ := json.Marshal(stats) @@ -292,6 +299,7 @@ func (s *Store) Join() error { values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) + values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10)) values.Add("dataCenter", s.dataCenter) values.Add("rack", s.rack) jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values) diff --git a/go/topology/topology.go b/go/topology/topology.go index b1fa3f2a2..9db3e78ae 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -19,7 +19,7 @@ type Topology struct { volumeSizeLimit uint64 - sequence sequence.Sequencer + Sequence sequence.Sequencer chanDeadDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode @@ -40,7 +40,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit - t.sequence = seq + t.Sequence = seq t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) @@ -118,7 +118,7 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } - fileId, count := t.sequence.NextFileId(count) + fileId, count := t.Sequence.NextFileId(count) return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } @@ -143,7 +143,8 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) { + t.Sequence.SetMax(maxFileKey) dcName, rackName = t.configuration.Locate(ip, dcName, rackName) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) diff --git a/go/weed/master.go b/go/weed/master.go index 1efa0e79c..c494ff42b 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -72,11 +72,12 @@ func runMaster(cmd *Command, args []string) bool { go func() { time.Sleep(100 * time.Millisecond) + myAddress := *masterIp + ":" + strconv.Itoa(*mport) var peers []string if *masterPeers != "" { peers = strings.Split(*masterPeers, ",") } - raftServer := weed_server.NewRaftServer(r, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse) + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *metaFolder, ms.Topo, *mpulse) ms.SetRaftServer(raftServer) }() diff --git a/go/weed/server.go b/go/weed/server.go index 87b541fd3..61e42fb36 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -164,11 +164,12 @@ func runServer(cmd *Command, args []string) bool { go func() { raftWaitForMaster.Wait() time.Sleep(100 * time.Millisecond) + myAddress := *serverIp + ":" + strconv.Itoa(*masterPort) var peers []string if *serverPeers != "" { peers = strings.Split(*serverPeers, ",") } - raftServer := weed_server.NewRaftServer(r, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse) + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse) ms.SetRaftServer(raftServer) volumeWait.Done() }() diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index b932e1b11..874688cbb 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -11,7 +11,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "path" "sync" ) @@ -48,7 +47,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, whiteList: whiteList, } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) + seq := sequence.NewMemorySequencer() var e error if ms.Topo, e = topology.NewTopology("topo", confFile, seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { @@ -97,7 +96,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer.Leader() != "" { + } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { ms.bounedLeaderChan <- 1 defer func() { <-ms.bounedLeaderChan }() targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) @@ -111,7 +110,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy.ServeHTTP(w, r) } else { //drop it to the floor - writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) } } } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index d34baa349..e549a1dfb 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -36,6 +36,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } port, _ := strconv.Atoi(r.FormValue("port")) maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64) s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) @@ -44,7 +45,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { return } debug(s, "volumes", r.FormValue("volumes")) - ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) + ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack")) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index b5a9dd9b6..f67caaebd 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -77,13 +77,6 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin return nil } } - var err error - for err != nil { - glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...") - time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond) - err = s.Join(s.peers) - } - glog.V(0).Infoln("Joined cluster") } // Initialize the server by joining itself. @@ -124,14 +117,17 @@ func (s *RaftServer) Join(peers []string) error { ConnectionString: "http://" + s.httpAddr, } + var err error var b bytes.Buffer json.NewEncoder(&b).Encode(command) - for _, m := range peers { + if m == s.httpAddr { + continue + } target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) glog.V(0).Infoln("Attempting to connect to:", target) - err := postFollowingOneRedirect(target, "application/json", &b) + err = postFollowingOneRedirect(target, "application/json", &b) if err != nil { glog.V(0).Infoln("Post returned error: ", err.Error()) @@ -139,11 +135,9 @@ func (s *RaftServer) Join(peers []string) error { // If we receive a network error try the next member continue } - - return err + } else { + return nil } - - return nil } return errors.New("Could not connect to any cluster peers") diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go index 6a3c58b29..1ce24a963 100644 --- a/go/weed/weed_server/raft_server_handlers.go +++ b/go/weed/weed_server/raft_server_handlers.go @@ -18,7 +18,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { commandText, _ := ioutil.ReadAll(req.Body) glog.V(0).Info("Command:", string(commandText)) if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { - glog.V(0).Infoln("Error decoding json message:", err) + glog.V(0).Infoln("Error decoding json message:", err, string(commandText)) http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 377e1e99c..a2db0d8a9 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -54,10 +54,10 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicUrl string, fo if err == nil { if !connected { connected = true - glog.V(0).Infoln("Reconnected with master") + glog.V(0).Infoln("Volume Server Connected with master") } } else { - glog.V(4).Infoln("Failing to talk with master:", err.Error()) + glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error()) if connected { connected = false }