From 38480e52c0e50dceb13f5d9a142dbf0a5f8f5748 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Tue, 27 Jan 2026 14:20:30 +1100 Subject: [PATCH] improve vm deletion detection --- internal/tasks/inventorySnapshots.go | 133 +++++++++++++++++++-------- internal/vcenter/vcenter.go | 132 +++++++++++++++++++++++--- 2 files changed, 211 insertions(+), 54 deletions(-) diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 908b78a..f3c3cbf 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -1062,52 +1062,105 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim if freq <= 0 { freq = time.Hour } - begin := startTime.Add(-4 * freq) - end := startTime - events, err := vc.FindVmDeletionEvents(ctx, begin, end) - if err != nil { - log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) - } else { - log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) - for _, cand := range candidates { - if t, ok := events[cand.vmID]; ok { - delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} - if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ - DeletionTime: delTs, - VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, - DatacenterName: cand.datacenter, - }); err != nil { - log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + candidateIDs := make([]string, 0, len(candidates)) + candidateSet := make(map[string]struct{}, len(candidates)) + for _, cand := range candidates { + if cand.vmID == "" { + continue + } + if _, ok := candidateSet[cand.vmID]; ok { + continue + } + candidateSet[cand.vmID] = struct{}{} + candidateIDs = append(candidateIDs, cand.vmID) + } + events := make(map[string]time.Time) + var windowBegin time.Time + var windowEnd time.Time + var windowUsed time.Duration + if len(candidateIDs) > 0 { + baseWindow := 4 * freq + maxWindow := 24 * time.Hour + windowSizes := make([]time.Duration, 0, 3) + addWindow := func(d time.Duration) { + if d <= 0 { + return + } + if d > maxWindow { + d = maxWindow + } + if len(windowSizes) == 0 || windowSizes[len(windowSizes)-1] != d { + windowSizes = append(windowSizes, d) + } + } + addWindow(baseWindow) + addWindow(baseWindow * 3) + addWindow(baseWindow * 6) + for idx, window := range windowSizes { + begin := startTime.Add(-window) + end := startTime + windowEvents, err := vc.FindVmDeletionEventsForCandidates(ctx, begin, end, candidateIDs) + if err != nil { + log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err, "window_start_local", begin, "window_end_local", end) + continue + } + windowBegin = begin + windowEnd = end + windowUsed = window + for vmID, ts := range windowEvents { + if prev, ok := events[vmID]; !ok || ts.Before(prev) { + events[vmID] = ts } - if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { - log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + } + if len(events) < len(candidateIDs) && idx < len(windowSizes)-1 { + log.Debug("widening deletion event window", "vcenter", url, "matched", len(events), "candidates", len(candidateIDs), "window_minutes", window.Minutes()) + } + if len(events) >= len(candidateIDs) { + break + } + } + } + if len(events) > 0 { + log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", windowBegin, "window_end_local", windowEnd, "window_minutes", windowUsed.Minutes(), "window_start_utc", windowBegin.UTC(), "window_end_utc", windowEnd.UTC()) + } + for _, cand := range candidates { + if t, ok := events[cand.vmID]; ok { + delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: delTs, + VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, + DatacenterName: cand.datacenter, + }); err != nil { + log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + } + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { + log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + } + if snapRow, snapTable, found := findVMInHourlySnapshots(ctx, dbConn, url, cand.vmID); found { + vmUUID := cand.vmUUID + if vmUUID == "" && snapRow.VmUuid.Valid { + vmUUID = snapRow.VmUuid.String } - if snapRow, snapTable, found := findVMInHourlySnapshots(ctx, dbConn, url, cand.vmID); found { - vmUUID := cand.vmUUID - if vmUUID == "" && snapRow.VmUuid.Valid { - vmUUID = snapRow.VmUuid.String - } - name := cand.name - if name == "" { - name = snapRow.Name - } - if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, snapTable, url, cand.vmID, vmUUID, name, delTs.Int64); err != nil { - log.Warn("failed to update hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err) - } else if rowsAffected > 0 { - reportTables[snapTable] = struct{}{} - deletionsMarked = true - log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t) - if snapUnix, ok := parseSnapshotTime(snapTable); ok { - if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, cand.vmID, vmUUID, name, snapUnix, delTs.Int64); err != nil { - log.Warn("failed to update hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err) - } else if cacheRows > 0 { - log.Debug("updated hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t) - } + name := cand.name + if name == "" { + name = snapRow.Name + } + if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, snapTable, url, cand.vmID, vmUUID, name, delTs.Int64); err != nil { + log.Warn("failed to update hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err) + } else if rowsAffected > 0 { + reportTables[snapTable] = struct{}{} + deletionsMarked = true + log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t) + if snapUnix, ok := parseSnapshotTime(snapTable); ok { + if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, cand.vmID, vmUUID, name, snapUnix, delTs.Int64); err != nil { + log.Warn("failed to update hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err) + } else if cacheRows > 0 { + log.Debug("updated hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t) } } } - log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) } + log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) } } } diff --git a/internal/vcenter/vcenter.go b/internal/vcenter/vcenter.go index 9e30d32..650b345 100644 --- a/internal/vcenter/vcenter.go +++ b/internal/vcenter/vcenter.go @@ -199,6 +199,28 @@ func (v *Vcenter) GetAllVMsWithProps() ([]mo.VirtualMachine, error) { // FindVmDeletionEvents returns a map of MoRef (VmId) to the deletion event time within the given window. func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time) (map[string]time.Time, error) { + return v.findVmDeletionEvents(ctx, begin, end, nil) +} + +// FindVmDeletionEventsForCandidates returns deletion event times for the provided VM IDs only. +func (v *Vcenter) FindVmDeletionEventsForCandidates(ctx context.Context, begin, end time.Time, candidates []string) (map[string]time.Time, error) { + if len(candidates) == 0 { + return map[string]time.Time{}, nil + } + candidateSet := make(map[string]struct{}, len(candidates)) + for _, id := range candidates { + if id == "" { + continue + } + candidateSet[id] = struct{}{} + } + if len(candidateSet) == 0 { + return map[string]time.Time{}, nil + } + return v.findVmDeletionEvents(ctx, begin, end, candidateSet) +} + +func (v *Vcenter) findVmDeletionEvents(ctx context.Context, begin, end time.Time, candidateSet map[string]struct{}) (map[string]time.Time, error) { result := make(map[string]time.Time) if v.client == nil || !v.client.Valid() { return result, fmt.Errorf("vcenter client is not valid") @@ -208,18 +230,63 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time endUTC := end.UTC() mgr := event.NewManager(v.client.Client) - recordDeletion := func(vmID string, ts time.Time) { + type deletionHit struct { + ts time.Time + priority int + } + const ( + deletionPriorityRemoved = iota + deletionPriorityVmEvent + deletionPriorityTask + ) + hits := make(map[string]deletionHit) + foundCandidates := 0 + recordDeletion := func(vmID string, ts time.Time, priority int) { if vmID == "" { return } - if prev, ok := result[vmID]; !ok || ts.Before(prev) { - result[vmID] = ts + if candidateSet != nil { + if _, ok := candidateSet[vmID]; !ok { + return + } + } + if prev, ok := hits[vmID]; !ok { + hits[vmID] = deletionHit{ts: ts, priority: priority} + if candidateSet != nil { + foundCandidates++ + } + } else if priority < prev.priority || (priority == prev.priority && ts.Before(prev.ts)) { + hits[vmID] = deletionHit{ts: ts, priority: priority} } } isDeletionMessage := func(msg string) bool { msg = strings.ToLower(msg) - return strings.Contains(msg, "destroy") || strings.Contains(msg, "deleted") + return strings.Contains(msg, "destroy") || + strings.Contains(msg, "deleted") || + strings.Contains(msg, "unregister") || + strings.Contains(msg, "removed from inventory") + } + + isVmDeletionTask := func(info types.TaskInfo, msg string) bool { + id := strings.ToLower(strings.TrimSpace(info.DescriptionId)) + if id != "" { + if strings.Contains(id, "virtualmachine") && + (strings.Contains(id, "destroy") || strings.Contains(id, "delete") || strings.Contains(id, "unregister")) { + return true + } + } + name := strings.ToLower(strings.TrimSpace(info.Name)) + if name != "" { + if (strings.Contains(name, "destroy") || strings.Contains(name, "delete") || strings.Contains(name, "unregister")) && + (strings.Contains(name, "virtualmachine") || strings.Contains(name, "virtual machine")) { + return true + } + } + if msg != "" && isDeletionMessage(msg) { + return true + } + return false } processEvents := func(evts []types.BaseEvent) { @@ -228,33 +295,67 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time case *types.VmRemovedEvent: if e.Vm != nil { vmID := e.Vm.Vm.Value - recordDeletion(vmID, e.CreatedTime) + recordDeletion(vmID, e.CreatedTime, deletionPriorityRemoved) } case *types.TaskEvent: // Fallback for destroy task events. if e.Info.Entity != nil { vmID := e.Info.Entity.Value - if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) { - recordDeletion(vmID, e.CreatedTime) + if vmID != "" && isVmDeletionTask(e.Info, e.GetEvent().FullFormattedMessage) { + recordDeletion(vmID, e.CreatedTime, deletionPriorityTask) } } case *types.VmEvent: if e.Vm != nil { vmID := e.Vm.Vm.Value if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) { - recordDeletion(vmID, e.CreatedTime) + recordDeletion(vmID, e.CreatedTime, deletionPriorityVmEvent) } } } } } + const ( + eventPageSize = int32(1000) + maxEventPages = 25 + ) + readCollector := func(label string, collector *event.HistoryCollector) error { + pageCount := 0 + for { + events, err := collector.ReadNextEvents(ctx, eventPageSize) + if err != nil { + return err + } + if len(events) == 0 { + break + } + processEvents(events) + if candidateSet != nil && foundCandidates >= len(candidateSet) { + break + } + pageCount++ + if pageCount >= maxEventPages { + if v.Logger != nil { + v.Logger.Warn("vcenter deletion events truncated", "vcenter", v.Vurl, "label", label, "pages", pageCount, "page_size", eventPageSize, "window_start_utc", beginUTC, "window_end_utc", endUTC) + } + break + } + if len(events) < int(eventPageSize) { + break + } + } + return nil + } + // First attempt: specific deletion event types. + disableFullMessage := false filter := types.EventFilterSpec{ Time: &types.EventFilterSpecByTime{ BeginTime: &beginUTC, EndTime: &endUTC, }, + DisableFullMessage: &disableFullMessage, EventTypeId: []string{ "VmRemovedEvent", "TaskEvent", @@ -266,29 +367,32 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time } defer collector.Destroy(ctx) - events, err := collector.ReadNextEvents(ctx, 500) - if err != nil { + if err := readCollector("primary", collector); err != nil { return result, fmt.Errorf("failed to read events: %w", err) } - processEvents(events) // If nothing found, widen the filter to all event types in the window as a fallback. - if len(result) == 0 { + if len(hits) == 0 { fallbackFilter := types.EventFilterSpec{ Time: &types.EventFilterSpecByTime{ BeginTime: &beginUTC, EndTime: &endUTC, }, + DisableFullMessage: &disableFullMessage, } fc, err := mgr.CreateCollectorForEvents(ctx, fallbackFilter) if err == nil { defer fc.Destroy(ctx) - if evs, readErr := fc.ReadNextEvents(ctx, 500); readErr == nil { - processEvents(evs) + if readErr := readCollector("fallback", fc); readErr != nil && v.Logger != nil { + v.Logger.Warn("vcenter fallback event read failed", "vcenter", v.Vurl, "error", readErr) } } } + for vmID, hit := range hits { + result[vmID] = hit.ts + } + return result, nil }