All checks were successful
continuous-integration/drone/push Build is passing
189 lines
4.8 KiB
Go
189 lines
4.8 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
"vctp/db"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
func NewCronTracker(database db.Database) *CronTracker {
|
|
return &CronTracker{
|
|
db: database,
|
|
bindType: sqlx.BindType(database.DB().DriverName()),
|
|
}
|
|
}
|
|
|
|
// ClearAllInProgress resets any stuck in-progress flags (e.g., after crashes).
|
|
func (c *CronTracker) ClearAllInProgress(ctx context.Context) error {
|
|
if err := c.ensureTable(ctx); err != nil {
|
|
return err
|
|
}
|
|
_, err := c.db.DB().ExecContext(ctx, `UPDATE cron_status SET in_progress = FALSE`)
|
|
return err
|
|
}
|
|
|
|
// ClearStale resets in_progress for a specific job if it has been running longer than maxAge.
|
|
func (c *CronTracker) ClearStale(ctx context.Context, job string, maxAge time.Duration) error {
|
|
if err := c.ensureTable(ctx); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(c.db.DB().DriverName())
|
|
var query string
|
|
switch driver {
|
|
case "sqlite":
|
|
query = `
|
|
UPDATE cron_status
|
|
SET in_progress = FALSE
|
|
WHERE job_name = ?
|
|
AND in_progress = TRUE
|
|
AND started_at > 0
|
|
AND (strftime('%s','now') - started_at) > ?
|
|
`
|
|
case "pgx", "postgres":
|
|
query = `
|
|
UPDATE cron_status
|
|
SET in_progress = FALSE
|
|
WHERE job_name = $1
|
|
AND in_progress = TRUE
|
|
AND started_at > 0
|
|
AND (EXTRACT(EPOCH FROM now())::BIGINT - started_at) > $2
|
|
`
|
|
default:
|
|
return nil
|
|
}
|
|
_, err := c.db.DB().ExecContext(ctx, query, job, int64(maxAge.Seconds()))
|
|
return err
|
|
}
|
|
|
|
func (c *CronTracker) ensureTable(ctx context.Context) error {
|
|
conn := c.db.DB()
|
|
driver := conn.DriverName()
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS cron_status (
|
|
job_name TEXT PRIMARY KEY,
|
|
started_at BIGINT NOT NULL,
|
|
ended_at BIGINT NOT NULL,
|
|
duration_ms BIGINT NOT NULL,
|
|
last_error TEXT,
|
|
in_progress BOOLEAN NOT NULL DEFAULT FALSE
|
|
);`
|
|
default:
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS cron_status (
|
|
job_name TEXT PRIMARY KEY,
|
|
started_at BIGINT NOT NULL,
|
|
ended_at BIGINT NOT NULL,
|
|
duration_ms BIGINT NOT NULL,
|
|
last_error TEXT,
|
|
in_progress BOOLEAN NOT NULL DEFAULT FALSE
|
|
);`
|
|
}
|
|
_, err := conn.ExecContext(ctx, ddl)
|
|
return err
|
|
}
|
|
|
|
// Start marks a job as in-progress; returns a completion callback and whether to skip because it's already running.
|
|
func (c *CronTracker) Start(ctx context.Context, job string) (func(error), bool, error) {
|
|
if err := c.ensureTable(ctx); err != nil {
|
|
return nil, false, err
|
|
}
|
|
conn := c.db.DB()
|
|
now := time.Now().Unix()
|
|
|
|
tx, err := conn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
var inProgress bool
|
|
query := sqlx.Rebind(c.bindType, `SELECT in_progress FROM cron_status WHERE job_name = ?`)
|
|
err = tx.QueryRowContext(ctx, query, job).Scan(&inProgress)
|
|
if err != nil {
|
|
// no row, insert
|
|
if err := upsertCron(tx, c.bindType, job, now, false); err != nil {
|
|
tx.Rollback()
|
|
return nil, false, err
|
|
}
|
|
} else {
|
|
if inProgress {
|
|
tx.Rollback()
|
|
return nil, true, nil
|
|
}
|
|
if err := markCronStart(tx, c.bindType, job, now); err != nil {
|
|
tx.Rollback()
|
|
return nil, false, err
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
done := func(runErr error) {
|
|
_ = c.finish(context.Background(), job, now, runErr)
|
|
}
|
|
return done, false, nil
|
|
}
|
|
|
|
func (c *CronTracker) finish(ctx context.Context, job string, startedAt int64, runErr error) error {
|
|
conn := c.db.DB()
|
|
duration := time.Since(time.Unix(startedAt, 0)).Milliseconds()
|
|
tx, err := conn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lastErr := ""
|
|
if runErr != nil {
|
|
lastErr = runErr.Error()
|
|
}
|
|
err = upsertCronFinish(tx, c.bindType, job, duration, lastErr)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func upsertCron(tx *sqlx.Tx, bindType int, job string, startedAt int64, inProgress bool) error {
|
|
query := `
|
|
INSERT INTO cron_status (job_name, started_at, ended_at, duration_ms, last_error, in_progress)
|
|
VALUES (?, ?, 0, 0, NULL, ?)
|
|
ON CONFLICT (job_name) DO UPDATE SET started_at = excluded.started_at, in_progress = excluded.in_progress, ended_at = excluded.ended_at, duration_ms = excluded.duration_ms, last_error = excluded.last_error
|
|
`
|
|
_, err := tx.Exec(sqlx.Rebind(bindType, query), job, startedAt, inProgress)
|
|
return err
|
|
}
|
|
|
|
func markCronStart(tx *sqlx.Tx, bindType int, job string, startedAt int64) error {
|
|
query := `
|
|
UPDATE cron_status
|
|
SET started_at = ?, in_progress = TRUE, ended_at = 0, duration_ms = 0, last_error = NULL
|
|
WHERE job_name = ?
|
|
`
|
|
_, err := tx.Exec(sqlx.Rebind(bindType, query), startedAt, job)
|
|
return err
|
|
}
|
|
|
|
func upsertCronFinish(tx *sqlx.Tx, bindType int, job string, durationMS int64, lastErr string) error {
|
|
query := `
|
|
UPDATE cron_status
|
|
SET ended_at = ?, duration_ms = ?, last_error = ?, in_progress = FALSE
|
|
WHERE job_name = ?
|
|
`
|
|
_, err := tx.Exec(sqlx.Rebind(bindType, query), time.Now().Unix(), durationMS, nullableString(lastErr), job)
|
|
return err
|
|
}
|
|
|
|
func nullableString(s string) interface{} {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|