work on optimising vcenter queries
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"vctp/db"
|
||||
"vctp/db/queries"
|
||||
"vctp/internal/report"
|
||||
"vctp/internal/vcenter"
|
||||
@@ -43,6 +44,8 @@ type inventorySnapshotRow struct {
|
||||
IsPresent string
|
||||
}
|
||||
|
||||
type snapshotTotals = db.SnapshotTotals
|
||||
|
||||
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
|
||||
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) error {
|
||||
startedAt := time.Now()
|
||||
@@ -148,7 +151,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
||||
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist && !force {
|
||||
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
|
||||
@@ -158,7 +161,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist {
|
||||
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
|
||||
@@ -185,7 +188,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
||||
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, `"IsPresent"`,
|
||||
}, templateExclusionFilter())
|
||||
|
||||
currentTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
||||
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
||||
if err != nil {
|
||||
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
|
||||
} else {
|
||||
@@ -213,7 +216,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
||||
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
|
||||
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, `"IsPresent"`,
|
||||
}, templateExclusionFilter())
|
||||
prevTotals, err := snapshotTotalsForUnion(ctx, dbConn, prevUnion)
|
||||
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
|
||||
if err != nil {
|
||||
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
|
||||
} else {
|
||||
@@ -337,7 +340,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist && !force {
|
||||
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
|
||||
@@ -347,7 +350,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
||||
return err
|
||||
} else if rowsExist {
|
||||
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
|
||||
@@ -368,7 +371,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01"))
|
||||
}
|
||||
|
||||
monthlyTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
||||
monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
||||
if err != nil {
|
||||
c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01"))
|
||||
} else {
|
||||
@@ -518,25 +521,15 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
|
||||
}
|
||||
|
||||
func hourlyInventoryTableName(t time.Time) (string, error) {
|
||||
return safeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
|
||||
return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
|
||||
}
|
||||
|
||||
func dailySummaryTableName(t time.Time) (string, error) {
|
||||
return safeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
||||
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
||||
}
|
||||
|
||||
func monthlySummaryTableName(t time.Time) (string, error) {
|
||||
return safeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
|
||||
}
|
||||
|
||||
func safeTableName(name string) (string, error) {
|
||||
for _, r := range name {
|
||||
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
|
||||
continue
|
||||
}
|
||||
return "", fmt.Errorf("invalid table name: %s", name)
|
||||
}
|
||||
return name, nil
|
||||
return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
|
||||
}
|
||||
|
||||
func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
||||
@@ -820,7 +813,7 @@ func buildUnionQuery(tables []string, columns []string, whereClause string) stri
|
||||
queries := make([]string, 0, len(tables))
|
||||
columnList := strings.Join(columns, ", ")
|
||||
for _, table := range tables {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
if _, err := db.SafeTableName(table); err != nil {
|
||||
continue
|
||||
}
|
||||
query := fmt.Sprintf("SELECT %s FROM %s", columnList, table)
|
||||
@@ -860,7 +853,7 @@ func truncateDate(t time.Time) time.Time {
|
||||
}
|
||||
|
||||
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
if _, err := db.SafeTableName(table); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
|
||||
@@ -868,7 +861,7 @@ func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error
|
||||
}
|
||||
|
||||
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
if _, err := db.SafeTableName(table); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
|
||||
@@ -878,45 +871,23 @@ func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
return false, err
|
||||
}
|
||||
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
||||
var exists int
|
||||
if err := dbConn.GetContext(ctx, &exists, query); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord {
|
||||
filtered := snapshots[:0]
|
||||
for _, snapshot := range snapshots {
|
||||
if rowsExist, err := tableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
|
||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
|
||||
filtered = append(filtered, snapshot)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
type snapshotTotals struct {
|
||||
VmCount int64 `db:"vm_count"`
|
||||
VcpuTotal int64 `db:"vcpu_total"`
|
||||
RamTotal int64 `db:"ram_total"`
|
||||
DiskTotal float64 `db:"disk_total"`
|
||||
}
|
||||
|
||||
type columnDef struct {
|
||||
Name string
|
||||
Type string
|
||||
}
|
||||
|
||||
func ensureSnapshotColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []columnDef) error {
|
||||
if _, err := safeTableName(tableName); err != nil {
|
||||
if _, err := db.SafeTableName(tableName); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, column := range columns {
|
||||
@@ -968,7 +939,7 @@ func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string)
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "pgx", "postgres":
|
||||
hasColumn, err := columnExists(ctx, dbConn, tableName, "RowId")
|
||||
hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -994,112 +965,8 @@ func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string)
|
||||
return nil
|
||||
}
|
||||
|
||||
func columnExists(ctx context.Context, dbConn *sqlx.DB, tableName string, columnName string) (bool, error) {
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
query := fmt.Sprintf(`PRAGMA table_info("%s")`, tableName)
|
||||
rows, err := dbConn.QueryxContext(ctx, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var (
|
||||
cid int
|
||||
name string
|
||||
colType string
|
||||
notNull int
|
||||
defaultVal sql.NullString
|
||||
pk int
|
||||
)
|
||||
if err := rows.Scan(&cid, &name, &colType, ¬Null, &defaultVal, &pk); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if strings.EqualFold(name, columnName) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, rows.Err()
|
||||
case "pgx", "postgres":
|
||||
var count int
|
||||
err := dbConn.GetContext(ctx, &count, `
|
||||
SELECT COUNT(1)
|
||||
FROM information_schema.columns
|
||||
WHERE table_name = $1 AND column_name = $2
|
||||
`, tableName, strings.ToLower(columnName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
default:
|
||||
return false, fmt.Errorf("unsupported driver for column lookup: %s", driver)
|
||||
}
|
||||
}
|
||||
|
||||
func snapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (snapshotTotals, error) {
|
||||
if _, err := safeTableName(table); err != nil {
|
||||
return snapshotTotals{}, err
|
||||
}
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(DISTINCT "VmId") AS vm_count,
|
||||
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
||||
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
||||
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
||||
FROM %s
|
||||
WHERE "IsPresent" = 'TRUE'
|
||||
`, table)
|
||||
|
||||
var totals snapshotTotals
|
||||
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
|
||||
return snapshotTotals{}, err
|
||||
}
|
||||
return totals, nil
|
||||
}
|
||||
|
||||
func snapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (snapshotTotals, error) {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(DISTINCT "VmId") AS vm_count,
|
||||
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
||||
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
||||
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
||||
FROM (
|
||||
%s
|
||||
) snapshots
|
||||
WHERE "IsPresent" = 'TRUE'
|
||||
`, unionQuery)
|
||||
|
||||
var totals snapshotTotals
|
||||
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
|
||||
return snapshotTotals{}, err
|
||||
}
|
||||
return totals, nil
|
||||
}
|
||||
|
||||
func tableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
var count int
|
||||
err := dbConn.GetContext(ctx, &count, `
|
||||
SELECT COUNT(1)
|
||||
FROM sqlite_master
|
||||
WHERE type = 'table' AND name = ?
|
||||
`, table)
|
||||
return err == nil && count > 0
|
||||
case "pgx", "postgres":
|
||||
var count int
|
||||
err := dbConn.GetContext(ctx, &count, `
|
||||
SELECT COUNT(1)
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE schemaname = 'public' AND tablename = $1
|
||||
`, table)
|
||||
return err == nil && count > 0
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return db.TableExists(ctx, dbConn, table)
|
||||
}
|
||||
|
||||
func nullInt64ToInt(value sql.NullInt64) int64 {
|
||||
@@ -1142,7 +1009,7 @@ func normalizeResourcePool(value string) string {
|
||||
}
|
||||
}
|
||||
|
||||
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory) (inventorySnapshotRow, error) {
|
||||
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup) (inventorySnapshotRow, error) {
|
||||
if vmObject == nil {
|
||||
return inventorySnapshotRow{}, fmt.Errorf("missing VM object")
|
||||
}
|
||||
@@ -1238,8 +1105,21 @@ func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTi
|
||||
}
|
||||
|
||||
if row.Folder.String == "" {
|
||||
if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil {
|
||||
if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok {
|
||||
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
|
||||
} else if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil {
|
||||
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
|
||||
}
|
||||
}
|
||||
|
||||
if vmObject.Runtime.Host != nil && hostLookup != nil {
|
||||
if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok {
|
||||
if row.Cluster.String == "" && lookup.Cluster != "" {
|
||||
row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true}
|
||||
}
|
||||
if row.Datacenter.String == "" && lookup.Datacenter != "" {
|
||||
row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1337,6 +1217,20 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
||||
if !canDetectMissing {
|
||||
c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url)
|
||||
}
|
||||
hostLookup, err := vc.BuildHostLookup()
|
||||
if err != nil {
|
||||
c.Logger.Warn("failed to build host lookup", "url", url, "error", err)
|
||||
hostLookup = nil
|
||||
} else {
|
||||
c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup))
|
||||
}
|
||||
folderLookup, err := vc.BuildFolderPathLookup()
|
||||
if err != nil {
|
||||
c.Logger.Warn("failed to build folder lookup", "url", url, "error", err)
|
||||
folderLookup = nil
|
||||
} else {
|
||||
c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup))
|
||||
}
|
||||
|
||||
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
|
||||
if err != nil {
|
||||
@@ -1373,7 +1267,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
||||
inv = &existingCopy
|
||||
}
|
||||
|
||||
row, err := snapshotFromVM(vmObj, vc, startTime, inv)
|
||||
row, err := snapshotFromVM(vmObj, vc, startTime, inv, hostLookup, folderLookup)
|
||||
if err != nil {
|
||||
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user