370 lines
12 KiB
Go
370 lines
12 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
"vctp/db"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
type AggregationBenchmarkStats struct {
|
|
Runs int
|
|
Min time.Duration
|
|
Median time.Duration
|
|
Avg time.Duration
|
|
Max time.Duration
|
|
}
|
|
|
|
type AggregationBenchmarkReport struct {
|
|
Runs int
|
|
|
|
DailyWindowStart time.Time
|
|
DailyWindowEnd time.Time
|
|
DailyGo AggregationBenchmarkStats
|
|
DailySQL AggregationBenchmarkStats
|
|
DailyGoRowsWritten int64
|
|
DailySQLRowsWritten int64
|
|
|
|
MonthlyWindowStart time.Time
|
|
MonthlyWindowEnd time.Time
|
|
MonthlyGo AggregationBenchmarkStats
|
|
MonthlySQL AggregationBenchmarkStats
|
|
MonthlyGoRowsWritten int64
|
|
MonthlySQLRowsWritten int64
|
|
}
|
|
|
|
// RunCanonicalAggregationBenchmark compares Go and SQL aggregation cores on canonical cache tables.
|
|
func (c *CronTask) RunCanonicalAggregationBenchmark(ctx context.Context, runs int) (AggregationBenchmarkReport, error) {
|
|
if runs <= 0 {
|
|
runs = 3
|
|
}
|
|
report := AggregationBenchmarkReport{Runs: runs}
|
|
dbConn := c.Database.DB()
|
|
logger := loggerFromCtx(ctx, c.Logger)
|
|
|
|
hourlyStart, hourlyEnd, err := latestDailyWindowFromHourlyCache(ctx, dbConn)
|
|
if err != nil {
|
|
return report, err
|
|
}
|
|
if !hourlyStart.IsZero() {
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark phase starting", "phase", "daily", "window_start", hourlyStart.Format(time.RFC3339), "window_end", hourlyEnd.Format(time.RFC3339), "runs", runs)
|
|
}
|
|
report.DailyWindowStart = hourlyStart
|
|
report.DailyWindowEnd = hourlyEnd
|
|
goDurations := make([]time.Duration, 0, runs)
|
|
sqlDurations := make([]time.Duration, 0, runs)
|
|
var goRows, sqlRows int64
|
|
for i := 0; i < runs; i++ {
|
|
run := i + 1
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run starting", "phase", "daily", "mode", "go", "run", run, "runs", runs)
|
|
}
|
|
dur, rows, runErr := c.benchmarkDailyGoCore(ctx, hourlyStart, hourlyEnd)
|
|
if runErr != nil {
|
|
return report, fmt.Errorf("daily go benchmark run %d failed: %w", i+1, runErr)
|
|
}
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run complete", "phase", "daily", "mode", "go", "run", run, "runs", runs, "duration", dur, "rows", rows)
|
|
}
|
|
goDurations = append(goDurations, dur)
|
|
goRows = rows
|
|
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run starting", "phase", "daily", "mode", "sql", "run", run, "runs", runs)
|
|
}
|
|
dur, rows, runErr = c.benchmarkDailySQLCore(ctx, hourlyStart, hourlyEnd)
|
|
if runErr != nil {
|
|
return report, fmt.Errorf("daily sql benchmark run %d failed: %w", i+1, runErr)
|
|
}
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run complete", "phase", "daily", "mode", "sql", "run", run, "runs", runs, "duration", dur, "rows", rows)
|
|
}
|
|
sqlDurations = append(sqlDurations, dur)
|
|
sqlRows = rows
|
|
}
|
|
report.DailyGo = summarizeDurations(goDurations)
|
|
report.DailySQL = summarizeDurations(sqlDurations)
|
|
report.DailyGoRowsWritten = goRows
|
|
report.DailySQLRowsWritten = sqlRows
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark phase complete", "phase", "daily", "runs", runs)
|
|
}
|
|
} else if logger != nil {
|
|
logger.Info("canonical benchmark phase skipped", "phase", "daily", "reason", "no benchmarkable window found in vm_hourly_stats")
|
|
}
|
|
|
|
monthlyStart, monthlyEnd, err := latestMonthlyWindowFromDailyRollup(ctx, dbConn)
|
|
if err != nil {
|
|
return report, err
|
|
}
|
|
if !monthlyStart.IsZero() {
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark phase starting", "phase", "monthly", "window_start", monthlyStart.Format(time.RFC3339), "window_end", monthlyEnd.Format(time.RFC3339), "runs", runs)
|
|
}
|
|
report.MonthlyWindowStart = monthlyStart
|
|
report.MonthlyWindowEnd = monthlyEnd
|
|
goDurations := make([]time.Duration, 0, runs)
|
|
sqlDurations := make([]time.Duration, 0, runs)
|
|
var goRows, sqlRows int64
|
|
for i := 0; i < runs; i++ {
|
|
run := i + 1
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run starting", "phase", "monthly", "mode", "go", "run", run, "runs", runs)
|
|
}
|
|
dur, rows, runErr := c.benchmarkMonthlyGoCore(ctx, monthlyStart, monthlyEnd)
|
|
if runErr != nil {
|
|
return report, fmt.Errorf("monthly go benchmark run %d failed: %w", i+1, runErr)
|
|
}
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run complete", "phase", "monthly", "mode", "go", "run", run, "runs", runs, "duration", dur, "rows", rows)
|
|
}
|
|
goDurations = append(goDurations, dur)
|
|
goRows = rows
|
|
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run starting", "phase", "monthly", "mode", "sql", "run", run, "runs", runs)
|
|
}
|
|
dur, rows, runErr = c.benchmarkMonthlySQLCore(ctx, monthlyStart, monthlyEnd)
|
|
if runErr != nil {
|
|
return report, fmt.Errorf("monthly sql benchmark run %d failed: %w", i+1, runErr)
|
|
}
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark run complete", "phase", "monthly", "mode", "sql", "run", run, "runs", runs, "duration", dur, "rows", rows)
|
|
}
|
|
sqlDurations = append(sqlDurations, dur)
|
|
sqlRows = rows
|
|
}
|
|
report.MonthlyGo = summarizeDurations(goDurations)
|
|
report.MonthlySQL = summarizeDurations(sqlDurations)
|
|
report.MonthlyGoRowsWritten = goRows
|
|
report.MonthlySQLRowsWritten = sqlRows
|
|
if logger != nil {
|
|
logger.Info("canonical benchmark phase complete", "phase", "monthly", "runs", runs)
|
|
}
|
|
} else if logger != nil {
|
|
logger.Info("canonical benchmark phase skipped", "phase", "monthly", "reason", "no benchmarkable window found in vm_daily_rollup")
|
|
}
|
|
|
|
if report.DailyWindowStart.IsZero() && report.MonthlyWindowStart.IsZero() {
|
|
return report, fmt.Errorf("no benchmarkable canonical windows found (vm_hourly_stats/vm_daily_rollup are empty)")
|
|
}
|
|
return report, nil
|
|
}
|
|
|
|
func (c *CronTask) benchmarkDailyGoCore(ctx context.Context, dayStart, dayEnd time.Time) (time.Duration, int64, error) {
|
|
tableName, err := benchmarkSummaryTableName("benchmark_daily_go")
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
dbConn := c.Database.DB()
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer dropSnapshotTable(ctx, dbConn, tableName)
|
|
|
|
started := time.Now()
|
|
aggMap, snapTimes, err := c.scanHourlyCache(ctx, dayStart, dayEnd)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if len(aggMap) == 0 || len(snapTimes) == 0 {
|
|
return 0, 0, fmt.Errorf("no daily rows found in canonical hourly cache")
|
|
}
|
|
totalSamplesByVcenter := sampleCountsByVcenter(aggMap)
|
|
if err := c.insertDailyAggregates(ctx, tableName, aggMap, len(snapTimes), totalSamplesByVcenter); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
elapsed := time.Since(started)
|
|
rows, err := db.TableRowCount(ctx, dbConn, tableName)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return elapsed, rows, nil
|
|
}
|
|
|
|
func (c *CronTask) benchmarkDailySQLCore(ctx context.Context, dayStart, dayEnd time.Time) (time.Duration, int64, error) {
|
|
tableName, err := benchmarkSummaryTableName("benchmark_daily_sql")
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
dbConn := c.Database.DB()
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer dropSnapshotTable(ctx, dbConn, tableName)
|
|
|
|
insertQuery, err := db.BuildDailySummaryInsert(tableName, buildCanonicalHourlySummaryUnion(dayStart, dayEnd))
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
started := time.Now()
|
|
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
elapsed := time.Since(started)
|
|
rows, err := db.TableRowCount(ctx, dbConn, tableName)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return elapsed, rows, nil
|
|
}
|
|
|
|
func (c *CronTask) benchmarkMonthlyGoCore(ctx context.Context, monthStart, monthEnd time.Time) (time.Duration, int64, error) {
|
|
tableName, err := benchmarkSummaryTableName("benchmark_monthly_go")
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
dbConn := c.Database.DB()
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer dropSnapshotTable(ctx, dbConn, tableName)
|
|
|
|
started := time.Now()
|
|
aggMap, err := c.scanDailyRollup(ctx, monthStart, monthEnd)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if len(aggMap) == 0 {
|
|
return 0, 0, fmt.Errorf("no monthly rows found in canonical daily rollup")
|
|
}
|
|
if err := c.insertMonthlyAggregates(ctx, tableName, aggMap); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
elapsed := time.Since(started)
|
|
rows, err := db.TableRowCount(ctx, dbConn, tableName)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return elapsed, rows, nil
|
|
}
|
|
|
|
func (c *CronTask) benchmarkMonthlySQLCore(ctx context.Context, monthStart, monthEnd time.Time) (time.Duration, int64, error) {
|
|
tableName, err := benchmarkSummaryTableName("benchmark_monthly_sql")
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
dbConn := c.Database.DB()
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, tableName); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer dropSnapshotTable(ctx, dbConn, tableName)
|
|
|
|
insertQuery, err := db.BuildMonthlySummaryInsert(tableName, buildCanonicalDailyRollupSummaryUnion(monthStart, monthEnd))
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
started := time.Now()
|
|
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
elapsed := time.Since(started)
|
|
rows, err := db.TableRowCount(ctx, dbConn, tableName)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return elapsed, rows, nil
|
|
}
|
|
|
|
func benchmarkSummaryTableName(prefix string) (string, error) {
|
|
return db.SafeTableName(fmt.Sprintf("%s_%d", prefix, time.Now().UTC().UnixNano()))
|
|
}
|
|
|
|
func latestDailyWindowFromHourlyCache(ctx context.Context, dbConn *sqlx.DB) (time.Time, time.Time, error) {
|
|
if !db.TableExists(ctx, dbConn, "vm_hourly_stats") {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
query := dbConn.Rebind(`
|
|
SELECT MAX("SnapshotTime")
|
|
FROM vm_hourly_stats
|
|
WHERE "SnapshotTime" > ?
|
|
`)
|
|
var maxSnapshot sql.NullInt64
|
|
if err := dbConn.GetContext(ctx, &maxSnapshot, query, 0); err != nil {
|
|
return time.Time{}, time.Time{}, err
|
|
}
|
|
if !maxSnapshot.Valid || maxSnapshot.Int64 <= 0 {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
dayStart := time.Unix(maxSnapshot.Int64, 0).UTC()
|
|
dayStart = time.Date(dayStart.Year(), dayStart.Month(), dayStart.Day(), 0, 0, 0, 0, time.UTC)
|
|
dayEnd := dayStart.AddDate(0, 0, 1)
|
|
|
|
countQuery := dbConn.Rebind(`
|
|
SELECT COUNT(1)
|
|
FROM vm_hourly_stats
|
|
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?
|
|
`)
|
|
var count int64
|
|
if err := dbConn.GetContext(ctx, &count, countQuery, dayStart.Unix(), dayEnd.Unix()); err != nil {
|
|
return time.Time{}, time.Time{}, err
|
|
}
|
|
if count == 0 {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
return dayStart, dayEnd, nil
|
|
}
|
|
|
|
func latestMonthlyWindowFromDailyRollup(ctx context.Context, dbConn *sqlx.DB) (time.Time, time.Time, error) {
|
|
if !db.TableExists(ctx, dbConn, "vm_daily_rollup") {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
query := dbConn.Rebind(`
|
|
SELECT MAX("Date")
|
|
FROM vm_daily_rollup
|
|
WHERE "Date" > ?
|
|
`)
|
|
var maxDate sql.NullInt64
|
|
if err := dbConn.GetContext(ctx, &maxDate, query, 0); err != nil {
|
|
return time.Time{}, time.Time{}, err
|
|
}
|
|
if !maxDate.Valid || maxDate.Int64 <= 0 {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
monthStart := time.Unix(maxDate.Int64, 0).UTC()
|
|
monthStart = time.Date(monthStart.Year(), monthStart.Month(), 1, 0, 0, 0, 0, time.UTC)
|
|
monthEnd := monthStart.AddDate(0, 1, 0)
|
|
|
|
countQuery := dbConn.Rebind(`
|
|
SELECT COUNT(1)
|
|
FROM vm_daily_rollup
|
|
WHERE "Date" >= ? AND "Date" < ?
|
|
`)
|
|
var count int64
|
|
if err := dbConn.GetContext(ctx, &count, countQuery, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
return time.Time{}, time.Time{}, err
|
|
}
|
|
if count == 0 {
|
|
return time.Time{}, time.Time{}, nil
|
|
}
|
|
return monthStart, monthEnd, nil
|
|
}
|
|
|
|
func summarizeDurations(values []time.Duration) AggregationBenchmarkStats {
|
|
if len(values) == 0 {
|
|
return AggregationBenchmarkStats{}
|
|
}
|
|
sorted := append([]time.Duration(nil), values...)
|
|
slices.Sort(sorted)
|
|
total := time.Duration(0)
|
|
for _, v := range sorted {
|
|
total += v
|
|
}
|
|
median := sorted[len(sorted)/2]
|
|
if len(sorted)%2 == 0 {
|
|
median = (sorted[(len(sorted)/2)-1] + sorted[len(sorted)/2]) / 2
|
|
}
|
|
return AggregationBenchmarkStats{
|
|
Runs: len(sorted),
|
|
Min: sorted[0],
|
|
Median: median,
|
|
Avg: total / time.Duration(len(sorted)),
|
|
Max: sorted[len(sorted)-1],
|
|
}
|
|
}
|