diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 538ddf9e1..b87654dd4 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -171,6 +171,7 @@ message ReadVolumeFileStatusResponse { uint64 idx_file_size = 3; uint64 dat_file_timestamp = 4; uint64 dat_file_size = 5; + uint64 file_count = 6; } message DiskStatus { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 829f141a0..10d3409b8 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -656,6 +656,7 @@ type ReadVolumeFileStatusResponse struct { IdxFileSize uint64 `protobuf:"varint,3,opt,name=idx_file_size,json=idxFileSize" json:"idx_file_size,omitempty"` DatFileTimestamp uint64 `protobuf:"varint,4,opt,name=dat_file_timestamp,json=datFileTimestamp" json:"dat_file_timestamp,omitempty"` DatFileSize uint64 `protobuf:"varint,5,opt,name=dat_file_size,json=datFileSize" json:"dat_file_size,omitempty"` + FileCount uint64 `protobuf:"varint,6,opt,name=file_count,json=fileCount" json:"file_count,omitempty"` } func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } @@ -698,6 +699,13 @@ func (m *ReadVolumeFileStatusResponse) GetDatFileSize() uint64 { return 0 } +func (m *ReadVolumeFileStatusResponse) GetFileCount() uint64 { + if m != nil { + return m.FileCount + } + return 0 +} + type DiskStatus struct { Dir string `protobuf:"bytes,1,opt,name=dir" json:"dir,omitempty"` All uint64 `protobuf:"varint,2,opt,name=all" json:"all,omitempty"` diff --git a/weed/server/volume_grpc_replicate.go b/weed/server/volume_grpc_replicate.go index 1a31a37f3..c641755d0 100644 --- a/weed/server/volume_grpc_replicate.go +++ b/weed/server/volume_grpc_replicate.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" @@ -34,22 +35,28 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ // send .idx file // send .dat file // confirm size and timestamp - + var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse + datFileName := volumeFileName + ".dat" + idxFileName := volumeFileName + ".idx" err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - - // TODO read file sizes before copying - client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{}) + var err error + volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + if nil != err { + return fmt.Errorf("read volume file status failed, %v", err) + } copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: req.VolumeId, IsIdxFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".idx") + err = writeToFile(copyFileClient, idxFileName) if err != nil { return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err) } @@ -58,24 +65,26 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ VolumeId: req.VolumeId, IsDatFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".dat") + err = writeToFile(copyFileClient, datFileName) if err != nil { return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err) } return nil }) - if err != nil { + os.Remove(idxFileName) + os.Remove(datFileName) return nil, err } - // TODO: check the timestamp and size + if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 + return nil, err + } // mount the volume err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) @@ -84,11 +93,35 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ } return &volume_server_pb.ReplicateVolumeResponse{}, err +} +/** + only check the the differ of the file size + todo: maybe should check the received count and deleted count of the volume + */ +func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { + stat, err := os.Stat(idxFileName) + if err != nil { + return fmt.Errorf("get idx file info failed, %v", err) + } + if originFileInf.IdxFileSize != uint64(stat.Size()) { + return fmt.Errorf("the idx file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.IdxFileSize) + } + + stat, err = os.Stat(datFileName) + if err != nil { + return fmt.Errorf("get dat file info failed, %v", err) + } + if originFileInf.DatFileSize != uint64(stat.Size()) { + return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.DatFileSize) + } + return nil } func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { - println("writing to ", fileName) + glog.V(4).Infof("writing to %s", fileName) dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil @@ -110,6 +143,17 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { resp := &volume_server_pb.ReadVolumeFileStatusResponse{} + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.VolumeId = req.VolumeId + resp.DatFileSize = v.DataFileSize() + resp.IdxFileSize = v.IndexFileSize() + resp.DatFileTimestamp = v.LastModifiedTime() + resp.IdxFileTimestamp = v.LastModifiedTime() + resp.FileCount = uint64(v.FileCount()) return resp, nil } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 807fefa38..280963c2c 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -79,6 +79,25 @@ func (v *Volume) Size() int64 { return 0 // -1 causes integer overflow and the volume to become unwritable. } +func (v *Volume)IndexFileSize() uint64 { + return v.nm.IndexFileSize() +} + +func (v *Volume)DataFileSize() uint64 { + return uint64(v.Size()) +} + +/** +unix time in seconds + */ +func (v *Volume)LastModifiedTime() uint64 { + return v.lastModifiedTime +} + +func (v *Volume)FileCount() uint { + return uint(v.nm.FileCount()) +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock()