diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index be8d3faff..4afb7c091 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -157,7 +158,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour } dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) - return client.WriteFile(dest, message.NewEntry, reader) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) } if message.OldEntry != nil && message.NewEntry == nil { fmt.Printf("delete: %+v\n", resp) @@ -182,7 +187,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return err } reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) - return client.WriteFile(dest, message.NewEntry, reader) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) } return nil @@ -238,3 +247,14 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool { } return false } + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} \ No newline at end of file diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 608d158ad..c94260ac0 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -32,7 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file type RemoteStorageClient interface { Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) - WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) + WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) } diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 316751227..5be64406e 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -116,7 +116,7 @@ func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, of return writerAt.Bytes(), nil } -func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) { +func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { fileSize := int64(filer.FileSize(entry)) @@ -153,10 +153,12 @@ func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, e //in case it fails to upload if err != nil { - return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) + return nil, fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) } - return nil + // read back the remote entry + return s.readFileRemoteEntry(loc) + } func toTagging(attributes map[string][]byte) *s3.Tagging { @@ -170,6 +172,24 @@ func toTagging(attributes map[string][]byte) *s3.Tagging { return tagging } +func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { + resp, err := s.conn.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + if err != nil { + return nil, err + } + + return &filer_pb.RemoteEntry{ + LastModifiedAt: resp.LastModified.Unix(), + Size: *resp.ContentLength, + ETag: *resp.ETag, + StorageName: s.conf.Name, + }, nil + +} + func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { tagging := toTagging(entry.Extended) if len(tagging.TagSet) > 0 {