This commit is contained in:
@@ -102,6 +102,14 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
|
||||
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
|
||||
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error {
|
||||
targetTime := time.Now().Add(-time.Minute)
|
||||
return c.aggregateDailySummary(ctx, targetTime, false)
|
||||
}
|
||||
|
||||
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
|
||||
return c.aggregateDailySummary(ctx, date, force)
|
||||
}
|
||||
|
||||
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
|
||||
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
|
||||
dayEnd := dayStart.AddDate(0, 0, 1)
|
||||
summaryTable, err := dailySummaryTableName(targetTime)
|
||||
@@ -116,6 +124,16 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo
|
||||
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist && !force {
|
||||
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
|
||||
return nil
|
||||
} else if rowsExist && force {
|
||||
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist {
|
||||
@@ -235,7 +253,14 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
|
||||
now := time.Now()
|
||||
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
|
||||
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
|
||||
return c.aggregateMonthlySummary(ctx, targetMonth, false)
|
||||
}
|
||||
|
||||
func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error {
|
||||
return c.aggregateMonthlySummary(ctx, month, force)
|
||||
}
|
||||
|
||||
func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error {
|
||||
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -259,6 +284,16 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
|
||||
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist && !force {
|
||||
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
|
||||
return nil
|
||||
} else if rowsExist && force {
|
||||
if err := clearTable(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist {
|
||||
@@ -604,6 +639,17 @@ func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error
|
||||
return err
|
||||
}
|
||||
|
||||
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to clear table %s: %w", table, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
return false, err
|
||||
|
||||
Reference in New Issue
Block a user