more index cleanups to optimise space
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-02-08 15:40:42 +11:00
parent a993aedf79
commit c66679a71f
13 changed files with 590 additions and 61 deletions

View File

@@ -5,7 +5,6 @@ import (
"embed"
"fmt"
"log/slog"
"os"
"reflect"
"strings"
@@ -24,8 +23,9 @@ type Database interface {
}
type Config struct {
Driver string
DSN string
Driver string
DSN string
EnableExperimentalPostgres bool
}
func New(logger *slog.Logger, cfg Config) (Database, error) {
@@ -42,8 +42,8 @@ func New(logger *slog.Logger, cfg Config) (Database, error) {
return db, nil
case "postgres":
// The sqlc query set is SQLite-first. Keep Postgres opt-in until full parity is validated.
if strings.TrimSpace(os.Getenv("VCTP_ENABLE_EXPERIMENTAL_POSTGRES")) != "1" {
return nil, fmt.Errorf("postgres driver is disabled by default; set VCTP_ENABLE_EXPERIMENTAL_POSTGRES=1 to enable experimental mode")
if !cfg.EnableExperimentalPostgres {
return nil, fmt.Errorf("postgres driver is disabled by default; set settings.enable_experimental_postgres=true to enable experimental mode")
}
db, err := newPostgresDB(logger, cfg.DSN)
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"sort"
"strconv"
"strings"
"time"
@@ -430,6 +431,16 @@ ORDER BY tablename DESC
// CleanupHourlySnapshotIndexes drops low-value per-table indexes on hourly snapshot tables.
func CleanupHourlySnapshotIndexes(ctx context.Context, dbConn *sqlx.DB) (int, error) {
return cleanupHourlySnapshotIndexes(ctx, dbConn, 0)
}
// CleanupHourlySnapshotIndexesOlderThan drops per-table hourly indexes for snapshot tables older than cutoff.
// cutoff <= 0 means drop across all hourly tables.
func CleanupHourlySnapshotIndexesOlderThan(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (int, error) {
return cleanupHourlySnapshotIndexes(ctx, dbConn, cutoff.Unix())
}
func cleanupHourlySnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, cutoffUnix int64) (int, error) {
driver := strings.ToLower(dbConn.DriverName())
if driver != "sqlite" {
return 0, fmt.Errorf("hourly snapshot index cleanup is only supported for sqlite")
@@ -438,25 +449,68 @@ func CleanupHourlySnapshotIndexes(ctx context.Context, dbConn *sqlx.DB) (int, er
if err != nil {
return 0, err
}
var existing []struct {
Name string `db:"name"`
}
if err := selectLog(ctx, dbConn, &existing, `
SELECT name
FROM sqlite_master
WHERE type = 'index'
AND tbl_name LIKE 'inventory_hourly_%'
`); err != nil {
return 0, err
}
existingSet := make(map[string]struct{}, len(existing))
for _, idx := range existing {
existingSet[idx.Name] = struct{}{}
}
dropped := 0
for _, tableName := range tables {
if _, err := SafeTableName(tableName); err != nil {
continue
}
indexes := []string{
fmt.Sprintf(`DROP INDEX IF EXISTS %s_snapshottime_idx`, tableName),
fmt.Sprintf(`DROP INDEX IF EXISTS %s_resourcepool_idx`, tableName),
snapshotUnix, ok := parseHourlySnapshotUnix(tableName)
if !ok {
continue
}
for _, stmt := range indexes {
if _, err := execLog(ctx, dbConn, stmt); err != nil {
if cutoffUnix > 0 && snapshotUnix >= cutoffUnix {
continue
}
indexNames := []string{
fmt.Sprintf("%s_vm_vcenter_idx", tableName),
fmt.Sprintf("%s_snapshottime_idx", tableName),
fmt.Sprintf("%s_resourcepool_idx", tableName),
}
for _, indexName := range indexNames {
if _, exists := existingSet[indexName]; !exists {
continue
}
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`DROP INDEX IF EXISTS %s`, indexName)); err != nil {
return dropped, err
}
delete(existingSet, indexName)
dropped++
}
}
return dropped, nil
}
func parseHourlySnapshotUnix(tableName string) (int64, bool) {
const prefix = "inventory_hourly_"
if !strings.HasPrefix(tableName, prefix) {
return 0, false
}
suffix := strings.TrimPrefix(tableName, prefix)
unix, err := strconv.ParseInt(suffix, 10, 64)
if err != nil || unix <= 0 {
return 0, false
}
return unix, true
}
// BackfillSerialColumn sets missing values in a serial-like column for Postgres tables.
func BackfillSerialColumn(ctx context.Context, dbConn *sqlx.DB, tableName, columnName string) error {
if err := ValidateTableName(tableName); err != nil {
@@ -550,6 +604,7 @@ CREATE TABLE IF NOT EXISTS vm_hourly_stats (
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmid_time_idx ON vm_hourly_stats ("VmId","SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`)
return nil
}
@@ -1243,8 +1298,38 @@ type VmLifecycle struct {
}
// FetchVmTrace returns combined hourly snapshot records for a VM (by id/uuid/name) ordered by snapshot time.
// To avoid SQLite's UNION term limits, this iterates tables one by one and merges in-memory.
// It prefers the shared vm_hourly_stats history table and falls back to per-snapshot tables.
func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
if TableExists(ctx, dbConn, "vm_hourly_stats") {
rows, err := fetchVmTraceFromHourlyCache(ctx, dbConn, vmID, vmUUID, name)
if err != nil {
slog.Warn("vm trace cache query failed; falling back to hourly tables", "error", err)
} else if len(rows) > 0 {
slog.Debug("vm trace loaded from hourly cache", "row_count", len(rows))
return rows, nil
}
}
return fetchVmTraceFromSnapshotTables(ctx, dbConn, vmID, vmUUID, name)
}
func fetchVmTraceFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
query := `
SELECT "SnapshotTime","Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM vm_hourly_stats
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
ORDER BY "SnapshotTime"
`
query = dbConn.Rebind(query)
var rows []VmTraceRow
if err := selectLog(ctx, dbConn, &rows, query, vmID, vmUUID, name); err != nil {
return nil, err
}
return rows, nil
}
func fetchVmTraceFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
@@ -1281,7 +1366,6 @@ WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`, t.SnapshotTime, t.TableName)
args := []interface{}{vmID, vmUUID, name}
if driver != "sqlite" {
// convert ? to $1 style for postgres/pgx
query = strings.Replace(query, "?", "$1", 1)
query = strings.Replace(query, "?", "$2", 1)
query = strings.Replace(query, "?", "$3", 1)
@@ -1302,8 +1386,134 @@ WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
return rows, nil
}
// FetchVmLifecycle walks hourly snapshots to determine lifecycle bounds for a VM.
// FetchVmLifecycle walks VM history data to determine lifecycle bounds for a VM.
// It prefers vm_hourly_stats + vm_lifecycle_cache and falls back to per-snapshot table probes.
func FetchVmLifecycle(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, error) {
if TableExists(ctx, dbConn, "vm_hourly_stats") {
lifecycle, found, err := fetchVmLifecycleFromHourlyCache(ctx, dbConn, vmID, vmUUID, name)
if err != nil {
slog.Warn("vm lifecycle cache query failed; falling back to hourly tables", "error", err)
} else if found {
if TableExists(ctx, dbConn, "vm_lifecycle_cache") {
cached, cachedFound, cacheErr := fetchVmLifecycleFromLifecycleCache(ctx, dbConn, vmID, vmUUID, name)
if cacheErr != nil {
slog.Warn("vm lifecycle cache lookup failed", "error", cacheErr)
} else if cachedFound {
lifecycle = mergeVmLifecycle(lifecycle, cached)
}
}
return lifecycle, nil
}
}
return fetchVmLifecycleFromSnapshotTables(ctx, dbConn, vmID, vmUUID, name)
}
func fetchVmLifecycleFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, bool, error) {
var row struct {
Rows int64 `db:"rows"`
Creation sql.NullInt64 `db:"creation_time"`
FirstSeen sql.NullInt64 `db:"first_seen"`
LastSeen sql.NullInt64 `db:"last_seen"`
Deletion sql.NullInt64 `db:"deletion_time"`
}
query := `
SELECT
COUNT(1) AS rows,
MIN(NULLIF("CreationTime",0)) AS creation_time,
MIN("SnapshotTime") AS first_seen,
MAX("SnapshotTime") AS last_seen,
MIN(NULLIF("DeletionTime",0)) AS deletion_time
FROM vm_hourly_stats
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, vmID, vmUUID, name); err != nil {
return VmLifecycle{}, false, err
}
if row.Rows == 0 {
return VmLifecycle{}, false, nil
}
lifecycle := VmLifecycle{
FirstSeen: row.FirstSeen.Int64,
LastSeen: row.LastSeen.Int64,
}
if row.Creation.Valid && row.Creation.Int64 > 0 {
lifecycle.CreationTime = row.Creation.Int64
lifecycle.CreationApprox = false
} else if row.FirstSeen.Valid && row.FirstSeen.Int64 > 0 {
lifecycle.CreationTime = row.FirstSeen.Int64
lifecycle.CreationApprox = true
}
if row.Deletion.Valid && row.Deletion.Int64 > 0 {
lifecycle.DeletionTime = row.Deletion.Int64
}
return lifecycle, true, nil
}
func fetchVmLifecycleFromLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, bool, error) {
var row struct {
Rows int64 `db:"rows"`
FirstSeen sql.NullInt64 `db:"first_seen"`
LastSeen sql.NullInt64 `db:"last_seen"`
Deletion sql.NullInt64 `db:"deletion_time"`
}
query := `
SELECT
COUNT(1) AS rows,
MIN(NULLIF("FirstSeen",0)) AS first_seen,
MAX(NULLIF("LastSeen",0)) AS last_seen,
MIN(NULLIF("DeletedAt",0)) AS deletion_time
FROM vm_lifecycle_cache
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, vmID, vmUUID, name); err != nil {
return VmLifecycle{}, false, err
}
if row.Rows == 0 {
return VmLifecycle{}, false, nil
}
lifecycle := VmLifecycle{
FirstSeen: row.FirstSeen.Int64,
LastSeen: row.LastSeen.Int64,
}
if row.FirstSeen.Valid && row.FirstSeen.Int64 > 0 {
lifecycle.CreationTime = row.FirstSeen.Int64
lifecycle.CreationApprox = true
}
if row.Deletion.Valid && row.Deletion.Int64 > 0 {
lifecycle.DeletionTime = row.Deletion.Int64
}
return lifecycle, true, nil
}
func mergeVmLifecycle(base, overlay VmLifecycle) VmLifecycle {
out := base
if overlay.FirstSeen > 0 && (out.FirstSeen == 0 || overlay.FirstSeen < out.FirstSeen) {
out.FirstSeen = overlay.FirstSeen
}
if overlay.LastSeen > out.LastSeen {
out.LastSeen = overlay.LastSeen
}
if overlay.DeletionTime > 0 && (out.DeletionTime == 0 || overlay.DeletionTime < out.DeletionTime) {
out.DeletionTime = overlay.DeletionTime
}
if overlay.CreationTime > 0 {
if out.CreationTime == 0 || overlay.CreationTime < out.CreationTime {
out.CreationTime = overlay.CreationTime
out.CreationApprox = overlay.CreationApprox
} else if out.CreationTime == overlay.CreationTime && !overlay.CreationApprox {
out.CreationApprox = false
}
}
if out.CreationTime == 0 && out.FirstSeen > 0 {
out.CreationTime = out.FirstSeen
out.CreationApprox = true
}
return out
}
func fetchVmLifecycleFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, error) {
var lifecycle VmLifecycle
var tables []struct {
TableName string `db:"table_name"`
@@ -1325,7 +1535,6 @@ ORDER BY snapshot_time
if err := ValidateTableName(t.TableName); err != nil {
continue
}
// Probe this table for the VM.
query := fmt.Sprintf(`
SELECT MIN(NULLIF("CreationTime",0)) AS min_creation, COUNT(1) AS cnt
FROM %s
@@ -1362,7 +1571,6 @@ WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
break
}
} else {
// reset if we haven't seen the VM yet
consecutiveMissing = 0
}
}

View File

@@ -0,0 +1,164 @@
package db
import (
"context"
"database/sql"
"fmt"
"testing"
"time"
"github.com/jmoiron/sqlx"
_ "modernc.org/sqlite"
)
func newTestSQLiteDB(t *testing.T) *sqlx.DB {
t.Helper()
dbConn, err := sqlx.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("failed to open sqlite test db: %v", err)
}
t.Cleanup(func() {
_ = dbConn.Close()
})
return dbConn
}
func indexExists(t *testing.T, dbConn *sqlx.DB, name string) bool {
t.Helper()
var count int
if err := dbConn.Get(&count, `SELECT COUNT(1) FROM sqlite_master WHERE type='index' AND name=?`, name); err != nil {
t.Fatalf("failed to query index %s: %v", name, err)
}
return count > 0
}
func TestCleanupHourlySnapshotIndexesOlderThan(t *testing.T) {
ctx := context.Background()
dbConn := newTestSQLiteDB(t)
oldTable := "inventory_hourly_1700000000"
newTable := "inventory_hourly_1800000000"
for _, table := range []string{oldTable, newTable} {
if err := EnsureSnapshotTable(ctx, dbConn, table); err != nil {
t.Fatalf("failed to create snapshot table %s: %v", table, err)
}
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, table, table)); err != nil {
t.Fatalf("failed to create snapshottime index for %s: %v", table, err)
}
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, table, table)); err != nil {
t.Fatalf("failed to create resourcepool index for %s: %v", table, err)
}
}
cutoff := time.Unix(1750000000, 0)
dropped, err := CleanupHourlySnapshotIndexesOlderThan(ctx, dbConn, cutoff)
if err != nil {
t.Fatalf("cleanup failed: %v", err)
}
if dropped != 3 {
t.Fatalf("expected 3 old indexes dropped, got %d", dropped)
}
oldIndexes := []string{
oldTable + "_vm_vcenter_idx",
oldTable + "_snapshottime_idx",
oldTable + "_resourcepool_idx",
}
for _, idx := range oldIndexes {
if indexExists(t, dbConn, idx) {
t.Fatalf("expected old index %s to be removed", idx)
}
}
newIndexes := []string{
newTable + "_vm_vcenter_idx",
newTable + "_snapshottime_idx",
newTable + "_resourcepool_idx",
}
for _, idx := range newIndexes {
if !indexExists(t, dbConn, idx) {
t.Fatalf("expected recent index %s to remain", idx)
}
}
}
func TestFetchVmTraceAndLifecycleUseCacheTables(t *testing.T) {
ctx := context.Background()
dbConn := newTestSQLiteDB(t)
if err := EnsureVmHourlyStats(ctx, dbConn); err != nil {
t.Fatalf("failed to ensure vm_hourly_stats: %v", err)
}
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
t.Fatalf("failed to ensure vm_lifecycle_cache: %v", err)
}
insertSQL := `
INSERT INTO vm_hourly_stats (
"SnapshotTime","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","ResourcePool",
"Datacenter","Cluster","Folder","ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder"
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
`
rows := [][]interface{}{
{int64(1000), "vc-a", "vm-1", "uuid-1", "demo-vm", int64(900), int64(0), "Tin", "dc", "cluster", "folder", 100.0, int64(2), int64(4), "FALSE", "TRUE", "FALSE"},
{int64(2000), "vc-a", "vm-1", "uuid-1", "demo-vm", int64(900), int64(0), "Gold", "dc", "cluster", "folder", 150.0, int64(4), int64(8), "FALSE", "TRUE", "FALSE"},
}
for _, args := range rows {
if _, err := dbConn.ExecContext(ctx, insertSQL, args...); err != nil {
t.Fatalf("failed to insert hourly cache row: %v", err)
}
}
if err := UpsertVmLifecycleCache(ctx, dbConn, "vc-a", "vm-1", "uuid-1", "demo-vm", "cluster", time.Unix(1000, 0), sql.NullInt64{Int64: 900, Valid: true}); err != nil {
t.Fatalf("failed to upsert lifecycle cache: %v", err)
}
if err := MarkVmDeletedWithDetails(ctx, dbConn, "vc-a", "vm-1", "uuid-1", "demo-vm", "cluster", 2500); err != nil {
t.Fatalf("failed to mark vm deleted: %v", err)
}
traceRows, err := FetchVmTrace(ctx, dbConn, "vm-1", "", "")
if err != nil {
t.Fatalf("FetchVmTrace failed: %v", err)
}
if len(traceRows) != 2 {
t.Fatalf("expected 2 trace rows, got %d", len(traceRows))
}
if traceRows[0].SnapshotTime != 1000 || traceRows[1].SnapshotTime != 2000 {
t.Fatalf("trace rows are not sorted by snapshot time: %#v", traceRows)
}
lifecycle, err := FetchVmLifecycle(ctx, dbConn, "vm-1", "", "")
if err != nil {
t.Fatalf("FetchVmLifecycle failed: %v", err)
}
if lifecycle.FirstSeen != 900 {
t.Fatalf("expected FirstSeen=900 (earliest known from lifecycle cache), got %d", lifecycle.FirstSeen)
}
if lifecycle.LastSeen != 2000 {
t.Fatalf("expected LastSeen=2000, got %d", lifecycle.LastSeen)
}
if lifecycle.CreationTime != 900 || lifecycle.CreationApprox {
t.Fatalf("expected exact CreationTime=900, got time=%d approx=%v", lifecycle.CreationTime, lifecycle.CreationApprox)
}
if lifecycle.DeletionTime != 2500 {
t.Fatalf("expected DeletionTime=2500 from lifecycle cache, got %d", lifecycle.DeletionTime)
}
}
func TestParseHourlySnapshotUnix(t *testing.T) {
cases := []struct {
table string
ok bool
val int64
}{
{table: "inventory_hourly_1700000000", ok: true, val: 1700000000},
{table: "inventory_hourly_bad", ok: false, val: 0},
{table: "inventory_daily_summary_20260101", ok: false, val: 0},
}
for _, tc := range cases {
got, ok := parseHourlySnapshotUnix(tc.table)
if ok != tc.ok || got != tc.val {
t.Fatalf("parseHourlySnapshotUnix(%q) = (%d,%v), expected (%d,%v)", tc.table, got, ok, tc.val, tc.ok)
}
}
}