Files
vctp2/internal/tasks/aggregationBenchmark.go
nathan 2e0788caf2
continuous-integration/drone/push Build is passing
more logging
2026-04-20 16:59:45 +10:00

370 lines
12 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"slices"
"time"
"vctp/db"
"github.com/jmoiron/sqlx"
)
type AggregationBenchmarkStats struct {
Runs int
Min time.Duration
Median time.Duration
Avg time.Duration
Max time.Duration
}
type AggregationBenchmarkReport struct {
Runs int
DailyWindowStart time.Time
DailyWindowEnd time.Time
DailyGo AggregationBenchmarkStats
DailySQL AggregationBenchmarkStats
DailyGoRowsWritten int64
DailySQLRowsWritten int64
MonthlyWindowStart time.Time
MonthlyWindowEnd time.Time
MonthlyGo AggregationBenchmarkStats
MonthlySQL AggregationBenchmarkStats
MonthlyGoRowsWritten int64
MonthlySQLRowsWritten int64
}
// RunCanonicalAggregationBenchmark compares Go and SQL aggregation cores on canonical cache tables.
func (c *CronTask) RunCanonicalAggregationBenchmark(ctx context.Context, runs int) (AggregationBenchmarkReport, error) {
if runs <= 0 {
runs = 3
}
report := AggregationBenchmarkReport{Runs: runs}
dbConn := c.Database.DB()
logger := loggerFromCtx(ctx, c.Logger)
hourlyStart, hourlyEnd, err := latestDailyWindowFromHourlyCache(ctx, dbConn)
if err != nil {
return report, err
}
if !hourlyStart.IsZero() {
if logger != nil {
logger.Info("canonical benchmark phase starting", "phase", "daily", "window_start", hourlyStart.Format(time.RFC3339), "window_end", hourlyEnd.Format(time.RFC3339), "runs", runs)
}
report.DailyWindowStart = hourlyStart
report.DailyWindowEnd = hourlyEnd
goDurations := make([]time.Duration, 0, runs)
sqlDurations := make([]time.Duration, 0, runs)
var goRows, sqlRows int64
for i := 0; i < runs; i++ {
run := i + 1
if logger != nil {
logger.Info("canonical benchmark run starting", "phase", "daily", "mode", "go", "run", run, "runs", runs)
}
dur, rows, runErr := c.benchmarkDailyGoCore(ctx, hourlyStart, hourlyEnd)
if runErr != nil {
return report, fmt.Errorf("daily go benchmark run %d failed: %w", i+1, runErr)
}
if logger != nil {
logger.Info("canonical benchmark run complete", "phase", "daily", "mode", "go", "run", run, "runs", runs, "duration", dur, "rows", rows)
}
goDurations = append(goDurations, dur)
goRows = rows
if logger != nil {
logger.Info("canonical benchmark run starting", "phase", "daily", "mode", "sql", "run", run, "runs", runs)
}
dur, rows, runErr = c.benchmarkDailySQLCore(ctx, hourlyStart, hourlyEnd)
if runErr != nil {
return report, fmt.Errorf("daily sql benchmark run %d failed: %w", i+1, runErr)
}
if logger != nil {
logger.Info("canonical benchmark run complete", "phase", "daily", "mode", "sql", "run", run, "runs", runs, "duration", dur, "rows", rows)
}
sqlDurations = append(sqlDurations, dur)
sqlRows = rows
}
report.DailyGo = summarizeDurations(goDurations)
report.DailySQL = summarizeDurations(sqlDurations)
report.DailyGoRowsWritten = goRows
report.DailySQLRowsWritten = sqlRows
if logger != nil {
logger.Info("canonical benchmark phase complete", "phase", "daily", "runs", runs)
}
} else if logger != nil {
logger.Info("canonical benchmark phase skipped", "phase", "daily", "reason", "no benchmarkable window found in vm_hourly_stats")
}
monthlyStart, monthlyEnd, err := latestMonthlyWindowFromDailyRollup(ctx, dbConn)
if err != nil {
return report, err
}
if !monthlyStart.IsZero() {
if logger != nil {
logger.Info("canonical benchmark phase starting", "phase", "monthly", "window_start", monthlyStart.Format(time.RFC3339), "window_end", monthlyEnd.Format(time.RFC3339), "runs", runs)
}
report.MonthlyWindowStart = monthlyStart
report.MonthlyWindowEnd = monthlyEnd
goDurations := make([]time.Duration, 0, runs)
sqlDurations := make([]time.Duration, 0, runs)
var goRows, sqlRows int64
for i := 0; i < runs; i++ {
run := i + 1
if logger != nil {
logger.Info("canonical benchmark run starting", "phase", "monthly", "mode", "go", "run", run, "runs", runs)
}
dur, rows, runErr := c.benchmarkMonthlyGoCore(ctx, monthlyStart, monthlyEnd)
if runErr != nil {
return report, fmt.Errorf("monthly go benchmark run %d failed: %w", i+1, runErr)
}
if logger != nil {
logger.Info("canonical benchmark run complete", "phase", "monthly", "mode", "go", "run", run, "runs", runs, "duration", dur, "rows", rows)
}
goDurations = append(goDurations, dur)
goRows = rows
if logger != nil {
logger.Info("canonical benchmark run starting", "phase", "monthly", "mode", "sql", "run", run, "runs", runs)
}
dur, rows, runErr = c.benchmarkMonthlySQLCore(ctx, monthlyStart, monthlyEnd)
if runErr != nil {
return report, fmt.Errorf("monthly sql benchmark run %d failed: %w", i+1, runErr)
}
if logger != nil {
logger.Info("canonical benchmark run complete", "phase", "monthly", "mode", "sql", "run", run, "runs", runs, "duration", dur, "rows", rows)
}
sqlDurations = append(sqlDurations, dur)
sqlRows = rows
}
report.MonthlyGo = summarizeDurations(goDurations)
report.MonthlySQL = summarizeDurations(sqlDurations)
report.MonthlyGoRowsWritten = goRows
report.MonthlySQLRowsWritten = sqlRows
if logger != nil {
logger.Info("canonical benchmark phase complete", "phase", "monthly", "runs", runs)
}
} else if logger != nil {
logger.Info("canonical benchmark phase skipped", "phase", "monthly", "reason", "no benchmarkable window found in vm_daily_rollup")
}
if report.DailyWindowStart.IsZero() && report.MonthlyWindowStart.IsZero() {
return report, fmt.Errorf("no benchmarkable canonical windows found (vm_hourly_stats/vm_daily_rollup are empty)")
}
return report, nil
}
func (c *CronTask) benchmarkDailyGoCore(ctx context.Context, dayStart, dayEnd time.Time) (time.Duration, int64, error) {
tableName, err := benchmarkSummaryTableName("benchmark_daily_go")
if err != nil {
return 0, 0, err
}
dbConn := c.Database.DB()
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
return 0, 0, err
}
defer dropSnapshotTable(ctx, dbConn, tableName)
started := time.Now()
aggMap, snapTimes, err := c.scanHourlyCache(ctx, dayStart, dayEnd)
if err != nil {
return 0, 0, err
}
if len(aggMap) == 0 || len(snapTimes) == 0 {
return 0, 0, fmt.Errorf("no daily rows found in canonical hourly cache")
}
totalSamplesByVcenter := sampleCountsByVcenter(aggMap)
if err := c.insertDailyAggregates(ctx, tableName, aggMap, len(snapTimes), totalSamplesByVcenter); err != nil {
return 0, 0, err
}
elapsed := time.Since(started)
rows, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
return 0, 0, err
}
return elapsed, rows, nil
}
func (c *CronTask) benchmarkDailySQLCore(ctx context.Context, dayStart, dayEnd time.Time) (time.Duration, int64, error) {
tableName, err := benchmarkSummaryTableName("benchmark_daily_sql")
if err != nil {
return 0, 0, err
}
dbConn := c.Database.DB()
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
return 0, 0, err
}
defer dropSnapshotTable(ctx, dbConn, tableName)
insertQuery, err := db.BuildDailySummaryInsert(tableName, buildCanonicalHourlySummaryUnion(dayStart, dayEnd))
if err != nil {
return 0, 0, err
}
started := time.Now()
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
return 0, 0, err
}
elapsed := time.Since(started)
rows, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
return 0, 0, err
}
return elapsed, rows, nil
}
func (c *CronTask) benchmarkMonthlyGoCore(ctx context.Context, monthStart, monthEnd time.Time) (time.Duration, int64, error) {
tableName, err := benchmarkSummaryTableName("benchmark_monthly_go")
if err != nil {
return 0, 0, err
}
dbConn := c.Database.DB()
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
return 0, 0, err
}
defer dropSnapshotTable(ctx, dbConn, tableName)
started := time.Now()
aggMap, err := c.scanDailyRollup(ctx, monthStart, monthEnd)
if err != nil {
return 0, 0, err
}
if len(aggMap) == 0 {
return 0, 0, fmt.Errorf("no monthly rows found in canonical daily rollup")
}
if err := c.insertMonthlyAggregates(ctx, tableName, aggMap); err != nil {
return 0, 0, err
}
elapsed := time.Since(started)
rows, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
return 0, 0, err
}
return elapsed, rows, nil
}
func (c *CronTask) benchmarkMonthlySQLCore(ctx context.Context, monthStart, monthEnd time.Time) (time.Duration, int64, error) {
tableName, err := benchmarkSummaryTableName("benchmark_monthly_sql")
if err != nil {
return 0, 0, err
}
dbConn := c.Database.DB()
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
return 0, 0, err
}
defer dropSnapshotTable(ctx, dbConn, tableName)
insertQuery, err := db.BuildMonthlySummaryInsert(tableName, buildCanonicalDailyRollupSummaryUnion(monthStart, monthEnd))
if err != nil {
return 0, 0, err
}
started := time.Now()
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
return 0, 0, err
}
elapsed := time.Since(started)
rows, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
return 0, 0, err
}
return elapsed, rows, nil
}
func benchmarkSummaryTableName(prefix string) (string, error) {
return db.SafeTableName(fmt.Sprintf("%s_%d", prefix, time.Now().UTC().UnixNano()))
}
func latestDailyWindowFromHourlyCache(ctx context.Context, dbConn *sqlx.DB) (time.Time, time.Time, error) {
if !db.TableExists(ctx, dbConn, "vm_hourly_stats") {
return time.Time{}, time.Time{}, nil
}
query := dbConn.Rebind(`
SELECT MAX("SnapshotTime")
FROM vm_hourly_stats
WHERE "SnapshotTime" > ?
`)
var maxSnapshot sql.NullInt64
if err := dbConn.GetContext(ctx, &maxSnapshot, query, 0); err != nil {
return time.Time{}, time.Time{}, err
}
if !maxSnapshot.Valid || maxSnapshot.Int64 <= 0 {
return time.Time{}, time.Time{}, nil
}
dayStart := time.Unix(maxSnapshot.Int64, 0).UTC()
dayStart = time.Date(dayStart.Year(), dayStart.Month(), dayStart.Day(), 0, 0, 0, 0, time.UTC)
dayEnd := dayStart.AddDate(0, 0, 1)
countQuery := dbConn.Rebind(`
SELECT COUNT(1)
FROM vm_hourly_stats
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?
`)
var count int64
if err := dbConn.GetContext(ctx, &count, countQuery, dayStart.Unix(), dayEnd.Unix()); err != nil {
return time.Time{}, time.Time{}, err
}
if count == 0 {
return time.Time{}, time.Time{}, nil
}
return dayStart, dayEnd, nil
}
func latestMonthlyWindowFromDailyRollup(ctx context.Context, dbConn *sqlx.DB) (time.Time, time.Time, error) {
if !db.TableExists(ctx, dbConn, "vm_daily_rollup") {
return time.Time{}, time.Time{}, nil
}
query := dbConn.Rebind(`
SELECT MAX("Date")
FROM vm_daily_rollup
WHERE "Date" > ?
`)
var maxDate sql.NullInt64
if err := dbConn.GetContext(ctx, &maxDate, query, 0); err != nil {
return time.Time{}, time.Time{}, err
}
if !maxDate.Valid || maxDate.Int64 <= 0 {
return time.Time{}, time.Time{}, nil
}
monthStart := time.Unix(maxDate.Int64, 0).UTC()
monthStart = time.Date(monthStart.Year(), monthStart.Month(), 1, 0, 0, 0, 0, time.UTC)
monthEnd := monthStart.AddDate(0, 1, 0)
countQuery := dbConn.Rebind(`
SELECT COUNT(1)
FROM vm_daily_rollup
WHERE "Date" >= ? AND "Date" < ?
`)
var count int64
if err := dbConn.GetContext(ctx, &count, countQuery, monthStart.Unix(), monthEnd.Unix()); err != nil {
return time.Time{}, time.Time{}, err
}
if count == 0 {
return time.Time{}, time.Time{}, nil
}
return monthStart, monthEnd, nil
}
func summarizeDurations(values []time.Duration) AggregationBenchmarkStats {
if len(values) == 0 {
return AggregationBenchmarkStats{}
}
sorted := append([]time.Duration(nil), values...)
slices.Sort(sorted)
total := time.Duration(0)
for _, v := range sorted {
total += v
}
median := sorted[len(sorted)/2]
if len(sorted)%2 == 0 {
median = (sorted[(len(sorted)/2)-1] + sorted[len(sorted)/2]) / 2
}
return AggregationBenchmarkStats{
Runs: len(sorted),
Min: sorted[0],
Median: median,
Avg: total / time.Duration(len(sorted)),
Max: sorted[len(sorted)-1],
}
}