1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-01 16:22:43 +02:00

volume: load ec shards during heartbeats to master

This commit is contained in:
Chris Lu 2019-05-21 22:41:20 -07:00
parent 54b835e1ae
commit 17ac1290c0
23 changed files with 591 additions and 207 deletions

View file

@ -8,7 +8,7 @@ import (
"strconv" "strconv"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
@ -35,7 +35,7 @@ func main() {
} }
defer indexFile.Close() defer indexFile.Close()
storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
fmt.Printf("key:%v offset:%v size:%v\n", key, offset, size) fmt.Printf("key:%v offset:%v size:%v\n", key, offset, size)
return nil return nil
}) })

View file

@ -222,7 +222,7 @@ func runServer(cmd *Command, args []string) bool {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
} }
// Create your protocol servers. // Create your protocol servers.
glog.V(0).Infof("grpc config %+v", viper.Sub("grpc")) glog.V(1).Infof("grpc config %+v", viper.Sub("grpc"))
grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
master_pb.RegisterSeaweedServer(grpcS, ms) master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer) protobuf.RegisterRaftServer(grpcS, raftServer)

View file

@ -38,6 +38,13 @@ message Heartbeat {
// delta volumes // delta volumes
repeated VolumeShortInformationMessage new_volumes = 10; repeated VolumeShortInformationMessage new_volumes = 10;
repeated VolumeShortInformationMessage deleted_volumes = 11; repeated VolumeShortInformationMessage deleted_volumes = 11;
// erasure coding
repeated VolumeEcShardInformationMessage ec_shards = 16;
// delta erasure coding shards
repeated VolumeEcShardInformationMessage new_ec_shards = 17;
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
} }
message HeartbeatResponse { message HeartbeatResponse {
@ -67,6 +74,12 @@ message VolumeShortInformationMessage {
uint32 ttl = 10; uint32 ttl = 10;
} }
message VolumeEcShardInformationMessage {
uint32 id = 1;
string collection = 2;
uint32 ec_index = 3;
}
message Empty { message Empty {
} }

View file

@ -13,6 +13,7 @@ It has these top-level messages:
HeartbeatResponse HeartbeatResponse
VolumeInformationMessage VolumeInformationMessage
VolumeShortInformationMessage VolumeShortInformationMessage
VolumeEcShardInformationMessage
Empty Empty
SuperBlockExtra SuperBlockExtra
ClientListenRequest ClientListenRequest
@ -72,6 +73,11 @@ type Heartbeat struct {
// delta volumes // delta volumes
NewVolumes []*VolumeShortInformationMessage `protobuf:"bytes,10,rep,name=new_volumes,json=newVolumes" json:"new_volumes,omitempty"` NewVolumes []*VolumeShortInformationMessage `protobuf:"bytes,10,rep,name=new_volumes,json=newVolumes" json:"new_volumes,omitempty"`
DeletedVolumes []*VolumeShortInformationMessage `protobuf:"bytes,11,rep,name=deleted_volumes,json=deletedVolumes" json:"deleted_volumes,omitempty"` DeletedVolumes []*VolumeShortInformationMessage `protobuf:"bytes,11,rep,name=deleted_volumes,json=deletedVolumes" json:"deleted_volumes,omitempty"`
// erasure coding
EcShards []*VolumeEcShardInformationMessage `protobuf:"bytes,16,rep,name=ec_shards,json=ecShards" json:"ec_shards,omitempty"`
// delta erasure coding shards
NewEcShards []*VolumeEcShardInformationMessage `protobuf:"bytes,17,rep,name=new_ec_shards,json=newEcShards" json:"new_ec_shards,omitempty"`
DeletedEcShards []*VolumeEcShardInformationMessage `protobuf:"bytes,18,rep,name=deleted_ec_shards,json=deletedEcShards" json:"deleted_ec_shards,omitempty"`
} }
func (m *Heartbeat) Reset() { *m = Heartbeat{} } func (m *Heartbeat) Reset() { *m = Heartbeat{} }
@ -156,6 +162,27 @@ func (m *Heartbeat) GetDeletedVolumes() []*VolumeShortInformationMessage {
return nil return nil
} }
func (m *Heartbeat) GetEcShards() []*VolumeEcShardInformationMessage {
if m != nil {
return m.EcShards
}
return nil
}
func (m *Heartbeat) GetNewEcShards() []*VolumeEcShardInformationMessage {
if m != nil {
return m.NewEcShards
}
return nil
}
func (m *Heartbeat) GetDeletedEcShards() []*VolumeEcShardInformationMessage {
if m != nil {
return m.DeletedEcShards
}
return nil
}
type HeartbeatResponse struct { type HeartbeatResponse struct {
VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volumeSizeLimit" json:"volumeSizeLimit,omitempty"` VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volumeSizeLimit" json:"volumeSizeLimit,omitempty"`
Leader string `protobuf:"bytes,3,opt,name=leader" json:"leader,omitempty"` Leader string `protobuf:"bytes,3,opt,name=leader" json:"leader,omitempty"`
@ -324,13 +351,45 @@ func (m *VolumeShortInformationMessage) GetTtl() uint32 {
return 0 return 0
} }
type VolumeEcShardInformationMessage struct {
Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
EcIndex uint32 `protobuf:"varint,3,opt,name=ec_index,json=ecIndex" json:"ec_index,omitempty"`
}
func (m *VolumeEcShardInformationMessage) Reset() { *m = VolumeEcShardInformationMessage{} }
func (m *VolumeEcShardInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeEcShardInformationMessage) ProtoMessage() {}
func (*VolumeEcShardInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *VolumeEcShardInformationMessage) GetId() uint32 {
if m != nil {
return m.Id
}
return 0
}
func (m *VolumeEcShardInformationMessage) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
func (m *VolumeEcShardInformationMessage) GetEcIndex() uint32 {
if m != nil {
return m.EcIndex
}
return 0
}
type Empty struct { type Empty struct {
} }
func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) } func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {} func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
type SuperBlockExtra struct { type SuperBlockExtra struct {
ErasureCoding *SuperBlockExtra_ErasureCoding `protobuf:"bytes,1,opt,name=erasure_coding,json=erasureCoding" json:"erasure_coding,omitempty"` ErasureCoding *SuperBlockExtra_ErasureCoding `protobuf:"bytes,1,opt,name=erasure_coding,json=erasureCoding" json:"erasure_coding,omitempty"`
@ -339,7 +398,7 @@ type SuperBlockExtra struct {
func (m *SuperBlockExtra) Reset() { *m = SuperBlockExtra{} } func (m *SuperBlockExtra) Reset() { *m = SuperBlockExtra{} }
func (m *SuperBlockExtra) String() string { return proto.CompactTextString(m) } func (m *SuperBlockExtra) String() string { return proto.CompactTextString(m) }
func (*SuperBlockExtra) ProtoMessage() {} func (*SuperBlockExtra) ProtoMessage() {}
func (*SuperBlockExtra) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } func (*SuperBlockExtra) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *SuperBlockExtra) GetErasureCoding() *SuperBlockExtra_ErasureCoding { func (m *SuperBlockExtra) GetErasureCoding() *SuperBlockExtra_ErasureCoding {
if m != nil { if m != nil {
@ -358,7 +417,7 @@ func (m *SuperBlockExtra_ErasureCoding) Reset() { *m = SuperBlockExtra_E
func (m *SuperBlockExtra_ErasureCoding) String() string { return proto.CompactTextString(m) } func (m *SuperBlockExtra_ErasureCoding) String() string { return proto.CompactTextString(m) }
func (*SuperBlockExtra_ErasureCoding) ProtoMessage() {} func (*SuperBlockExtra_ErasureCoding) ProtoMessage() {}
func (*SuperBlockExtra_ErasureCoding) Descriptor() ([]byte, []int) { func (*SuperBlockExtra_ErasureCoding) Descriptor() ([]byte, []int) {
return fileDescriptor0, []int{5, 0} return fileDescriptor0, []int{6, 0}
} }
func (m *SuperBlockExtra_ErasureCoding) GetData() uint32 { func (m *SuperBlockExtra_ErasureCoding) GetData() uint32 {
@ -389,7 +448,7 @@ type ClientListenRequest struct {
func (m *ClientListenRequest) Reset() { *m = ClientListenRequest{} } func (m *ClientListenRequest) Reset() { *m = ClientListenRequest{} }
func (m *ClientListenRequest) String() string { return proto.CompactTextString(m) } func (m *ClientListenRequest) String() string { return proto.CompactTextString(m) }
func (*ClientListenRequest) ProtoMessage() {} func (*ClientListenRequest) ProtoMessage() {}
func (*ClientListenRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } func (*ClientListenRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *ClientListenRequest) GetName() string { func (m *ClientListenRequest) GetName() string {
if m != nil { if m != nil {
@ -408,7 +467,7 @@ type VolumeLocation struct {
func (m *VolumeLocation) Reset() { *m = VolumeLocation{} } func (m *VolumeLocation) Reset() { *m = VolumeLocation{} }
func (m *VolumeLocation) String() string { return proto.CompactTextString(m) } func (m *VolumeLocation) String() string { return proto.CompactTextString(m) }
func (*VolumeLocation) ProtoMessage() {} func (*VolumeLocation) ProtoMessage() {}
func (*VolumeLocation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (*VolumeLocation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *VolumeLocation) GetUrl() string { func (m *VolumeLocation) GetUrl() string {
if m != nil { if m != nil {
@ -446,7 +505,7 @@ type LookupVolumeRequest struct {
func (m *LookupVolumeRequest) Reset() { *m = LookupVolumeRequest{} } func (m *LookupVolumeRequest) Reset() { *m = LookupVolumeRequest{} }
func (m *LookupVolumeRequest) String() string { return proto.CompactTextString(m) } func (m *LookupVolumeRequest) String() string { return proto.CompactTextString(m) }
func (*LookupVolumeRequest) ProtoMessage() {} func (*LookupVolumeRequest) ProtoMessage() {}
func (*LookupVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } func (*LookupVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
func (m *LookupVolumeRequest) GetVolumeIds() []string { func (m *LookupVolumeRequest) GetVolumeIds() []string {
if m != nil { if m != nil {
@ -469,7 +528,7 @@ type LookupVolumeResponse struct {
func (m *LookupVolumeResponse) Reset() { *m = LookupVolumeResponse{} } func (m *LookupVolumeResponse) Reset() { *m = LookupVolumeResponse{} }
func (m *LookupVolumeResponse) String() string { return proto.CompactTextString(m) } func (m *LookupVolumeResponse) String() string { return proto.CompactTextString(m) }
func (*LookupVolumeResponse) ProtoMessage() {} func (*LookupVolumeResponse) ProtoMessage() {}
func (*LookupVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } func (*LookupVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *LookupVolumeResponse) GetVolumeIdLocations() []*LookupVolumeResponse_VolumeIdLocation { func (m *LookupVolumeResponse) GetVolumeIdLocations() []*LookupVolumeResponse_VolumeIdLocation {
if m != nil { if m != nil {
@ -488,7 +547,7 @@ func (m *LookupVolumeResponse_VolumeIdLocation) Reset() { *m = LookupVol
func (m *LookupVolumeResponse_VolumeIdLocation) String() string { return proto.CompactTextString(m) } func (m *LookupVolumeResponse_VolumeIdLocation) String() string { return proto.CompactTextString(m) }
func (*LookupVolumeResponse_VolumeIdLocation) ProtoMessage() {} func (*LookupVolumeResponse_VolumeIdLocation) ProtoMessage() {}
func (*LookupVolumeResponse_VolumeIdLocation) Descriptor() ([]byte, []int) { func (*LookupVolumeResponse_VolumeIdLocation) Descriptor() ([]byte, []int) {
return fileDescriptor0, []int{9, 0} return fileDescriptor0, []int{10, 0}
} }
func (m *LookupVolumeResponse_VolumeIdLocation) GetVolumeId() string { func (m *LookupVolumeResponse_VolumeIdLocation) GetVolumeId() string {
@ -520,7 +579,7 @@ type Location struct {
func (m *Location) Reset() { *m = Location{} } func (m *Location) Reset() { *m = Location{} }
func (m *Location) String() string { return proto.CompactTextString(m) } func (m *Location) String() string { return proto.CompactTextString(m) }
func (*Location) ProtoMessage() {} func (*Location) ProtoMessage() {}
func (*Location) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } func (*Location) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
func (m *Location) GetUrl() string { func (m *Location) GetUrl() string {
if m != nil { if m != nil {
@ -549,7 +608,7 @@ type AssignRequest struct {
func (m *AssignRequest) Reset() { *m = AssignRequest{} } func (m *AssignRequest) Reset() { *m = AssignRequest{} }
func (m *AssignRequest) String() string { return proto.CompactTextString(m) } func (m *AssignRequest) String() string { return proto.CompactTextString(m) }
func (*AssignRequest) ProtoMessage() {} func (*AssignRequest) ProtoMessage() {}
func (*AssignRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } func (*AssignRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
func (m *AssignRequest) GetCount() uint64 { func (m *AssignRequest) GetCount() uint64 {
if m != nil { if m != nil {
@ -612,7 +671,7 @@ type AssignResponse struct {
func (m *AssignResponse) Reset() { *m = AssignResponse{} } func (m *AssignResponse) Reset() { *m = AssignResponse{} }
func (m *AssignResponse) String() string { return proto.CompactTextString(m) } func (m *AssignResponse) String() string { return proto.CompactTextString(m) }
func (*AssignResponse) ProtoMessage() {} func (*AssignResponse) ProtoMessage() {}
func (*AssignResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } func (*AssignResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
func (m *AssignResponse) GetFid() string { func (m *AssignResponse) GetFid() string {
if m != nil { if m != nil {
@ -665,7 +724,7 @@ type StatisticsRequest struct {
func (m *StatisticsRequest) Reset() { *m = StatisticsRequest{} } func (m *StatisticsRequest) Reset() { *m = StatisticsRequest{} }
func (m *StatisticsRequest) String() string { return proto.CompactTextString(m) } func (m *StatisticsRequest) String() string { return proto.CompactTextString(m) }
func (*StatisticsRequest) ProtoMessage() {} func (*StatisticsRequest) ProtoMessage() {}
func (*StatisticsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } func (*StatisticsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} }
func (m *StatisticsRequest) GetReplication() string { func (m *StatisticsRequest) GetReplication() string {
if m != nil { if m != nil {
@ -700,7 +759,7 @@ type StatisticsResponse struct {
func (m *StatisticsResponse) Reset() { *m = StatisticsResponse{} } func (m *StatisticsResponse) Reset() { *m = StatisticsResponse{} }
func (m *StatisticsResponse) String() string { return proto.CompactTextString(m) } func (m *StatisticsResponse) String() string { return proto.CompactTextString(m) }
func (*StatisticsResponse) ProtoMessage() {} func (*StatisticsResponse) ProtoMessage() {}
func (*StatisticsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } func (*StatisticsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} }
func (m *StatisticsResponse) GetReplication() string { func (m *StatisticsResponse) GetReplication() string {
if m != nil { if m != nil {
@ -752,7 +811,7 @@ type StorageType struct {
func (m *StorageType) Reset() { *m = StorageType{} } func (m *StorageType) Reset() { *m = StorageType{} }
func (m *StorageType) String() string { return proto.CompactTextString(m) } func (m *StorageType) String() string { return proto.CompactTextString(m) }
func (*StorageType) ProtoMessage() {} func (*StorageType) ProtoMessage() {}
func (*StorageType) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } func (*StorageType) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} }
func (m *StorageType) GetReplication() string { func (m *StorageType) GetReplication() string {
if m != nil { if m != nil {
@ -775,7 +834,7 @@ type Collection struct {
func (m *Collection) Reset() { *m = Collection{} } func (m *Collection) Reset() { *m = Collection{} }
func (m *Collection) String() string { return proto.CompactTextString(m) } func (m *Collection) String() string { return proto.CompactTextString(m) }
func (*Collection) ProtoMessage() {} func (*Collection) ProtoMessage() {}
func (*Collection) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } func (*Collection) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} }
func (m *Collection) GetName() string { func (m *Collection) GetName() string {
if m != nil { if m != nil {
@ -790,7 +849,7 @@ type CollectionListRequest struct {
func (m *CollectionListRequest) Reset() { *m = CollectionListRequest{} } func (m *CollectionListRequest) Reset() { *m = CollectionListRequest{} }
func (m *CollectionListRequest) String() string { return proto.CompactTextString(m) } func (m *CollectionListRequest) String() string { return proto.CompactTextString(m) }
func (*CollectionListRequest) ProtoMessage() {} func (*CollectionListRequest) ProtoMessage() {}
func (*CollectionListRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } func (*CollectionListRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
type CollectionListResponse struct { type CollectionListResponse struct {
Collections []*Collection `protobuf:"bytes,1,rep,name=collections" json:"collections,omitempty"` Collections []*Collection `protobuf:"bytes,1,rep,name=collections" json:"collections,omitempty"`
@ -799,7 +858,7 @@ type CollectionListResponse struct {
func (m *CollectionListResponse) Reset() { *m = CollectionListResponse{} } func (m *CollectionListResponse) Reset() { *m = CollectionListResponse{} }
func (m *CollectionListResponse) String() string { return proto.CompactTextString(m) } func (m *CollectionListResponse) String() string { return proto.CompactTextString(m) }
func (*CollectionListResponse) ProtoMessage() {} func (*CollectionListResponse) ProtoMessage() {}
func (*CollectionListResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } func (*CollectionListResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} }
func (m *CollectionListResponse) GetCollections() []*Collection { func (m *CollectionListResponse) GetCollections() []*Collection {
if m != nil { if m != nil {
@ -815,7 +874,7 @@ type CollectionDeleteRequest struct {
func (m *CollectionDeleteRequest) Reset() { *m = CollectionDeleteRequest{} } func (m *CollectionDeleteRequest) Reset() { *m = CollectionDeleteRequest{} }
func (m *CollectionDeleteRequest) String() string { return proto.CompactTextString(m) } func (m *CollectionDeleteRequest) String() string { return proto.CompactTextString(m) }
func (*CollectionDeleteRequest) ProtoMessage() {} func (*CollectionDeleteRequest) ProtoMessage() {}
func (*CollectionDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } func (*CollectionDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} }
func (m *CollectionDeleteRequest) GetName() string { func (m *CollectionDeleteRequest) GetName() string {
if m != nil { if m != nil {
@ -830,7 +889,7 @@ type CollectionDeleteResponse struct {
func (m *CollectionDeleteResponse) Reset() { *m = CollectionDeleteResponse{} } func (m *CollectionDeleteResponse) Reset() { *m = CollectionDeleteResponse{} }
func (m *CollectionDeleteResponse) String() string { return proto.CompactTextString(m) } func (m *CollectionDeleteResponse) String() string { return proto.CompactTextString(m) }
func (*CollectionDeleteResponse) ProtoMessage() {} func (*CollectionDeleteResponse) ProtoMessage() {}
func (*CollectionDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} } func (*CollectionDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} }
// //
// volume related // volume related
@ -847,7 +906,7 @@ type DataNodeInfo struct {
func (m *DataNodeInfo) Reset() { *m = DataNodeInfo{} } func (m *DataNodeInfo) Reset() { *m = DataNodeInfo{} }
func (m *DataNodeInfo) String() string { return proto.CompactTextString(m) } func (m *DataNodeInfo) String() string { return proto.CompactTextString(m) }
func (*DataNodeInfo) ProtoMessage() {} func (*DataNodeInfo) ProtoMessage() {}
func (*DataNodeInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} } func (*DataNodeInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} }
func (m *DataNodeInfo) GetId() string { func (m *DataNodeInfo) GetId() string {
if m != nil { if m != nil {
@ -903,7 +962,7 @@ type RackInfo struct {
func (m *RackInfo) Reset() { *m = RackInfo{} } func (m *RackInfo) Reset() { *m = RackInfo{} }
func (m *RackInfo) String() string { return proto.CompactTextString(m) } func (m *RackInfo) String() string { return proto.CompactTextString(m) }
func (*RackInfo) ProtoMessage() {} func (*RackInfo) ProtoMessage() {}
func (*RackInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} } func (*RackInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} }
func (m *RackInfo) GetId() string { func (m *RackInfo) GetId() string {
if m != nil { if m != nil {
@ -959,7 +1018,7 @@ type DataCenterInfo struct {
func (m *DataCenterInfo) Reset() { *m = DataCenterInfo{} } func (m *DataCenterInfo) Reset() { *m = DataCenterInfo{} }
func (m *DataCenterInfo) String() string { return proto.CompactTextString(m) } func (m *DataCenterInfo) String() string { return proto.CompactTextString(m) }
func (*DataCenterInfo) ProtoMessage() {} func (*DataCenterInfo) ProtoMessage() {}
func (*DataCenterInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} } func (*DataCenterInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} }
func (m *DataCenterInfo) GetId() string { func (m *DataCenterInfo) GetId() string {
if m != nil { if m != nil {
@ -1015,7 +1074,7 @@ type TopologyInfo struct {
func (m *TopologyInfo) Reset() { *m = TopologyInfo{} } func (m *TopologyInfo) Reset() { *m = TopologyInfo{} }
func (m *TopologyInfo) String() string { return proto.CompactTextString(m) } func (m *TopologyInfo) String() string { return proto.CompactTextString(m) }
func (*TopologyInfo) ProtoMessage() {} func (*TopologyInfo) ProtoMessage() {}
func (*TopologyInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} } func (*TopologyInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} }
func (m *TopologyInfo) GetId() string { func (m *TopologyInfo) GetId() string {
if m != nil { if m != nil {
@ -1065,7 +1124,7 @@ type VolumeListRequest struct {
func (m *VolumeListRequest) Reset() { *m = VolumeListRequest{} } func (m *VolumeListRequest) Reset() { *m = VolumeListRequest{} }
func (m *VolumeListRequest) String() string { return proto.CompactTextString(m) } func (m *VolumeListRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeListRequest) ProtoMessage() {} func (*VolumeListRequest) ProtoMessage() {}
func (*VolumeListRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} } func (*VolumeListRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} }
type VolumeListResponse struct { type VolumeListResponse struct {
TopologyInfo *TopologyInfo `protobuf:"bytes,1,opt,name=topology_info,json=topologyInfo" json:"topology_info,omitempty"` TopologyInfo *TopologyInfo `protobuf:"bytes,1,opt,name=topology_info,json=topologyInfo" json:"topology_info,omitempty"`
@ -1075,7 +1134,7 @@ type VolumeListResponse struct {
func (m *VolumeListResponse) Reset() { *m = VolumeListResponse{} } func (m *VolumeListResponse) Reset() { *m = VolumeListResponse{} }
func (m *VolumeListResponse) String() string { return proto.CompactTextString(m) } func (m *VolumeListResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeListResponse) ProtoMessage() {} func (*VolumeListResponse) ProtoMessage() {}
func (*VolumeListResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} } func (*VolumeListResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27} }
func (m *VolumeListResponse) GetTopologyInfo() *TopologyInfo { func (m *VolumeListResponse) GetTopologyInfo() *TopologyInfo {
if m != nil { if m != nil {
@ -1096,6 +1155,7 @@ func init() {
proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse") proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse")
proto.RegisterType((*VolumeInformationMessage)(nil), "master_pb.VolumeInformationMessage") proto.RegisterType((*VolumeInformationMessage)(nil), "master_pb.VolumeInformationMessage")
proto.RegisterType((*VolumeShortInformationMessage)(nil), "master_pb.VolumeShortInformationMessage") proto.RegisterType((*VolumeShortInformationMessage)(nil), "master_pb.VolumeShortInformationMessage")
proto.RegisterType((*VolumeEcShardInformationMessage)(nil), "master_pb.VolumeEcShardInformationMessage")
proto.RegisterType((*Empty)(nil), "master_pb.Empty") proto.RegisterType((*Empty)(nil), "master_pb.Empty")
proto.RegisterType((*SuperBlockExtra)(nil), "master_pb.SuperBlockExtra") proto.RegisterType((*SuperBlockExtra)(nil), "master_pb.SuperBlockExtra")
proto.RegisterType((*SuperBlockExtra_ErasureCoding)(nil), "master_pb.SuperBlockExtra.ErasureCoding") proto.RegisterType((*SuperBlockExtra_ErasureCoding)(nil), "master_pb.SuperBlockExtra.ErasureCoding")
@ -1494,98 +1554,104 @@ var _Seaweed_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("master.proto", fileDescriptor0) } func init() { proto.RegisterFile("master.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 1482 bytes of a gzipped FileDescriptorProto // 1572 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xd4, 0x58, 0x4d, 0x6f, 0xdb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xd4, 0x58, 0xcd, 0x6e, 0xdc, 0x46,
0x13, 0x36, 0xa9, 0x0f, 0x4b, 0xa3, 0xef, 0xb5, 0x93, 0x30, 0xca, 0x9b, 0x44, 0x61, 0x2e, 0xca, 0x12, 0x16, 0xe7, 0x4f, 0x33, 0x35, 0x9a, 0xd1, 0x4c, 0x4b, 0xb6, 0xa9, 0xf1, 0xda, 0x1e, 0xd3,
0xfb, 0xe1, 0x37, 0x75, 0x0f, 0x3d, 0xb4, 0x45, 0x90, 0x38, 0x0e, 0x6a, 0xc4, 0x69, 0x12, 0x2a, 0x97, 0xb1, 0x77, 0x57, 0xeb, 0xd5, 0x1e, 0xf6, 0xb0, 0x1b, 0x18, 0xb6, 0x2c, 0x27, 0x82, 0xe5,
0x49, 0x81, 0x02, 0x05, 0xbb, 0x22, 0xd7, 0x0e, 0x61, 0x8a, 0x64, 0xc9, 0x95, 0x63, 0xe5, 0xd2, 0x3f, 0x8e, 0xed, 0x00, 0x01, 0x02, 0x86, 0x22, 0x4b, 0x32, 0x21, 0x0e, 0xc9, 0x90, 0x3d, 0xb2,
0x43, 0x7b, 0x2c, 0xda, 0x43, 0xff, 0x44, 0x7f, 0x45, 0x2f, 0x3d, 0xe6, 0xc7, 0x14, 0xe8, 0xbd, 0xc6, 0x97, 0x1c, 0x92, 0x63, 0x90, 0x1c, 0xf2, 0x12, 0x79, 0x88, 0x20, 0x97, 0x1c, 0xf3, 0x30,
0x40, 0xb1, 0x1f, 0x24, 0x97, 0x94, 0x6c, 0xa7, 0x05, 0x72, 0xc8, 0x6d, 0x77, 0x66, 0x76, 0x76, 0x01, 0x72, 0x0f, 0x10, 0xf4, 0x0f, 0xc9, 0x26, 0x67, 0x24, 0xd9, 0x01, 0x7c, 0xf0, 0xad, 0xbb,
0xf6, 0x99, 0xe1, 0x33, 0x23, 0x41, 0x7b, 0x86, 0x13, 0x4a, 0xe2, 0xad, 0x28, 0x0e, 0x69, 0x88, 0xaa, 0xba, 0xba, 0xfa, 0xeb, 0xea, 0xaf, 0x8a, 0x84, 0x95, 0x89, 0x9d, 0x50, 0x8c, 0x37, 0xa3,
0x9a, 0x62, 0x67, 0x47, 0x53, 0xf3, 0x4d, 0x05, 0x9a, 0x9f, 0x11, 0x1c, 0xd3, 0x29, 0xc1, 0x14, 0x38, 0xa4, 0x21, 0x69, 0x89, 0x99, 0x15, 0xed, 0x1b, 0x3f, 0xd5, 0xa1, 0xf5, 0x09, 0xda, 0x31,
0x75, 0x41, 0xf7, 0x22, 0x43, 0x1b, 0x69, 0xe3, 0xa6, 0xa5, 0x7b, 0x11, 0x42, 0x50, 0x8d, 0xc2, 0xdd, 0x47, 0x9b, 0x92, 0x2e, 0x54, 0xbc, 0x48, 0xd7, 0x86, 0xda, 0xa8, 0x65, 0x56, 0xbc, 0x88,
0x98, 0x1a, 0xfa, 0x48, 0x1b, 0x77, 0x2c, 0xbe, 0x46, 0x57, 0x01, 0xa2, 0xf9, 0xd4, 0xf7, 0x1c, 0x10, 0xa8, 0x45, 0x61, 0x4c, 0xf5, 0xca, 0x50, 0x1b, 0x75, 0x4c, 0x3e, 0x26, 0x57, 0x00, 0xa2,
0x7b, 0x1e, 0xfb, 0x46, 0x85, 0xdb, 0x36, 0x85, 0xe4, 0x79, 0xec, 0xa3, 0x31, 0xf4, 0x67, 0xf8, 0xe9, 0xbe, 0xef, 0x39, 0xd6, 0x34, 0xf6, 0xf5, 0x2a, 0xb7, 0x6d, 0x09, 0xc9, 0x8b, 0xd8, 0x27,
0xc4, 0x3e, 0x0e, 0xfd, 0xf9, 0x8c, 0xd8, 0x4e, 0x38, 0x0f, 0xa8, 0x51, 0xe5, 0xc7, 0xbb, 0x33, 0x23, 0xe8, 0x4d, 0xec, 0x13, 0xeb, 0x38, 0xf4, 0xa7, 0x13, 0xb4, 0x9c, 0x70, 0x1a, 0x50, 0xbd,
0x7c, 0xf2, 0x82, 0x8b, 0x77, 0x98, 0x14, 0x8d, 0x58, 0x54, 0x27, 0xf6, 0x81, 0xe7, 0x13, 0xfb, 0xc6, 0x97, 0x77, 0x27, 0xf6, 0xc9, 0x4b, 0x2e, 0xde, 0x66, 0x52, 0x32, 0x64, 0x51, 0x9d, 0x58,
0x88, 0x2c, 0x8c, 0xda, 0x48, 0x1b, 0x57, 0x2d, 0x98, 0xe1, 0x93, 0x07, 0x9e, 0x4f, 0x1e, 0x92, 0x07, 0x9e, 0x8f, 0xd6, 0x11, 0xce, 0xf4, 0xfa, 0x50, 0x1b, 0xd5, 0x4c, 0x98, 0xd8, 0x27, 0x0f,
0x05, 0xba, 0x0e, 0x2d, 0x17, 0x53, 0x6c, 0x3b, 0x24, 0xa0, 0x24, 0x36, 0xea, 0xfc, 0x2e, 0x60, 0x3c, 0x1f, 0x1f, 0xe2, 0x8c, 0x5c, 0x83, 0xb6, 0x6b, 0x53, 0xdb, 0x72, 0x30, 0xa0, 0x18, 0xeb,
0xa2, 0x1d, 0x2e, 0x61, 0xf1, 0xc5, 0xd8, 0x39, 0x32, 0xd6, 0xb9, 0x86, 0xaf, 0x59, 0x7c, 0xd8, 0x0d, 0xbe, 0x17, 0x30, 0xd1, 0x36, 0x97, 0xb0, 0xf8, 0x62, 0xdb, 0x39, 0xd2, 0x97, 0xb9, 0x86,
0x9d, 0x79, 0x81, 0xcd, 0x23, 0x6f, 0xf0, 0xab, 0x9b, 0x5c, 0xf2, 0x84, 0x85, 0xff, 0x29, 0xac, 0x8f, 0x59, 0x7c, 0xb6, 0x3b, 0xf1, 0x02, 0x8b, 0x47, 0xde, 0xe4, 0x5b, 0xb7, 0xb8, 0xe4, 0x29,
0x8b, 0xd8, 0x12, 0xa3, 0x39, 0xaa, 0x8c, 0x5b, 0xdb, 0x37, 0xb7, 0x32, 0x34, 0xb6, 0x44, 0x78, 0x0b, 0xff, 0x23, 0x58, 0x16, 0xb1, 0x25, 0x7a, 0x6b, 0x58, 0x1d, 0xb5, 0xb7, 0x6e, 0x6c, 0x66,
0x7b, 0xc1, 0x41, 0x18, 0xcf, 0x30, 0xf5, 0xc2, 0xe0, 0x11, 0x49, 0x12, 0x7c, 0x48, 0xac, 0xf4, 0x68, 0x6c, 0x8a, 0xf0, 0x76, 0x83, 0x83, 0x30, 0x9e, 0xd8, 0xd4, 0x0b, 0x83, 0x47, 0x98, 0x24,
0x0c, 0xda, 0x83, 0x56, 0x40, 0x5e, 0xd9, 0xa9, 0x0b, 0xe0, 0x2e, 0xc6, 0x4b, 0x2e, 0x26, 0x2f, 0xf6, 0x21, 0x9a, 0xe9, 0x1a, 0xb2, 0x0b, 0xed, 0x00, 0x5f, 0x5b, 0xa9, 0x0b, 0xe0, 0x2e, 0x46,
0xc3, 0x98, 0xae, 0xf0, 0x03, 0x01, 0x79, 0xf5, 0x42, 0xba, 0x7a, 0x0a, 0x3d, 0x97, 0xf8, 0x84, 0x73, 0x2e, 0xc6, 0xaf, 0xc2, 0x98, 0x2e, 0xf0, 0x03, 0x01, 0xbe, 0x7e, 0x29, 0x5d, 0x3d, 0x83,
0x12, 0x37, 0x73, 0xd7, 0xfa, 0x9b, 0xee, 0xba, 0xd2, 0x81, 0x74, 0x69, 0x3e, 0x87, 0x41, 0x96, 0x55, 0x17, 0x7d, 0xa4, 0xe8, 0x66, 0xee, 0xda, 0xef, 0xe8, 0xae, 0x2b, 0x1d, 0xa4, 0x2e, 0x3f,
0x4c, 0x8b, 0x24, 0x51, 0x18, 0x24, 0x04, 0x8d, 0xa1, 0x27, 0xfc, 0x4f, 0xbc, 0xd7, 0x64, 0xdf, 0x86, 0x16, 0x3a, 0x56, 0xf2, 0xca, 0x8e, 0xdd, 0x44, 0xef, 0x71, 0x67, 0xb7, 0xe6, 0x9c, 0xed,
0x9b, 0x79, 0x94, 0x67, 0xb8, 0x6a, 0x95, 0xc5, 0xe8, 0x22, 0xd4, 0x7d, 0x82, 0x5d, 0x12, 0xcb, 0x38, 0x63, 0x66, 0xb0, 0xc0, 0x5d, 0x13, 0x85, 0x2a, 0x21, 0x8f, 0xa1, 0xc3, 0x8e, 0x99, 0x3b,
0xb4, 0xca, 0x9d, 0xf9, 0xbb, 0x0e, 0xc6, 0x69, 0xd0, 0xf0, 0x9a, 0x71, 0xb9, 0xc7, 0x8e, 0xa5, 0xeb, 0xbf, 0xb3, 0x33, 0x86, 0xd3, 0x4e, 0xea, 0xef, 0x25, 0xf4, 0xd3, 0xb3, 0xe6, 0x3e, 0xc9,
0x7b, 0x2e, 0xcb, 0x49, 0xe2, 0xbd, 0x26, 0xbc, 0x66, 0xaa, 0x16, 0x5f, 0xa3, 0x6b, 0x00, 0x4e, 0x3b, 0xfb, 0x4c, 0x01, 0x4b, 0xfd, 0x1a, 0x2f, 0xa0, 0x9f, 0x65, 0xaf, 0x89, 0x49, 0x14, 0x06,
0xe8, 0xfb, 0xc4, 0x61, 0x07, 0xa5, 0x73, 0x45, 0xc2, 0x72, 0xc6, 0xcb, 0x20, 0x2f, 0x97, 0xaa, 0x09, 0x92, 0x11, 0xac, 0x0a, 0x40, 0xc7, 0xde, 0x1b, 0xdc, 0xf3, 0x26, 0x1e, 0xe5, 0x29, 0x5d,
0xd5, 0x64, 0x12, 0x51, 0x29, 0x37, 0xa0, 0x2d, 0x1e, 0x2a, 0x0d, 0x44, 0xa5, 0xb4, 0x84, 0x4c, 0x33, 0xcb, 0x62, 0x72, 0x11, 0x1a, 0x3e, 0xda, 0x2e, 0xc6, 0x32, 0x8f, 0xe5, 0xcc, 0xf8, 0xad,
0x98, 0xfc, 0x17, 0x50, 0x0a, 0xe6, 0x74, 0x91, 0x19, 0xd6, 0xb9, 0x61, 0x5f, 0x6a, 0xee, 0x2d, 0x02, 0xfa, 0x69, 0xb9, 0xc0, 0x1f, 0x89, 0xcb, 0x3d, 0x76, 0xcc, 0x8a, 0xe7, 0xb2, 0x24, 0x4c,
0x52, 0xeb, 0x2b, 0xd0, 0x8c, 0x09, 0x76, 0xed, 0x30, 0xf0, 0x17, 0xbc, 0x78, 0x1a, 0x56, 0x83, 0xbc, 0x37, 0xc8, 0x1f, 0x49, 0xcd, 0xe4, 0x63, 0x72, 0x15, 0xc0, 0x09, 0x7d, 0x1f, 0x1d, 0xb6,
0x09, 0x1e, 0x07, 0xfe, 0x02, 0xfd, 0x07, 0x06, 0x31, 0x89, 0x7c, 0xcf, 0xc1, 0x76, 0xe4, 0x63, 0x50, 0x3a, 0x57, 0x24, 0x2c, 0x49, 0x79, 0xde, 0xe7, 0xef, 0xa3, 0x66, 0xb6, 0x98, 0x44, 0x3c,
0x87, 0xcc, 0x48, 0x90, 0xd6, 0x51, 0x5f, 0x2a, 0x9e, 0xa4, 0x72, 0x64, 0xc0, 0xfa, 0x31, 0x89, 0x8d, 0xeb, 0xb0, 0x22, 0x4e, 0x2a, 0x0d, 0xc4, 0xd3, 0x68, 0x0b, 0x99, 0x30, 0xf9, 0x07, 0x90,
0x13, 0xf6, 0xac, 0x26, 0x37, 0x49, 0xb7, 0xa8, 0x0f, 0x15, 0x4a, 0x7d, 0x03, 0xb8, 0x94, 0x2d, 0x14, 0xd1, 0xfd, 0x59, 0x66, 0xd8, 0xe0, 0x86, 0x3d, 0xa9, 0xb9, 0x37, 0x4b, 0xad, 0x2f, 0x43,
0xd1, 0x2d, 0xe8, 0x3b, 0xe1, 0x2c, 0xc2, 0x0e, 0xb5, 0x63, 0x72, 0xec, 0xf1, 0x43, 0x2d, 0xae, 0x2b, 0x46, 0xdb, 0xb5, 0xc2, 0xc0, 0x9f, 0xf1, 0xd7, 0xd2, 0x34, 0x9b, 0x4c, 0xf0, 0x24, 0xf0,
0xee, 0x49, 0xb9, 0x25, 0xc5, 0xe6, 0x2f, 0x1a, 0x5c, 0x3d, 0x33, 0xf5, 0x4b, 0xb0, 0x9f, 0x07, 0x67, 0xe4, 0xef, 0xd0, 0x8f, 0x31, 0xf2, 0x3d, 0xc7, 0xb6, 0x22, 0xdf, 0x76, 0x70, 0x82, 0x41,
0xf1, 0xbb, 0x7a, 0x95, 0xb9, 0x0e, 0xb5, 0xdd, 0x59, 0x44, 0x17, 0xe6, 0xaf, 0x1a, 0xf4, 0x26, 0xfa, 0x70, 0x7a, 0x52, 0xf1, 0x34, 0x95, 0x13, 0x1d, 0x96, 0x8f, 0x31, 0x4e, 0xd8, 0xb1, 0x5a,
0xf3, 0x88, 0xc4, 0xf7, 0xfc, 0xd0, 0x39, 0xda, 0x3d, 0xa1, 0x31, 0x46, 0x8f, 0xa1, 0x4b, 0x62, 0xdc, 0x24, 0x9d, 0x92, 0x1e, 0x54, 0x29, 0xf5, 0x75, 0xe0, 0x52, 0x36, 0x24, 0x37, 0xa1, 0xe7,
0x9c, 0xcc, 0x63, 0x96, 0x11, 0xd7, 0x0b, 0x0e, 0x79, 0xc4, 0xc5, 0x12, 0x2f, 0x9d, 0xd9, 0xda, 0x84, 0x93, 0xc8, 0x76, 0xa8, 0x15, 0xe3, 0xb1, 0xc7, 0x17, 0xb5, 0xb9, 0x7a, 0x55, 0xca, 0x4d,
0x15, 0x07, 0x76, 0xb8, 0xbd, 0xd5, 0x21, 0xea, 0x76, 0xf8, 0x25, 0x74, 0x0a, 0x7a, 0x56, 0x6e, 0x29, 0x36, 0x7e, 0xd4, 0xe0, 0xca, 0x99, 0xb9, 0x3e, 0x07, 0xfb, 0x79, 0x10, 0xbf, 0xaf, 0x53,
0x8c, 0x10, 0x24, 0x12, 0x7c, 0xcd, 0xea, 0x38, 0xc2, 0xb1, 0x47, 0x17, 0x92, 0xb8, 0xe4, 0x8e, 0x19, 0x3e, 0x5c, 0x3b, 0x27, 0x4f, 0xcf, 0x89, 0xb5, 0x32, 0x17, 0xeb, 0x06, 0x34, 0xd1, 0xb1,
0x95, 0x99, 0xe4, 0x25, 0xcf, 0x4d, 0x8c, 0xca, 0xa8, 0xc2, 0xa8, 0x41, 0x48, 0xf6, 0xdc, 0xc4, 0xbc, 0xc0, 0xc5, 0x13, 0x7e, 0x92, 0x8e, 0xb9, 0x8c, 0xce, 0x2e, 0x9b, 0x1a, 0xcb, 0x50, 0xdf,
0xbc, 0x05, 0x1b, 0x3b, 0xbe, 0x47, 0x02, 0xba, 0xef, 0x25, 0x94, 0x04, 0x16, 0xf9, 0x66, 0x4e, 0x99, 0x44, 0x74, 0x66, 0xfc, 0xac, 0xc1, 0xea, 0x78, 0x1a, 0x61, 0x7c, 0xcf, 0x0f, 0x9d, 0xa3,
0x12, 0xca, 0x6e, 0x08, 0xf0, 0x8c, 0x48, 0x5a, 0xe4, 0x6b, 0xf3, 0x5b, 0xe8, 0x8a, 0xf4, 0xec, 0x9d, 0x13, 0x1a, 0xdb, 0xe4, 0x09, 0x74, 0x31, 0xb6, 0x93, 0x69, 0xcc, 0xee, 0xdf, 0xf5, 0x82,
0x87, 0x0e, 0x4f, 0x0b, 0x03, 0x86, 0xf1, 0xa1, 0x30, 0x62, 0xcb, 0x12, 0x51, 0xea, 0x65, 0xa2, 0x43, 0xbe, 0x67, 0x91, 0x41, 0x4a, 0x6b, 0x36, 0x77, 0xc4, 0x82, 0x6d, 0x6e, 0x6f, 0x76, 0x50,
0xbc, 0x0c, 0x0d, 0xce, 0x24, 0x79, 0x28, 0xeb, 0x8c, 0x1c, 0x3c, 0x37, 0xc9, 0xeb, 0xdd, 0x15, 0x9d, 0x0e, 0x3e, 0x83, 0x4e, 0x41, 0xcf, 0x92, 0x9b, 0xf1, 0xad, 0x3c, 0x0b, 0x1f, 0xb3, 0x57,
0xea, 0x2a, 0x57, 0xb7, 0xd2, 0x8f, 0xdd, 0x73, 0x13, 0xf3, 0x19, 0x6c, 0xec, 0x87, 0xe1, 0xd1, 0x13, 0xd9, 0xb1, 0x47, 0x67, 0xb2, 0x2e, 0xc8, 0x19, 0x4b, 0x6a, 0x49, 0xfb, 0x9e, 0x9b, 0xe8,
0x3c, 0x12, 0x61, 0xa4, 0xb1, 0x16, 0x5f, 0xa8, 0x8d, 0x2a, 0xec, 0xce, 0xec, 0x85, 0xa5, 0x22, 0xd5, 0x61, 0x95, 0x31, 0xaf, 0x90, 0xec, 0xba, 0x89, 0x71, 0x13, 0xd6, 0xb6, 0x7d, 0x0f, 0x03,
0xd1, 0xcb, 0x45, 0x62, 0xfe, 0xa1, 0xc1, 0x66, 0xd1, 0xad, 0xe4, 0x90, 0xaf, 0x61, 0x23, 0xf3, 0xba, 0xe7, 0x25, 0x14, 0x03, 0x13, 0xbf, 0x9c, 0x62, 0x42, 0xd9, 0x0e, 0x81, 0x3d, 0x41, 0x59,
0x6b, 0xfb, 0xf2, 0xcd, 0xe2, 0x82, 0xd6, 0xf6, 0x6d, 0x25, 0x99, 0xab, 0x4e, 0xa7, 0xb4, 0xea, 0x75, 0xf8, 0xd8, 0xf8, 0x0a, 0xba, 0x02, 0xe2, 0xbd, 0xd0, 0xe1, 0xc0, 0xb2, 0x6b, 0x60, 0xe5,
0xa6, 0x60, 0x59, 0x83, 0xe3, 0x92, 0x24, 0x19, 0x9e, 0x40, 0xbf, 0x6c, 0xc6, 0x3e, 0xd3, 0xec, 0x46, 0x18, 0xb1, 0x61, 0xa9, 0x0e, 0x55, 0xca, 0x75, 0x68, 0x03, 0x9a, 0x9c, 0xa8, 0xf3, 0x50,
0x56, 0x89, 0x6c, 0x23, 0x3d, 0x89, 0x3e, 0x80, 0x66, 0x1e, 0x88, 0xce, 0x03, 0xd9, 0x28, 0x04, 0x96, 0x19, 0xf7, 0x7a, 0x6e, 0x92, 0xbf, 0x2e, 0x57, 0xa8, 0x6b, 0x5c, 0xdd, 0x4e, 0xb9, 0xd4,
0x22, 0xef, 0xca, 0xad, 0xd0, 0x26, 0xd4, 0x48, 0x1c, 0x87, 0x29, 0xbd, 0x89, 0x8d, 0xf9, 0x31, 0x73, 0x13, 0xe3, 0x39, 0xac, 0xed, 0x85, 0xe1, 0xd1, 0x34, 0x12, 0x61, 0xa4, 0xb1, 0x16, 0x4f,
0x34, 0xfe, 0x71, 0x16, 0xcd, 0x37, 0x1a, 0x74, 0xee, 0x26, 0x89, 0x77, 0x98, 0x95, 0xcb, 0x26, 0xa8, 0x0d, 0xab, 0x6c, 0xcf, 0xec, 0x84, 0xe7, 0x5d, 0xb3, 0xf1, 0xbb, 0x06, 0xeb, 0x45, 0xb7,
0xd4, 0x04, 0xf9, 0x08, 0x92, 0x15, 0x1b, 0x34, 0x82, 0x96, 0xfc, 0xca, 0x14, 0xe8, 0x55, 0xd1, 0x92, 0xb1, 0xbe, 0x80, 0xb5, 0xcc, 0xaf, 0xe5, 0xcb, 0x33, 0x8b, 0x0d, 0xda, 0x5b, 0xb7, 0x95,
0xb9, 0x1f, 0xb0, 0xfc, 0xf2, 0xaa, 0x22, 0x34, 0xc6, 0x27, 0xa5, 0xf6, 0x58, 0x3b, 0xb5, 0x3d, 0xcb, 0x5c, 0xb4, 0x3a, 0xad, 0x5a, 0x6e, 0x0a, 0x96, 0xd9, 0x3f, 0x2e, 0x49, 0x92, 0xc1, 0x09,
0xd6, 0x95, 0xf6, 0x78, 0x05, 0x9a, 0xfc, 0x50, 0x10, 0xba, 0x44, 0xf6, 0xcd, 0x06, 0x13, 0x7c, 0xf4, 0xca, 0x66, 0x8c, 0x14, 0xb2, 0x5d, 0x25, 0xb2, 0xcd, 0x74, 0x25, 0xf9, 0x37, 0xb4, 0xf2,
0x1e, 0xba, 0xc4, 0xfc, 0x59, 0x83, 0x6e, 0xfa, 0x1a, 0x99, 0xf9, 0x3e, 0x54, 0x0e, 0x32, 0xf4, 0x40, 0x2a, 0x3c, 0x90, 0xb5, 0x42, 0x20, 0x72, 0xaf, 0xdc, 0x8a, 0xac, 0x43, 0x1d, 0xe3, 0x38,
0xd9, 0x32, 0xc5, 0x48, 0x3f, 0x0d, 0xa3, 0xa5, 0x91, 0x20, 0x43, 0xa4, 0xaa, 0x22, 0x92, 0x25, 0x4c, 0xc9, 0x54, 0x4c, 0x8c, 0xff, 0x41, 0xf3, 0x2f, 0xdf, 0xa2, 0xf1, 0xab, 0x06, 0x9d, 0xbb,
0xa3, 0xa6, 0x24, 0x83, 0x85, 0x8c, 0xe7, 0xf4, 0x65, 0x1a, 0x32, 0x5b, 0x9b, 0x87, 0x30, 0x98, 0x49, 0xe2, 0x1d, 0x66, 0xe9, 0xb2, 0x0e, 0x75, 0x41, 0x75, 0x82, 0xd2, 0xc5, 0x84, 0x0c, 0xa1,
0x50, 0x4c, 0xbd, 0x84, 0x7a, 0x4e, 0x92, 0xc2, 0x5c, 0x02, 0x54, 0x3b, 0x0f, 0x50, 0xfd, 0x34, 0x2d, 0xdf, 0xb4, 0x02, 0xbd, 0x2a, 0x3a, 0x97, 0x2e, 0xe4, 0x3b, 0xaf, 0x89, 0xd0, 0x18, 0x7b,
0x40, 0x2b, 0x19, 0xa0, 0xe6, 0x6f, 0x1a, 0x20, 0xf5, 0x26, 0x09, 0xc1, 0x3b, 0xb8, 0x8a, 0x41, 0x95, 0xba, 0x8f, 0xfa, 0xa9, 0xdd, 0x47, 0x43, 0xe9, 0x3e, 0x2e, 0x43, 0x8b, 0x2f, 0x0a, 0x42,
0x46, 0x43, 0x8a, 0x7d, 0x9b, 0xf7, 0x4a, 0xd9, 0xf1, 0xb8, 0x84, 0xb5, 0x63, 0x96, 0xa5, 0x79, 0x17, 0x65, 0x5b, 0xd2, 0x64, 0x82, 0xc7, 0xa1, 0x8b, 0xc6, 0x0f, 0x1a, 0x74, 0xd3, 0xd3, 0xc8,
0x42, 0x5c, 0xa1, 0x15, 0xed, 0xae, 0xc1, 0x04, 0x5c, 0x59, 0xec, 0x96, 0xf5, 0x52, 0xb7, 0x34, 0x9b, 0xef, 0x41, 0xf5, 0x20, 0x43, 0x9f, 0x0d, 0x53, 0x8c, 0x2a, 0xa7, 0x61, 0x34, 0xd7, 0x71,
0xef, 0x42, 0x6b, 0x42, 0xc3, 0x18, 0x1f, 0x92, 0x67, 0x8b, 0xe8, 0x6d, 0xa2, 0x97, 0xd1, 0xe9, 0x65, 0x88, 0xd4, 0x54, 0x44, 0xb2, 0xcb, 0xa8, 0x2b, 0x97, 0xc1, 0x42, 0xb6, 0xa7, 0xf4, 0x55,
0x39, 0x10, 0x23, 0x80, 0x9d, 0x3c, 0xfa, 0x55, 0x04, 0x78, 0x09, 0x2e, 0xe4, 0x16, 0x8c, 0x2f, 0x1a, 0x32, 0x1b, 0x1b, 0x87, 0xd0, 0x1f, 0x53, 0x9b, 0x7a, 0x09, 0xf5, 0x9c, 0x24, 0x85, 0xb9,
0x65, 0x5e, 0xcc, 0xa7, 0x70, 0xb1, 0xac, 0x90, 0x30, 0x7e, 0x04, 0xad, 0x1c, 0x92, 0x94, 0x3b, 0x04, 0xa8, 0x76, 0x1e, 0xa0, 0x95, 0xd3, 0x00, 0xad, 0x66, 0x80, 0x1a, 0xbf, 0x68, 0x40, 0xd4,
0x2e, 0x28, 0x9f, 0x6c, 0x7e, 0xce, 0x52, 0x2d, 0xcd, 0xff, 0xc1, 0xa5, 0x5c, 0x75, 0x9f, 0x93, 0x9d, 0x24, 0x04, 0xef, 0x61, 0x2b, 0x06, 0x19, 0x0d, 0xa9, 0xed, 0x5b, 0xbc, 0x32, 0xcb, 0xfa,
0xe0, 0x59, 0xdc, 0x3c, 0x04, 0x63, 0xd9, 0x5c, 0xc4, 0x60, 0xfe, 0xa4, 0x43, 0xfb, 0xbe, 0xac, 0xca, 0x25, 0xac, 0xf8, 0xb3, 0x5b, 0x9a, 0x26, 0xe8, 0x0a, 0xad, 0x28, 0xae, 0x4d, 0x26, 0xe0,
0x76, 0xd6, 0x54, 0x95, 0x36, 0xda, 0xe4, 0x6d, 0xf4, 0x06, 0xb4, 0x0b, 0xa3, 0xab, 0x98, 0x62, 0xca, 0x62, 0x6d, 0x6e, 0x94, 0x6a, 0xb3, 0x71, 0x17, 0xda, 0x63, 0x1a, 0xc6, 0xf6, 0x21, 0x3e,
0x5a, 0xc7, 0xca, 0xdc, 0xba, 0x6a, 0xc2, 0xad, 0x70, 0xb3, 0xf2, 0x84, 0xfb, 0x6f, 0x18, 0x1c, 0x9f, 0x45, 0x6f, 0x13, 0xbd, 0x8c, 0xae, 0x92, 0x03, 0x31, 0x04, 0xd8, 0xce, 0xa3, 0x5f, 0x44,
0xc4, 0x84, 0x2c, 0x0f, 0xc3, 0x55, 0xab, 0xc7, 0x14, 0xaa, 0xed, 0x16, 0x6c, 0x60, 0x87, 0x7a, 0x80, 0x97, 0xe0, 0x42, 0x6e, 0xc1, 0xf8, 0x52, 0xde, 0x8b, 0xf1, 0x0c, 0x2e, 0x96, 0x15, 0x12,
0xc7, 0x25, 0x6b, 0x91, 0xfb, 0x81, 0x50, 0xa9, 0xf6, 0x0f, 0xb2, 0x40, 0xbd, 0xe0, 0x20, 0x4c, 0xc6, 0xff, 0x42, 0x3b, 0x87, 0x24, 0xe5, 0x8e, 0x0b, 0xca, 0x93, 0xcd, 0xd7, 0x99, 0xaa, 0xa5,
0x8c, 0xfa, 0xdb, 0x0f, 0xb3, 0xf2, 0x35, 0x4c, 0x93, 0x98, 0xdf, 0xeb, 0xd0, 0xb0, 0xb0, 0x73, 0xf1, 0x4f, 0xb8, 0x94, 0xab, 0xee, 0x73, 0x12, 0x3c, 0x8b, 0x9b, 0x07, 0xa0, 0xcf, 0x9b, 0x8b,
0xf4, 0x7e, 0xa3, 0x71, 0x07, 0x7a, 0x19, 0xab, 0x15, 0x00, 0xb9, 0xa4, 0x00, 0xa2, 0x26, 0xde, 0x18, 0x8c, 0xef, 0x2b, 0xb0, 0x72, 0x5f, 0x66, 0x3b, 0x2b, 0x8b, 0x4a, 0x21, 0x6c, 0xf1, 0x42,
0xea, 0xb8, 0xca, 0x2e, 0x31, 0xff, 0xd4, 0xa0, 0x7b, 0x3f, 0x63, 0xce, 0xf7, 0x1b, 0x8c, 0x6d, 0x78, 0x1d, 0x56, 0x0a, 0x5f, 0x06, 0xa2, 0x67, 0x6a, 0x1f, 0x2b, 0x9f, 0x05, 0x8b, 0x3e, 0x20,
0x00, 0x46, 0xf5, 0x05, 0x1c, 0xd4, 0xd6, 0x98, 0xa6, 0xdb, 0x6a, 0xc6, 0x72, 0x95, 0x98, 0x3f, 0xaa, 0xdc, 0xac, 0xfc, 0x01, 0x71, 0x0b, 0xfa, 0x07, 0x31, 0xe2, 0xfc, 0xb7, 0x46, 0xcd, 0x5c,
0xea, 0xd0, 0x7e, 0x16, 0x46, 0xa1, 0x1f, 0x1e, 0x2e, 0xde, 0xef, 0xd7, 0xef, 0xc2, 0x40, 0xe9, 0x65, 0x0a, 0xd5, 0x76, 0x13, 0xd6, 0x6c, 0x87, 0x7a, 0xc7, 0x25, 0x6b, 0x71, 0xf7, 0x7d, 0xa1,
0x8a, 0x05, 0x10, 0x2e, 0x97, 0x8a, 0x21, 0x4f, 0xb6, 0xd5, 0x73, 0x0b, 0xfb, 0xc4, 0xdc, 0x80, 0x52, 0xed, 0x1f, 0x64, 0x81, 0x7a, 0xc1, 0x41, 0x98, 0xe8, 0x8d, 0xb7, 0xff, 0x56, 0x90, 0xa7,
0x81, 0x9c, 0xf0, 0x14, 0x72, 0xfb, 0x4e, 0x03, 0xa4, 0x4a, 0x25, 0xb3, 0x7d, 0x02, 0x1d, 0x2a, 0x61, 0x9a, 0xc4, 0xf8, 0xa6, 0x02, 0x4d, 0xd3, 0x76, 0x8e, 0x3e, 0x6c, 0x34, 0xee, 0xc0, 0x6a,
0xb1, 0xe3, 0xf7, 0xc9, 0x21, 0x57, 0xad, 0x3d, 0x15, 0x5b, 0xab, 0x4d, 0x55, 0xa4, 0xff, 0x0f, 0xc6, 0x6a, 0x05, 0x40, 0x2e, 0x29, 0x80, 0xa8, 0x17, 0x6f, 0x76, 0x5c, 0x65, 0x96, 0x18, 0x7f,
0x9b, 0xf2, 0x65, 0x8c, 0xed, 0x6d, 0x9f, 0xfd, 0x14, 0xb3, 0x67, 0x53, 0x89, 0xf0, 0xa0, 0xf4, 0x68, 0xd0, 0xbd, 0x9f, 0x31, 0xe7, 0x87, 0x0d, 0xc6, 0x16, 0x00, 0xa3, 0xfa, 0x02, 0x0e, 0x6a,
0x23, 0xed, 0xd1, 0x74, 0xfb, 0x87, 0x1a, 0xac, 0x4f, 0x08, 0x7e, 0x45, 0x88, 0x8b, 0xf6, 0xa0, 0x69, 0x4c, 0xaf, 0xdb, 0x6c, 0xc5, 0x72, 0x94, 0x18, 0xdf, 0x55, 0x60, 0xe5, 0x79, 0x18, 0x85,
0x33, 0x21, 0x81, 0x9b, 0xff, 0x84, 0xdf, 0x54, 0x2e, 0xcd, 0xa4, 0xc3, 0x7f, 0xad, 0x92, 0x66, 0x7e, 0x78, 0x38, 0xfb, 0xb0, 0x4f, 0xbf, 0x03, 0x7d, 0xa5, 0x2a, 0x16, 0x40, 0xd8, 0x28, 0x25,
0xac, 0xb8, 0x36, 0xd6, 0x6e, 0x6b, 0xe8, 0x09, 0x74, 0x1e, 0x12, 0x12, 0xed, 0x84, 0x41, 0x40, 0x43, 0x7e, 0xd9, 0xe6, 0xaa, 0x5b, 0x98, 0x27, 0xc6, 0x1a, 0xf4, 0x65, 0x87, 0xa7, 0x90, 0xdb,
0x1c, 0x4a, 0x5c, 0x74, 0x4d, 0xe5, 0xe6, 0xe5, 0xc1, 0x78, 0x78, 0x79, 0x89, 0x6c, 0xd2, 0x39, 0xd7, 0x1a, 0x10, 0x55, 0x2a, 0x99, 0xed, 0xff, 0xd0, 0xa1, 0x12, 0x3b, 0xbe, 0x9f, 0x6c, 0x72,
0x4a, 0x7a, 0x7c, 0x0a, 0x6d, 0x75, 0x1e, 0x2c, 0x38, 0x5c, 0x31, 0xbd, 0x0e, 0xaf, 0x9f, 0x33, 0xd5, 0xdc, 0x53, 0xb1, 0x35, 0x57, 0xa8, 0x8a, 0xf4, 0xbf, 0x60, 0x5d, 0x9e, 0x8c, 0xb1, 0xbd,
0x48, 0x9a, 0x6b, 0xe8, 0x0e, 0xd4, 0xc5, 0x80, 0x82, 0x0c, 0xc5, 0xb8, 0x30, 0x81, 0x15, 0xe2, 0xe5, 0xb3, 0x0f, 0x3f, 0x6b, 0xb2, 0x2f, 0x11, 0xee, 0x97, 0x3e, 0x09, 0x1f, 0xed, 0x6f, 0x7d,
0x2a, 0x4e, 0x33, 0xe6, 0x1a, 0x7a, 0x08, 0x90, 0xb7, 0x78, 0xa4, 0xe2, 0xb2, 0x34, 0x63, 0x0c, 0x5b, 0x87, 0xe5, 0x31, 0xda, 0xaf, 0x11, 0x5d, 0xb2, 0x0b, 0x9d, 0x31, 0x06, 0x6e, 0xfe, 0x87,
0xaf, 0x9e, 0xa2, 0xcd, 0x9c, 0x7d, 0x01, 0xdd, 0x62, 0xb3, 0x43, 0xa3, 0x95, 0xfd, 0x4c, 0xa9, 0x64, 0x5d, 0xd9, 0x34, 0x93, 0x0e, 0xfe, 0xb6, 0x48, 0x9a, 0xb1, 0xe2, 0xd2, 0x48, 0xbb, 0xad,
0xa1, 0xe1, 0x8d, 0x33, 0x2c, 0x32, 0xc7, 0x5f, 0x41, 0xbf, 0xdc, 0xc3, 0x90, 0xb9, 0xf2, 0x60, 0x91, 0xa7, 0xd0, 0x79, 0x88, 0x18, 0x6d, 0x87, 0x41, 0x80, 0x0e, 0x45, 0x97, 0x5c, 0x55, 0xb9,
0xa1, 0x1f, 0x0e, 0x6f, 0x9e, 0x69, 0xa3, 0x82, 0x90, 0x97, 0x71, 0x01, 0x84, 0xa5, 0x9a, 0x2f, 0x79, 0xbe, 0x31, 0x1e, 0x6c, 0xcc, 0x91, 0x4d, 0xda, 0x47, 0x49, 0x8f, 0xcf, 0x60, 0x45, 0xed,
0x80, 0xb0, 0x5c, 0xfb, 0xe6, 0xda, 0xb4, 0xce, 0xff, 0x54, 0xfa, 0xf0, 0xaf, 0x00, 0x00, 0x00, 0x07, 0x0b, 0x0e, 0x17, 0x74, 0xaf, 0x83, 0x6b, 0xe7, 0x34, 0x92, 0xc6, 0x12, 0xb9, 0x03, 0x0d,
0xff, 0xff, 0x94, 0xee, 0xe2, 0x89, 0x64, 0x12, 0x00, 0x00, 0xd1, 0xa0, 0x10, 0x5d, 0x31, 0x2e, 0x74, 0x60, 0x85, 0xb8, 0x8a, 0xdd, 0x8c, 0xb1, 0x44, 0x1e,
0x02, 0xe4, 0x25, 0x9e, 0xa8, 0xb8, 0xcc, 0xf5, 0x18, 0x83, 0x2b, 0xa7, 0x68, 0x33, 0x67, 0x9f,
0x42, 0xb7, 0x58, 0xec, 0xc8, 0x70, 0x61, 0x3d, 0x53, 0x72, 0x68, 0x70, 0xfd, 0x0c, 0x8b, 0xcc,
0xf1, 0xe7, 0xd0, 0x2b, 0xd7, 0x30, 0x62, 0x2c, 0x5c, 0x58, 0xa8, 0x87, 0x83, 0x1b, 0x67, 0xda,
0xa8, 0x20, 0xe4, 0x69, 0x5c, 0x00, 0x61, 0x2e, 0xe7, 0x0b, 0x20, 0xcc, 0xe7, 0xbe, 0xb1, 0xb4,
0xdf, 0xe0, 0xff, 0xec, 0xfe, 0xf3, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x63, 0xc4, 0xec, 0xbe,
0xc3, 0x13, 0x00, 0x00,
} }

View file

@ -87,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
// update master internal volume layouts // update master internal volume layouts
t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
} else { } else if len(heartbeat.Volumes) > 0 {
// process heartbeat.Volumes // process heartbeat.Volumes
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
@ -99,6 +99,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
} }
} else if len(heartbeat.EcShards) > 0 {
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
} }
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {

View file

@ -94,7 +94,13 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
return "", err return "", err
} }
tickChan := time.Tick(sleepInterval) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
}
volumeTickChan := time.Tick(sleepInterval)
ecShardTickChan := time.Tick(17 * sleepInterval)
for { for {
select { select {
@ -109,6 +115,17 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err return "", err
} }
case ecShardMessage := <-vs.store.NewEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
&ecShardMessage,
},
}
glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, ecShardMessage.EcIndex)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
case volumeMessage := <-vs.store.DeletedVolumesChan: case volumeMessage := <-vs.store.DeletedVolumesChan:
deltaBeat := &master_pb.Heartbeat{ deltaBeat := &master_pb.Heartbeat{
DeletedVolumes: []*master_pb.VolumeShortInformationMessage{ DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
@ -120,12 +137,29 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err return "", err
} }
case <-tickChan: case ecShardMessage := <-vs.store.DeletedEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
&ecShardMessage,
},
}
glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, ecShardMessage.EcIndex)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
case <-volumeTickChan:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err return "", err
} }
case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
}
case err = <-doneChan: case err = <-doneChan:
return return
} }

View file

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
) )
@ -17,43 +18,52 @@ type DiskLocation struct {
MaxVolumeCount int MaxVolumeCount int
volumes map[needle.VolumeId]*Volume volumes map[needle.VolumeId]*Volume
sync.RWMutex sync.RWMutex
// erasure coding
ecShards map[needle.VolumeId]erasure_coding.EcVolumeShards
ecShardsLock sync.RWMutex
} }
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
location.volumes = make(map[needle.VolumeId]*Volume) location.volumes = make(map[needle.VolumeId]*Volume)
location.ecShards = make(map[needle.VolumeId]erasure_coding.EcVolumeShards)
return location return location
} }
func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) {
name := dir.Name() name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") { if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")] base := name[:len(name)-len(".dat")]
i := strings.LastIndex(base, "_") collection, volumeId, err := parseCollectionVolumeId(base)
if i > 0 { return volumeId, collection, err
collection, base = base[0:i], base[i+1:]
}
vol, err := needle.NewVolumeId(base)
return vol, collection, err
} }
return 0, "", fmt.Errorf("Path is not a volume: %s", name) return 0, "", fmt.Errorf("Path is not a volume: %s", name)
} }
func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
name := dir.Name() i := strings.LastIndex(base, "_")
if !dir.IsDir() && strings.HasSuffix(name, ".dat") { if i > 0 {
vid, collection, err := l.volumeIdFromPath(dir) collection, base = base[0:i], base[i+1:]
}
vol, err := needle.NewVolumeId(base)
return collection, vol, err
}
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".dat") {
vid, collection, err := l.volumeIdFromPath(fileInfo)
if err == nil { if err == nil {
mutex.RLock() l.RLock()
_, found := l.volumes[vid] _, found := l.volumes[vid]
mutex.RUnlock() l.RUnlock()
if !found { if !found {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil { if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
mutex.Lock() l.Lock()
l.volumes[vid] = v l.volumes[vid] = v
mutex.Unlock() l.Unlock()
size, _, _ := v.FileStat() size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
@ -80,13 +90,12 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
}() }()
var wg sync.WaitGroup var wg sync.WaitGroup
var mutex sync.RWMutex
for workerNum := 0; workerNum < concurrency; workerNum++ { for workerNum := 0; workerNum < concurrency; workerNum++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for dir := range task_queue { for dir := range task_queue {
l.loadExistingVolume(dir, needleMapKind, &mutex) l.loadExistingVolume(dir, needleMapKind)
} }
}() }()
} }
@ -95,12 +104,13 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
} }
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
l.Lock()
defer l.Unlock()
l.concurrentLoadingVolumes(needleMapKind, 10) l.concurrentLoadingVolumes(needleMapKind, 10)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards()
glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecShards))
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
} }
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
@ -132,12 +142,11 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
} }
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs { for _, fileInfo := range fileInfos {
volId, _, err := l.volumeIdFromPath(dir) volId, _, err := l.volumeIdFromPath(fileInfo)
if vid == volId && err == nil { if vid == volId && err == nil {
var mutex sync.RWMutex l.loadExistingVolume(fileInfo, needleMapKind)
l.loadExistingVolume(dir, needleMapKind, &mutex)
return true return true
} }
} }
@ -194,10 +203,16 @@ func (l *DiskLocation) VolumesLen() int {
func (l *DiskLocation) Close() { func (l *DiskLocation) Close() {
l.Lock() l.Lock()
defer l.Unlock()
for _, v := range l.volumes { for _, v := range l.volumes {
v.Close() v.Close()
} }
l.Unlock()
l.ecShardsLock.Lock()
for _, shards := range l.ecShards {
shards.Close()
}
l.ecShardsLock.Unlock()
return return
} }

View file

@ -0,0 +1,84 @@
package storage
import (
"fmt"
"io/ioutil"
"path"
"regexp"
"sort"
"strconv"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
var (
re = regexp.MustCompile("\\.ec[0-9][0-9]")
)
func (l *DiskLocation) loadEcShards(baseName string, shards []string, collection string, vid needle.VolumeId) (err error){
for _, shard := range shards{
shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse ec shard name %v: %v", shard, err)
}
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, int(shardId))
if err != nil {
return fmt.Errorf("failed to create ec shard %v: %v", shard, err)
}
l.ecShardsLock.Lock()
l.ecShards[vid] = append(l.ecShards[vid], ecVolumeShard)
l.ecShardsLock.Unlock()
}
return nil
}
func (l *DiskLocation) loadAllEcShards() (err error){
fileInfos, err := ioutil.ReadDir(l.Directory)
if err != nil {
return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err)
}
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].Name() < fileInfos[j].Name()
})
var sameVolumeShards []string
var prevVolumeId needle.VolumeId
for _, fileInfo := range fileInfos{
if fileInfo.IsDir(){
continue
}
ext := path.Ext(fileInfo.Name())
name := fileInfo.Name()
baseName := name[:len(name)-len(ext)]
collection, volumeId, err := parseCollectionVolumeId(baseName)
if err != nil {
continue
}
if re.MatchString(ext){
if prevVolumeId == 0 || volumeId == prevVolumeId {
sameVolumeShards = append(sameVolumeShards, fileInfo.Name())
}else{
sameVolumeShards = []string{fileInfo.Name()}
}
prevVolumeId = volumeId
continue
}
if ext == ".ecx" && volumeId == prevVolumeId{
if err = l.loadEcShards(baseName, sameVolumeShards, collection, volumeId);err!=nil{
return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
}
prevVolumeId = volumeId
continue
}
}
return nil
}

View file

@ -0,0 +1,17 @@
package storage
import (
"testing"
)
func TestLoadingEcShards(t *testing.T) {
dl := NewDiskLocation("./erasure_coding", 100)
err := dl.loadAllEcShards()
if err != nil {
t.Errorf("load all ec shards: %v", err)
}
if len(dl.ecShards)!=1 {
t.Errorf("loading err")
}
}

View file

View file

View file

@ -6,7 +6,7 @@ import (
"os" "os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
@ -190,7 +190,7 @@ func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
defer indexFile.Close() defer indexFile.Close()
cm := needle_map.NewCompactMap() cm := needle_map.NewCompactMap()
err = storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
if !offset.IsZero() && size != types.TombstoneFileSize { if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size) cm.Set(key, offset, size)
} else { } else {

View file

@ -0,0 +1,108 @@
package erasure_coding
import (
"fmt"
"os"
"path"
"strconv"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type EcVolumeShard struct {
VolumeId needle.VolumeId
ShardId uint8
Collection string
dir string
ecdFile *os.File
ecxFile *os.File
}
type EcVolumeShards []*EcVolumeShard
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId int) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: uint8(shardId)}
baseFileName := v.FileName()
if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
}
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(shardId), os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(shardId), e)
}
return
}
func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
for _, s := range *shards {
if s.ShardId == ecVolumeShard.ShardId {
return false
}
}
*shards = append(*shards, ecVolumeShard)
return true
}
func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
foundPosition := -1
for i, s := range *shards {
if s.ShardId == ecVolumeShard.ShardId {
foundPosition = i
}
}
if foundPosition < 0 {
return false
}
*shards = append((*shards)[:foundPosition], (*shards)[foundPosition+1:]...)
return true
}
func (shards *EcVolumeShards) Close() {
for _, s := range *shards {
s.Close()
}
}
func (shards *EcVolumeShards) ToVolumeInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
for _, s := range *shards {
m := &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
EcIndex: uint32(s.ShardId),
}
messages = append(messages, m)
}
return
}
func (v *EcVolumeShard) String() string {
return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection)
}
func (v *EcVolumeShard) FileName() (fileName string) {
return EcShardFileName(v.Collection, v.dir, int(v.VolumeId))
}
func EcShardFileName(collection string, dir string, id int) (fileName string) {
idString := strconv.Itoa(id)
if collection == "" {
fileName = path.Join(dir, idString)
} else {
fileName = path.Join(dir, collection+"_"+idString)
}
return
}
func (v *EcVolumeShard) Close() {
if v.ecdFile != nil {
_ = v.ecdFile.Close()
v.ecdFile = nil
}
if v.ecxFile != nil {
_ = v.ecxFile.Close()
v.ecxFile = nil
}
}

54
weed/storage/idx/walk.go Normal file
View file

@ -0,0 +1,54 @@
package idx
import (
"io"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
// walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function
func WalkIndexFile(r *os.File, fn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
var readerOffset int64
bytes := make([]byte, types.NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count)
var (
key types.NeedleId
offset types.Offset
size uint32
i int
)
for count > 0 && e == nil || e == io.EOF {
for i = 0; i+types.NeedleMapEntrySize <= count; i += types.NeedleMapEntrySize {
key, offset, size = IdxFileEntry(bytes[i : i+types.NeedleMapEntrySize])
if e = fn(key, offset, size); e != nil {
return e
}
}
if e == io.EOF {
return nil
}
count, e = r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count)
}
return e
}
func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size uint32) {
key = types.BytesToNeedleId(bytes[:types.NeedleIdSize])
offset = types.BytesToOffset(bytes[types.NeedleIdSize : types.NeedleIdSize+types.OffsetSize])
size = util.BytesToUint32(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize])
return
}
const (
RowsToRead = 1024
)

View file

@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
) )
type NeedleMapType int type NeedleMapType int
@ -55,12 +54,6 @@ func (nm *baseNeedleMapper) IndexFileName() string {
return nm.indexFile.Name() return nm.indexFile.Name()
} }
func IdxFileEntry(bytes []byte) (key NeedleId, offset Offset, size uint32) {
key = BytesToNeedleId(bytes[:NeedleIdSize])
offset = BytesToOffset(bytes[NeedleIdSize : NeedleIdSize+OffsetSize])
size = util.BytesToUint32(bytes[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])
return
}
func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error { func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error {
bytes := needle_map.ToBytes(key, offset, size) bytes := needle_map.ToBytes(key, offset, size)

View file

@ -2,10 +2,12 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/syndtr/goleveldb/leveldb/opt"
"os" "os"
"path/filepath" "path/filepath"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
@ -64,7 +66,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err return err
} }
defer db.Close() defer db.Close()
return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
if !offset.IsZero() && size != TombstoneFileSize { if !offset.IsZero() && size != TombstoneFileSize {
levelDbWrite(db, key, offset, size) levelDbWrite(db, key, offset, size)
} else { } else {

View file

@ -1,10 +1,10 @@
package storage package storage
import ( import (
"io"
"os" "os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
@ -30,10 +30,6 @@ func NewBtreeNeedleMap(file *os.File) *NeedleMap {
return nm return nm
} }
const (
RowsToRead = 1024
)
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewCompactNeedleMap(file) nm := NewCompactNeedleMap(file)
return doLoading(file, nm) return doLoading(file, nm)
@ -45,7 +41,7 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
} }
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
nm.MaybeSetMaxFileKey(key) nm.MaybeSetMaxFileKey(key)
if !offset.IsZero() && size != TombstoneFileSize { if !offset.IsZero() && size != TombstoneFileSize {
nm.FileCounter++ nm.FileCounter++
@ -68,38 +64,6 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
return nm, e return nm, e
} }
// walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function
func WalkIndexFile(r *os.File, fn func(key NeedleId, offset Offset, size uint32) error) error {
var readerOffset int64
bytes := make([]byte, NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count)
var (
key NeedleId
offset Offset
size uint32
i int
)
for count > 0 && e == nil || e == io.EOF {
for i = 0; i+NeedleMapEntrySize <= count; i += NeedleMapEntrySize {
key, offset, size = IdxFileEntry(bytes[i : i+NeedleMapEntrySize])
if e = fn(key, offset, size); e != nil {
return e
}
}
if e == io.EOF {
return nil
}
count, e = r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count)
}
return e
}
func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error { func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
_, oldSize := nm.m.Set(NeedleId(key), offset, size) _, oldSize := nm.m.Set(NeedleId(key), offset, size)
nm.logPut(key, oldSize, size) nm.logPut(key, oldSize, size)

View file

@ -5,6 +5,7 @@ import (
"os" "os"
"sync/atomic" "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/willf/bloom" "github.com/willf/bloom"
) )
@ -119,7 +120,7 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key
return e return e
} }
for i := int(nextBatchSize) - 1; i >= 0; i-- { for i := int(nextBatchSize) - 1; i >= 0; i-- {
key, offset, size := IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize]) key, offset, size := idx.IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
if e = fn(key, offset, size); e != nil { if e = fn(key, offset, size); e != nil {
return e return e
} }

View file

@ -30,6 +30,8 @@ type Store struct {
NeedleMapType NeedleMapType NeedleMapType NeedleMapType
NewVolumesChan chan master_pb.VolumeShortInformationMessage NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
} }
func (s *Store) String() (str string) { func (s *Store) String() (str string) {
@ -47,6 +49,10 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
} }
s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
return return
} }
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
@ -186,6 +192,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
} }
func (s *Store) Close() { func (s *Store) Close() {
for _, location := range s.Locations { for _, location := range s.Locations {
location.Close() location.Close()

21
weed/storage/store_ec.go Normal file
View file

@ -0,0 +1,21 @@
package storage
import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
for _, location := range s.Locations {
location.ecShardsLock.RLock()
for _, ecShards := range location.ecShards {
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeInformationMessage()...)
}
location.ecShardsLock.RUnlock()
}
return &master_pb.Heartbeat{
EcShards: ecShardMessages,
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -142,7 +143,7 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
if n != NeedleMapEntrySize { if n != NeedleMapEntrySize {
return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e) return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
} }
_, offset, _ := IdxFileEntry(bytes) _, offset, _ := idx.IdxFileEntry(bytes)
return offset, nil return offset, nil
} }
@ -230,7 +231,7 @@ func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m
if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF { if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
return Offset{}, readErr return Offset{}, readErr
} }
_, offset, _ := IdxFileEntry(bytes) _, offset, _ := idx.IdxFileEntry(bytes)
return offset, nil return offset, nil
} }

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -21,7 +22,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
} }
key, offset, size := IdxFileEntry(lastIdxEntry) key, offset, size := idx.IdxFileEntry(lastIdxEntry)
if offset.IsZero() { if offset.IsZero() {
return 0, nil return 0, nil
} }

View file

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -143,7 +144,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idxOffset); err != nil { if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idxOffset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idxOffset, err) return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idxOffset, err)
} }
key, offset, size := IdxFileEntry(IdxEntry) key, offset, size := idx2.IdxFileEntry(IdxEntry)
glog.V(4).Infof("key %d offset %d size %d", key, offset, size) glog.V(4).Infof("key %d offset %d size %d", key, offset, size)
if _, found := incrementedHasUpdatedIndexEntry[key]; !found { if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
incrementedHasUpdatedIndexEntry[key] = keyField{ incrementedHasUpdatedIndexEntry[key] = keyField{
@ -329,7 +330,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
dst.Write(v.SuperBlock.Bytes()) dst.Write(v.SuperBlock.Bytes())
newOffset := int64(v.SuperBlock.BlockSize()) newOffset := int64(v.SuperBlock.BlockSize())
WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
if offset.IsZero() || size == TombstoneFileSize { if offset.IsZero() || size == TombstoneFileSize {
return nil return nil
} }