From 1ecdb10cf7c28b74003c10556095129499920b11 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Mon, 14 Oct 2024 12:02:31 +1100 Subject: [PATCH] fix vcenter update task to avoid re-adding update records for previous updates --- db/queries/query.sql | 4 ++ db/queries/query.sql.go | 48 +++++++++++++++++ internal/tasks/monitorVcenter.go | 89 +++++++++++++++++++++++++------- main.go | 6 +-- 4 files changed, 126 insertions(+), 21 deletions(-) diff --git a/db/queries/query.sql b/db/queries/query.sql index 9d941ea..13475eb 100644 --- a/db/queries/query.sql +++ b/db/queries/query.sql @@ -75,6 +75,10 @@ RETURNING *; SELECT * FROM "Updates" ORDER BY "UpdateTime"; +-- name: GetVmUpdates :many +SELECT * FROM "Updates" +WHERE "UpdateType" = sqlc.arg('updateType') AND "InventoryId" = sqlc.arg('InventoryId'); + -- name: CleanupUpdates :exec DELETE FROM "Updates" WHERE "UpdateType" = sqlc.arg('updateType') AND "UpdateTime" <= sqlc.arg('updateTime') diff --git a/db/queries/query.sql.go b/db/queries/query.sql.go index 462f485..3d59c2b 100644 --- a/db/queries/query.sql.go +++ b/db/queries/query.sql.go @@ -628,6 +628,54 @@ func (q *Queries) GetReportUpdates(ctx context.Context) ([]Updates, error) { return items, nil } +const getVmUpdates = `-- name: GetVmUpdates :many +SELECT Uid, InventoryId, UpdateTime, UpdateType, NewVcpus, NewRam, NewResourcePool, EventKey, EventId, NewProvisionedDisk, UserName, PlaceholderChange, Name, RawChangeString FROM "Updates" +WHERE "UpdateType" = ?1 AND "InventoryId" = ?2 +` + +type GetVmUpdatesParams struct { + UpdateType string + InventoryId sql.NullInt64 +} + +func (q *Queries) GetVmUpdates(ctx context.Context, arg GetVmUpdatesParams) ([]Updates, error) { + rows, err := q.db.QueryContext(ctx, getVmUpdates, arg.UpdateType, arg.InventoryId) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Updates + for rows.Next() { + var i Updates + if err := rows.Scan( + &i.Uid, + &i.InventoryId, + &i.UpdateTime, + &i.UpdateType, + &i.NewVcpus, + &i.NewRam, + &i.NewResourcePool, + &i.EventKey, + &i.EventId, + &i.NewProvisionedDisk, + &i.UserName, + &i.PlaceholderChange, + &i.Name, + &i.RawChangeString, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const inventoryCleanup = `-- name: InventoryCleanup :exec DELETE FROM "Inventory" WHERE "VmId" = ?1 AND "Datacenter" = ?2 diff --git a/internal/tasks/monitorVcenter.go b/internal/tasks/monitorVcenter.go index e7841ba..6bf115d 100644 --- a/internal/tasks/monitorVcenter.go +++ b/internal/tasks/monitorVcenter.go @@ -115,18 +115,23 @@ func (c *CronTask) RunVcenterPoll(ctx context.Context, logger *slog.Logger) erro // UpdateVmInventory will compare database against current vcenter values, and create update record if not matching func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcenter, ctx context.Context, dbVm queries.Inventory) error { var ( - err error - numVcpus int32 - numRam int32 - srmPlaceholder string - updateType string - rpName string + err error + numVcpus int32 + numRam int32 + srmPlaceholder string + updateType string + rpName string + existingUpdateFound bool ) + + // TODO - how to prevent creating a new record every polling cycle? + params := queries.CreateUpdateParams{ InventoryId: sql.NullInt64{Int64: dbVm.Iid, Valid: dbVm.Iid > 0}, } - srmPlaceholder = "FALSE" // default value - updateType = "unknown" // default value + srmPlaceholder = "FALSE" // default value + updateType = "unknown" // default value + existingUpdateFound = false // default value numRam = vmObj.Config.Hardware.MemoryMB numVcpus = vmObj.Config.Hardware.NumCPU @@ -143,7 +148,7 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent // Determine if the VM is a normal VM or an SRM placeholder if vmObj.Config.ManagedBy != nil && vmObj.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" { if vmObj.Config.ManagedBy.Type == "placeholderVm" { - //c.Logger.Debug("VM is a placeholder") + c.Logger.Debug("VM is a placeholder") srmPlaceholder = "TRUE" } else { //c.Logger.Debug("VM is managed by SRM but not a placeholder", "details", vmObj.Config.ManagedBy) @@ -151,6 +156,7 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent } if srmPlaceholder != dbVm.SrmPlaceholder { + c.Logger.Debug("VM has changed placeholder type", "db_value", dbVm.SrmPlaceholder, "current_Value", srmPlaceholder) params.PlaceholderChange = sql.NullString{String: srmPlaceholder, Valid: srmPlaceholder != ""} if updateType == "unknown" { updateType = "srm" @@ -162,6 +168,7 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent c.Logger.Error("Unable to determine resource pool name", "error", err) } if rpName != dbVm.ResourcePool.String { + c.Logger.Debug("VM has changed resource pool", "db_value", dbVm.ResourcePool.String, "current_Value", rpName) params.NewResourcePool = sql.NullString{String: rpName, Valid: rpName != ""} if updateType == "unknown" { updateType = "move" @@ -171,18 +178,64 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent // TODO - should we bother to check if disk space has changed? if updateType != "unknown" { - c.Logger.Debug("Detected change in VM, inserting update record into database", "update_type", updateType) - params.UpdateType = updateType - result, err := c.Database.Queries().CreateUpdate(ctx, params) - if err != nil { - c.Logger.Error("Failed creating database record", "error", err) - return err + // TODO query updates table to see if there is already an update of this type and the new value + + checkParams := queries.GetVmUpdatesParams{ + InventoryId: sql.NullInt64{Int64: dbVm.Iid, Valid: dbVm.Iid > 0}, + UpdateType: updateType, + } + + existingUpdates, err := c.Database.Queries().GetVmUpdates(ctx, checkParams) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + c.Logger.Debug("No update records found") + } else { + c.Logger.Error("Unbale to query database for vm update records", "error", err) + return err + } + } + + for _, u := range existingUpdates { + // check if we already recorded this same update + if u.UpdateType == updateType { + switch u.UpdateType { + case "srm": + if u.PlaceholderChange.String == srmPlaceholder { + c.Logger.Debug("SRM update already exists for vm", "update_value", u.PlaceholderChange.String, "inventory_id", u.InventoryId.Int64, "vm_name", u.Name.String) + existingUpdateFound = true + } + case "move": + if u.NewResourcePool.String == rpName { + c.Logger.Debug("Resource pool update already exists for vm", "update_value", u.NewResourcePool.String, "inventory_id", u.InventoryId.Int64, "vm_name", u.Name.String) + existingUpdateFound = true + } + case "reconfigure": + if u.NewRam.Int64 == int64(numRam) || u.NewVcpus.Int64 == int64(numVcpus) { + c.Logger.Debug("RAM/vCPU update already exists for vm", "update_ram", u.NewRam.Int64, "update_vcpu", u.NewVcpus.Int64, "inventory_id", u.InventoryId.Int64, "vm_name", u.Name.String) + existingUpdateFound = true + } + } + } + } + + if !existingUpdateFound { + params.UpdateType = updateType + updateTime := time.Now().Unix() + params.UpdateTime = sql.NullInt64{Int64: updateTime, Valid: updateTime > 0} + c.Logger.Debug("Detected new change in VM, inserting update record into database", "update_type", updateType, "params", params) + + result, err := c.Database.Queries().CreateUpdate(ctx, params) + if err != nil { + c.Logger.Error("Failed creating database record", "error", err) + return err + } + + c.Logger.Debug("created database record", "insert_result", result) + // add sleep to slow down mass VM additions + utils.SleepWithContext(ctx, (10 * time.Millisecond)) } - c.Logger.Debug("created database record", "insert_result", result) - // add sleep to slow down mass VM additions - utils.SleepWithContext(ctx, (10 * time.Millisecond)) } return nil diff --git a/main.go b/main.go index 474e543..94be0e9 100644 --- a/main.go +++ b/main.go @@ -188,10 +188,10 @@ func main() { logger.Error("failed to start event processing cron job", "error", err) os.Exit(1) } - logger.Debug("Created event processing cron job", "job", job.ID()) + logger.Debug("Created event processing cron job", "job", job.ID(), "starting_at", startsAt) // start background checks of vcenter inventory - startsAt2 := time.Now().Add(time.Second * 300) + startsAt2 := time.Now().Add(cronInvFrequency) job2, err := c.NewJob( gocron.DurationJob(cronInvFrequency), gocron.NewTask(func() { @@ -203,7 +203,7 @@ func main() { logger.Error("failed to start vcenter inventory cron job", "error", err) os.Exit(1) } - logger.Debug("Created vcenter inventory cron job", "job", job2.ID()) + logger.Debug("Created vcenter inventory cron job", "job", job2.ID(), "starting_at", startsAt2) // start cron scheduler c.Start()