code re-org and bugfix hanging hourly snapshot
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -135,12 +135,22 @@ func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, err
|
|||||||
if err := ValidateTableName(table); err != nil {
|
if err := ValidateTableName(table); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
// Avoid hanging on locked tables; apply a short timeout.
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
||||||
var exists int
|
var exists int
|
||||||
if err := getLog(ctx, dbConn, &exists, query); err != nil {
|
if err := getLog(ctx, dbConn, &exists, query); err != nil {
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|||||||
@@ -44,10 +44,11 @@ func boolStringFromInterface(value interface{}) string {
|
|||||||
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
|
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
|
||||||
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
|
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
|
||||||
rows, err := dbConn.QueryxContext(ctx, `
|
rows, err := dbConn.QueryxContext(ctx, `
|
||||||
SELECT table_name, snapshot_time
|
SELECT table_name, snapshot_time, snapshot_count
|
||||||
FROM snapshot_registry
|
FROM snapshot_registry
|
||||||
WHERE snapshot_type = 'hourly' AND snapshot_time < ?
|
WHERE snapshot_type = 'hourly' AND snapshot_time < ? AND snapshot_count > 0
|
||||||
ORDER BY snapshot_time DESC
|
ORDER BY snapshot_time DESC
|
||||||
|
LIMIT 50
|
||||||
`, cutoff.Unix())
|
`, cutoff.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@@ -57,17 +58,18 @@ ORDER BY snapshot_time DESC
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var name string
|
var name string
|
||||||
var ts int64
|
var ts int64
|
||||||
if scanErr := rows.Scan(&name, &ts); scanErr != nil {
|
var count int64
|
||||||
|
if scanErr := rows.Scan(&name, &ts, &count); scanErr != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := db.ValidateTableName(name); err != nil {
|
if err := db.ValidateTableName(name); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hasRows, err := db.TableHasRows(ctx, dbConn, name)
|
// Rely on snapshot_count to avoid costly table scans; fall back to a cheap row check only if count is zero.
|
||||||
if err != nil {
|
if count > 0 {
|
||||||
continue
|
return name, nil
|
||||||
}
|
}
|
||||||
if hasRows {
|
if hasRows, _ := db.TableHasRows(ctx, dbConn, name); hasRows {
|
||||||
return name, nil
|
return name, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -206,6 +206,8 @@ func isPresent(presence map[string]struct{}, cand lifecycleCandidate) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findDeletionInTables walks ordered hourly tables for a vCenter and returns the first confirmed deletion time
|
||||||
|
// (requiring two consecutive misses) plus the time of the first miss for cross-day handling.
|
||||||
func findDeletionInTables(ctx context.Context, dbConn *sqlx.DB, tables []snapshotTable, vcenter string, cand lifecycleCandidate) (int64, int64) {
|
func findDeletionInTables(ctx context.Context, dbConn *sqlx.DB, tables []snapshotTable, vcenter string, cand lifecycleCandidate) (int64, int64) {
|
||||||
var lastSeen int64
|
var lastSeen int64
|
||||||
var firstMiss int64
|
var firstMiss int64
|
||||||
|
|||||||
@@ -24,7 +24,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
|
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
|
||||||
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) {
|
// If force is true, any in-progress marker will be cleared before starting (useful for manual recovery).
|
||||||
|
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) {
|
||||||
jobCtx := ctx
|
jobCtx := ctx
|
||||||
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute)
|
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute)
|
||||||
if jobTimeout > 0 {
|
if jobTimeout > 0 {
|
||||||
@@ -40,6 +41,13 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
|
|||||||
if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil {
|
if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil {
|
||||||
logger.Warn("failed to clear stale cron status", "error", err)
|
logger.Warn("failed to clear stale cron status", "error", err)
|
||||||
}
|
}
|
||||||
|
if force {
|
||||||
|
if err := tracker.ClearAllInProgress(staleCtx); err != nil {
|
||||||
|
logger.Warn("failed to clear in-progress flag (force run)", "error", err)
|
||||||
|
} else {
|
||||||
|
logger.Info("force run cleared in-progress marker before starting")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -51,8 +59,24 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if skip {
|
if skip {
|
||||||
logger.Warn("Hourly snapshot skipped because a previous run is still active")
|
if force {
|
||||||
return nil
|
logger.Info("Force run requested; clearing in-progress marker and retrying")
|
||||||
|
if err := tracker.ClearAllInProgress(jobCtx); err != nil {
|
||||||
|
logger.Warn("failed to clear in-progress flag for force run", "error", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
done, skip, err = tracker.Start(jobCtx, "hourly_snapshot")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if skip {
|
||||||
|
logger.Warn("Hourly snapshot still marked active after force clear; skipping")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.Warn("Hourly snapshot skipped because a previous run is still active", "force", force)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
defer func() { done(err) }()
|
defer func() { done(err) }()
|
||||||
|
|
||||||
@@ -824,6 +848,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
|
|
||||||
missingCount := 0
|
missingCount := 0
|
||||||
newCount := 0
|
newCount := 0
|
||||||
|
prevTableName := ""
|
||||||
|
|
||||||
for _, inv := range inventoryRows {
|
for _, inv := range inventoryRows {
|
||||||
c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name)
|
c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name)
|
||||||
@@ -957,49 +982,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err)
|
slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare with previous snapshot for this vcenter to mark deletions at snapshot time.
|
prevTableName, newCount, missingCount = c.compareWithPreviousSnapshot(ctx, dbConn, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName, missingCount)
|
||||||
prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime)
|
|
||||||
|
|
||||||
if prevTableErr != nil {
|
|
||||||
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url)
|
|
||||||
}
|
|
||||||
|
|
||||||
prevSnapshotTime := int64(0)
|
|
||||||
if prevTableName != "" {
|
|
||||||
if suffix := strings.TrimPrefix(prevTableName, "inventory_hourly_"); suffix != prevTableName {
|
|
||||||
if ts, err := strconv.ParseInt(suffix, 10, 64); err == nil {
|
|
||||||
prevSnapshotTime = ts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if prevTableName != "" {
|
|
||||||
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
|
||||||
missingCount += moreMissing
|
|
||||||
// Guard against gaps: if previous snapshot is much older than expected, skip "new" detection to avoid false positives when an hourly run was missed.
|
|
||||||
expectedSeconds := int64(durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour).Seconds())
|
|
||||||
if HasSnapshotGap(prevSnapshotTime, startTime.Unix(), expectedSeconds) {
|
|
||||||
c.Logger.Info("skipping new-VM detection due to gap between snapshots", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix())
|
|
||||||
} else {
|
|
||||||
newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
|
|
||||||
if newCount > 0 {
|
|
||||||
newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
|
|
||||||
names := make([]string, 0, len(newRows))
|
|
||||||
for _, r := range newRows {
|
|
||||||
if r.Name != "" {
|
|
||||||
names = append(names, r.Name)
|
|
||||||
} else if r.VmId.Valid {
|
|
||||||
names = append(names, r.VmId.String)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount)
|
|
||||||
} else {
|
|
||||||
// No previous snapshot found (or lookup failed).
|
|
||||||
newCount = len(presentSnapshots)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
|
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
|
||||||
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
|
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
|
||||||
@@ -1100,3 +1083,63 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// compareWithPreviousSnapshot cross-checks current vs. previous hourly snapshots:
|
||||||
|
// marks deletions, detects new VMs when no gap exists, and returns the previous table name along with new/missing counts.
|
||||||
|
func (c *CronTask) compareWithPreviousSnapshot(
|
||||||
|
ctx context.Context,
|
||||||
|
dbConn *sqlx.DB,
|
||||||
|
url string,
|
||||||
|
startTime time.Time,
|
||||||
|
presentSnapshots map[string]InventorySnapshotRow,
|
||||||
|
presentByUuid map[string]struct{},
|
||||||
|
presentByName map[string]struct{},
|
||||||
|
inventoryByVmID map[string]queries.Inventory,
|
||||||
|
inventoryByUuid map[string]queries.Inventory,
|
||||||
|
inventoryByName map[string]queries.Inventory,
|
||||||
|
missingCount int,
|
||||||
|
) (string, int, int) {
|
||||||
|
prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime)
|
||||||
|
if prevTableErr != nil {
|
||||||
|
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url)
|
||||||
|
}
|
||||||
|
|
||||||
|
prevSnapshotTime := int64(0)
|
||||||
|
if prevTableName != "" {
|
||||||
|
if suffix := strings.TrimPrefix(prevTableName, "inventory_hourly_"); suffix != prevTableName {
|
||||||
|
if ts, err := strconv.ParseInt(suffix, 10, 64); err == nil {
|
||||||
|
prevSnapshotTime = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newCount := 0
|
||||||
|
if prevTableName != "" {
|
||||||
|
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
||||||
|
missingCount += moreMissing
|
||||||
|
expectedSeconds := int64(durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour).Seconds())
|
||||||
|
// Allow runs as soon as half the normal interval; treat larger gaps as unreliable for "new" detection.
|
||||||
|
if HasSnapshotGap(prevSnapshotTime, startTime.Unix(), expectedSeconds/2) {
|
||||||
|
c.Logger.Info("skipping new-VM detection due to gap between snapshots", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix())
|
||||||
|
} else {
|
||||||
|
newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
|
||||||
|
if newCount > 0 {
|
||||||
|
newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
|
||||||
|
names := make([]string, 0, len(newRows))
|
||||||
|
for _, r := range newRows {
|
||||||
|
if r.Name != "" {
|
||||||
|
names = append(names, r.Name)
|
||||||
|
} else if r.VmId.Valid {
|
||||||
|
names = append(names, r.VmId.String)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount)
|
||||||
|
} else {
|
||||||
|
newCount = len(presentSnapshots)
|
||||||
|
}
|
||||||
|
|
||||||
|
return prevTableName, newCount, missingCount
|
||||||
|
}
|
||||||
|
|||||||
4
main.go
4
main.go
@@ -185,7 +185,7 @@ func main() {
|
|||||||
// One-shot mode: run a single inventory snapshot across all configured vCenters and exit.
|
// One-shot mode: run a single inventory snapshot across all configured vCenters and exit.
|
||||||
if *runInventory {
|
if *runInventory {
|
||||||
logger.Info("Running one-shot inventory snapshot across all vCenters")
|
logger.Info("Running one-shot inventory snapshot across all vCenters")
|
||||||
ct.RunVcenterSnapshotHourly(ctx, logger)
|
ct.RunVcenterSnapshotHourly(ctx, logger, true)
|
||||||
logger.Info("One-shot inventory snapshot complete; exiting")
|
logger.Info("One-shot inventory snapshot complete; exiting")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -200,7 +200,7 @@ func main() {
|
|||||||
job3, err := c.NewJob(
|
job3, err := c.NewJob(
|
||||||
gocron.DurationJob(cronSnapshotFrequency),
|
gocron.DurationJob(cronSnapshotFrequency),
|
||||||
gocron.NewTask(func() {
|
gocron.NewTask(func() {
|
||||||
ct.RunVcenterSnapshotHourly(ctx, logger)
|
ct.RunVcenterSnapshotHourly(ctx, logger, false)
|
||||||
}), gocron.WithSingletonMode(gocron.LimitModeReschedule),
|
}), gocron.WithSingletonMode(gocron.LimitModeReschedule),
|
||||||
gocron.WithStartAt(gocron.WithStartDateTime(startsAt3)),
|
gocron.WithStartAt(gocron.WithStartDateTime(startsAt3)),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ func (h *Handler) SnapshotForceHourly(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
h.Logger.Info("Manual hourly snapshot requested")
|
h.Logger.Info("Manual hourly snapshot requested")
|
||||||
if err := ct.RunVcenterSnapshotHourly(ctx, h.Logger.With("manual", true)); err != nil {
|
if err := ct.RunVcenterSnapshotHourly(ctx, h.Logger.With("manual", true), true); err != nil {
|
||||||
h.Logger.Error("Manual hourly snapshot failed", "error", err)
|
h.Logger.Error("Manual hourly snapshot failed", "error", err)
|
||||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user