diff --git a/db/helpers.go b/db/helpers.go index 741ce2f..71d59ef 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -468,6 +468,7 @@ ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET INSERT OR REPLACE INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen") VALUES (?,?,?,?,?,?,?) ` + args = []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix(), seen.Unix()} } _, err := dbConn.ExecContext(ctx, query, args...) return err diff --git a/internal/tasks/cronstatus.go b/internal/tasks/cronstatus.go index cbd0c6f..098b9f2 100644 --- a/internal/tasks/cronstatus.go +++ b/internal/tasks/cronstatus.go @@ -2,6 +2,7 @@ package tasks import ( "context" + "strings" "time" "vctp/db" @@ -30,6 +31,39 @@ func (c *CronTracker) ClearAllInProgress(ctx context.Context) error { return err } +// ClearStale resets in_progress for a specific job if it has been running longer than maxAge. +func (c *CronTracker) ClearStale(ctx context.Context, job string, maxAge time.Duration) error { + if err := c.ensureTable(ctx); err != nil { + return err + } + driver := strings.ToLower(c.db.DB().DriverName()) + var query string + switch driver { + case "sqlite": + query = ` +UPDATE cron_status +SET in_progress = FALSE +WHERE job_name = ? + AND in_progress = TRUE + AND started_at > 0 + AND (strftime('%s','now') - started_at) > ? +` + case "pgx", "postgres": + query = ` +UPDATE cron_status +SET in_progress = FALSE +WHERE job_name = $1 + AND in_progress = TRUE + AND started_at > 0 + AND (EXTRACT(EPOCH FROM now())::BIGINT - started_at) > $2 +` + default: + return nil + } + _, err := c.db.DB().ExecContext(ctx, query, job, int64(maxAge.Seconds())) + return err +} + func (c *CronTracker) ensureTable(ctx context.Context) error { conn := c.db.DB() driver := conn.DriverName() diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 046ea8e..0a32e0f 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -57,8 +57,10 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo defer cancel() } tracker := NewCronTracker(c.Database) - // Clear any stale in-progress markers (e.g., after a crash) before attempting the run. - if err := tracker.ClearAllInProgress(jobCtx); err != nil { + // Clear stale marker for this job only (short timeout to avoid blocking). + staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelStale() + if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil { logger.Warn("failed to clear stale cron status", "error", err) } @@ -715,17 +717,12 @@ func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []inventorySna return err } driver := strings.ToLower(dbConn.DriverName()) - insert := ` -INSERT INTO vm_hourly_stats ( - "SnapshotTime","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","ResourcePool", - "Datacenter","Cluster","Folder","ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder" -) VALUES (:SnapshotTime,:Vcenter,:VmId,:VmUuid,:Name,:CreationTime,:DeletionTime,:ResourcePool, - :Datacenter,:Cluster,:Folder,:ProvisionedDisk,:VcpuCount,:RamGB,:IsTemplate,:PoweredOn,:SrmPlaceholder) -` + conflict := "" + verb := "INSERT INTO" if driver == "sqlite" { - insert = strings.Replace(insert, "INSERT INTO", "INSERT OR REPLACE INTO", 1) + verb = "INSERT OR REPLACE INTO" } else { - insert += ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET + conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET "VmUuid"=EXCLUDED."VmUuid", "Name"=EXCLUDED."Name", "CreationTime"=EXCLUDED."CreationTime", @@ -741,11 +738,21 @@ INSERT INTO vm_hourly_stats ( "PoweredOn"=EXCLUDED."PoweredOn", "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` } + + cols := []string{ + "SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool", + "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", + } + bind := sqlx.BindType(dbConn.DriverName()) + placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") + stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict) + stmtText = sqlx.Rebind(bind, stmtText) + tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } - stmt, err := tx.PrepareNamedContext(ctx, insert) + stmt, err := tx.PreparexContext(ctx, stmtText) if err != nil { tx.Rollback() return err @@ -753,7 +760,11 @@ INSERT INTO vm_hourly_stats ( defer stmt.Close() for _, r := range rows { - if _, err := stmt.ExecContext(ctx, r); err != nil { + args := []interface{}{ + r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool, + r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder, + } + if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err }