This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -42,51 +41,62 @@ func boolStringFromInterface(value interface{}) string {
|
||||
}
|
||||
}
|
||||
|
||||
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time.
|
||||
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
|
||||
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
var rows *sqlx.Rows
|
||||
var err error
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
rows, err = dbConn.QueryxContext(ctx, `
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type = 'table' AND name LIKE 'inventory_hourly_%'
|
||||
`)
|
||||
case "pgx", "postgres":
|
||||
rows, err = dbConn.QueryxContext(ctx, `
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%'
|
||||
`)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver)
|
||||
}
|
||||
rows, err := dbConn.QueryxContext(ctx, `
|
||||
SELECT table_name, snapshot_time
|
||||
FROM snapshot_registry
|
||||
WHERE snapshot_type = 'hourly' AND snapshot_time < ?
|
||||
ORDER BY snapshot_time DESC
|
||||
`, cutoff.Unix())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var latest string
|
||||
var latestTime int64
|
||||
for rows.Next() {
|
||||
var name string
|
||||
if scanErr := rows.Scan(&name); scanErr != nil {
|
||||
var ts int64
|
||||
if scanErr := rows.Scan(&name, &ts); scanErr != nil {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(name, "inventory_hourly_") {
|
||||
if err := db.ValidateTableName(name); err != nil {
|
||||
continue
|
||||
}
|
||||
suffix := strings.TrimPrefix(name, "inventory_hourly_")
|
||||
epoch, parseErr := strconv.ParseInt(suffix, 10, 64)
|
||||
if parseErr != nil {
|
||||
hasRows, err := db.TableHasRows(ctx, dbConn, name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if epoch < cutoff.Unix() && epoch > latestTime {
|
||||
latestTime = epoch
|
||||
latest = name
|
||||
if hasRows {
|
||||
return name, nil
|
||||
}
|
||||
}
|
||||
return latest, nil
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// HasSnapshotGap reports whether the gap between prev and curr exceeds 2x the expected interval.
|
||||
func HasSnapshotGap(prevUnix, currUnix int64, expectedSeconds int64) bool {
|
||||
if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 {
|
||||
return false
|
||||
}
|
||||
return currUnix-prevUnix > expectedSeconds*2
|
||||
}
|
||||
|
||||
// querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where.
|
||||
func querySnapshotRows(ctx context.Context, dbConn *sqlx.DB, table string, columns []string, where string, args ...interface{}) (*sqlx.Rows, error) {
|
||||
if err := db.ValidateTableName(table); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
colExpr := "*"
|
||||
if len(columns) > 0 {
|
||||
colExpr = `"` + strings.Join(columns, `","`) + `"`
|
||||
}
|
||||
query := fmt.Sprintf(`SELECT %s FROM %s`, colExpr, table)
|
||||
if strings.TrimSpace(where) != "" {
|
||||
query = fmt.Sprintf(`%s WHERE %s`, query, where)
|
||||
}
|
||||
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
|
||||
return dbConn.QueryxContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
|
||||
@@ -98,9 +108,6 @@ func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB,
|
||||
return 0
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Cluster","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable)
|
||||
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
|
||||
|
||||
type prevRow struct {
|
||||
VmId sql.NullString `db:"VmId"`
|
||||
VmUuid sql.NullString `db:"VmUuid"`
|
||||
@@ -110,7 +117,7 @@ func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB,
|
||||
DeletionTime sql.NullInt64 `db:"DeletionTime"`
|
||||
}
|
||||
|
||||
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
|
||||
rows, err := querySnapshotRows(ctx, dbConn, prevTable, []string{"VmId", "VmUuid", "Name", "Cluster", "Datacenter", "DeletionTime"}, `"Vcenter" = ?`, vcenter)
|
||||
if err != nil {
|
||||
c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter)
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user