improve concurrency handling for inventory job
Some checks failed
continuous-integration/drone/push Build encountered an error
Some checks failed
continuous-integration/drone/push Build encountered an error
This commit is contained in:
@@ -149,7 +149,7 @@ func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, err
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithTimeout(ctx, 15*time.Second)
|
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
||||||
var exists int
|
var exists int
|
||||||
|
|||||||
@@ -44,8 +44,8 @@ func boolStringFromInterface(value interface{}) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
|
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
|
||||||
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
|
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time, logger *slog.Logger) (string, error) {
|
||||||
tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1, nil)
|
tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -103,7 +103,7 @@ LIMIT ?
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
probed := false
|
probed := false
|
||||||
hasRows := true
|
hasRows := count.Valid && count.Int64 > 0
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if !count.Valid {
|
if !count.Valid {
|
||||||
probed = true
|
probed = true
|
||||||
@@ -141,12 +141,12 @@ LIMIT ?
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnapshotTooSoon reports whether the gap between prev and curr is significantly shorter than expected (default: <50% interval).
|
// SnapshotTooSoon reports whether the gap between prev and curr is significantly shorter than expected.
|
||||||
func SnapshotTooSoon(prevUnix, currUnix int64, expectedSeconds int64) bool {
|
func SnapshotTooSoon(prevUnix, currUnix int64, expectedSeconds int64) bool {
|
||||||
if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 {
|
if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return currUnix-prevUnix < expectedSeconds/2
|
return currUnix-prevUnix < expectedSeconds
|
||||||
}
|
}
|
||||||
|
|
||||||
// querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where.
|
// querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where.
|
||||||
|
|||||||
@@ -137,9 +137,15 @@ ORDER BY snapshot_time ASC
|
|||||||
if err := db.ValidateTableName(t.Table); err != nil {
|
if err := db.ValidateTableName(t.Table); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hasRows, err := db.TableHasRows(ctx, dbConn, t.Table)
|
if t.Count.Valid {
|
||||||
if err != nil || !hasRows {
|
if t.Count.Int64 <= 0 {
|
||||||
continue
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
hasRows, err := db.TableHasRows(ctx, dbConn, t.Table)
|
||||||
|
if err != nil || !hasRows {
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tables = append(tables, t)
|
tables = append(tables, t)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,16 @@ import (
|
|||||||
|
|
||||||
type ctxLoggerKey struct{}
|
type ctxLoggerKey struct{}
|
||||||
|
|
||||||
|
func loggerFromCtx(ctx context.Context, fallback *slog.Logger) *slog.Logger {
|
||||||
|
if ctx == nil {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
if l, ok := ctx.Value(ctxLoggerKey{}).(*slog.Logger); ok && l != nil {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
|
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
|
||||||
// If force is true, any in-progress marker will be cleared before starting (useful for manual recovery).
|
// If force is true, any in-progress marker will be cleared before starting (useful for manual recovery).
|
||||||
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) {
|
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) {
|
||||||
@@ -992,7 +1002,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
|
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
|
||||||
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
|
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
|
||||||
// Fallback: compare against latest registered snapshot table.
|
// Fallback: compare against latest registered snapshot table.
|
||||||
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" {
|
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)); err == nil && prevTable != "" {
|
||||||
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
||||||
if moreMissing > 0 {
|
if moreMissing > 0 {
|
||||||
missingCount += moreMissing
|
missingCount += moreMissing
|
||||||
@@ -1104,7 +1114,7 @@ func (c *CronTask) compareWithPreviousSnapshot(
|
|||||||
inventoryByName map[string]queries.Inventory,
|
inventoryByName map[string]queries.Inventory,
|
||||||
missingCount int,
|
missingCount int,
|
||||||
) (string, int, int) {
|
) (string, int, int) {
|
||||||
prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime)
|
prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger))
|
||||||
if prevTableErr != nil {
|
if prevTableErr != nil {
|
||||||
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url)
|
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url)
|
||||||
}
|
}
|
||||||
@@ -1115,8 +1125,8 @@ func (c *CronTask) compareWithPreviousSnapshot(
|
|||||||
if prevTableName != "" {
|
if prevTableName != "" {
|
||||||
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
|
||||||
missingCount += moreMissing
|
missingCount += moreMissing
|
||||||
expectedSeconds := int64(durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour).Seconds())
|
expectedSeconds := int64(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds)
|
||||||
// Skip only if snapshots are much closer together than the configured cadence (e.g., rerun inside half interval).
|
// Skip only if snapshots are much closer together than the configured cadence.
|
||||||
if SnapshotTooSoon(prevSnapshotTime, startTime.Unix(), expectedSeconds) {
|
if SnapshotTooSoon(prevSnapshotTime, startTime.Unix(), expectedSeconds) {
|
||||||
c.Logger.Info("skipping new-VM detection because snapshots are too close together", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix(), "expected_interval_seconds", expectedSeconds)
|
c.Logger.Info("skipping new-VM detection because snapshots are too close together", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix(), "expected_interval_seconds", expectedSeconds)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user