From f0bacab7290baa1d990fa7e725f22cd554371470 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 15 Jan 2026 19:43:20 +1100 Subject: [PATCH] postgres optimisations and daily sqlite vacuum --- db/helpers.go | 39 +++++++++++++++++++++++ internal/report/snapshots.go | 4 +++ internal/settings/settings.go | 1 + internal/tasks/dailyAggregate.go | 2 ++ internal/tasks/monthlyAggregate.go | 3 ++ main.go | 50 +++++------------------------- 6 files changed, 56 insertions(+), 43 deletions(-) diff --git a/db/helpers.go b/db/helpers.go index da202f6..cb37ffb 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -287,11 +287,20 @@ func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName strin return err } + driver := strings.ToLower(dbConn.DriverName()) indexes := []string{ fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName), } + // PG-specific helpful indexes; safe no-ops on SQLite if executed, but keep them gated to reduce file bloat. + if driver == "pgx" || driver == "postgres" { + indexes = append(indexes, + fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_snapshottime_idx ON %s ("Vcenter","SnapshotTime")`, tableName, tableName), + fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_vcenter_idx ON %s ("Name","Vcenter")`, tableName, tableName), + fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName), + ) + } for _, idx := range indexes { if _, err := dbConn.ExecContext(ctx, idx); err != nil { return err @@ -339,6 +348,30 @@ func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) { } } +// AnalyzeTableIfPostgres runs ANALYZE on a table to refresh planner stats. +func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string) { + if _, err := SafeTableName(tableName); err != nil { + return + } + driver := strings.ToLower(dbConn.DriverName()) + if driver != "pgx" && driver != "postgres" { + return + } + _, _ = dbConn.ExecContext(ctx, fmt.Sprintf(`ANALYZE %s`, tableName)) +} + +// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers. +func SetPostgresWorkMem(ctx context.Context, dbConn *sqlx.DB, workMemMB int) { + if workMemMB <= 0 { + return + } + driver := strings.ToLower(dbConn.DriverName()) + if driver != "pgx" && driver != "postgres" { + return + } + _, _ = dbConn.ExecContext(ctx, fmt.Sprintf(`SET LOCAL work_mem = '%dMB'`, workMemMB)) +} + // CheckMigrationState ensures goose migrations are present and not dirty. func CheckMigrationState(ctx context.Context, dbConn *sqlx.DB) error { driver := strings.ToLower(dbConn.DriverName()) @@ -822,6 +855,12 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName), } + if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" { + indexes = append(indexes, + fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_idx ON %s ("Vcenter")`, tableName, tableName), + fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName), + ) + } for _, idx := range indexes { if _, err := dbConn.ExecContext(ctx, idx); err != nil { return err diff --git a/internal/report/snapshots.go b/internal/report/snapshots.go index 9abf7ff..f2fd0c0 100644 --- a/internal/report/snapshots.go +++ b/internal/report/snapshots.go @@ -99,6 +99,8 @@ CREATE TABLE IF NOT EXISTS snapshot_registry ( if err != nil && !strings.Contains(strings.ToLower(err.Error()), "duplicate column name") { return err } + _, _ = dbConn.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_snapshot_registry_type_time ON snapshot_registry (snapshot_type, snapshot_time)`) + _, _ = dbConn.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_snapshot_registry_table_name ON snapshot_registry (table_name)`) return nil case "pgx", "postgres": _, err := dbConn.ExecContext(ctx, ` @@ -117,6 +119,8 @@ CREATE TABLE IF NOT EXISTS snapshot_registry ( if err != nil && !strings.Contains(strings.ToLower(err.Error()), "column \"snapshot_count\" of relation \"snapshot_registry\" already exists") { return err } + _, _ = dbConn.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_snapshot_registry_type_time ON snapshot_registry (snapshot_type, snapshot_time DESC)`) + _, _ = dbConn.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_snapshot_registry_table_name ON snapshot_registry (table_name)`) return nil default: return fmt.Errorf("unsupported driver for snapshot registry: %s", driver) diff --git a/internal/settings/settings.go b/internal/settings/settings.go index b650982..d3a1286 100644 --- a/internal/settings/settings.go +++ b/internal/settings/settings.go @@ -52,6 +52,7 @@ type SettingsYML struct { NodeChargeClusters []string `yaml:"node_charge_clusters"` SrmActiveActiveVms []string `yaml:"srm_activeactive_vms"` VcenterAddresses []string `yaml:"vcenter_addresses"` + PostgresWorkMemMB int `yaml:"postgres_work_mem_mb"` } `yaml:"settings"` } diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index 9457920..8735bba 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -37,6 +37,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti } dbConn := c.Database.DB() + db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { return err } @@ -139,6 +140,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } + db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index 976f395..32d9e11 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -44,6 +44,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) dbConn := c.Database.DB() + db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) if len(dailySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) @@ -114,6 +115,8 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) } + db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable) + if err := c.generateReport(ctx, monthlyTable); err != nil { c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), err) diff --git a/main.go b/main.go index 69d8741..d3559c0 100644 --- a/main.go +++ b/main.go @@ -139,7 +139,6 @@ func main() { logger.Info("encrypted vcenter password stored in settings file") } } - //os.Exit(1) } creds := vcenter.VcenterLogin{ @@ -168,54 +167,12 @@ func main() { FirstHourlySnapshotCheck: true, } - /* - cronFrequency = durationFromSeconds(s.Values.Settings.VcenterEventPollingSeconds, 60) - logger.Debug("Setting VM event polling cronjob frequency to", "frequency", cronFrequency) - - cronInvFrequency = durationFromSeconds(s.Values.Settings.VcenterInventoryPollingSeconds, 7200) - logger.Debug("Setting VM inventory polling cronjob frequency to", "frequency", cronInvFrequency) - */ - cronSnapshotFrequency = durationFromSeconds(s.Values.Settings.VcenterInventorySnapshotSeconds, 3600) logger.Debug("Setting VM inventory snapshot cronjob frequency to", "frequency", cronSnapshotFrequency) cronAggregateFrequency = durationFromSeconds(s.Values.Settings.VcenterInventoryAggregateSeconds, 86400) logger.Debug("Setting VM inventory daily aggregation cronjob frequency to", "frequency", cronAggregateFrequency) - /* - // start background processing for events stored in events table - startsAt := time.Now().Add(time.Second * 10) - job, err := c.NewJob( - gocron.DurationJob(cronFrequency), - gocron.NewTask(func() { - ct.RunVmCheck(ctx, logger) - }), gocron.WithSingletonMode(gocron.LimitModeReschedule), - gocron.WithStartAt(gocron.WithStartDateTime(startsAt)), - ) - if err != nil { - logger.Error("failed to start event processing cron job", "error", err) - os.Exit(1) - } - logger.Debug("Created event processing cron job", "job", job.ID(), "starting_at", startsAt) - */ - - // start background checks of vcenter inventory - /* - startsAt2 := time.Now().Add(cronInvFrequency) - job2, err := c.NewJob( - gocron.DurationJob(cronInvFrequency), - gocron.NewTask(func() { - ct.RunVcenterPoll(ctx, logger) - }), gocron.WithSingletonMode(gocron.LimitModeReschedule), - gocron.WithStartAt(gocron.WithStartDateTime(startsAt2)), - ) - if err != nil { - logger.Error("failed to start vcenter inventory cron job", "error", err) - os.Exit(1) - } - logger.Debug("Created vcenter inventory cron job", "job", job2.ID(), "starting_at", startsAt2) - */ - startsAt3 := time.Now().Add(cronSnapshotFrequency) if cronSnapshotFrequency == time.Hour { startsAt3 = time.Now().Truncate(time.Hour).Add(time.Hour) @@ -273,6 +230,13 @@ func main() { gocron.CronJob(snapshotCleanupCron, false), gocron.NewTask(func() { ct.RunSnapshotCleanup(ctx, logger) + if strings.EqualFold(s.Values.Settings.DatabaseDriver, "sqlite") { + if _, err := ct.Database.DB().ExecContext(ctx, "VACUUM"); err != nil { + logger.Warn("VACUUM failed after snapshot cleanup", "error", err) + } else { + logger.Debug("VACUUM completed after snapshot cleanup") + } + } }), gocron.WithSingletonMode(gocron.LimitModeReschedule), ) if err != nil {