package main import ( "context" "encoding/json" "fmt" "log" "math/big" "math/rand" "os" "strconv" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type UserEvent struct { ID int64 `json:"id"` UserID int64 `json:"user_id"` UserType string `json:"user_type"` Action string `json:"action"` Status string `json:"status"` Amount float64 `json:"amount,omitempty"` PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL BirthDate time.Time `json:"birth_date"` // Will be converted to DATE Timestamp time.Time `json:"timestamp"` Metadata string `json:"metadata,omitempty"` } type SystemLog struct { ID int64 `json:"id"` Level string `json:"level"` Service string `json:"service"` Message string `json:"message"` ErrorCode int `json:"error_code,omitempty"` Timestamp time.Time `json:"timestamp"` } type MetricEntry struct { ID int64 `json:"id"` Name string `json:"name"` Value float64 `json:"value"` Tags string `json:"tags"` Timestamp time.Time `json:"timestamp"` } type ProductView struct { ID int64 `json:"id"` ProductID int64 `json:"product_id"` UserID int64 `json:"user_id"` Category string `json:"category"` Price float64 `json:"price"` ViewCount int `json:"view_count"` Timestamp time.Time `json:"timestamp"` } func main() { // Get SeaweedFS configuration from environment masterAddr := getEnv("SEAWEEDFS_MASTER", "localhost:9333") filerAddr := getEnv("SEAWEEDFS_FILER", "localhost:8888") log.Printf("Creating MQ test data...") log.Printf("Master: %s", masterAddr) log.Printf("Filer: %s", filerAddr) // Wait for SeaweedFS to be ready log.Println("Waiting for SeaweedFS to be ready...") time.Sleep(10 * time.Second) // Create topics and populate with data topics := []struct { namespace string topic string generator func() interface{} count int }{ {"analytics", "user_events", generateUserEvent, 1000}, {"analytics", "system_logs", generateSystemLog, 500}, {"analytics", "metrics", generateMetric, 800}, {"ecommerce", "product_views", generateProductView, 1200}, {"ecommerce", "user_events", generateUserEvent, 600}, {"logs", "application_logs", generateSystemLog, 2000}, {"logs", "error_logs", generateErrorLog, 300}, } for _, topicConfig := range topics { log.Printf("Creating topic %s.%s with %d records...", topicConfig.namespace, topicConfig.topic, topicConfig.count) err := createTopicData(masterAddr, filerAddr, topicConfig.namespace, topicConfig.topic, topicConfig.generator, topicConfig.count) if err != nil { log.Printf("Error creating topic %s.%s: %v", topicConfig.namespace, topicConfig.topic, err) } else { log.Printf("✓ Successfully created %s.%s", topicConfig.namespace, topicConfig.topic) } // Small delay between topics time.Sleep(2 * time.Second) } log.Println("✓ MQ test data creation completed!") log.Println("\nCreated namespaces:") log.Println(" - analytics (user_events, system_logs, metrics)") log.Println(" - ecommerce (product_views, user_events)") log.Println(" - logs (application_logs, error_logs)") log.Println("\nYou can now test with PostgreSQL clients:") log.Println(" psql -h localhost -p 5432 -U seaweedfs -d analytics") log.Println(" postgres=> SHOW TABLES;") log.Println(" postgres=> SELECT COUNT(*) FROM user_events;") } // createSchemaForTopic creates a proper RecordType schema based on topic name func createSchemaForTopic(topicName string) *schema_pb.RecordType { switch topicName { case "user_events": return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "user_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "user_type", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "action", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "status", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "amount", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: false}, {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "metadata", FieldIndex: 7, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: false}, }, } case "system_logs": return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false}, {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, }, } case "metrics": return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "name", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "value", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true}, {Name: "tags", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "timestamp", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, }, } case "product_views": return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "product_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "user_id", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "category", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "price", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true}, {Name: "view_count", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: true}, {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, }, } case "application_logs", "error_logs": return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false}, {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, }, } default: // Default generic schema return &schema_pb.RecordType{ Fields: []*schema_pb.Field{ {Name: "data", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, IsRequired: true}, }, } } } // convertToDecimal converts a string to decimal format for Parquet logical type func convertToDecimal(value string) ([]byte, int32, int32) { // Parse the decimal string using big.Rat for precision rat := new(big.Rat) if _, success := rat.SetString(value); !success { return nil, 0, 0 } // Convert to a fixed scale (e.g., 4 decimal places) scale := int32(4) precision := int32(18) // Total digits // Scale the rational number to integer representation multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) scaled := new(big.Int).Mul(rat.Num(), multiplier) scaled.Div(scaled, rat.Denom()) return scaled.Bytes(), precision, scale } // convertToRecordValue converts Go structs to RecordValue format func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields := make(map[string]*schema_pb.Value) switch v := data.(type) { case UserEvent: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}} fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.UserType}} fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}} fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}} fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}} // Convert precise amount to DECIMAL logical type if v.PreciseAmount != "" { if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil { fields["precise_amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DecimalValue{DecimalValue: &schema_pb.DecimalValue{ Value: decimal, Precision: precision, Scale: scale, }}} } } // Convert birth date to DATE logical type fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{ DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch }}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), IsUtc: true, }}} fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} case SystemLog: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["level"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Level}} fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}} fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}} fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), IsUtc: true, }}} case MetricEntry: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}} fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}} fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), IsUtc: true, }}} case ProductView: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["product_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ProductID}} fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}} fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}} fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}} fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), IsUtc: true, }}} default: // Fallback to JSON for unknown types jsonData, err := json.Marshal(data) if err != nil { return nil, fmt.Errorf("failed to marshal unknown type: %v", err) } fields["data"] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: jsonData}} } return &schema_pb.RecordValue{Fields: fields}, nil } // convertHTTPToGRPC converts HTTP address to gRPC address // Follows SeaweedFS convention: gRPC port = HTTP port + 10000 func convertHTTPToGRPC(httpAddress string) string { if strings.Contains(httpAddress, ":") { parts := strings.Split(httpAddress, ":") if len(parts) == 2 { if port, err := strconv.Atoi(parts[1]); err == nil { return fmt.Sprintf("%s:%d", parts[0], port+10000) } } } // Fallback: return original address if conversion fails return httpAddress } // discoverFiler finds a filer from the master server func discoverFiler(masterHTTPAddress string) (string, error) { masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err) } defer conn.Close() client := master_pb.NewSeaweedClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) if err != nil { return "", fmt.Errorf("failed to list filers from master: %v", err) } if len(resp.ClusterNodes) == 0 { return "", fmt.Errorf("no filers found in cluster") } // Use the first available filer and convert HTTP address to gRPC filerHTTPAddress := resp.ClusterNodes[0].Address return convertHTTPToGRPC(filerHTTPAddress), nil } // discoverBroker finds the broker balancer using filer lock mechanism func discoverBroker(masterHTTPAddress string) (string, error) { // First discover filer from master filerAddress, err := discoverFiler(masterHTTPAddress) if err != nil { return "", fmt.Errorf("failed to discover filer: %v", err) } conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err) } defer conn.Close() client := filer_pb.NewSeaweedFilerClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ Name: pub_balancer.LockBrokerBalancer, }) if err != nil { return "", fmt.Errorf("failed to find broker balancer: %v", err) } return resp.Owner, nil } func createTopicData(masterAddr, filerAddr, namespace, topicName string, generator func() interface{}, count int) error { // Create schema based on topic type recordType := createSchemaForTopic(topicName) // Dynamically discover broker address instead of hardcoded port replacement brokerAddress, err := discoverBroker(masterAddr) if err != nil { // Fallback to hardcoded port replacement if discovery fails log.Printf("Warning: Failed to discover broker dynamically (%v), using hardcoded port replacement", err) brokerAddress = strings.Replace(masterAddr, ":9333", ":17777", 1) } // Create publisher configuration config := &pub_client.PublisherConfiguration{ Topic: topic.NewTopic(namespace, topicName), PartitionCount: 1, Brokers: []string{brokerAddress}, // Use dynamically discovered broker address PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName), RecordType: recordType, // Use structured schema } // Create publisher publisher, err := pub_client.NewTopicPublisher(config) if err != nil { return fmt.Errorf("failed to create publisher: %v", err) } defer publisher.Shutdown() // Generate and publish data for i := 0; i < count; i++ { data := generator() // Convert struct to RecordValue recordValue, err := convertToRecordValue(data) if err != nil { log.Printf("Error converting data to RecordValue: %v", err) continue } // Publish structured record err = publisher.PublishRecord([]byte(fmt.Sprintf("key-%d", i)), recordValue) if err != nil { log.Printf("Error publishing message %d: %v", i+1, err) continue } // Small delay every 100 messages if (i+1)%100 == 0 { log.Printf(" Published %d/%d messages to %s.%s", i+1, count, namespace, topicName) time.Sleep(100 * time.Millisecond) } } // Finish publishing err = publisher.FinishPublish() if err != nil { return fmt.Errorf("failed to finish publishing: %v", err) } return nil } func generateUserEvent() interface{} { userTypes := []string{"premium", "standard", "trial", "enterprise"} actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"} statuses := []string{"active", "inactive", "pending", "completed", "failed"} // Generate a birth date between 1970 and 2005 (18+ years old) birthYear := 1970 + rand.Intn(35) birthMonth := 1 + rand.Intn(12) birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC) // Generate a precise amount as a string with 4 decimal places preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000) return UserEvent{ ID: rand.Int63n(1000000) + 1, UserID: rand.Int63n(10000) + 1, UserType: userTypes[rand.Intn(len(userTypes))], Action: actions[rand.Intn(len(actions))], Status: statuses[rand.Intn(len(statuses))], Amount: rand.Float64() * 1000, PreciseAmount: preciseAmount, BirthDate: birthDate, Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second), Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)), } } func generateSystemLog() interface{} { levels := []string{"debug", "info", "warning", "error", "critical"} services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"} messages := []string{ "Request processed successfully", "User authentication completed", "Payment transaction initiated", "Database connection established", "Cache miss for key", "API rate limit exceeded", "Service health check passed", } return SystemLog{ ID: rand.Int63n(1000000) + 1, Level: levels[rand.Intn(len(levels))], Service: services[rand.Intn(len(services))], Message: messages[rand.Intn(len(messages))], ErrorCode: rand.Intn(1000), Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second), } } func generateErrorLog() interface{} { levels := []string{"error", "critical", "fatal"} services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"} messages := []string{ "Database connection failed", "Authentication token expired", "Payment processing error", "Service unavailable", "Memory limit exceeded", "Timeout waiting for response", "Invalid request parameters", } return SystemLog{ ID: rand.Int63n(1000000) + 1, Level: levels[rand.Intn(len(levels))], Service: services[rand.Intn(len(services))], Message: messages[rand.Intn(len(messages))], ErrorCode: rand.Intn(100) + 400, // 400-499 error codes Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second), } } func generateMetric() interface{} { names := []string{"cpu_usage", "memory_usage", "disk_usage", "request_latency", "error_rate", "throughput"} tags := []string{ "service=web,region=us-east", "service=api,region=us-west", "service=db,region=eu-central", "service=cache,region=asia-pacific", } return MetricEntry{ ID: rand.Int63n(1000000) + 1, Name: names[rand.Intn(len(names))], Value: rand.Float64() * 100, Tags: tags[rand.Intn(len(tags))], Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*3)) * time.Second), } } func generateProductView() interface{} { categories := []string{"electronics", "books", "clothing", "home", "sports", "automotive"} return ProductView{ ID: rand.Int63n(1000000) + 1, ProductID: rand.Int63n(10000) + 1, UserID: rand.Int63n(5000) + 1, Category: categories[rand.Intn(len(categories))], Price: rand.Float64() * 500, ViewCount: rand.Intn(100) + 1, Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*14)) * time.Second), } } func getEnv(key, defaultValue string) string { if value, exists := os.LookupEnv(key); exists { return value } return defaultValue }