more optimisation
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-14 21:30:10 +11:00
parent 877b65f10b
commit 434c7136e9
10 changed files with 457 additions and 19 deletions

View File

@@ -0,0 +1,152 @@
package tasks
import (
"context"
"database/sql"
"time"
"vctp/db"
"github.com/jmoiron/sqlx"
)
// CronTracker manages re-entry protection and status recording for cron jobs.
type CronTracker struct {
db db.Database
bindType int
}
func NewCronTracker(database db.Database) *CronTracker {
return &CronTracker{
db: database,
bindType: sqlx.BindType(database.DB().DriverName()),
}
}
func (c *CronTracker) ensureTable(ctx context.Context) error {
conn := c.db.DB()
driver := conn.DriverName()
var ddl string
switch driver {
case "pgx", "postgres":
ddl = `
CREATE TABLE IF NOT EXISTS cron_status (
job_name TEXT PRIMARY KEY,
started_at BIGINT NOT NULL,
ended_at BIGINT NOT NULL,
duration_ms BIGINT NOT NULL,
last_error TEXT,
in_progress BOOLEAN NOT NULL DEFAULT FALSE
);`
default:
ddl = `
CREATE TABLE IF NOT EXISTS cron_status (
job_name TEXT PRIMARY KEY,
started_at BIGINT NOT NULL,
ended_at BIGINT NOT NULL,
duration_ms BIGINT NOT NULL,
last_error TEXT,
in_progress BOOLEAN NOT NULL DEFAULT FALSE
);`
}
_, err := conn.ExecContext(ctx, ddl)
return err
}
// Start marks a job as in-progress; returns a completion callback and whether to skip because it's already running.
func (c *CronTracker) Start(ctx context.Context, job string) (func(error), bool, error) {
if err := c.ensureTable(ctx); err != nil {
return nil, false, err
}
conn := c.db.DB()
now := time.Now().Unix()
tx, err := conn.BeginTxx(ctx, nil)
if err != nil {
return nil, false, err
}
var inProgress bool
query := sqlx.Rebind(c.bindType, `SELECT in_progress FROM cron_status WHERE job_name = ?`)
err = tx.QueryRowContext(ctx, query, job).Scan(&inProgress)
if err != nil {
// no row, insert
if err := upsertCron(tx, c.bindType, job, now, false); err != nil {
tx.Rollback()
return nil, false, err
}
} else {
if inProgress {
tx.Rollback()
return nil, true, nil
}
if err := markCronStart(tx, c.bindType, job, now); err != nil {
tx.Rollback()
return nil, false, err
}
}
if err := tx.Commit(); err != nil {
return nil, false, err
}
done := func(runErr error) {
_ = c.finish(context.Background(), job, now, runErr)
}
return done, false, nil
}
func (c *CronTracker) finish(ctx context.Context, job string, startedAt int64, runErr error) error {
conn := c.db.DB()
duration := time.Since(time.Unix(startedAt, 0)).Milliseconds()
tx, err := conn.BeginTxx(ctx, nil)
if err != nil {
return err
}
var lastError sql.NullString
if runErr != nil {
lastError = sql.NullString{String: runErr.Error(), Valid: true}
}
err = upsertCronFinish(tx, c.bindType, job, startedAt, duration, lastError.String)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func upsertCron(tx *sqlx.Tx, bindType int, job string, startedAt int64, inProgress bool) error {
query := `
INSERT INTO cron_status (job_name, started_at, ended_at, duration_ms, last_error, in_progress)
VALUES (?, ?, 0, 0, NULL, ?)
ON CONFLICT (job_name) DO UPDATE SET started_at = excluded.started_at, in_progress = excluded.in_progress, ended_at = excluded.ended_at, duration_ms = excluded.duration_ms, last_error = excluded.last_error
`
_, err := tx.Exec(sqlx.Rebind(bindType, query), job, startedAt, inProgress)
return err
}
func markCronStart(tx *sqlx.Tx, bindType int, job string, startedAt int64) error {
query := `
UPDATE cron_status
SET started_at = ?, in_progress = TRUE, ended_at = 0, duration_ms = 0, last_error = NULL
WHERE job_name = ?
`
_, err := tx.Exec(sqlx.Rebind(bindType, query), startedAt, job)
return err
}
func upsertCronFinish(tx *sqlx.Tx, bindType int, job string, startedAt int64, durationMS int64, lastErr string) error {
query := `
UPDATE cron_status
SET ended_at = ?, duration_ms = ?, last_error = ?, in_progress = FALSE
WHERE job_name = ?
`
_, err := tx.Exec(sqlx.Rebind(bindType, query), time.Now().Unix(), durationMS, nullableString(lastErr), job)
return err
}
func nullableString(s string) interface{} {
if s == "" {
return nil
}
return s
}

View File

@@ -13,6 +13,7 @@ import (
"vctp/db"
"vctp/db/queries"
"vctp/internal/report"
"vctp/internal/utils"
"vctp/internal/vcenter"
"github.com/jmoiron/sqlx"
@@ -47,13 +48,37 @@ type inventorySnapshotRow struct {
type snapshotTotals = db.SnapshotTotals
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) error {
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
startedAt := time.Now()
defer func() {
logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt))
}()
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "hourly_snapshot")
if err != nil {
return err
}
if skip {
logger.Warn("Hourly snapshot skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
ctx, cancel := context.WithCancel(jobCtx)
defer cancel()
startTime := time.Now()
if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil {
return err
}
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
@@ -83,6 +108,7 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
}
dbConn := c.Database.DB()
db.ApplySQLiteTuning(ctx, dbConn)
if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil {
return err
}
@@ -90,6 +116,9 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
var wg sync.WaitGroup
var errCount int64
concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency
if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 {
concurrencyLimit = override
}
var sem chan struct{}
if concurrencyLimit > 0 {
sem = make(chan struct{}, concurrencyLimit)
@@ -99,23 +128,36 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
wg.Add(1)
go func(url string) {
defer wg.Done()
waitStarted := time.Now()
vcStart := time.Now()
if sem != nil {
sem <- struct{}{}
defer func() { <-sem }()
}
waitDuration := time.Since(waitStarted)
timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute)
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c.Logger.Info("Starting hourly snapshot for vcenter", "url", url)
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, url); err != nil {
if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil {
atomic.AddInt64(&errCount, 1)
c.Logger.Error("hourly snapshot failed", "error", err, "url", url)
} else {
c.Logger.Info("Finished hourly snapshot for vcenter", "url", url, "duration", time.Since(vcStart))
c.Logger.Info("Finished hourly snapshot for vcenter",
"url", url,
"queue_wait", waitDuration,
"duration", time.Since(vcStart),
"timeout", timeout,
)
}
}(url)
}
wg.Wait()
if errCount > 0 {
return fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
return err
}
rowCount, err := db.TableRowCount(ctx, dbConn, tableName)
@@ -131,13 +173,36 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
}
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error {
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "daily_aggregate")
if err != nil {
return err
}
if skip {
logger.Warn("Daily aggregate skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil {
return err
}
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
return c.aggregateDailySummary(ctx, targetTime, false)
err = c.aggregateDailySummary(jobCtx, targetTime, false)
return err
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
@@ -252,7 +317,29 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
}
// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots.
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) error {
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "monthly_aggregate")
if err != nil {
return err
}
if skip {
logger.Warn("Monthly aggregate skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil {
return err
}
startedAt := time.Now()
defer func() {
logger.Info("Monthly summary job finished", "duration", time.Since(startedAt))
@@ -260,7 +347,8 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
now := time.Now()
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
return c.aggregateMonthlySummary(ctx, targetMonth, false)
err = c.aggregateMonthlySummary(jobCtx, targetMonth, false)
return err
}
func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error {
@@ -348,7 +436,29 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) error {
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup")
if err != nil {
return err
}
if skip {
logger.Warn("Snapshot cleanup skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil {
return err
}
startedAt := time.Now()
defer func() {
logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt))
@@ -582,6 +692,13 @@ func intWithDefault(value int, fallback int) int {
return value
}
func durationFromSeconds(seconds int, fallback time.Duration) time.Duration {
if seconds > 0 {
return time.Duration(seconds) * time.Second
}
return fallback
}
func normalizeResourcePool(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
@@ -800,6 +917,58 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
return err
}
func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error {
if len(rows) == 0 {
return nil
}
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PreparexContext(ctx, sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent"
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, tableName)))
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, row := range rows {
if _, err := stmt.ExecContext(ctx,
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
row.IsPresent,
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error {
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
@@ -886,10 +1055,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
batch := make([]inventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows))
for _, row := range presentSnapshots {
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String)
}
batch = append(batch, row)
}
if !canDetectMissing {
@@ -927,9 +1095,11 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
}
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String)
}
batch = append(batch, row)
}
if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil {
return err
}
c.Logger.Info("Hourly snapshot summary",