diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 0bfa12180..c05aae745 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -7,6 +7,14 @@ import ( "github.com/chrislusf/weed-fs/go/util" ) +type NeedleMapType int + +const ( + NeedleMapInMemory NeedleMapType = iota + NeedleMapLevelDb + NeedleMapBoltDb +) + type NeedleMapper interface { Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go new file mode 100644 index 000000000..bef10299a --- /dev/null +++ b/go/storage/needle_map_boltdb.go @@ -0,0 +1,182 @@ +package storage + +import ( + "fmt" + "os" + + "github.com/boltdb/bolt" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type BoltDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *bolt.DB + mapMetric +} + +var boltdbBucket = []byte("weed") + +func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { + m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isBoltDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateBoltDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isBoltDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(dbFileName) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateBoltDbFile(dbFileName string, indexFile *os.File) error { + db, err := bolt.Open(dbFileName, 0644, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + boltDbWrite(db, key, offset, size) + } else { + boltDbDelete(db, key) + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + var data []byte + util.Uint64toBytes(bytes, key) + err := m.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(boltdbBucket) + if bucket == nil { + return fmt.Errorf("Bucket %q not found!", boltdbBucket) + } + + data = bucket.Get(bytes) + return nil + }) + + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return boltDbWrite(m.db, key, offset, size) +} + +func boltDbWrite(db *bolt.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Put(bytes[0:8], bytes[8:16]) + if err != nil { + return err + } + return nil + }) +} +func boltDbDelete(db *bolt.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Delete(bytes) + if err != nil { + return err + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return boltDbDelete(m.db, key) +} + +func (m *BoltDbNeedleMap) Close() { + m.db.Close() +} + +func (m *BoltDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *BoltDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *BoltDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *BoltDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *BoltDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *BoltDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go index 73595278d..32f763b4f 100644 --- a/go/storage/needle_map_leveldb.go +++ b/go/storage/needle_map_leveldb.go @@ -21,7 +21,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} if !isLevelDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) - generateDbFile(dbFileName, indexFile) + generateLevelDbFile(dbFileName, indexFile) glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) } glog.V(1).Infof("Opening %s...", dbFileName) @@ -54,7 +54,7 @@ func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { return dbStat.ModTime().After(indexStat.ModTime()) } -func generateDbFile(dbFileName string, indexFile *os.File) error { +func generateLevelDbFile(dbFileName string, indexFile *os.File) error { db, err := leveldb.OpenFile(dbFileName, nil) if err != nil { return err @@ -75,7 +75,6 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { util.Uint64toBytes(bytes, key) data, err := m.db.Get(bytes, nil) if err != nil || len(data) != 8 { - glog.V(0).Infof("Failed to get %d %v", key, err) return nil, false } offset := util.BytesToUint32(data[0:4]) diff --git a/go/storage/store.go b/go/storage/store.go index 65e5b218b..eb44bd9d0 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes(useLevelDb) + location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) + e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,7 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, repl return fmt.Errorf("No more free space left") } -func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -208,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } diff --git a/go/storage/volume.go b/go/storage/volume.go index 2b47fb497..e35eeee49 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -14,12 +14,13 @@ import ( ) type Volume struct { - Id VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - readOnly bool + Id VolumeId + dir string + Collection string + dataFile *os.File + nm NeedleMapper + needleMapKind NeedleMapType + readOnly bool SuperBlock @@ -27,20 +28,22 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true, useLevelDb) + v.needleMapKind = needleMapKind + e = v.load(true, true, needleMapKind) return } func (v *Volume) String() string { return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) } -func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { +func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false, false) + v.needleMapKind = needleMapKind + e = v.load(false, false, needleMapKind) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +54,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { var e error fileName := v.FileName() @@ -99,16 +102,22 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bo return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if !useLevelDb { + switch needleMapKind { + case NeedleMapInMemory: glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) if v.nm, e = LoadNeedleMap(indexFile); e != nil { glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) } - } else { + case NeedleMapLevelDb: glog.V(0).Infoln("loading leveldb file", fileName+".ldb") if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } + case NeedleMapBoltDb: + glog.V(0).Infoln("loading boltdb file", fileName+".bdb") + if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { + glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) + } } } return e @@ -263,11 +272,12 @@ func (v *Volume) read(n *Needle) (int, error) { } func ScanVolumeFile(dirname string, collection string, id VolumeId, + needleMapKind NeedleMapType, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, visitNeedle func(n *Needle, offset int64) error) (err error) { var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { return fmt.Errorf("Failed to load volume %d: %v", id, err) } if err = visitSuperBlock(v.SuperBlock); err != nil { diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 9f6f8e35f..eab138000 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false, false); e != nil { + if e = v.load(true, false, v.needleMapKind); e != nil { return e } return nil @@ -63,27 +63,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro now := uint64(time.Now().Unix()) - err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { - _, err = dst.Write(superBlock.Bytes()) - return err - }, true, func(n *Needle, offset int64) error { - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, + func(superBlock SuperBlock) error { + _, err = dst.Write(superBlock.Bytes()) + return err + }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } + nv, ok := v.nm.Get(n.Id) + glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) + if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) + } return nil - } - nv, ok := v.nm.Get(n.Id) - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, err = n.Append(dst, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - new_offset += n.DiskSize() - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) - } - return nil - }) + }) return } diff --git a/go/weed/compact.go b/go/weed/compact.go index 6ce55a609..a5ea8a529 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,8 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, + storage.NeedleMapInMemory, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/export.go b/go/weed/export.go index 9e7012937..b120c2fa7 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -99,23 +99,25 @@ func runExport(cmd *Command, args []string) bool { var version storage.Version - err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error { - version = superBlock.Version() - return nil - }, true, func(n *storage.Needle, offset int64) error { - nv, ok := nm.Get(n.Id) - glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", - n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv) - if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset { - return walker(vid, n, version) - } - if !ok { - glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size) - } else { - glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size) - } - return nil - }) + err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, + storage.NeedleMapInMemory, + func(superBlock storage.SuperBlock) error { + version = superBlock.Version() + return nil + }, true, func(n *storage.Needle, offset int64) error { + nv, ok := nm.Get(n.Id) + glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", + n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv) + if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset { + return walker(vid, n, version) + } + if !ok { + glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size) + } else { + glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size) + } + return nil + }) if err != nil { glog.Fatalf("Export Volume File [ERROR] %s\n", err) } diff --git a/go/weed/fix.go b/go/weed/fix.go index d2cd40398..f51dc1bf2 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -47,19 +47,21 @@ func runFix(cmd *Command, args []string) bool { defer nm.Close() vid := storage.VolumeId(*fixVolumeId) - err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error { - return nil - }, false, func(n *storage.Needle, offset int64) error { - glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) - if n.Size > 0 { - pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) - glog.V(2).Infof("saved %d with error %v", n.Size, pe) - } else { - glog.V(2).Infof("skipping deleted file ...") - return nm.Delete(n.Id) - } - return nil - }) + err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, + storage.NeedleMapInMemory, + func(superBlock storage.SuperBlock) error { + return nil + }, false, func(n *storage.Needle, offset int64) error { + glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) + if n.Size > 0 { + pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) + glog.V(2).Infof("saved %d with error %v", n.Size, pe) + } else { + glog.V(2).Infof("skipping deleted file ...") + return nm.Delete(n.Id) + } + return nil + }) if err != nil { glog.Fatalf("Export Volume File [ERROR] %s\n", err) } diff --git a/go/weed/server.go b/go/weed/server.go index 71346de0a..39d02597b 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/weed/weed_server" "github.com/gorilla/mux" @@ -68,7 +69,7 @@ var ( volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") + volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") @@ -233,10 +234,17 @@ func runServer(cmd *Command, args []string) bool { if isSeperatedPublicPort { publicVolumeMux = http.NewServeMux() } + volumeNeedleMapKind := storage.NeedleMapInMemory + switch *volumeIndexType { + case "leveldb": + volumeNeedleMapKind = storage.NeedleMapLevelDb + case "boltdb": + volumeNeedleMapKind = storage.NeedleMapBoltDb + } volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *serverIp, *volumePort, *serverPublicUrl, folders, maxCounts, - *volumeUseLevelDb, + volumeNeedleMapKind, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, ) diff --git a/go/weed/volume.go b/go/weed/volume.go index e2c6ebd94..2d3ecbb4d 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -9,6 +9,7 @@ import ( "time" "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/weed/weed_server" ) @@ -32,7 +33,7 @@ type VolumeServerOptions struct { dataCenter *string rack *string whiteList []string - useLevelDb *bool + indexType *string fixJpgOrientation *bool } @@ -49,7 +50,7 @@ func init() { v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") - v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") + v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") } @@ -115,10 +116,17 @@ func runVolume(cmd *Command, args []string) bool { publicVolumeMux = http.NewServeMux() } + volumeNeedleMapKind := storage.NeedleMapInMemory + switch *v.indexType { + case "leveldb": + volumeNeedleMapKind = storage.NeedleMapLevelDb + case "boltdb": + volumeNeedleMapKind = storage.NeedleMapBoltDb + } volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, v.folders, v.folderMaxLimits, - *v.useLevelDb, + volumeNeedleMapKind, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index d84b39808..69c944c99 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -20,14 +20,14 @@ type VolumeServer struct { store *storage.Store guard *security.Guard - UseLevelDb bool + needleMapKind storage.NeedleMapType FixJpgOrientation bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int, - useLevelDb bool, + needleMapKind storage.NeedleMapType, masterNode string, pulseSeconds int, dataCenter string, rack string, whiteList []string, @@ -36,11 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, - UseLevelDb: useLevelDb, + needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, } vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, "") @@ -77,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err) + glog.V(0).Infof("Volume Server Failed to talk with master %+v: %v", vs, err) if connected { connected = false } diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 0d70a757e..eb8222ff8 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else {