improve vm deletion detection
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -1062,52 +1062,105 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
if freq <= 0 {
|
if freq <= 0 {
|
||||||
freq = time.Hour
|
freq = time.Hour
|
||||||
}
|
}
|
||||||
begin := startTime.Add(-4 * freq)
|
candidateIDs := make([]string, 0, len(candidates))
|
||||||
end := startTime
|
candidateSet := make(map[string]struct{}, len(candidates))
|
||||||
events, err := vc.FindVmDeletionEvents(ctx, begin, end)
|
for _, cand := range candidates {
|
||||||
if err != nil {
|
if cand.vmID == "" {
|
||||||
log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err)
|
continue
|
||||||
} else {
|
}
|
||||||
log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC())
|
if _, ok := candidateSet[cand.vmID]; ok {
|
||||||
for _, cand := range candidates {
|
continue
|
||||||
if t, ok := events[cand.vmID]; ok {
|
}
|
||||||
delTs := sql.NullInt64{Int64: t.Unix(), Valid: true}
|
candidateSet[cand.vmID] = struct{}{}
|
||||||
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
|
candidateIDs = append(candidateIDs, cand.vmID)
|
||||||
DeletionTime: delTs,
|
}
|
||||||
VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""},
|
events := make(map[string]time.Time)
|
||||||
DatacenterName: cand.datacenter,
|
var windowBegin time.Time
|
||||||
}); err != nil {
|
var windowEnd time.Time
|
||||||
log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
|
var windowUsed time.Duration
|
||||||
|
if len(candidateIDs) > 0 {
|
||||||
|
baseWindow := 4 * freq
|
||||||
|
maxWindow := 24 * time.Hour
|
||||||
|
windowSizes := make([]time.Duration, 0, 3)
|
||||||
|
addWindow := func(d time.Duration) {
|
||||||
|
if d <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if d > maxWindow {
|
||||||
|
d = maxWindow
|
||||||
|
}
|
||||||
|
if len(windowSizes) == 0 || windowSizes[len(windowSizes)-1] != d {
|
||||||
|
windowSizes = append(windowSizes, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addWindow(baseWindow)
|
||||||
|
addWindow(baseWindow * 3)
|
||||||
|
addWindow(baseWindow * 6)
|
||||||
|
for idx, window := range windowSizes {
|
||||||
|
begin := startTime.Add(-window)
|
||||||
|
end := startTime
|
||||||
|
windowEvents, err := vc.FindVmDeletionEventsForCandidates(ctx, begin, end, candidateIDs)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err, "window_start_local", begin, "window_end_local", end)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
windowBegin = begin
|
||||||
|
windowEnd = end
|
||||||
|
windowUsed = window
|
||||||
|
for vmID, ts := range windowEvents {
|
||||||
|
if prev, ok := events[vmID]; !ok || ts.Before(prev) {
|
||||||
|
events[vmID] = ts
|
||||||
}
|
}
|
||||||
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil {
|
}
|
||||||
log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
|
if len(events) < len(candidateIDs) && idx < len(windowSizes)-1 {
|
||||||
|
log.Debug("widening deletion event window", "vcenter", url, "matched", len(events), "candidates", len(candidateIDs), "window_minutes", window.Minutes())
|
||||||
|
}
|
||||||
|
if len(events) >= len(candidateIDs) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(events) > 0 {
|
||||||
|
log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", windowBegin, "window_end_local", windowEnd, "window_minutes", windowUsed.Minutes(), "window_start_utc", windowBegin.UTC(), "window_end_utc", windowEnd.UTC())
|
||||||
|
}
|
||||||
|
for _, cand := range candidates {
|
||||||
|
if t, ok := events[cand.vmID]; ok {
|
||||||
|
delTs := sql.NullInt64{Int64: t.Unix(), Valid: true}
|
||||||
|
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
|
||||||
|
DeletionTime: delTs,
|
||||||
|
VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""},
|
||||||
|
DatacenterName: cand.datacenter,
|
||||||
|
}); err != nil {
|
||||||
|
log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
|
||||||
|
}
|
||||||
|
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil {
|
||||||
|
log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
|
||||||
|
}
|
||||||
|
if snapRow, snapTable, found := findVMInHourlySnapshots(ctx, dbConn, url, cand.vmID); found {
|
||||||
|
vmUUID := cand.vmUUID
|
||||||
|
if vmUUID == "" && snapRow.VmUuid.Valid {
|
||||||
|
vmUUID = snapRow.VmUuid.String
|
||||||
}
|
}
|
||||||
if snapRow, snapTable, found := findVMInHourlySnapshots(ctx, dbConn, url, cand.vmID); found {
|
name := cand.name
|
||||||
vmUUID := cand.vmUUID
|
if name == "" {
|
||||||
if vmUUID == "" && snapRow.VmUuid.Valid {
|
name = snapRow.Name
|
||||||
vmUUID = snapRow.VmUuid.String
|
}
|
||||||
}
|
if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, snapTable, url, cand.vmID, vmUUID, name, delTs.Int64); err != nil {
|
||||||
name := cand.name
|
log.Warn("failed to update hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err)
|
||||||
if name == "" {
|
} else if rowsAffected > 0 {
|
||||||
name = snapRow.Name
|
reportTables[snapTable] = struct{}{}
|
||||||
}
|
deletionsMarked = true
|
||||||
if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, snapTable, url, cand.vmID, vmUUID, name, delTs.Int64); err != nil {
|
log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
||||||
log.Warn("failed to update hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err)
|
if snapUnix, ok := parseSnapshotTime(snapTable); ok {
|
||||||
} else if rowsAffected > 0 {
|
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, cand.vmID, vmUUID, name, snapUnix, delTs.Int64); err != nil {
|
||||||
reportTables[snapTable] = struct{}{}
|
log.Warn("failed to update hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err)
|
||||||
deletionsMarked = true
|
} else if cacheRows > 0 {
|
||||||
log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
log.Debug("updated hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
||||||
if snapUnix, ok := parseSnapshotTime(snapTable); ok {
|
|
||||||
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, cand.vmID, vmUUID, name, snapUnix, delTs.Int64); err != nil {
|
|
||||||
log.Warn("failed to update hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err)
|
|
||||||
} else if cacheRows > 0 {
|
|
||||||
log.Debug("updated hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t)
|
|
||||||
}
|
}
|
||||||
|
log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -199,6 +199,28 @@ func (v *Vcenter) GetAllVMsWithProps() ([]mo.VirtualMachine, error) {
|
|||||||
|
|
||||||
// FindVmDeletionEvents returns a map of MoRef (VmId) to the deletion event time within the given window.
|
// FindVmDeletionEvents returns a map of MoRef (VmId) to the deletion event time within the given window.
|
||||||
func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time) (map[string]time.Time, error) {
|
func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time) (map[string]time.Time, error) {
|
||||||
|
return v.findVmDeletionEvents(ctx, begin, end, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindVmDeletionEventsForCandidates returns deletion event times for the provided VM IDs only.
|
||||||
|
func (v *Vcenter) FindVmDeletionEventsForCandidates(ctx context.Context, begin, end time.Time, candidates []string) (map[string]time.Time, error) {
|
||||||
|
if len(candidates) == 0 {
|
||||||
|
return map[string]time.Time{}, nil
|
||||||
|
}
|
||||||
|
candidateSet := make(map[string]struct{}, len(candidates))
|
||||||
|
for _, id := range candidates {
|
||||||
|
if id == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
candidateSet[id] = struct{}{}
|
||||||
|
}
|
||||||
|
if len(candidateSet) == 0 {
|
||||||
|
return map[string]time.Time{}, nil
|
||||||
|
}
|
||||||
|
return v.findVmDeletionEvents(ctx, begin, end, candidateSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Vcenter) findVmDeletionEvents(ctx context.Context, begin, end time.Time, candidateSet map[string]struct{}) (map[string]time.Time, error) {
|
||||||
result := make(map[string]time.Time)
|
result := make(map[string]time.Time)
|
||||||
if v.client == nil || !v.client.Valid() {
|
if v.client == nil || !v.client.Valid() {
|
||||||
return result, fmt.Errorf("vcenter client is not valid")
|
return result, fmt.Errorf("vcenter client is not valid")
|
||||||
@@ -208,18 +230,63 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time
|
|||||||
endUTC := end.UTC()
|
endUTC := end.UTC()
|
||||||
mgr := event.NewManager(v.client.Client)
|
mgr := event.NewManager(v.client.Client)
|
||||||
|
|
||||||
recordDeletion := func(vmID string, ts time.Time) {
|
type deletionHit struct {
|
||||||
|
ts time.Time
|
||||||
|
priority int
|
||||||
|
}
|
||||||
|
const (
|
||||||
|
deletionPriorityRemoved = iota
|
||||||
|
deletionPriorityVmEvent
|
||||||
|
deletionPriorityTask
|
||||||
|
)
|
||||||
|
hits := make(map[string]deletionHit)
|
||||||
|
foundCandidates := 0
|
||||||
|
recordDeletion := func(vmID string, ts time.Time, priority int) {
|
||||||
if vmID == "" {
|
if vmID == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if prev, ok := result[vmID]; !ok || ts.Before(prev) {
|
if candidateSet != nil {
|
||||||
result[vmID] = ts
|
if _, ok := candidateSet[vmID]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if prev, ok := hits[vmID]; !ok {
|
||||||
|
hits[vmID] = deletionHit{ts: ts, priority: priority}
|
||||||
|
if candidateSet != nil {
|
||||||
|
foundCandidates++
|
||||||
|
}
|
||||||
|
} else if priority < prev.priority || (priority == prev.priority && ts.Before(prev.ts)) {
|
||||||
|
hits[vmID] = deletionHit{ts: ts, priority: priority}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
isDeletionMessage := func(msg string) bool {
|
isDeletionMessage := func(msg string) bool {
|
||||||
msg = strings.ToLower(msg)
|
msg = strings.ToLower(msg)
|
||||||
return strings.Contains(msg, "destroy") || strings.Contains(msg, "deleted")
|
return strings.Contains(msg, "destroy") ||
|
||||||
|
strings.Contains(msg, "deleted") ||
|
||||||
|
strings.Contains(msg, "unregister") ||
|
||||||
|
strings.Contains(msg, "removed from inventory")
|
||||||
|
}
|
||||||
|
|
||||||
|
isVmDeletionTask := func(info types.TaskInfo, msg string) bool {
|
||||||
|
id := strings.ToLower(strings.TrimSpace(info.DescriptionId))
|
||||||
|
if id != "" {
|
||||||
|
if strings.Contains(id, "virtualmachine") &&
|
||||||
|
(strings.Contains(id, "destroy") || strings.Contains(id, "delete") || strings.Contains(id, "unregister")) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
name := strings.ToLower(strings.TrimSpace(info.Name))
|
||||||
|
if name != "" {
|
||||||
|
if (strings.Contains(name, "destroy") || strings.Contains(name, "delete") || strings.Contains(name, "unregister")) &&
|
||||||
|
(strings.Contains(name, "virtualmachine") || strings.Contains(name, "virtual machine")) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if msg != "" && isDeletionMessage(msg) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
processEvents := func(evts []types.BaseEvent) {
|
processEvents := func(evts []types.BaseEvent) {
|
||||||
@@ -228,33 +295,67 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time
|
|||||||
case *types.VmRemovedEvent:
|
case *types.VmRemovedEvent:
|
||||||
if e.Vm != nil {
|
if e.Vm != nil {
|
||||||
vmID := e.Vm.Vm.Value
|
vmID := e.Vm.Vm.Value
|
||||||
recordDeletion(vmID, e.CreatedTime)
|
recordDeletion(vmID, e.CreatedTime, deletionPriorityRemoved)
|
||||||
}
|
}
|
||||||
case *types.TaskEvent:
|
case *types.TaskEvent:
|
||||||
// Fallback for destroy task events.
|
// Fallback for destroy task events.
|
||||||
if e.Info.Entity != nil {
|
if e.Info.Entity != nil {
|
||||||
vmID := e.Info.Entity.Value
|
vmID := e.Info.Entity.Value
|
||||||
if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) {
|
if vmID != "" && isVmDeletionTask(e.Info, e.GetEvent().FullFormattedMessage) {
|
||||||
recordDeletion(vmID, e.CreatedTime)
|
recordDeletion(vmID, e.CreatedTime, deletionPriorityTask)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case *types.VmEvent:
|
case *types.VmEvent:
|
||||||
if e.Vm != nil {
|
if e.Vm != nil {
|
||||||
vmID := e.Vm.Vm.Value
|
vmID := e.Vm.Vm.Value
|
||||||
if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) {
|
if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) {
|
||||||
recordDeletion(vmID, e.CreatedTime)
|
recordDeletion(vmID, e.CreatedTime, deletionPriorityVmEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
eventPageSize = int32(1000)
|
||||||
|
maxEventPages = 25
|
||||||
|
)
|
||||||
|
readCollector := func(label string, collector *event.HistoryCollector) error {
|
||||||
|
pageCount := 0
|
||||||
|
for {
|
||||||
|
events, err := collector.ReadNextEvents(ctx, eventPageSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
processEvents(events)
|
||||||
|
if candidateSet != nil && foundCandidates >= len(candidateSet) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pageCount++
|
||||||
|
if pageCount >= maxEventPages {
|
||||||
|
if v.Logger != nil {
|
||||||
|
v.Logger.Warn("vcenter deletion events truncated", "vcenter", v.Vurl, "label", label, "pages", pageCount, "page_size", eventPageSize, "window_start_utc", beginUTC, "window_end_utc", endUTC)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(events) < int(eventPageSize) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// First attempt: specific deletion event types.
|
// First attempt: specific deletion event types.
|
||||||
|
disableFullMessage := false
|
||||||
filter := types.EventFilterSpec{
|
filter := types.EventFilterSpec{
|
||||||
Time: &types.EventFilterSpecByTime{
|
Time: &types.EventFilterSpecByTime{
|
||||||
BeginTime: &beginUTC,
|
BeginTime: &beginUTC,
|
||||||
EndTime: &endUTC,
|
EndTime: &endUTC,
|
||||||
},
|
},
|
||||||
|
DisableFullMessage: &disableFullMessage,
|
||||||
EventTypeId: []string{
|
EventTypeId: []string{
|
||||||
"VmRemovedEvent",
|
"VmRemovedEvent",
|
||||||
"TaskEvent",
|
"TaskEvent",
|
||||||
@@ -266,29 +367,32 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time
|
|||||||
}
|
}
|
||||||
defer collector.Destroy(ctx)
|
defer collector.Destroy(ctx)
|
||||||
|
|
||||||
events, err := collector.ReadNextEvents(ctx, 500)
|
if err := readCollector("primary", collector); err != nil {
|
||||||
if err != nil {
|
|
||||||
return result, fmt.Errorf("failed to read events: %w", err)
|
return result, fmt.Errorf("failed to read events: %w", err)
|
||||||
}
|
}
|
||||||
processEvents(events)
|
|
||||||
|
|
||||||
// If nothing found, widen the filter to all event types in the window as a fallback.
|
// If nothing found, widen the filter to all event types in the window as a fallback.
|
||||||
if len(result) == 0 {
|
if len(hits) == 0 {
|
||||||
fallbackFilter := types.EventFilterSpec{
|
fallbackFilter := types.EventFilterSpec{
|
||||||
Time: &types.EventFilterSpecByTime{
|
Time: &types.EventFilterSpecByTime{
|
||||||
BeginTime: &beginUTC,
|
BeginTime: &beginUTC,
|
||||||
EndTime: &endUTC,
|
EndTime: &endUTC,
|
||||||
},
|
},
|
||||||
|
DisableFullMessage: &disableFullMessage,
|
||||||
}
|
}
|
||||||
fc, err := mgr.CreateCollectorForEvents(ctx, fallbackFilter)
|
fc, err := mgr.CreateCollectorForEvents(ctx, fallbackFilter)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
defer fc.Destroy(ctx)
|
defer fc.Destroy(ctx)
|
||||||
if evs, readErr := fc.ReadNextEvents(ctx, 500); readErr == nil {
|
if readErr := readCollector("fallback", fc); readErr != nil && v.Logger != nil {
|
||||||
processEvents(evs)
|
v.Logger.Warn("vcenter fallback event read failed", "vcenter", v.Vurl, "error", readErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for vmID, hit := range hits {
|
||||||
|
result[vmID] = hit.ts
|
||||||
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user