diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 9eb64b3b4..856dbc877 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -84,7 +84,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl } offset := int64(superBlock.BlockSize()) version := superBlock.Version() - n, rest, err := storage.ReadNeedleHeader(datFile, version, offset) + n, _, rest, err := storage.ReadNeedleHeader(datFile, version, offset) if err != nil { fmt.Printf("cannot read needle header: %v", err) return @@ -114,7 +114,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl fmt.Println("Recovered in f", r) } }() - if err = n.ReadNeedleBody(datFile, version, offset+int64(types.NeedleEntrySize), rest); err != nil { + if _, err = n.ReadNeedleBody(datFile, version, offset+int64(types.NeedleEntrySize), rest); err != nil { fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err) } }() @@ -126,7 +126,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl offset += types.NeedleEntrySize + rest //fmt.Printf("==> new entry offset %d\n", offset) - if n, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil { + if n, _, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil { if err == io.EOF { return } diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 7ff1ce25b..4a41c7104 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -42,6 +42,9 @@ service VolumeServer { rpc CopyFile (CopyFileRequest) returns (stream CopyFileResponse) { } + rpc VolumeStreamFollow (VolumeStreamFollowRequest) returns (stream VolumeStreamFollowResponse) { + } + } ////////////////////////////////////////////////// @@ -120,7 +123,7 @@ message VolumeSyncStatusResponse { message VolumeIncrementalCopyRequest { uint32 volume_id = 1; - uint64 since = 2; + uint64 since_ns = 2; } message VolumeIncrementalCopyResponse { bytes file_content = 1; @@ -163,6 +166,16 @@ message CopyFileResponse { bytes file_content = 1; } +message VolumeStreamFollowRequest { + uint32 volume_id = 1; + uint64 since_ns = 2; + uint32 drainingSeconds = 3; +} +message VolumeStreamFollowResponse { + bytes needle_header = 1; + bytes needle_body = 2; +} + message ReadVolumeFileStatusRequest { uint32 volume_id = 1; } diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index cb6ca7c82..269774aa4 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -39,6 +39,8 @@ It has these top-level messages: VolumeCopyResponse CopyFileRequest CopyFileResponse + VolumeStreamFollowRequest + VolumeStreamFollowResponse ReadVolumeFileStatusRequest ReadVolumeFileStatusResponse DiskStatus @@ -420,7 +422,7 @@ func (m *VolumeSyncStatusResponse) GetIdxFileSize() uint64 { type VolumeIncrementalCopyRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` - Since uint64 `protobuf:"varint,2,opt,name=since" json:"since,omitempty"` + SinceNs uint64 `protobuf:"varint,2,opt,name=since_ns,json=sinceNs" json:"since_ns,omitempty"` } func (m *VolumeIncrementalCopyRequest) Reset() { *m = VolumeIncrementalCopyRequest{} } @@ -435,9 +437,9 @@ func (m *VolumeIncrementalCopyRequest) GetVolumeId() uint32 { return 0 } -func (m *VolumeIncrementalCopyRequest) GetSince() uint64 { +func (m *VolumeIncrementalCopyRequest) GetSinceNs() uint64 { if m != nil { - return m.Since + return m.SinceNs } return 0 } @@ -634,6 +636,62 @@ func (m *CopyFileResponse) GetFileContent() []byte { return nil } +type VolumeStreamFollowRequest struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + SinceNs uint64 `protobuf:"varint,2,opt,name=since_ns,json=sinceNs" json:"since_ns,omitempty"` + DrainingSeconds uint32 `protobuf:"varint,3,opt,name=drainingSeconds" json:"drainingSeconds,omitempty"` +} + +func (m *VolumeStreamFollowRequest) Reset() { *m = VolumeStreamFollowRequest{} } +func (m *VolumeStreamFollowRequest) String() string { return proto.CompactTextString(m) } +func (*VolumeStreamFollowRequest) ProtoMessage() {} +func (*VolumeStreamFollowRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} } + +func (m *VolumeStreamFollowRequest) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 +} + +func (m *VolumeStreamFollowRequest) GetSinceNs() uint64 { + if m != nil { + return m.SinceNs + } + return 0 +} + +func (m *VolumeStreamFollowRequest) GetDrainingSeconds() uint32 { + if m != nil { + return m.DrainingSeconds + } + return 0 +} + +type VolumeStreamFollowResponse struct { + NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"` + NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"` +} + +func (m *VolumeStreamFollowResponse) Reset() { *m = VolumeStreamFollowResponse{} } +func (m *VolumeStreamFollowResponse) String() string { return proto.CompactTextString(m) } +func (*VolumeStreamFollowResponse) ProtoMessage() {} +func (*VolumeStreamFollowResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} } + +func (m *VolumeStreamFollowResponse) GetNeedleHeader() []byte { + if m != nil { + return m.NeedleHeader + } + return nil +} + +func (m *VolumeStreamFollowResponse) GetNeedleBody() []byte { + if m != nil { + return m.NeedleBody + } + return nil +} + type ReadVolumeFileStatusRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` } @@ -641,7 +699,7 @@ type ReadVolumeFileStatusRequest struct { func (m *ReadVolumeFileStatusRequest) Reset() { *m = ReadVolumeFileStatusRequest{} } func (m *ReadVolumeFileStatusRequest) String() string { return proto.CompactTextString(m) } func (*ReadVolumeFileStatusRequest) ProtoMessage() {} -func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} } +func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{32} } func (m *ReadVolumeFileStatusRequest) GetVolumeId() uint32 { if m != nil { @@ -662,7 +720,7 @@ type ReadVolumeFileStatusResponse struct { func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } func (m *ReadVolumeFileStatusResponse) String() string { return proto.CompactTextString(m) } func (*ReadVolumeFileStatusResponse) ProtoMessage() {} -func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} } +func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{33} } func (m *ReadVolumeFileStatusResponse) GetVolumeId() uint32 { if m != nil { @@ -716,7 +774,7 @@ type DiskStatus struct { func (m *DiskStatus) Reset() { *m = DiskStatus{} } func (m *DiskStatus) String() string { return proto.CompactTextString(m) } func (*DiskStatus) ProtoMessage() {} -func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{32} } +func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{34} } func (m *DiskStatus) GetDir() string { if m != nil { @@ -759,7 +817,7 @@ type MemStatus struct { func (m *MemStatus) Reset() { *m = MemStatus{} } func (m *MemStatus) String() string { return proto.CompactTextString(m) } func (*MemStatus) ProtoMessage() {} -func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{33} } +func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{35} } func (m *MemStatus) GetGoroutines() int32 { if m != nil { @@ -841,6 +899,8 @@ func init() { proto.RegisterType((*VolumeCopyResponse)(nil), "volume_server_pb.VolumeCopyResponse") proto.RegisterType((*CopyFileRequest)(nil), "volume_server_pb.CopyFileRequest") proto.RegisterType((*CopyFileResponse)(nil), "volume_server_pb.CopyFileResponse") + proto.RegisterType((*VolumeStreamFollowRequest)(nil), "volume_server_pb.VolumeStreamFollowRequest") + proto.RegisterType((*VolumeStreamFollowResponse)(nil), "volume_server_pb.VolumeStreamFollowResponse") proto.RegisterType((*ReadVolumeFileStatusRequest)(nil), "volume_server_pb.ReadVolumeFileStatusRequest") proto.RegisterType((*ReadVolumeFileStatusResponse)(nil), "volume_server_pb.ReadVolumeFileStatusResponse") proto.RegisterType((*DiskStatus)(nil), "volume_server_pb.DiskStatus") @@ -875,6 +935,7 @@ type VolumeServerClient interface { VolumeCopy(ctx context.Context, in *VolumeCopyRequest, opts ...grpc.CallOption) (*VolumeCopyResponse, error) ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error) CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error) + VolumeStreamFollow(ctx context.Context, in *VolumeStreamFollowRequest, opts ...grpc.CallOption) (VolumeServer_VolumeStreamFollowClient, error) } type volumeServerClient struct { @@ -1066,6 +1127,38 @@ func (x *volumeServerCopyFileClient) Recv() (*CopyFileResponse, error) { return m, nil } +func (c *volumeServerClient) VolumeStreamFollow(ctx context.Context, in *VolumeStreamFollowRequest, opts ...grpc.CallOption) (VolumeServer_VolumeStreamFollowClient, error) { + stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[2], c.cc, "/volume_server_pb.VolumeServer/VolumeStreamFollow", opts...) + if err != nil { + return nil, err + } + x := &volumeServerVolumeStreamFollowClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type VolumeServer_VolumeStreamFollowClient interface { + Recv() (*VolumeStreamFollowResponse, error) + grpc.ClientStream +} + +type volumeServerVolumeStreamFollowClient struct { + grpc.ClientStream +} + +func (x *volumeServerVolumeStreamFollowClient) Recv() (*VolumeStreamFollowResponse, error) { + m := new(VolumeStreamFollowResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for VolumeServer service type VolumeServerServer interface { @@ -1086,6 +1179,7 @@ type VolumeServerServer interface { VolumeCopy(context.Context, *VolumeCopyRequest) (*VolumeCopyResponse, error) ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) CopyFile(*CopyFileRequest, VolumeServer_CopyFileServer) error + VolumeStreamFollow(*VolumeStreamFollowRequest, VolumeServer_VolumeStreamFollowServer) error } func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) { @@ -1368,6 +1462,27 @@ func (x *volumeServerCopyFileServer) Send(m *CopyFileResponse) error { return x.ServerStream.SendMsg(m) } +func _VolumeServer_VolumeStreamFollow_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(VolumeStreamFollowRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(VolumeServerServer).VolumeStreamFollow(m, &volumeServerVolumeStreamFollowServer{stream}) +} + +type VolumeServer_VolumeStreamFollowServer interface { + Send(*VolumeStreamFollowResponse) error + grpc.ServerStream +} + +type volumeServerVolumeStreamFollowServer struct { + grpc.ServerStream +} + +func (x *volumeServerVolumeStreamFollowServer) Send(m *VolumeStreamFollowResponse) error { + return x.ServerStream.SendMsg(m) +} + var _VolumeServer_serviceDesc = grpc.ServiceDesc{ ServiceName: "volume_server_pb.VolumeServer", HandlerType: (*VolumeServerServer)(nil), @@ -1436,6 +1551,11 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ Handler: _VolumeServer_CopyFile_Handler, ServerStreams: true, }, + { + StreamName: "VolumeStreamFollow", + Handler: _VolumeServer_VolumeStreamFollow_Handler, + ServerStreams: true, + }, }, Metadata: "volume_server.proto", } @@ -1443,80 +1563,86 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1190 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x58, 0xdb, 0x72, 0xdc, 0x44, - 0x13, 0xb6, 0xb2, 0xbb, 0xb6, 0xb7, 0x77, 0x9d, 0x6c, 0xc6, 0xa7, 0x8d, 0x7c, 0xf8, 0x37, 0x93, - 0xfc, 0xc9, 0x26, 0x31, 0x0e, 0x38, 0x05, 0x04, 0xae, 0x20, 0x36, 0x54, 0xf9, 0x22, 0xa4, 0x90, - 0x49, 0x0a, 0x0a, 0xaa, 0x54, 0x63, 0x69, 0x6c, 0xab, 0xac, 0x53, 0xa4, 0x91, 0x89, 0x29, 0xde, - 0x86, 0x3b, 0x2e, 0x78, 0x0e, 0xde, 0x89, 0x2a, 0x8a, 0x9a, 0x83, 0x64, 0x1d, 0xbd, 0x0a, 0xdc, - 0x8d, 0x7a, 0xba, 0xbf, 0xee, 0x9e, 0xe9, 0xee, 0xf9, 0x76, 0x61, 0xf9, 0x22, 0x70, 0x13, 0x8f, - 0x9a, 0x31, 0x8d, 0x2e, 0x68, 0xb4, 0x1b, 0x46, 0x01, 0x0b, 0xd0, 0xa8, 0x20, 0x34, 0xc3, 0x63, - 0xfc, 0x14, 0xd0, 0x0b, 0xc2, 0xac, 0xb3, 0x03, 0xea, 0x52, 0x46, 0x0d, 0xfa, 0x36, 0xa1, 0x31, - 0x43, 0x77, 0x60, 0xf1, 0xc4, 0x71, 0xa9, 0xe9, 0xd8, 0xf1, 0x58, 0x9b, 0x74, 0xa6, 0x7d, 0x63, - 0x81, 0x7f, 0x1f, 0xda, 0x31, 0x7e, 0x05, 0xcb, 0x05, 0x83, 0x38, 0x0c, 0xfc, 0x98, 0xa2, 0xe7, - 0xb0, 0x10, 0xd1, 0x38, 0x71, 0x99, 0x34, 0x18, 0xec, 0x6d, 0xef, 0x96, 0x7d, 0xed, 0x66, 0x26, - 0x89, 0xcb, 0x8c, 0x54, 0x1d, 0x3b, 0x30, 0xcc, 0x6f, 0xa0, 0x75, 0x58, 0x50, 0xbe, 0xc7, 0xda, - 0x44, 0x9b, 0xf6, 0x8d, 0x79, 0xe9, 0x1a, 0xad, 0xc1, 0x7c, 0xcc, 0x08, 0x4b, 0xe2, 0xf1, 0x8d, - 0x89, 0x36, 0xed, 0x19, 0xea, 0x0b, 0xad, 0x40, 0x8f, 0x46, 0x51, 0x10, 0x8d, 0x3b, 0x42, 0x5d, - 0x7e, 0x20, 0x04, 0xdd, 0xd8, 0xf9, 0x85, 0x8e, 0xbb, 0x13, 0x6d, 0xba, 0x64, 0x88, 0x35, 0x5e, - 0x80, 0xde, 0x57, 0x5e, 0xc8, 0x2e, 0xf1, 0xa7, 0x30, 0x7e, 0x43, 0xac, 0x24, 0xf1, 0xde, 0x88, - 0x18, 0xf7, 0xcf, 0xa8, 0x75, 0x9e, 0xe6, 0xbe, 0x01, 0x7d, 0x15, 0xb9, 0x8a, 0x60, 0xc9, 0x58, - 0x94, 0x82, 0x43, 0x1b, 0x7f, 0x01, 0x77, 0x6a, 0x0c, 0xd5, 0x19, 0xdc, 0x83, 0xa5, 0x53, 0x12, - 0x1d, 0x93, 0x53, 0x6a, 0x46, 0x84, 0x39, 0x81, 0xb0, 0xd6, 0x8c, 0xa1, 0x12, 0x1a, 0x5c, 0x86, - 0x7f, 0x04, 0xbd, 0x80, 0x10, 0x78, 0x21, 0xb1, 0x58, 0x1b, 0xe7, 0x68, 0x02, 0x83, 0x30, 0xa2, - 0xc4, 0x75, 0x03, 0x8b, 0x30, 0x2a, 0x4e, 0xa1, 0x63, 0xe4, 0x45, 0x78, 0x0b, 0x36, 0x6a, 0xc1, - 0x65, 0x80, 0xf8, 0x79, 0x29, 0xfa, 0xc0, 0xf3, 0x9c, 0x56, 0xae, 0xf1, 0x66, 0x25, 0x6a, 0x61, - 0xa9, 0x70, 0x3f, 0x2b, 0xed, 0xba, 0x94, 0xf8, 0x49, 0xd8, 0x0a, 0xb8, 0x1c, 0x71, 0x6a, 0x9a, - 0x21, 0xaf, 0xcb, 0xe2, 0xd8, 0x0f, 0x5c, 0x97, 0x5a, 0xcc, 0x09, 0xfc, 0x14, 0x76, 0x1b, 0xc0, - 0xca, 0x84, 0xaa, 0x54, 0x72, 0x12, 0xac, 0xc3, 0xb8, 0x6a, 0xaa, 0x60, 0x7f, 0xd7, 0x60, 0xf5, - 0x4b, 0x75, 0x68, 0xd2, 0x71, 0xab, 0x0b, 0x28, 0xba, 0xbc, 0x51, 0x76, 0x59, 0xbe, 0xa0, 0x4e, - 0xe5, 0x82, 0xb8, 0x46, 0x44, 0x43, 0xd7, 0xb1, 0x88, 0x80, 0xe8, 0x0a, 0x88, 0xbc, 0x08, 0x8d, - 0xa0, 0xc3, 0x98, 0x3b, 0xee, 0x89, 0x1d, 0xbe, 0xc4, 0x63, 0x58, 0x2b, 0xc7, 0xaa, 0xd2, 0xf8, - 0x04, 0xd6, 0xa5, 0xe4, 0xe8, 0xd2, 0xb7, 0x8e, 0x44, 0x37, 0xb4, 0x3a, 0xf4, 0xbf, 0x34, 0x18, - 0x57, 0x0d, 0x55, 0x15, 0xff, 0xd7, 0x13, 0x78, 0xdf, 0xfc, 0xd0, 0xff, 0x60, 0xc0, 0x88, 0xe3, - 0x9a, 0xc1, 0xc9, 0x49, 0x4c, 0xd9, 0x78, 0x7e, 0xa2, 0x4d, 0xbb, 0x06, 0x70, 0xd1, 0x2b, 0x21, - 0x41, 0x8f, 0x60, 0x64, 0xc9, 0x4a, 0x36, 0x23, 0x7a, 0xe1, 0xc4, 0x1c, 0x79, 0x41, 0x04, 0x76, - 0xcb, 0x4a, 0x2b, 0x5c, 0x8a, 0x11, 0x86, 0x25, 0xc7, 0x7e, 0x67, 0x8a, 0x01, 0x22, 0xda, 0x7f, - 0x51, 0xa0, 0x0d, 0x1c, 0xfb, 0xdd, 0xd7, 0x8e, 0x4b, 0x8f, 0xf8, 0x14, 0xf8, 0x16, 0x36, 0x65, - 0xf2, 0x87, 0xbe, 0x15, 0x51, 0x8f, 0xfa, 0x8c, 0xb8, 0xfb, 0x41, 0x78, 0xd9, 0xaa, 0x04, 0x56, - 0xa0, 0x17, 0x3b, 0xbe, 0x25, 0xbb, 0xaf, 0x6b, 0xc8, 0x0f, 0xfc, 0x02, 0xb6, 0x1a, 0x20, 0xd5, - 0xa1, 0xde, 0x85, 0xa1, 0x88, 0xc9, 0x0a, 0x7c, 0x46, 0x7d, 0x26, 0x60, 0x87, 0xc6, 0x80, 0xcb, - 0xf6, 0xa5, 0x08, 0x7f, 0x04, 0x48, 0x62, 0xbc, 0x0c, 0x12, 0xbf, 0x5d, 0x57, 0xae, 0xc2, 0x72, - 0xc1, 0x44, 0x95, 0xc5, 0x33, 0x58, 0x91, 0xe2, 0xd7, 0xbe, 0xd7, 0x1a, 0x6b, 0x1d, 0x56, 0x4b, - 0x46, 0x0a, 0x6d, 0x2f, 0x75, 0x52, 0x7c, 0x22, 0xae, 0x05, 0x5b, 0x4b, 0x23, 0x28, 0xbe, 0x12, - 0xf8, 0x0f, 0x0d, 0x6e, 0xa7, 0x13, 0xa4, 0xe5, 0x81, 0xbf, 0x67, 0xc5, 0x75, 0x1a, 0x2b, 0xae, - 0x7b, 0x55, 0x71, 0x53, 0x18, 0xc5, 0x41, 0x12, 0x59, 0xd4, 0xb4, 0x09, 0x23, 0xa6, 0x1f, 0xd8, - 0x54, 0x15, 0xe4, 0x4d, 0x29, 0x3f, 0x20, 0x8c, 0x7c, 0x13, 0xd8, 0x14, 0xaf, 0xa4, 0x97, 0x92, - 0xbf, 0x4d, 0xec, 0xc3, 0x2d, 0xfe, 0xcd, 0x2b, 0xaa, 0x65, 0x0e, 0x03, 0x27, 0x36, 0xd3, 0xc2, - 0x14, 0x49, 0x2c, 0x1a, 0x7d, 0x27, 0x3e, 0x94, 0x55, 0xa9, 0xf6, 0x6d, 0xc2, 0xe4, 0x7e, 0x27, - 0xdd, 0x3f, 0x20, 0x8c, 0xef, 0xe3, 0x8f, 0x61, 0x74, 0xe5, 0xaf, 0x7d, 0x45, 0x7d, 0x0e, 0x1b, - 0x06, 0x25, 0xb6, 0x4c, 0x40, 0x94, 0x7f, 0xfb, 0x11, 0xf1, 0xb7, 0x06, 0x9b, 0xf5, 0xc6, 0x6d, - 0xc6, 0xc4, 0x0e, 0xa0, 0xac, 0x0d, 0x99, 0xe3, 0xd1, 0x98, 0x11, 0x2f, 0x54, 0x2d, 0x33, 0x52, - 0xbd, 0xf8, 0x5d, 0x2a, 0xaf, 0x36, 0x6d, 0xa7, 0xd2, 0xb4, 0x1c, 0x31, 0x3d, 0x9f, 0x1c, 0x62, - 0x57, 0x22, 0xda, 0xf2, 0x9c, 0x0a, 0x88, 0x99, 0xb6, 0x40, 0xec, 0x49, 0x44, 0xa5, 0x28, 0x10, - 0xb7, 0x00, 0xd4, 0x01, 0x26, 0x7e, 0x3a, 0x75, 0xfa, 0xf2, 0xf8, 0x12, 0x9f, 0xe1, 0xef, 0x01, - 0x0e, 0x9c, 0xf8, 0x5c, 0x66, 0xcd, 0x6b, 0xc8, 0x76, 0x22, 0xf5, 0xca, 0xf0, 0x25, 0x97, 0x10, - 0xd7, 0x55, 0x39, 0xf1, 0x25, 0x67, 0x1c, 0x49, 0x4c, 0x6d, 0x15, 0xbd, 0x58, 0x73, 0xd9, 0x49, - 0x44, 0xa9, 0x0a, 0x54, 0xac, 0xf1, 0x6f, 0x1a, 0xf4, 0x5f, 0x52, 0x4f, 0x21, 0x6f, 0x03, 0x9c, - 0x06, 0x51, 0x90, 0x30, 0xc7, 0xa7, 0xb1, 0x70, 0xd0, 0x33, 0x72, 0x92, 0x7f, 0xef, 0x47, 0x30, - 0x20, 0xea, 0x9e, 0xa8, 0xdc, 0xc5, 0x9a, 0xcb, 0xce, 0x28, 0x09, 0x55, 0xba, 0x62, 0x2d, 0x46, - 0x1a, 0x23, 0xd6, 0xb9, 0x98, 0xa9, 0x7c, 0xa4, 0xf1, 0x8f, 0xbd, 0x3f, 0x87, 0x30, 0x54, 0x6f, - 0x84, 0x20, 0x70, 0xe8, 0x27, 0x18, 0xe4, 0x88, 0x1f, 0xba, 0x5f, 0xe5, 0x77, 0x55, 0x22, 0xa9, - 0xff, 0x7f, 0x86, 0x96, 0x6a, 0xa8, 0x39, 0xe4, 0xc3, 0xed, 0x0a, 0xb1, 0x42, 0x8f, 0xab, 0xd6, - 0x4d, 0xb4, 0x4d, 0x7f, 0xd2, 0x4a, 0x37, 0xf3, 0xc7, 0x60, 0xb9, 0x86, 0x29, 0xa1, 0x9d, 0x19, - 0x28, 0x05, 0xb6, 0xa6, 0x7f, 0xd0, 0x52, 0x3b, 0xf3, 0xfa, 0x16, 0x50, 0x95, 0x46, 0xa1, 0x27, - 0x33, 0x61, 0xae, 0x68, 0x9a, 0xbe, 0xd3, 0x4e, 0xb9, 0x31, 0x51, 0x49, 0xb0, 0x66, 0x26, 0x5a, - 0xa0, 0x70, 0x33, 0x13, 0x2d, 0xb1, 0xb6, 0x39, 0x74, 0x0e, 0xa3, 0x32, 0xf9, 0x42, 0x8f, 0x9a, - 0x7e, 0x11, 0x54, 0xb8, 0x9d, 0xfe, 0xb8, 0x8d, 0x6a, 0xe6, 0x8c, 0xc2, 0xcd, 0x22, 0x41, 0x42, - 0x0f, 0xab, 0xf6, 0xb5, 0x74, 0x4f, 0x9f, 0xce, 0x56, 0xcc, 0xe7, 0x54, 0x26, 0x4d, 0x75, 0x39, - 0x35, 0x30, 0xb2, 0xba, 0x9c, 0x9a, 0x38, 0x18, 0x9e, 0x43, 0xbf, 0xa6, 0xcf, 0x71, 0x89, 0x51, - 0xa0, 0xdd, 0x26, 0x98, 0x7a, 0x36, 0xa3, 0x3f, 0x6d, 0xad, 0x9f, 0xfa, 0xfe, 0x50, 0xe3, 0xbd, - 0x9e, 0x23, 0x16, 0x75, 0xbd, 0x5e, 0xa5, 0x2a, 0x75, 0xbd, 0x5e, 0xc7, 0x4e, 0xe6, 0xd0, 0x31, - 0x2c, 0x15, 0xa8, 0x06, 0x7a, 0xd0, 0x64, 0x59, 0x24, 0x30, 0xfa, 0xc3, 0x99, 0x7a, 0x99, 0x0f, - 0x33, 0x9d, 0x5e, 0x6a, 0x5c, 0x35, 0x06, 0x57, 0x9c, 0x57, 0x0f, 0x66, 0xa9, 0x65, 0x0e, 0x7e, - 0x00, 0xb8, 0x62, 0x06, 0xe8, 0x5e, 0x93, 0x5d, 0xfe, 0x2a, 0xee, 0x5f, 0xaf, 0x94, 0x41, 0xff, - 0x0c, 0x2b, 0x75, 0x4f, 0x2f, 0xaa, 0xe9, 0xc2, 0x6b, 0xde, 0x77, 0x7d, 0xb7, 0xad, 0x7a, 0xe6, - 0xf8, 0x35, 0x2c, 0xa6, 0x3c, 0x03, 0xdd, 0xad, 0x5a, 0x97, 0x38, 0x8f, 0x8e, 0xaf, 0x53, 0xb9, - 0xaa, 0xa6, 0xe3, 0x79, 0xf1, 0xe7, 0xc3, 0xb3, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x47, 0x54, - 0xf2, 0x7e, 0x93, 0x10, 0x00, 0x00, + // 1291 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x58, 0x5b, 0x6f, 0xdc, 0x44, + 0x14, 0x8e, 0xbb, 0x9b, 0xdb, 0xd9, 0xdd, 0x76, 0x3b, 0x49, 0x9b, 0x8d, 0x7b, 0x61, 0x3b, 0x2d, + 0xed, 0xf6, 0x42, 0x0a, 0xad, 0x80, 0xc2, 0x13, 0x34, 0xa1, 0x22, 0x0f, 0x6d, 0x25, 0x87, 0x56, + 0x20, 0x90, 0xac, 0x59, 0x7b, 0x92, 0x58, 0xb1, 0x3d, 0x5b, 0xcf, 0x38, 0x6d, 0x10, 0xfc, 0x1a, + 0xde, 0x78, 0xe0, 0x1f, 0xf0, 0xcb, 0x90, 0x10, 0xf2, 0xcc, 0xd8, 0xf1, 0x35, 0xeb, 0xd2, 0xb7, + 0xf1, 0xe7, 0x73, 0xbe, 0x73, 0xce, 0xf8, 0xcc, 0x99, 0x6f, 0x17, 0xd6, 0x8e, 0x99, 0x1f, 0x07, + 0xd4, 0xe6, 0x34, 0x3a, 0xa6, 0xd1, 0xd6, 0x2c, 0x62, 0x82, 0xa1, 0x61, 0x01, 0xb4, 0x67, 0x53, + 0xfc, 0x10, 0xd0, 0x53, 0x22, 0x9c, 0xc3, 0x1d, 0xea, 0x53, 0x41, 0x2d, 0xfa, 0x26, 0xa6, 0x5c, + 0xa0, 0x4d, 0x58, 0xd9, 0xf7, 0x7c, 0x6a, 0x7b, 0x2e, 0x1f, 0x19, 0xe3, 0xce, 0x64, 0xd5, 0x5a, + 0x4e, 0x9e, 0x77, 0x5d, 0x8e, 0x5f, 0xc2, 0x5a, 0xc1, 0x81, 0xcf, 0x58, 0xc8, 0x29, 0x7a, 0x02, + 0xcb, 0x11, 0xe5, 0xb1, 0x2f, 0x94, 0x43, 0xef, 0xd1, 0xf5, 0xad, 0x72, 0xac, 0xad, 0xcc, 0x25, + 0xf6, 0x85, 0x95, 0x9a, 0x63, 0x0f, 0xfa, 0xf9, 0x17, 0x68, 0x03, 0x96, 0x75, 0xec, 0x91, 0x31, + 0x36, 0x26, 0xab, 0xd6, 0x92, 0x0a, 0x8d, 0x2e, 0xc3, 0x12, 0x17, 0x44, 0xc4, 0x7c, 0x74, 0x6e, + 0x6c, 0x4c, 0x16, 0x2d, 0xfd, 0x84, 0xd6, 0x61, 0x91, 0x46, 0x11, 0x8b, 0x46, 0x1d, 0x69, 0xae, + 0x1e, 0x10, 0x82, 0x2e, 0xf7, 0x7e, 0xa5, 0xa3, 0xee, 0xd8, 0x98, 0x0c, 0x2c, 0xb9, 0xc6, 0xcb, + 0xb0, 0xf8, 0x5d, 0x30, 0x13, 0x27, 0xf8, 0x4b, 0x18, 0xbd, 0x26, 0x4e, 0x1c, 0x07, 0xaf, 0x65, + 0x8e, 0xdb, 0x87, 0xd4, 0x39, 0x4a, 0x6b, 0xbf, 0x02, 0xab, 0x3a, 0x73, 0x9d, 0xc1, 0xc0, 0x5a, + 0x51, 0xc0, 0xae, 0x8b, 0xbf, 0x81, 0xcd, 0x1a, 0x47, 0xbd, 0x07, 0x37, 0x61, 0x70, 0x40, 0xa2, + 0x29, 0x39, 0xa0, 0x76, 0x44, 0x84, 0xc7, 0xa4, 0xb7, 0x61, 0xf5, 0x35, 0x68, 0x25, 0x18, 0xfe, + 0x19, 0xcc, 0x02, 0x03, 0x0b, 0x66, 0xc4, 0x11, 0x6d, 0x82, 0xa3, 0x31, 0xf4, 0x66, 0x11, 0x25, + 0xbe, 0xcf, 0x1c, 0x22, 0xa8, 0xdc, 0x85, 0x8e, 0x95, 0x87, 0xf0, 0x35, 0xb8, 0x52, 0x4b, 0xae, + 0x12, 0xc4, 0x4f, 0x4a, 0xd9, 0xb3, 0x20, 0xf0, 0x5a, 0x85, 0xc6, 0x57, 0x2b, 0x59, 0x4b, 0x4f, + 0xcd, 0xfb, 0x55, 0xe9, 0xad, 0x4f, 0x49, 0x18, 0xcf, 0x5a, 0x11, 0x97, 0x33, 0x4e, 0x5d, 0x33, + 0xe6, 0x0d, 0xd5, 0x1c, 0xdb, 0xcc, 0xf7, 0xa9, 0x23, 0x3c, 0x16, 0xa6, 0xb4, 0xd7, 0x01, 0x9c, + 0x0c, 0xd4, 0xad, 0x92, 0x43, 0xb0, 0x09, 0xa3, 0xaa, 0xab, 0xa6, 0xfd, 0xd3, 0x80, 0x4b, 0xdf, + 0xea, 0x4d, 0x53, 0x81, 0x5b, 0x7d, 0x80, 0x62, 0xc8, 0x73, 0xe5, 0x90, 0xe5, 0x0f, 0xd4, 0xa9, + 0x7c, 0xa0, 0xc4, 0x22, 0xa2, 0x33, 0xdf, 0x73, 0x88, 0xa4, 0xe8, 0x4a, 0x8a, 0x3c, 0x84, 0x86, + 0xd0, 0x11, 0xc2, 0x1f, 0x2d, 0xca, 0x37, 0xc9, 0x12, 0x8f, 0xe0, 0x72, 0x39, 0x57, 0x5d, 0xc6, + 0x17, 0xb0, 0xa1, 0x90, 0xbd, 0x93, 0xd0, 0xd9, 0x93, 0xa7, 0xa1, 0xd5, 0xa6, 0xff, 0x63, 0xc0, + 0xa8, 0xea, 0xa8, 0xbb, 0xf8, 0x43, 0x77, 0xe0, 0x7d, 0xeb, 0x43, 0x1f, 0x41, 0x4f, 0x10, 0xcf, + 0xb7, 0xd9, 0xfe, 0x3e, 0xa7, 0x62, 0xb4, 0x34, 0x36, 0x26, 0x5d, 0x0b, 0x12, 0xe8, 0xa5, 0x44, + 0xd0, 0x5d, 0x18, 0x3a, 0xaa, 0x93, 0xed, 0x88, 0x1e, 0x7b, 0x3c, 0x61, 0x5e, 0x96, 0x89, 0x5d, + 0x70, 0xd2, 0x0e, 0x57, 0x30, 0xc2, 0x30, 0xf0, 0xdc, 0x77, 0xb6, 0x1c, 0x20, 0xf2, 0xf8, 0xaf, + 0x48, 0xb6, 0x9e, 0xe7, 0xbe, 0x7b, 0xe6, 0xf9, 0x74, 0x2f, 0x99, 0x02, 0xaf, 0xe1, 0xaa, 0x2a, + 0x7e, 0x37, 0x74, 0x22, 0x1a, 0xd0, 0x50, 0x10, 0x7f, 0x9b, 0xcd, 0x4e, 0x5a, 0xb5, 0xc0, 0x26, + 0xac, 0x70, 0x2f, 0x74, 0xa8, 0x1d, 0xaa, 0x31, 0xd4, 0xb5, 0x96, 0xe5, 0xf3, 0x0b, 0x8e, 0x9f, + 0xc2, 0xb5, 0x06, 0x5e, 0xbd, 0xb3, 0x37, 0xa0, 0x2f, 0x13, 0x73, 0x58, 0x28, 0x68, 0x28, 0x24, + 0x77, 0xdf, 0xea, 0x25, 0xd8, 0xb6, 0x82, 0xf0, 0x67, 0x80, 0x14, 0xc7, 0x73, 0x16, 0x87, 0xed, + 0x8e, 0xe6, 0x25, 0x58, 0x2b, 0xb8, 0xe8, 0xde, 0x78, 0x0c, 0xeb, 0x0a, 0x7e, 0x15, 0x06, 0xad, + 0xb9, 0x36, 0xe0, 0x52, 0xc9, 0x49, 0xb3, 0x3d, 0x4a, 0x83, 0x14, 0xef, 0x89, 0x33, 0xc9, 0x2e, + 0xa7, 0x19, 0x14, 0xaf, 0x0a, 0xfc, 0x97, 0x01, 0x17, 0xd3, 0x31, 0xd2, 0x72, 0xd7, 0xdf, 0xb3, + 0xed, 0x3a, 0x8d, 0x6d, 0xd7, 0x3d, 0x6d, 0xbb, 0x09, 0x0c, 0x39, 0x8b, 0x23, 0x87, 0xda, 0x2e, + 0x11, 0xc4, 0x0e, 0x99, 0x4b, 0x75, 0x57, 0x9e, 0x57, 0xf8, 0x0e, 0x11, 0xe4, 0x05, 0x73, 0x29, + 0x5e, 0x4f, 0x3f, 0x4a, 0xfe, 0x6b, 0xe2, 0x10, 0x2e, 0x24, 0xcf, 0x49, 0x5b, 0xb5, 0xac, 0xa1, + 0xe7, 0x71, 0x3b, 0xed, 0x4e, 0x59, 0xc4, 0x8a, 0xb5, 0xea, 0xf1, 0x5d, 0xd5, 0x9a, 0xfa, 0xbd, + 0x4b, 0x84, 0x7a, 0xdf, 0x49, 0xdf, 0xef, 0x10, 0x91, 0xbc, 0xc7, 0x9f, 0xc3, 0xf0, 0x34, 0x5e, + 0xfb, 0x8e, 0xfa, 0x1d, 0x36, 0xf5, 0x51, 0x17, 0x11, 0x25, 0xc1, 0x33, 0xe6, 0xfb, 0xec, 0xed, + 0x07, 0xb6, 0x3a, 0x9a, 0xc0, 0x05, 0x37, 0x22, 0x5e, 0xe8, 0x85, 0x07, 0x7b, 0xd4, 0x61, 0xa1, + 0xcb, 0x65, 0xbe, 0x03, 0xab, 0x0c, 0xe3, 0x29, 0x98, 0x75, 0xe1, 0x4f, 0x6f, 0xcc, 0x90, 0x52, + 0xd7, 0xa7, 0xf6, 0x21, 0x25, 0x2e, 0x8d, 0x74, 0x01, 0x7d, 0x05, 0x7e, 0x2f, 0xb1, 0x64, 0x3e, + 0x68, 0xa3, 0x29, 0x73, 0x4f, 0x64, 0x2a, 0x7d, 0x0b, 0x14, 0xf4, 0x94, 0xb9, 0x27, 0xf8, 0x6b, + 0xb8, 0x62, 0x51, 0xe2, 0xaa, 0x38, 0xf2, 0x98, 0xb7, 0x1f, 0x85, 0xff, 0x1a, 0x70, 0xb5, 0xde, + 0xb9, 0xcd, 0x38, 0x7c, 0x00, 0x28, 0x1b, 0x37, 0xc2, 0x0b, 0x28, 0x17, 0x24, 0x98, 0xe9, 0xcd, + 0x1a, 0xea, 0x99, 0xf3, 0x43, 0x8a, 0x57, 0x87, 0x53, 0xa7, 0x32, 0x9c, 0x12, 0xc6, 0xb4, 0x05, + 0x72, 0x8c, 0x5d, 0xc5, 0xe8, 0xaa, 0x56, 0x28, 0x30, 0x66, 0xd6, 0x92, 0x71, 0x51, 0x31, 0x6a, + 0x43, 0xc9, 0x78, 0x0d, 0x40, 0xf7, 0x48, 0x1c, 0xa6, 0xd3, 0x75, 0x55, 0x75, 0x48, 0x1c, 0x0a, + 0xfc, 0x23, 0xc0, 0x8e, 0xc7, 0x8f, 0x54, 0xd5, 0xc9, 0x31, 0x71, 0xbd, 0x48, 0xdf, 0xa6, 0xc9, + 0x32, 0x41, 0x88, 0xef, 0xeb, 0x9a, 0x92, 0x65, 0xa2, 0xac, 0x62, 0x4e, 0x5d, 0x9d, 0xbd, 0x5c, + 0x27, 0xd8, 0x7e, 0x44, 0xa9, 0x4e, 0x54, 0xae, 0xf1, 0x1f, 0x06, 0xac, 0x3e, 0xa7, 0x81, 0x66, + 0xbe, 0x0e, 0x70, 0xc0, 0x22, 0x16, 0x0b, 0x2f, 0xa4, 0x5c, 0x06, 0x58, 0xb4, 0x72, 0xc8, 0xff, + 0x8f, 0x23, 0x95, 0x1e, 0xf5, 0xf7, 0x75, 0xed, 0x72, 0x9d, 0x60, 0x87, 0x94, 0xcc, 0x74, 0xb9, + 0x72, 0x9d, 0xe8, 0x44, 0x2e, 0x88, 0x73, 0x24, 0xef, 0x8e, 0xae, 0xa5, 0x1e, 0x1e, 0xfd, 0x3d, + 0x80, 0xbe, 0xee, 0x50, 0x29, 0x54, 0xd1, 0x2f, 0xd0, 0xcb, 0x09, 0x5c, 0x74, 0xab, 0xaa, 0x63, + 0xab, 0x82, 0xd9, 0xfc, 0x78, 0x8e, 0x95, 0x9e, 0x19, 0x0b, 0x28, 0x84, 0x8b, 0x15, 0x01, 0x89, + 0xee, 0x55, 0xbd, 0x9b, 0xe4, 0xa9, 0x79, 0xbf, 0x95, 0x6d, 0x16, 0x4f, 0xc0, 0x5a, 0x8d, 0x22, + 0x44, 0x0f, 0xe6, 0xb0, 0x14, 0x54, 0xa9, 0xf9, 0x49, 0x4b, 0xeb, 0x2c, 0xea, 0x1b, 0x40, 0x55, + 0xb9, 0x88, 0xee, 0xcf, 0xa5, 0x39, 0x95, 0xa3, 0xe6, 0x83, 0x76, 0xc6, 0x8d, 0x85, 0x2a, 0x21, + 0x39, 0xb7, 0xd0, 0x82, 0x54, 0x9d, 0x5b, 0x68, 0x49, 0x9d, 0x2e, 0xa0, 0x23, 0x18, 0x96, 0x45, + 0x26, 0xba, 0xdb, 0xf4, 0xcb, 0xa7, 0xa2, 0x61, 0xcd, 0x7b, 0x6d, 0x4c, 0xb3, 0x60, 0x14, 0xce, + 0x17, 0x85, 0x20, 0xba, 0x53, 0xf5, 0xaf, 0x95, 0xb5, 0xe6, 0x64, 0xbe, 0x61, 0xbe, 0xa6, 0xb2, + 0x38, 0xac, 0xab, 0xa9, 0x41, 0x79, 0xd6, 0xd5, 0xd4, 0xa4, 0x35, 0xf1, 0x02, 0xfa, 0x2d, 0x55, + 0x1c, 0x25, 0xd1, 0x84, 0xb6, 0x9a, 0x68, 0xea, 0x55, 0x9b, 0xf9, 0xb0, 0xb5, 0x7d, 0x1a, 0xfb, + 0x53, 0x23, 0x39, 0xeb, 0x39, 0xed, 0x54, 0x77, 0xd6, 0xab, 0x6a, 0xac, 0xee, 0xac, 0xd7, 0x09, + 0xb0, 0x05, 0x34, 0x85, 0x41, 0x41, 0x4d, 0xa1, 0xdb, 0x4d, 0x9e, 0x45, 0x8d, 0x66, 0xde, 0x99, + 0x6b, 0x97, 0xc5, 0xb0, 0xd3, 0xe9, 0xa5, 0xc7, 0x55, 0x63, 0x72, 0xc5, 0x79, 0x75, 0x7b, 0x9e, + 0x59, 0x16, 0xe0, 0x27, 0x80, 0x53, 0xf1, 0x83, 0x6e, 0x36, 0xf9, 0xe5, 0x3f, 0xc5, 0xad, 0xb3, + 0x8d, 0x32, 0xea, 0xb7, 0xb0, 0x5e, 0x77, 0xf5, 0xa2, 0x9a, 0x53, 0x78, 0xc6, 0xfd, 0x6e, 0x6e, + 0xb5, 0x35, 0xcf, 0x02, 0xbf, 0x82, 0x95, 0x54, 0x4a, 0xa1, 0x1b, 0x55, 0xef, 0x92, 0xac, 0x33, + 0xf1, 0x59, 0x26, 0xb9, 0x6e, 0xe2, 0xa9, 0x4e, 0xcc, 0x6b, 0x9d, 0xda, 0xa9, 0xd7, 0x24, 0xc8, + 0x6a, 0xa7, 0x5e, 0xa3, 0x7c, 0x4a, 0x82, 0x4e, 0x97, 0xe4, 0x3f, 0x3b, 0x8f, 0xff, 0x0b, 0x00, + 0x00, 0xff, 0xff, 0x9b, 0xf8, 0x72, 0xd3, 0xf0, 0x11, 0x00, 0x00, } diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index 41b7a798c..06e7017e8 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -18,9 +18,9 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem } stopOffset := v.Size() - foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since) + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs) if err != nil { - return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.Since, err) + return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err) } if isLastOne { diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_stream_follow.go new file mode 100644 index 000000000..7c01e4b9c --- /dev/null +++ b/weed/server/volume_grpc_stream_follow.go @@ -0,0 +1,77 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + lastTimestampNs := req.SinceNs + drainingSeconds := req.DrainingSeconds + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) + if err != nil { + return fmt.Errorf("streamFollow: %v", err) + } + if req.DrainingSeconds == 0 { + continue + } + if lastProcessedTimestampNs == lastTimestampNs { + drainingSeconds-- + if drainingSeconds <= 0 { + return nil + } + glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds) + } else { + drainingSeconds = req.DrainingSeconds + glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) + } + } + } + +} + +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { + + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) + if err != nil { + return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) + } + + if isLastOne { + return lastTimestampNs, nil + } + + err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { + + sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody, + }) + if sendErr != nil { + return sendErr + } + + lastProcessedTimestampNs = needleAppendAtNs + return nil + + }) + + return + +} diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 0a73b2977..6e709d6c2 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -267,14 +267,14 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { return nil } -func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength int64, err error) { +func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { n = new(Needle) if version == Version1 || version == Version2 || version == Version3 { - bytes := make([]byte, NeedleEntrySize) + bytes = make([]byte, NeedleEntrySize) var count int count, err = r.ReadAt(bytes, offset) if count <= 0 || err != nil { - return nil, 0, err + return nil, bytes, 0, err } n.ParseNeedleHeader(bytes) bodyLength = NeedleBodyLength(n.Size, version) @@ -299,21 +299,21 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 { //n should be a needle already read the header //the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (err error) { +func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { if bodyLength <= 0 { - return nil + return nil, nil } switch version { case Version1: - bytes := make([]byte, bodyLength) + bytes = make([]byte, bodyLength) if _, err = r.ReadAt(bytes, offset); err != nil { return } n.Data = bytes[:n.Size] n.Checksum = NewCRC(n.Data) case Version2, Version3: - bytes := make([]byte, bodyLength) + bytes = make([]byte, bodyLength) if _, err = r.ReadAt(bytes, offset); err != nil { return } diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index cb478c427..9a261df71 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -68,7 +68,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), - Since: appendAtNs, + SinceNs: appendAtNs, }) if err != nil { return err @@ -147,11 +147,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) + n, _, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } - err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleEntrySize), bodyLength) + _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleEntrySize), bodyLength) if err != nil { return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 363835eb9..50ea8fecb 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -195,7 +195,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, } func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, rest, e := ReadNeedleHeader(dataFile, version, offset) + n, _, rest, e := ReadNeedleHeader(dataFile, version, offset) if e != nil { if e == io.EOF { return nil @@ -204,7 +204,7 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume } for n != nil { if volumeFileScanner.ReadNeedleBody() { - if err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil { + if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil { glog.V(0).Infof("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err) //return @@ -219,7 +219,40 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume } offset += NeedleEntrySize + rest glog.V(4).Infof("==> new entry offset %d", offset) - if n, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil { + if n, _, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) + } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + } + return nil +} + +func ScanVolumeFileNeedleFrom(version Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) { + n, nh, rest, e := ReadNeedleHeader(dataFile, version, offset) + if e != nil { + if e == io.EOF { + return nil + } + return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e) + } + for n != nil { + var needleBody []byte + if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil { + glog.V(0).Infof("cannot read needle body: %v", err) + //err = fmt.Errorf("cannot read needle body: %v", err) + //return + } + err = fn(nh, needleBody, n.AppendAtNs) + if err != nil { + glog.V(0).Infof("visit needle error: %v", err) + return + } + offset += NeedleEntrySize + rest + glog.V(4).Infof("==> new entry offset %d", offset) + if n, nh, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil { if err == io.EOF { return nil }