diff --git a/weed/filer/filer.go b/weed/filer/filer.go index f13782031..76d2f3f47 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -44,6 +44,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + UniqueFileId uint32 } func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, @@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), + UniqueFileId: uint32(util.RandomInt32()), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 7ab101102..e44ddfd59 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "strings" "time" @@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId, // startTime.Second(), startTime.Nanosecond(), ) @@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( } for _, dayEntry := range dayEntries { // println("checking day", dayEntry.FullPath) - hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "") + hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") if listHourMinuteErr != nil { return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } for _, hourMinuteEntry := range hourMinuteEntries { // println("checking hh-mm", hourMinuteEntry.FullPath) if dayEntry.Name() == startDate { - if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name()) + if strings.Compare(hourMinute, startHourMinute) < 0 { continue } } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index d21fb351f..f07a961db 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "io" "strings" @@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { if dayEntry.Name == startDate { - if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name) + if strings.Compare(hourMinute, startHourMinute) < 0 { return nil } } diff --git a/weed/util/file_util.go b/weed/util/file_util.go index f83f80265..f9cc4f70b 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -87,3 +87,11 @@ func ResolvePath(path string) string { return path } + +func FileNameBase(filename string) string { + lastDotIndex := strings.LastIndex(filename, ".") + if lastDotIndex < 0 { + return filename + } + return filename[:lastDotIndex] +}