package tasks import ( "context" "database/sql" "time" "vctp/db" "github.com/jmoiron/sqlx" ) // CronTracker manages re-entry protection and status recording for cron jobs. type CronTracker struct { db db.Database bindType int } func NewCronTracker(database db.Database) *CronTracker { return &CronTracker{ db: database, bindType: sqlx.BindType(database.DB().DriverName()), } } func (c *CronTracker) ensureTable(ctx context.Context) error { conn := c.db.DB() driver := conn.DriverName() var ddl string switch driver { case "pgx", "postgres": ddl = ` CREATE TABLE IF NOT EXISTS cron_status ( job_name TEXT PRIMARY KEY, started_at BIGINT NOT NULL, ended_at BIGINT NOT NULL, duration_ms BIGINT NOT NULL, last_error TEXT, in_progress BOOLEAN NOT NULL DEFAULT FALSE );` default: ddl = ` CREATE TABLE IF NOT EXISTS cron_status ( job_name TEXT PRIMARY KEY, started_at BIGINT NOT NULL, ended_at BIGINT NOT NULL, duration_ms BIGINT NOT NULL, last_error TEXT, in_progress BOOLEAN NOT NULL DEFAULT FALSE );` } _, err := conn.ExecContext(ctx, ddl) return err } // Start marks a job as in-progress; returns a completion callback and whether to skip because it's already running. func (c *CronTracker) Start(ctx context.Context, job string) (func(error), bool, error) { if err := c.ensureTable(ctx); err != nil { return nil, false, err } conn := c.db.DB() now := time.Now().Unix() tx, err := conn.BeginTxx(ctx, nil) if err != nil { return nil, false, err } var inProgress bool query := sqlx.Rebind(c.bindType, `SELECT in_progress FROM cron_status WHERE job_name = ?`) err = tx.QueryRowContext(ctx, query, job).Scan(&inProgress) if err != nil { // no row, insert if err := upsertCron(tx, c.bindType, job, now, false); err != nil { tx.Rollback() return nil, false, err } } else { if inProgress { tx.Rollback() return nil, true, nil } if err := markCronStart(tx, c.bindType, job, now); err != nil { tx.Rollback() return nil, false, err } } if err := tx.Commit(); err != nil { return nil, false, err } done := func(runErr error) { _ = c.finish(context.Background(), job, now, runErr) } return done, false, nil } func (c *CronTracker) finish(ctx context.Context, job string, startedAt int64, runErr error) error { conn := c.db.DB() duration := time.Since(time.Unix(startedAt, 0)).Milliseconds() tx, err := conn.BeginTxx(ctx, nil) if err != nil { return err } var lastError sql.NullString if runErr != nil { lastError = sql.NullString{String: runErr.Error(), Valid: true} } err = upsertCronFinish(tx, c.bindType, job, startedAt, duration, lastError.String) if err != nil { tx.Rollback() return err } return tx.Commit() } func upsertCron(tx *sqlx.Tx, bindType int, job string, startedAt int64, inProgress bool) error { query := ` INSERT INTO cron_status (job_name, started_at, ended_at, duration_ms, last_error, in_progress) VALUES (?, ?, 0, 0, NULL, ?) ON CONFLICT (job_name) DO UPDATE SET started_at = excluded.started_at, in_progress = excluded.in_progress, ended_at = excluded.ended_at, duration_ms = excluded.duration_ms, last_error = excluded.last_error ` _, err := tx.Exec(sqlx.Rebind(bindType, query), job, startedAt, inProgress) return err } func markCronStart(tx *sqlx.Tx, bindType int, job string, startedAt int64) error { query := ` UPDATE cron_status SET started_at = ?, in_progress = TRUE, ended_at = 0, duration_ms = 0, last_error = NULL WHERE job_name = ? ` _, err := tx.Exec(sqlx.Rebind(bindType, query), startedAt, job) return err } func upsertCronFinish(tx *sqlx.Tx, bindType int, job string, startedAt int64, durationMS int64, lastErr string) error { query := ` UPDATE cron_status SET ended_at = ?, duration_ms = ?, last_error = ?, in_progress = FALSE WHERE job_name = ? ` _, err := tx.Exec(sqlx.Rebind(bindType, query), time.Now().Unix(), durationMS, nullableString(lastErr), job) return err } func nullableString(s string) interface{} { if s == "" { return nil } return s }