'use strict'; Object.defineProperty(exports, '__esModule', { value: true }); var config = require('@backstage/config'); var luxon = require('luxon'); var backendCommon = require('@backstage/backend-common'); var lodash = require('lodash'); var errors = require('@backstage/errors'); var cron = require('cron'); var uuid = require('uuid'); var zod = require('zod'); var api = require('@opentelemetry/api'); function readDuration(config$1, key) { if (typeof config$1.get(key) === "string") { const value = config$1.getString(key); const duration = luxon.Duration.fromISO(value); if (!duration.isValid) { throw new Error(`Invalid duration: ${value}`); } return duration; } return config.readDurationFromConfig(config$1, { key }); } function readCronOrDuration(config, key) { const value = config.get(key); if (typeof value === "object" && value.cron) { return value; } return readDuration(config, key); } function readTaskScheduleDefinitionFromConfig(config) { const frequency = readCronOrDuration(config, "frequency"); const timeout = readDuration(config, "timeout"); const initialDelay = config.has("initialDelay") ? readDuration(config, "initialDelay") : void 0; const scope = config.getOptionalString("scope"); if (scope && !["global", "local"].includes(scope)) { throw new Error( `Only "global" or "local" are allowed for TaskScheduleDefinition.scope, but got: ${scope}` ); } return { frequency, timeout, initialDelay, scope }; } const DB_MIGRATIONS_TABLE = "backstage_backend_tasks__knex_migrations"; const DB_TASKS_TABLE = "backstage_backend_tasks__tasks"; async function migrateBackendTasks(knex) { const migrationsDir = backendCommon.resolvePackagePath( "@backstage/backend-tasks", "migrations" ); await knex.migrate.latest({ directory: migrationsDir, tableName: DB_MIGRATIONS_TABLE }); } function validateId(id) { if (typeof id !== "string" || !id.trim()) { throw new errors.InputError(`${id} is not a valid ID, expected non-empty string`); } } function nowPlus(duration, knex) { var _a; const seconds = (_a = duration == null ? void 0 : duration.as("seconds")) != null ? _a : 0; if (!seconds) { return knex.fn.now(); } if (knex.client.config.client.includes("sqlite3")) { return knex.raw(`datetime('now', ?)`, [`${seconds} seconds`]); } if (knex.client.config.client.includes("mysql")) { return knex.raw(`now() + interval ${seconds} second`); } return knex.raw(`now() + interval '${seconds} seconds'`); } async function sleep(duration, abortSignal) { if (abortSignal == null ? void 0 : abortSignal.aborted) { return; } await new Promise((resolve) => { let timeoutHandle = void 0; const done = () => { if (timeoutHandle) { clearTimeout(timeoutHandle); } abortSignal == null ? void 0 : abortSignal.removeEventListener("abort", done); resolve(); }; timeoutHandle = setTimeout(done, duration.as("milliseconds")); abortSignal == null ? void 0 : abortSignal.addEventListener("abort", done); }); } function delegateAbortController(parent) { const delegate = new AbortController(); if (parent) { if (parent.aborted) { delegate.abort(); } else { const onParentAborted = () => { delegate.abort(); }; const onChildAborted = () => { parent.removeEventListener("abort", onParentAborted); }; parent.addEventListener("abort", onParentAborted, { once: true }); delegate.signal.addEventListener("abort", onChildAborted, { once: true }); } } return delegate; } var __defProp$2 = Object.defineProperty; var __defNormalProp$2 = (obj, key, value) => key in obj ? __defProp$2(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$2 = (obj, key, value) => { __defNormalProp$2(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class LocalTaskWorker { constructor(taskId, fn, logger) { this.taskId = taskId; this.fn = fn; this.logger = logger; __publicField$2(this, "abortWait"); } start(settings, options) { this.logger.info( `Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}` ); (async () => { var _a; let attemptNum = 1; for (; ; ) { try { if (settings.initialDelayDuration) { await this.sleep( luxon.Duration.fromISO(settings.initialDelayDuration), options == null ? void 0 : options.signal ); } while (!((_a = options == null ? void 0 : options.signal) == null ? void 0 : _a.aborted)) { const startTime = process.hrtime(); await this.runOnce(settings, options == null ? void 0 : options.signal); const timeTaken = process.hrtime(startTime); await this.waitUntilNext( settings, (timeTaken[0] + timeTaken[1] / 1e9) * 1e3, options == null ? void 0 : options.signal ); } this.logger.info(`Task worker finished: ${this.taskId}`); attemptNum = 0; break; } catch (e) { attemptNum += 1; this.logger.warn( `Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}` ); await sleep(luxon.Duration.fromObject({ seconds: 1 })); } } })(); } trigger() { if (!this.abortWait) { throw new errors.ConflictError(`Task ${this.taskId} is currently running`); } this.abortWait.abort(); } /** * Makes a single attempt at running the task to completion. */ async runOnce(settings, signal) { const taskAbortController = delegateAbortController(signal); const timeoutHandle = setTimeout(() => { taskAbortController.abort(); }, luxon.Duration.fromISO(settings.timeoutAfterDuration).as("milliseconds")); try { await this.fn(taskAbortController.signal); } catch (e) { } clearTimeout(timeoutHandle); taskAbortController.abort(); } /** * Sleeps until it's time to run the task again. */ async waitUntilNext(settings, lastRunMillis, signal) { if (signal == null ? void 0 : signal.aborted) { return; } const isCron = !settings.cadence.startsWith("P"); let dt; if (isCron) { const nextRun = +new cron.CronTime(settings.cadence).sendAt().toJSDate(); dt = nextRun - Date.now(); } else { dt = luxon.Duration.fromISO(settings.cadence).as("milliseconds") - lastRunMillis; } dt = Math.max(dt, 0); this.logger.debug( `task: ${this.taskId} will next occur around ${luxon.DateTime.now().plus( luxon.Duration.fromMillis(dt) )}` ); await this.sleep(luxon.Duration.fromMillis(dt), signal); } async sleep(duration, abortSignal) { this.abortWait = delegateAbortController(abortSignal); await sleep(duration, this.abortWait.signal); this.abortWait.abort(); this.abortWait = void 0; } } function isValidOptionalDurationString(d) { try { return !d || luxon.Duration.fromISO(d).isValid; } catch { return false; } } function isValidCronFormat(c) { try { if (!c) { return false; } new cron.CronTime(c); return true; } catch { return false; } } zod.z.object({ version: zod.z.literal(1), initialDelayDuration: zod.z.string().optional().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }), recurringAtMostEveryDuration: zod.z.string().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }), timeoutAfterDuration: zod.z.string().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }) }); const taskSettingsV2Schema = zod.z.object({ version: zod.z.literal(2), cadence: zod.z.string().refine(isValidCronFormat, { message: "Invalid cron" }).or( zod.z.string().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }) ), timeoutAfterDuration: zod.z.string().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }), initialDelayDuration: zod.z.string().optional().refine(isValidOptionalDurationString, { message: "Invalid duration, expecting ISO Period" }) }); const DEFAULT_WORK_CHECK_FREQUENCY = luxon.Duration.fromObject({ seconds: 5 }); class TaskWorker { constructor(taskId, fn, knex, logger, workCheckFrequency = DEFAULT_WORK_CHECK_FREQUENCY) { this.taskId = taskId; this.fn = fn; this.knex = knex; this.logger = logger; this.workCheckFrequency = workCheckFrequency; } async start(settings, options) { try { await this.persistTask(settings); } catch (e) { throw new Error(`Failed to persist task, ${e}`); } this.logger.info( `Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}` ); let workCheckFrequency = this.workCheckFrequency; const isCron = !(settings == null ? void 0 : settings.cadence.startsWith("P")); if (!isCron) { const cadence = luxon.Duration.fromISO(settings.cadence); if (cadence < workCheckFrequency) { workCheckFrequency = cadence; } } let attemptNum = 1; (async () => { var _a; for (; ; ) { try { if (settings.initialDelayDuration) { await sleep( luxon.Duration.fromISO(settings.initialDelayDuration), options == null ? void 0 : options.signal ); } while (!((_a = options == null ? void 0 : options.signal) == null ? void 0 : _a.aborted)) { const runResult = await this.runOnce(options == null ? void 0 : options.signal); if (runResult.result === "abort") { break; } await sleep(workCheckFrequency, options == null ? void 0 : options.signal); } this.logger.info(`Task worker finished: ${this.taskId}`); attemptNum = 0; break; } catch (e) { attemptNum += 1; this.logger.warn( `Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}` ); await sleep(luxon.Duration.fromObject({ seconds: 1 })); } } })(); } static async trigger(knex, taskId) { const rows = await knex(DB_TASKS_TABLE).select(knex.raw(1)).where("id", "=", taskId); if (rows.length !== 1) { throw new errors.NotFoundError(`Task ${taskId} does not exist`); } const updatedRows = await knex(DB_TASKS_TABLE).where("id", "=", taskId).whereNull("current_run_ticket").update({ next_run_start_at: knex.fn.now() }); if (updatedRows < 1) { throw new errors.ConflictError(`Task ${taskId} is currently running`); } } /** * Makes a single attempt at running the task to completion, if ready. * * @returns The outcome of the attempt */ async runOnce(signal) { const findResult = await this.findReadyTask(); if (findResult.result === "not-ready-yet" || findResult.result === "abort") { return findResult; } const taskSettings = findResult.settings; const ticket = uuid.v4(); const claimed = await this.tryClaimTask(ticket, taskSettings); if (!claimed) { return { result: "not-ready-yet" }; } const taskAbortController = delegateAbortController(signal); const timeoutHandle = setTimeout(() => { taskAbortController.abort(); }, luxon.Duration.fromISO(taskSettings.timeoutAfterDuration).as("milliseconds")); try { await this.fn(taskAbortController.signal); taskAbortController.abort(); } catch (e) { this.logger.error(e); await this.tryReleaseTask(ticket, taskSettings); return { result: "failed" }; } finally { clearTimeout(timeoutHandle); } await this.tryReleaseTask(ticket, taskSettings); return { result: "completed" }; } /** * Perform the initial store of the task info */ async persistTask(settings) { taskSettingsV2Schema.parse(settings); const isCron = !(settings == null ? void 0 : settings.cadence.startsWith("P")); let startAt; let nextStartAt; if (settings.initialDelayDuration) { startAt = nowPlus( luxon.Duration.fromISO(settings.initialDelayDuration), this.knex ); } if (isCron) { const time = new cron.CronTime(settings.cadence).sendAt().minus({ seconds: 1 }).toUTC(); nextStartAt = this.nextRunAtRaw(time); startAt || (startAt = nextStartAt); } else { startAt || (startAt = this.knex.fn.now()); nextStartAt = nowPlus(luxon.Duration.fromISO(settings.cadence), this.knex); } this.logger.debug(`task: ${this.taskId} configured to run at: ${startAt}`); const settingsJson = JSON.stringify(settings); await this.knex(DB_TASKS_TABLE).insert({ id: this.taskId, settings_json: settingsJson, next_run_start_at: startAt }).onConflict("id").merge( this.knex.client.config.client.includes("mysql") ? { settings_json: settingsJson, next_run_start_at: this.knex.raw( `CASE WHEN ?? < ?? THEN ?? ELSE ?? END`, [ nextStartAt, "next_run_start_at", nextStartAt, "next_run_start_at" ] ) } : { settings_json: this.knex.ref("excluded.settings_json"), next_run_start_at: this.knex.raw( `CASE WHEN ?? < ?? THEN ?? ELSE ?? END`, [ nextStartAt, `${DB_TASKS_TABLE}.next_run_start_at`, nextStartAt, `${DB_TASKS_TABLE}.next_run_start_at` ] ) } ); } /** * Check if the task is ready to run */ async findReadyTask() { const [row] = await this.knex(DB_TASKS_TABLE).where("id", "=", this.taskId).select({ settingsJson: "settings_json", ready: this.knex.raw( `CASE WHEN next_run_start_at <= ? AND current_run_ticket IS NULL THEN TRUE ELSE FALSE END`, [this.knex.fn.now()] ) }); if (!row) { this.logger.info( "No longer able to find task; aborting and assuming that it has been unregistered or expired" ); return { result: "abort" }; } else if (!row.ready) { return { result: "not-ready-yet" }; } try { const obj = JSON.parse(row.settingsJson); const settings = taskSettingsV2Schema.parse(obj); return { result: "ready", settings }; } catch (e) { this.logger.info( `Task "${this.taskId}" is no longer able to parse task settings; aborting and assuming that a newer version of the task has been issued and being handled by other workers, ${e}` ); return { result: "abort" }; } } /** * Attempts to claim a task that's ready for execution, on this worker's * behalf. We should not attempt to perform the work unless the claim really * goes through. * * @param ticket - A globally unique string that changes for each invocation * @param settings - The settings of the task to claim * @returns True if it was successfully claimed */ async tryClaimTask(ticket, settings) { const startedAt = this.knex.fn.now(); const expiresAt = settings.timeoutAfterDuration ? nowPlus(luxon.Duration.fromISO(settings.timeoutAfterDuration), this.knex) : this.knex.raw("null"); const rows = await this.knex(DB_TASKS_TABLE).where("id", "=", this.taskId).whereNull("current_run_ticket").update({ current_run_ticket: ticket, current_run_started_at: startedAt, current_run_expires_at: expiresAt }); return rows === 1; } async tryReleaseTask(ticket, settings) { const isCron = !(settings == null ? void 0 : settings.cadence.startsWith("P")); let nextRun; if (isCron) { const time = new cron.CronTime(settings.cadence).sendAt().toUTC(); this.logger.debug(`task: ${this.taskId} will next occur around ${time}`); nextRun = this.nextRunAtRaw(time); } else { const dt = luxon.Duration.fromISO(settings.cadence).as("seconds"); this.logger.debug( `task: ${this.taskId} will next occur around ${luxon.DateTime.now().plus({ seconds: dt })}` ); if (this.knex.client.config.client.includes("sqlite3")) { nextRun = this.knex.raw( `max(datetime(next_run_start_at, ?), datetime('now'))`, [`+${dt} seconds`] ); } else if (this.knex.client.config.client.includes("mysql")) { nextRun = this.knex.raw( `greatest(next_run_start_at + interval ${dt} second, now())` ); } else { nextRun = this.knex.raw( `greatest(next_run_start_at + interval '${dt} seconds', now())` ); } } const rows = await this.knex(DB_TASKS_TABLE).where("id", "=", this.taskId).where("current_run_ticket", "=", ticket).update({ next_run_start_at: nextRun, current_run_ticket: this.knex.raw("null"), current_run_started_at: this.knex.raw("null"), current_run_expires_at: this.knex.raw("null") }); return rows === 1; } nextRunAtRaw(time) { if (this.knex.client.config.client.includes("sqlite3")) { return this.knex.raw("datetime(?)", [time.toISO()]); } else if (this.knex.client.config.client.includes("mysql")) { return this.knex.raw(`?`, [time.toSQL({ includeOffset: false })]); } return this.knex.raw(`?`, [time.toISO()]); } } var __defProp$1 = Object.defineProperty; var __defNormalProp$1 = (obj, key, value) => key in obj ? __defProp$1(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField$1 = (obj, key, value) => { __defNormalProp$1(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class PluginTaskSchedulerImpl { constructor(databaseFactory, logger) { this.databaseFactory = databaseFactory; this.logger = logger; __publicField$1(this, "localTasksById", /* @__PURE__ */ new Map()); __publicField$1(this, "allScheduledTasks", []); __publicField$1(this, "counter"); __publicField$1(this, "duration"); const meter = api.metrics.getMeter("default"); this.counter = meter.createCounter("backend_tasks.task.runs.count", { description: "Total number of times a task has been run" }); this.duration = meter.createHistogram("backend_tasks.task.runs.duration", { description: "Histogram of task run durations", unit: "seconds" }); } async triggerTask(id) { const localTask = this.localTasksById.get(id); if (localTask) { localTask.trigger(); return; } const knex = await this.databaseFactory(); await TaskWorker.trigger(knex, id); } async scheduleTask(task) { var _a; validateId(task.id); const scope = (_a = task.scope) != null ? _a : "global"; const settings = { version: 2, cadence: parseDuration(task.frequency), initialDelayDuration: task.initialDelay && parseDuration(task.initialDelay), timeoutAfterDuration: parseDuration(task.timeout) }; if (scope === "global") { const knex = await this.databaseFactory(); const worker = new TaskWorker( task.id, this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }), knex, this.logger.child({ task: task.id }) ); await worker.start(settings, { signal: task.signal }); } else { const worker = new LocalTaskWorker( task.id, this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }), this.logger.child({ task: task.id }) ); worker.start(settings, { signal: task.signal }); this.localTasksById.set(task.id, worker); } this.allScheduledTasks.push({ id: task.id, scope, settings }); } createScheduledTaskRunner(schedule) { return { run: async (task) => { await this.scheduleTask({ ...task, ...schedule }); } }; } async getScheduledTasks() { return this.allScheduledTasks; } wrapInMetrics(fn, opts) { return async (abort) => { const labels = { ...opts.labels }; this.counter.add(1, { ...labels, result: "started" }); const startTime = process.hrtime(); try { await fn(abort); labels.result = "completed"; } catch (ex) { labels.result = "failed"; throw ex; } finally { const delta = process.hrtime(startTime); const endTime = delta[0] + delta[1] / 1e9; this.counter.add(1, labels); this.duration.record(endTime, labels); } }; } } function parseDuration(frequency) { if ("cron" in frequency) { return frequency.cron; } const parsed = luxon.Duration.isDuration(frequency) ? frequency : luxon.Duration.fromObject(frequency); if (!parsed.isValid) { throw new Error( `Invalid duration, ${parsed.invalidReason}: ${parsed.invalidExplanation}` ); } return parsed.toISO(); } var __defProp = Object.defineProperty; var __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; var __publicField = (obj, key, value) => { __defNormalProp(obj, typeof key !== "symbol" ? key + "" : key, value); return value; }; class PluginTaskSchedulerJanitor { constructor(options) { __publicField(this, "knex"); __publicField(this, "waitBetweenRuns"); __publicField(this, "logger"); this.knex = options.knex; this.waitBetweenRuns = options.waitBetweenRuns; this.logger = options.logger; } async start(abortSignal) { while (!(abortSignal == null ? void 0 : abortSignal.aborted)) { try { await this.runOnce(); } catch (e) { this.logger.warn(`Error while performing janitorial tasks, ${e}`); } await sleep(this.waitBetweenRuns, abortSignal); } } async runOnce() { const dbNull = this.knex.raw("null"); const configClient = this.knex.client.config.client; let tasks; if (configClient.includes("sqlite3") || configClient.includes("mysql")) { tasks = await this.knex(DB_TASKS_TABLE).select("id").where("current_run_expires_at", "<", this.knex.fn.now()); await this.knex(DB_TASKS_TABLE).whereIn( "id", tasks.map((t) => t.id) ).update({ current_run_ticket: dbNull, current_run_started_at: dbNull, current_run_expires_at: dbNull }); } else { tasks = await this.knex(DB_TASKS_TABLE).where("current_run_expires_at", "<", this.knex.fn.now()).update({ current_run_ticket: dbNull, current_run_started_at: dbNull, current_run_expires_at: dbNull }).returning(["id"]); } if (typeof tasks === "number") { if (tasks > 0) { this.logger.warn(`${tasks} tasks timed out and were lost`); } } else { for (const { id } of tasks) { this.logger.warn(`Task timed out and was lost: ${id}`); } } } } class TaskScheduler { constructor(databaseManager, logger) { this.databaseManager = databaseManager; this.logger = logger; } static fromConfig(config, options) { var _a; const databaseManager = (_a = options == null ? void 0 : options.databaseManager) != null ? _a : backendCommon.DatabaseManager.fromConfig(config); const logger = ((options == null ? void 0 : options.logger) || backendCommon.getRootLogger()).child({ type: "taskManager" }); return new TaskScheduler(databaseManager, logger); } /** * Instantiates a task manager instance for the given plugin. * * @param pluginId - The unique ID of the plugin, for example "catalog" * @returns A {@link PluginTaskScheduler} instance */ forPlugin(pluginId) { return TaskScheduler.forPlugin({ pluginId, databaseManager: this.databaseManager.forPlugin(pluginId), logger: this.logger }); } static forPlugin(opts) { const databaseFactory = lodash.once(async () => { var _a; const knex = await opts.databaseManager.getClient(); if (!((_a = opts.databaseManager.migrations) == null ? void 0 : _a.skip)) { await migrateBackendTasks(knex); } if (process.env.NODE_ENV !== "test") { const janitor = new PluginTaskSchedulerJanitor({ knex, waitBetweenRuns: luxon.Duration.fromObject({ minutes: 1 }), logger: opts.logger }); janitor.start(); } return knex; }); return new PluginTaskSchedulerImpl(databaseFactory, opts.logger); } } exports.TaskScheduler = TaskScheduler; exports.readTaskScheduleDefinitionFromConfig = readTaskScheduleDefinitionFromConfig; //# sourceMappingURL=index.cjs.js.map