diff --git a/.gitignore b/.gitignore index 8a44af5..7e09d43 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ *.dylib vctp build/ +reports/ settings.yaml # Certificates diff --git a/internal/tasks/aggregateCommon.go b/internal/tasks/aggregateCommon.go new file mode 100644 index 0000000..a48eb77 --- /dev/null +++ b/internal/tasks/aggregateCommon.go @@ -0,0 +1,32 @@ +package tasks + +import ( + "context" + "time" + "vctp/db" +) + +// runAggregateJob wraps aggregation cron jobs with timeout, migration check, and circuit breaker semantics. +func (c *CronTask) runAggregateJob(ctx context.Context, jobName string, timeout time.Duration, fn func(context.Context) error) (err error) { + jobCtx := ctx + if timeout > 0 { + var cancel context.CancelFunc + jobCtx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + tracker := NewCronTracker(c.Database) + done, skip, err := tracker.Start(jobCtx, jobName) + if err != nil { + return err + } + if skip { + return nil + } + defer func() { done(err) }() + + if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { + return err + } + + return fn(jobCtx) +} diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go new file mode 100644 index 0000000..01e84e2 --- /dev/null +++ b/internal/tasks/dailyAggregate.go @@ -0,0 +1,142 @@ +package tasks + +import ( + "context" + "fmt" + "log/slog" + "time" + "vctp/db" + "vctp/internal/report" +) + +// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. +func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { + jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) + return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error { + startedAt := time.Now() + defer func() { + logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) + }() + targetTime := time.Now().Add(-time.Minute) + return c.aggregateDailySummary(jobCtx, targetTime, false) + }) +} + +func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { + return c.aggregateDailySummary(ctx, date, force) +} + +func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error { + dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) + dayEnd := dayStart.AddDate(0, 0, 1) + summaryTable, err := dailySummaryTableName(targetTime) + if err != nil { + return err + } + + dbConn := c.Database.DB() + if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { + return err + } + if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { + return err + } + if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { + return err + } else if rowsExist && !force { + c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) + return nil + } else if rowsExist && force { + if err := clearTable(ctx, dbConn, summaryTable); err != nil { + return err + } + } + + hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd) + if err != nil { + return err + } + hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) + if len(hourlySnapshots) == 0 { + return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) + } + + hourlyTables := make([]string, 0, len(hourlySnapshots)) + for _, snapshot := range hourlySnapshots { + hourlyTables = append(hourlyTables, snapshot.TableName) + } + unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) + if err != nil { + return err + } + + currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) + if err != nil { + c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) + } else { + c.Logger.Info("Daily snapshot totals", + "date", dayStart.Format("2006-01-02"), + "vm_count", currentTotals.VmCount, + "vcpu_total", currentTotals.VcpuTotal, + "ram_total_gb", currentTotals.RamTotal, + "disk_total_gb", currentTotals.DiskTotal, + ) + } + + prevStart := dayStart.AddDate(0, 0, -1) + prevEnd := dayStart + prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd) + if err == nil && len(prevSnapshots) > 0 { + prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) + prevTables := make([]string, 0, len(prevSnapshots)) + for _, snapshot := range prevSnapshots { + prevTables = append(prevTables, snapshot.TableName) + } + prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) + if err == nil { + prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) + if err != nil { + c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) + } else { + c.Logger.Info("Daily snapshot comparison", + "current_date", dayStart.Format("2006-01-02"), + "previous_date", prevStart.Format("2006-01-02"), + "vm_delta", currentTotals.VmCount-prevTotals.VmCount, + "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, + "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, + "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, + ) + } + } else { + c.Logger.Warn("unable to build previous day union", "error", err) + } + } + + insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) + if err != nil { + return err + } + + if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { + c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) + return err + } + rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) + if err != nil { + c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) + } + if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { + c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) + } + + if err := c.generateReport(ctx, summaryTable); err != nil { + c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) + } + + c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) + return nil +} + +func dailySummaryTableName(t time.Time) (string, error) { + return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) +} diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 908ba1f..1b566e2 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -176,277 +176,6 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo return nil } -// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. -func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { - jobCtx := ctx - jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) - if jobTimeout > 0 { - var cancel context.CancelFunc - jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) - defer cancel() - } - tracker := NewCronTracker(c.Database) - done, skip, err := tracker.Start(jobCtx, "daily_aggregate") - if err != nil { - return err - } - if skip { - logger.Warn("Daily aggregate skipped because a previous run is still active") - return nil - } - defer func() { done(err) }() - - if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { - return err - } - - startedAt := time.Now() - defer func() { - logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) - }() - targetTime := time.Now().Add(-time.Minute) - err = c.aggregateDailySummary(jobCtx, targetTime, false) - return err -} - -func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { - return c.aggregateDailySummary(ctx, date, force) -} - -func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error { - dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) - dayEnd := dayStart.AddDate(0, 0, 1) - summaryTable, err := dailySummaryTableName(targetTime) - if err != nil { - return err - } - - dbConn := c.Database.DB() - if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { - return err - } - if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { - return err - } - if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { - return err - } else if rowsExist && !force { - c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) - return nil - } else if rowsExist && force { - if err := clearTable(ctx, dbConn, summaryTable); err != nil { - return err - } - } - - hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd) - if err != nil { - return err - } - hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) - if len(hourlySnapshots) == 0 { - return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) - } - - hourlyTables := make([]string, 0, len(hourlySnapshots)) - for _, snapshot := range hourlySnapshots { - hourlyTables = append(hourlyTables, snapshot.TableName) - } - unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) - if err != nil { - return err - } - - currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) - if err != nil { - c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) - } else { - c.Logger.Info("Daily snapshot totals", - "date", dayStart.Format("2006-01-02"), - "vm_count", currentTotals.VmCount, - "vcpu_total", currentTotals.VcpuTotal, - "ram_total_gb", currentTotals.RamTotal, - "disk_total_gb", currentTotals.DiskTotal, - ) - } - - prevStart := dayStart.AddDate(0, 0, -1) - prevEnd := dayStart - prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd) - if err == nil && len(prevSnapshots) > 0 { - prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) - prevTables := make([]string, 0, len(prevSnapshots)) - for _, snapshot := range prevSnapshots { - prevTables = append(prevTables, snapshot.TableName) - } - prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) - if err == nil { - prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) - if err != nil { - c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) - } else { - c.Logger.Info("Daily snapshot comparison", - "current_date", dayStart.Format("2006-01-02"), - "previous_date", prevStart.Format("2006-01-02"), - "vm_delta", currentTotals.VmCount-prevTotals.VmCount, - "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, - "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, - "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, - ) - } - } else { - c.Logger.Warn("unable to build previous day union", "error", err) - } - } - - insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) - if err != nil { - return err - } - - if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { - c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) - return err - } - rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) - if err != nil { - c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) - } - if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { - c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) - } - - if err := c.generateReport(ctx, summaryTable); err != nil { - c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) - } - - c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) - return nil -} - -// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. -func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) { - jobCtx := ctx - jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute) - if jobTimeout > 0 { - var cancel context.CancelFunc - jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) - defer cancel() - } - tracker := NewCronTracker(c.Database) - done, skip, err := tracker.Start(jobCtx, "monthly_aggregate") - if err != nil { - return err - } - if skip { - logger.Warn("Monthly aggregate skipped because a previous run is still active") - return nil - } - defer func() { done(err) }() - - if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { - return err - } - - startedAt := time.Now() - defer func() { - logger.Info("Monthly summary job finished", "duration", time.Since(startedAt)) - }() - now := time.Now() - firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) - targetMonth := firstOfThisMonth.AddDate(0, -1, 0) - err = c.aggregateMonthlySummary(jobCtx, targetMonth, false) - return err -} - -func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error { - return c.aggregateMonthlySummary(ctx, month, force) -} - -func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error { - if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { - return err - } - - monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) - monthEnd := monthStart.AddDate(0, 1, 0) - dailySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", monthStart, monthEnd) - if err != nil { - return err - } - - dbConn := c.Database.DB() - dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) - if len(dailySnapshots) == 0 { - return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) - } - - monthlyTable, err := monthlySummaryTableName(targetMonth) - if err != nil { - return err - } - - if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil { - return err - } - if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil { - return err - } else if rowsExist && !force { - c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) - return nil - } else if rowsExist && force { - if err := clearTable(ctx, dbConn, monthlyTable); err != nil { - return err - } - } - - dailyTables := make([]string, 0, len(dailySnapshots)) - for _, snapshot := range dailySnapshots { - dailyTables = append(dailyTables, snapshot.TableName) - } - unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) - if err != nil { - return err - } - - monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) - if err != nil { - c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01")) - } else { - c.Logger.Info("Monthly snapshot totals", - "month", targetMonth.Format("2006-01"), - "vm_count", monthlyTotals.VmCount, - "vcpu_total", monthlyTotals.VcpuTotal, - "ram_total_gb", monthlyTotals.RamTotal, - "disk_total_gb", monthlyTotals.DiskTotal, - ) - } - - insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) - if err != nil { - return err - } - - if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { - c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) - return err - } - rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) - if err != nil { - c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) - } - if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil { - c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) - } - - if err := c.generateReport(ctx, monthlyTable); err != nil { - c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) - } - - c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) - return nil -} - // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx @@ -545,14 +274,6 @@ func hourlyInventoryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } -func dailySummaryTableName(t time.Time) (string, error) { - return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) -} - -func monthlySummaryTableName(t time.Time) (string, error) { - return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) -} - func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil { return err diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go new file mode 100644 index 0000000..02b6f13 --- /dev/null +++ b/internal/tasks/monthlyAggregate.go @@ -0,0 +1,117 @@ +package tasks + +import ( + "context" + "fmt" + "log/slog" + "time" + "vctp/db" + "vctp/internal/report" +) + +// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. +func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) { + jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute) + return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error { + startedAt := time.Now() + defer func() { + logger.Info("Monthly summary job finished", "duration", time.Since(startedAt)) + }() + now := time.Now() + firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) + targetMonth := firstOfThisMonth.AddDate(0, -1, 0) + return c.aggregateMonthlySummary(jobCtx, targetMonth, false) + }) +} + +func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error { + return c.aggregateMonthlySummary(ctx, month, force) +} + +func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error { + if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { + return err + } + + monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) + monthEnd := monthStart.AddDate(0, 1, 0) + dailySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", monthStart, monthEnd) + if err != nil { + return err + } + + dbConn := c.Database.DB() + dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) + if len(dailySnapshots) == 0 { + return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) + } + + monthlyTable, err := monthlySummaryTableName(targetMonth) + if err != nil { + return err + } + + if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil { + return err + } + if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil { + return err + } else if rowsExist && !force { + c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) + return nil + } else if rowsExist && force { + if err := clearTable(ctx, dbConn, monthlyTable); err != nil { + return err + } + } + + dailyTables := make([]string, 0, len(dailySnapshots)) + for _, snapshot := range dailySnapshots { + dailyTables = append(dailyTables, snapshot.TableName) + } + unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) + if err != nil { + return err + } + + monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) + if err != nil { + c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01")) + } else { + c.Logger.Info("Monthly snapshot totals", + "month", targetMonth.Format("2006-01"), + "vm_count", monthlyTotals.VmCount, + "vcpu_total", monthlyTotals.VcpuTotal, + "ram_total_gb", monthlyTotals.RamTotal, + "disk_total_gb", monthlyTotals.DiskTotal, + ) + } + + insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) + if err != nil { + return err + } + + if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { + c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) + return err + } + rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) + if err != nil { + c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) + } + if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil { + c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) + } + + if err := c.generateReport(ctx, monthlyTable); err != nil { + c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) + } + + c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) + return nil +} + +func monthlySummaryTableName(t time.Time) (string, error) { + return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) +}