Skip to content

Scheduler Service

SchedulerService

Bases: Service

Service that manages scheduled jobs.

Source code in src/hassette/core/scheduler_service.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class SchedulerService(Service):
    """Service that manages scheduled jobs."""

    depends_on: ClassVar[list[type[Resource]]] = [DatabaseService]
    restart_spec: ClassVar[RestartSpec] = RestartSpec(
        restart_type=RestartType.PERMANENT,
        budget_intensity=2,
        budget_period_seconds=30,
    )

    _job_queue: "_ScheduledJobQueue"
    """Queue of scheduled jobs."""

    _wakeup_event: asyncio.Event
    """Event to wake the scheduler when a new job is added or jobs are removed."""

    _exit_event: asyncio.Event
    """Event to signal the scheduler to exit."""

    _executor: "CommandExecutor"
    """Command executor for running jobs and persisting registration/execution records."""

    _removal_callbacks: dict[str, Callable[["ScheduledJob"], None]]
    """Per-owner callbacks invoked whenever a job is removed via dequeue_job() or _remove_job()."""

    def __init__(self, hassette: "Hassette", *, executor: "CommandExecutor", parent: Resource | None = None) -> None:
        super().__init__(hassette, parent=parent)
        self._executor = executor
        self._job_queue = self.add_child(_ScheduledJobQueue)
        self._wakeup_event = asyncio.Event()
        self._exit_event = asyncio.Event()
        self._removal_callbacks = {}

    @property
    def min_delay(self) -> float:
        return self.hassette.config.scheduler.min_delay_seconds

    @property
    def max_delay(self) -> float:
        return self.hassette.config.scheduler.max_delay_seconds

    @property
    def default_delay(self) -> float:
        return self.hassette.config.scheduler.default_delay_seconds

    @property
    def config_log_level(self) -> LOG_LEVEL_TYPE:
        return self.hassette.config.logging.scheduler_service

    async def before_initialize(self) -> None:
        await self.hassette.ready_event.wait()

    async def serve(self) -> None:
        """Run the scheduler forever, processing jobs as they become due."""

        self.mark_ready(reason="Scheduler started")

        while True:
            if self.shutdown_event.is_set():
                self.mark_not_ready(reason="Hassette is shutting down")
                self.logger.debug("Scheduler exiting")
                return

            due_jobs, next_run_time = await self._job_queue.pop_due_and_peek_next(date_utils.now())

            if due_jobs:
                for job in due_jobs:
                    self.task_bucket.spawn(self._dispatch_and_log(job), name="scheduler:dispatch_scheduled_job")

            await self.sleep(next_run_time)

    def kick(self) -> None:
        self._wakeup_event.set()

    async def _enqueue_job(self, job: "ScheduledJob") -> None:
        """Push a job onto the queue and wake the scheduler."""

        self._apply_jitter_to_heap(job)
        await self._job_queue.add(job)
        self.kick()

    async def _remove_jobs_by_owner(self, owner: str) -> None:
        """Remove all jobs for an owner and wake the scheduler if necessary."""

        removed = await self._job_queue.remove_owner(owner)

        if removed:
            self.kick()
            self._fire_removal_callbacks(removed)

    def register_removal_callback(self, owner_id: str, callback: Callable[["ScheduledJob"], None]) -> None:
        """Register a callback to be called whenever a job belonging to owner_id is removed.

        If a callback is already registered for owner_id, the new callback replaces it.
        This handles legitimate re-registration during hot-reload cycles where the old
        Scheduler instance is orphaned without a formal shutdown.

        Args:
            owner_id: The owner whose job removals should trigger the callback.
            callback: Called with the removed ScheduledJob as its single argument.
        """
        self._removal_callbacks[owner_id] = callback

    def deregister_removal_callback(self, owner_id: str) -> None:
        """Remove the removal callback for owner_id, if any.

        No-op when owner_id has no registered callback. Called by
        ``Scheduler.on_shutdown`` so the slot is freed before the Scheduler
        is re-initialized (e.g. during a hot-reload cycle).

        Args:
            owner_id: The owner whose callback should be removed.
        """
        self._removal_callbacks.pop(owner_id, None)

    def _fire_removal_callbacks(self, jobs: "list[ScheduledJob]") -> None:
        """Invoke per-owner removal callbacks for each job in jobs."""
        for job in jobs:
            callback = self._removal_callbacks.get(job.owner_id)
            if callback is not None:
                callback(job)

    def _apply_jitter_to_heap(self, job: "ScheduledJob") -> None:
        """Apply jitter to the heap sort_index and fire_at without mutating job.next_run.

        If job.jitter is not None, a random offset in [0, jitter) seconds is added
        to ``job.fire_at`` and ``job.sort_index``. ``job.next_run`` is never modified
        — it is the unjittered logical fire time used as ``previous_run`` in subsequent
        trigger calls. When jitter is None or 0, ``fire_at`` equals ``next_run`` exactly
        (already set by ``set_next_run``).

        Args:
            job: The job whose sort_index and fire_at should be jittered.
        """
        if job.jitter is not None:
            offset = random.uniform(0, job.jitter)
            jittered_time = job.next_run.add(seconds=offset)
            job.fire_at = jittered_time
            job.sort_index = (jittered_time.timestamp_nanos(), id(job))
            self.logger.debug(
                "Applied jitter offset=%.3fs to job %s: next_run=%s → fire_at=%s",
                offset,
                job,
                job.next_run,
                job.fire_at,
            )

    async def _remove_job(self, job: "ScheduledJob") -> None:
        """Remove a specific job via the async path (acquires lock) and wake the scheduler.

        Used by the serve loop for job exhaustion and trigger errors in reschedule_job.
        The callback is fired unconditionally because the serve loop has already
        popped the job from the queue before reschedule_job calls _remove_job —
        remove_job would return False and the callback would silently drop.

        Note: for cancel-initiated removal, use ``dequeue_job`` (synchronous path)
        instead. Both paths fire ``_fire_removal_callbacks`` unconditionally.
        """

        removed = await self._job_queue.remove_job(job)

        if removed:
            self.kick()

        self._fire_removal_callbacks([job])

    async def sleep(self, next_run_time: ZonedDateTime | None = None):
        """Sleep until the next job is due or a kick is received.

        This method will wait for the next job to be due or until a kick is received.
        If a kick is received, it will wake up immediately.

        Args:
            next_run_time: Pre-fetched next run time to avoid an extra lock acquisition.
                If None, uses the default delay.
        """
        try:
            timeout = self._calculate_sleep_time(next_run_time).in_seconds()
            await asyncio.wait_for(self._wakeup_event.wait(), timeout=timeout)
            self.logger.debug("Scheduler woke up due to kick")
        except asyncio.CancelledError:
            self.logger.debug("Scheduler sleep cancelled")
            raise
        except TimeoutError:
            self.logger.debug("Scheduler woke up due to timeout")
        finally:
            self._wakeup_event.clear()

    def _calculate_sleep_time(self, next_run_time: ZonedDateTime | None) -> TimeDelta:
        """Calculate the time to sleep until the next job is due.

        Args:
            next_run_time: The next scheduled run time, or None if no jobs are queued.
        """
        if next_run_time is not None:
            self.logger.debug("Next job scheduled at %s", next_run_time)
            delay = max((next_run_time - date_utils.now()).in_seconds(), self.min_delay)
        else:
            delay = self.default_delay

        delay = min(delay, self.max_delay)
        self.logger.debug("Scheduler sleeping for %s seconds", delay)

        return TimeDelta(seconds=delay)

    async def add_job(self, job: "ScheduledJob") -> None:
        """Register the job in DB, then push it to the queue.

        DB registration is awaited inline — job.db_id is set before the job
        is enqueued to the scheduler heap. This eliminates the window where a
        job fires with db_id=None.

        Trigger type dispatch uses the TriggerProtocol methods exclusively.
        Non-protocol triggers are rejected synchronously by ``Scheduler.schedule()``
        before reaching this path.
        """
        source_location = job.source_location
        registration_source: str | None = job.registration_source or None
        trigger = job.trigger
        if trigger is not None:
            trigger_type: str | None = trigger.trigger_db_type()
            trigger_label: str = trigger.trigger_label()
            trigger_detail: str | None = trigger.trigger_detail()
        else:
            trigger_type = None
            trigger_label = ""
            trigger_detail = None
        reg = ScheduledJobRegistration(
            app_key=job.app_key,
            instance_index=job.instance_index,
            job_name=job.name,
            handler_method=getattr(job.job, "__qualname__", str(job.job)),
            trigger_type=trigger_type,
            trigger_label=trigger_label,
            trigger_detail=trigger_detail,
            args_json=safe_json_serialize(list(job.args)),
            kwargs_json=safe_json_serialize(job.kwargs),
            source_location=source_location,
            registration_source=registration_source,
            source_tier=job.source_tier,
            group=job.group,
            name_auto=job.name_auto,
        )
        job.mark_registered(await self._executor.register_job(reg))
        await self._enqueue_job(job)

    async def _dispatch_and_log(self, job: "ScheduledJob") -> None:
        """Dispatch a job and log its execution.

        Args:
            job: The job to dispatch.
        """
        if job._dequeued:
            self.logger.debug("Job %s was dequeued (cancelled between heap-pop and dispatch), skipping", job)
            return

        self.logger.debug("Dispatching job: %s", job)

        # Run inline — no extra spawn/yield before execution
        try:
            await self.run_job(job)
        except asyncio.CancelledError:
            self.logger.debug("Dispatch cancelled for job %s", job)
            raise

        # Always reschedule after completion, even if the job failed
        try:
            await self.reschedule_job(job)
        except asyncio.CancelledError:
            self.logger.debug("Reschedule cancelled for job %s", job)
            raise
        except Exception:
            self.logger.exception("Error rescheduling job %s", job)

    async def run_job(self, job: "ScheduledJob") -> None:
        """Run a scheduled job by delegating to the CommandExecutor.

        All jobs go through ``ExecuteJob`` regardless of whether ``db_id`` is set.
        When ``db_id`` is ``None`` (job not yet registered), ``ExecuteJob`` is created
        with ``job_db_id=None`` and the ``CommandExecutor`` records an orphan execution row.

        Args:
            job: The job to run.
        """
        lag = (date_utils.now() - job.fire_at).in_seconds()
        if lag > self.hassette.config.scheduler.behind_schedule_threshold_seconds:
            self.logger.warning("Job %s is behind schedule by %.2fs", job, lag)

        async_fn = self.task_bucket.make_async_adapter(job.job)

        async def _bound_callable() -> None:
            await async_fn(*job.args, **job.kwargs)

        # Resolve effective timeout: timeout_disabled → None; job.timeout → use it;
        # job.timeout is None → config default
        if job.timeout_disabled:
            effective_timeout = None
        elif job.timeout is not None:
            effective_timeout = job.timeout
        else:
            effective_timeout = self.hassette.config.scheduler.job_timeout_seconds

        # Resolve the app-level error handler at dispatch time via the closure set by
        # Scheduler.add_job(). This avoids coupling the dispatch path to Scheduler internals.
        app_level_error_handler = (
            job.app_error_handler_resolver() if job.app_error_handler_resolver is not None else None
        )

        cmd = ExecuteJob(
            job=job,
            callable=_bound_callable,
            job_db_id=job.db_id,
            source_tier=job.source_tier,
            effective_timeout=effective_timeout,
            app_level_error_handler=app_level_error_handler,
        )
        await self._executor.execute(cmd)

    async def reschedule_job(self, job: "ScheduledJob") -> None:
        """Reschedule a job based on its trigger's next_run_time().

        If the trigger raises or returns None, the job is treated as exhausted and removed. If the trigger
        returns a non-future time (delta ≤ 0), a WARNING is logged and the next run
        is advanced by 1 second. Exceptions from next_run_time() are caught, logged,
        and treated as exhaustion — the scheduler must never crash due to a
        misbehaving trigger.

        Args:
            job: The job to reschedule.
        """

        try:
            next_run = job.trigger.next_run_time(job.next_run, date_utils.now()) if job.trigger else None
        except Exception:
            self.logger.exception(
                "reschedule_job: trigger raised for db_id=%s callable=%s trigger=%r",
                job.db_id,
                getattr(job.job, "__qualname__", str(job.job)),
                job.trigger,
            )
            await self._remove_job(job)
            return

        if next_run is None:
            await self._remove_job(job)
            return

        curr_next_run = job.next_run
        job.set_next_run(next_run)
        delta_to_now = (job.next_run - date_utils.now()).in_seconds()
        if delta_to_now <= 0:
            self.logger.warning(
                "Trigger produced non-future next_run (%.3fs in the past), advancing by 1s", -delta_to_now
            )
            job.set_next_run(date_utils.now().add(seconds=1))

        self.logger.debug(
            "Rescheduling repeating job %s from %s to %s",
            job,
            curr_next_run,
            job.next_run,
        )
        await self._enqueue_job(job)

    async def trigger_due_jobs(self) -> int:
        """Fire all jobs due at the current time.

        Snapshots due jobs via a single ``pop_due_and_peek_next(date_utils.now())``
        call, then awaits each ``_dispatch_and_log(job)`` inline (not via
        ``task_bucket.spawn``). Jobs re-enqueued during dispatch (repeating jobs)
        are not included in this invocation — only the initial snapshot is
        processed, preventing infinite loops when the clock is frozen.

        This method bypasses the ``serve()`` loop's timing and wakeup logic.
        Intended for controlled test dispatch via ``AppTestHarness.trigger_due_jobs()``
        or ``HassetteHarness.scheduler_service.trigger_due_jobs()``.

        Returns:
            The number of jobs dispatched.
        """
        current_time = date_utils.now()
        due_jobs, _next_run = await self._job_queue.pop_due_and_peek_next(current_time)

        count = 0
        for job in due_jobs:
            await self._dispatch_and_log(job)
            count += 1

        return count

    async def get_all_jobs(self) -> list["ScheduledJob"]:
        """Return all currently scheduled jobs across all apps."""
        return await self._job_queue.get_all()

    def remove_jobs_by_owner(self, owner: str) -> asyncio.Task[None]:
        """Remove all jobs for a given owner.

        Args:
            owner: The owner of the jobs to remove.
        """

        return self.task_bucket.spawn(self._remove_jobs_by_owner(owner), name="scheduler:remove_jobs_by_owner")

    def dequeue_job(self, job: "ScheduledJob") -> bool:
        """Remove a job from the scheduler synchronously, fire removal callbacks, and kick.

        Calls ``_ScheduledJobQueue.remove_item_sync`` directly (no lock). Fires
        ``_fire_removal_callbacks`` unconditionally — even when the job was not in
        the heap — to prevent dict leaks when the serve loop already popped the job.
        Calls ``kick()`` only when the job was actually removed from the heap.

        Args:
            job: The job to remove.

        Returns:
            True if the job was found and removed from the heap, False otherwise.
        """
        removed = self._job_queue.remove_item_sync(job)
        if removed:
            self.logger.debug("Dequeued job: %s", job)
            self.kick()
        else:
            self.logger.debug("Job not in heap (already popped by serve loop): %s", job)
        # Set _dequeued unconditionally — even when the job was already popped
        # from the heap by the serve loop. This prevents the dispatch race
        # (guard in _dispatch_and_log) and makes cancel idempotent.
        job._dequeued = True
        self._fire_removal_callbacks([job])
        return removed

    async def mark_job_cancelled(self, db_id: int) -> None:
        """Persist durable cancellation state for a job by setting ``cancelled_at`` in the DB.

        Delegates to ``CommandExecutor.mark_job_cancelled``. No-op when ``db_id`` is None.

        Args:
            db_id: The ``id`` of the ``scheduled_jobs`` row to mark as cancelled.
        """
        await self._executor.mark_job_cancelled(db_id)

serve() -> None async

Run the scheduler forever, processing jobs as they become due.

Source code in src/hassette/core/scheduler_service.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def serve(self) -> None:
    """Run the scheduler forever, processing jobs as they become due."""

    self.mark_ready(reason="Scheduler started")

    while True:
        if self.shutdown_event.is_set():
            self.mark_not_ready(reason="Hassette is shutting down")
            self.logger.debug("Scheduler exiting")
            return

        due_jobs, next_run_time = await self._job_queue.pop_due_and_peek_next(date_utils.now())

        if due_jobs:
            for job in due_jobs:
                self.task_bucket.spawn(self._dispatch_and_log(job), name="scheduler:dispatch_scheduled_job")

        await self.sleep(next_run_time)

register_removal_callback(owner_id: str, callback: Callable[[ScheduledJob], None]) -> None

Register a callback to be called whenever a job belonging to owner_id is removed.

If a callback is already registered for owner_id, the new callback replaces it. This handles legitimate re-registration during hot-reload cycles where the old Scheduler instance is orphaned without a formal shutdown.

Parameters:

Name Type Description Default
owner_id str

The owner whose job removals should trigger the callback.

required
callback Callable[[ScheduledJob], None]

Called with the removed ScheduledJob as its single argument.

required
Source code in src/hassette/core/scheduler_service.py
122
123
124
125
126
127
128
129
130
131
132
133
def register_removal_callback(self, owner_id: str, callback: Callable[["ScheduledJob"], None]) -> None:
    """Register a callback to be called whenever a job belonging to owner_id is removed.

    If a callback is already registered for owner_id, the new callback replaces it.
    This handles legitimate re-registration during hot-reload cycles where the old
    Scheduler instance is orphaned without a formal shutdown.

    Args:
        owner_id: The owner whose job removals should trigger the callback.
        callback: Called with the removed ScheduledJob as its single argument.
    """
    self._removal_callbacks[owner_id] = callback

deregister_removal_callback(owner_id: str) -> None

Remove the removal callback for owner_id, if any.

No-op when owner_id has no registered callback. Called by Scheduler.on_shutdown so the slot is freed before the Scheduler is re-initialized (e.g. during a hot-reload cycle).

Parameters:

Name Type Description Default
owner_id str

The owner whose callback should be removed.

required
Source code in src/hassette/core/scheduler_service.py
135
136
137
138
139
140
141
142
143
144
145
def deregister_removal_callback(self, owner_id: str) -> None:
    """Remove the removal callback for owner_id, if any.

    No-op when owner_id has no registered callback. Called by
    ``Scheduler.on_shutdown`` so the slot is freed before the Scheduler
    is re-initialized (e.g. during a hot-reload cycle).

    Args:
        owner_id: The owner whose callback should be removed.
    """
    self._removal_callbacks.pop(owner_id, None)

sleep(next_run_time: ZonedDateTime | None = None) async

Sleep until the next job is due or a kick is received.

This method will wait for the next job to be due or until a kick is received. If a kick is received, it will wake up immediately.

Parameters:

Name Type Description Default
next_run_time ZonedDateTime | None

Pre-fetched next run time to avoid an extra lock acquisition. If None, uses the default delay.

None
Source code in src/hassette/core/scheduler_service.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def sleep(self, next_run_time: ZonedDateTime | None = None):
    """Sleep until the next job is due or a kick is received.

    This method will wait for the next job to be due or until a kick is received.
    If a kick is received, it will wake up immediately.

    Args:
        next_run_time: Pre-fetched next run time to avoid an extra lock acquisition.
            If None, uses the default delay.
    """
    try:
        timeout = self._calculate_sleep_time(next_run_time).in_seconds()
        await asyncio.wait_for(self._wakeup_event.wait(), timeout=timeout)
        self.logger.debug("Scheduler woke up due to kick")
    except asyncio.CancelledError:
        self.logger.debug("Scheduler sleep cancelled")
        raise
    except TimeoutError:
        self.logger.debug("Scheduler woke up due to timeout")
    finally:
        self._wakeup_event.clear()

add_job(job: ScheduledJob) -> None async

Register the job in DB, then push it to the queue.

DB registration is awaited inline — job.db_id is set before the job is enqueued to the scheduler heap. This eliminates the window where a job fires with db_id=None.

Trigger type dispatch uses the TriggerProtocol methods exclusively. Non-protocol triggers are rejected synchronously by Scheduler.schedule() before reaching this path.

Source code in src/hassette/core/scheduler_service.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
async def add_job(self, job: "ScheduledJob") -> None:
    """Register the job in DB, then push it to the queue.

    DB registration is awaited inline — job.db_id is set before the job
    is enqueued to the scheduler heap. This eliminates the window where a
    job fires with db_id=None.

    Trigger type dispatch uses the TriggerProtocol methods exclusively.
    Non-protocol triggers are rejected synchronously by ``Scheduler.schedule()``
    before reaching this path.
    """
    source_location = job.source_location
    registration_source: str | None = job.registration_source or None
    trigger = job.trigger
    if trigger is not None:
        trigger_type: str | None = trigger.trigger_db_type()
        trigger_label: str = trigger.trigger_label()
        trigger_detail: str | None = trigger.trigger_detail()
    else:
        trigger_type = None
        trigger_label = ""
        trigger_detail = None
    reg = ScheduledJobRegistration(
        app_key=job.app_key,
        instance_index=job.instance_index,
        job_name=job.name,
        handler_method=getattr(job.job, "__qualname__", str(job.job)),
        trigger_type=trigger_type,
        trigger_label=trigger_label,
        trigger_detail=trigger_detail,
        args_json=safe_json_serialize(list(job.args)),
        kwargs_json=safe_json_serialize(job.kwargs),
        source_location=source_location,
        registration_source=registration_source,
        source_tier=job.source_tier,
        group=job.group,
        name_auto=job.name_auto,
    )
    job.mark_registered(await self._executor.register_job(reg))
    await self._enqueue_job(job)

run_job(job: ScheduledJob) -> None async

Run a scheduled job by delegating to the CommandExecutor.

All jobs go through ExecuteJob regardless of whether db_id is set. When db_id is None (job not yet registered), ExecuteJob is created with job_db_id=None and the CommandExecutor records an orphan execution row.

Parameters:

Name Type Description Default
job ScheduledJob

The job to run.

required
Source code in src/hassette/core/scheduler_service.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
async def run_job(self, job: "ScheduledJob") -> None:
    """Run a scheduled job by delegating to the CommandExecutor.

    All jobs go through ``ExecuteJob`` regardless of whether ``db_id`` is set.
    When ``db_id`` is ``None`` (job not yet registered), ``ExecuteJob`` is created
    with ``job_db_id=None`` and the ``CommandExecutor`` records an orphan execution row.

    Args:
        job: The job to run.
    """
    lag = (date_utils.now() - job.fire_at).in_seconds()
    if lag > self.hassette.config.scheduler.behind_schedule_threshold_seconds:
        self.logger.warning("Job %s is behind schedule by %.2fs", job, lag)

    async_fn = self.task_bucket.make_async_adapter(job.job)

    async def _bound_callable() -> None:
        await async_fn(*job.args, **job.kwargs)

    # Resolve effective timeout: timeout_disabled → None; job.timeout → use it;
    # job.timeout is None → config default
    if job.timeout_disabled:
        effective_timeout = None
    elif job.timeout is not None:
        effective_timeout = job.timeout
    else:
        effective_timeout = self.hassette.config.scheduler.job_timeout_seconds

    # Resolve the app-level error handler at dispatch time via the closure set by
    # Scheduler.add_job(). This avoids coupling the dispatch path to Scheduler internals.
    app_level_error_handler = (
        job.app_error_handler_resolver() if job.app_error_handler_resolver is not None else None
    )

    cmd = ExecuteJob(
        job=job,
        callable=_bound_callable,
        job_db_id=job.db_id,
        source_tier=job.source_tier,
        effective_timeout=effective_timeout,
        app_level_error_handler=app_level_error_handler,
    )
    await self._executor.execute(cmd)

reschedule_job(job: ScheduledJob) -> None async

Reschedule a job based on its trigger's next_run_time().

If the trigger raises or returns None, the job is treated as exhausted and removed. If the trigger returns a non-future time (delta ≤ 0), a WARNING is logged and the next run is advanced by 1 second. Exceptions from next_run_time() are caught, logged, and treated as exhaustion — the scheduler must never crash due to a misbehaving trigger.

Parameters:

Name Type Description Default
job ScheduledJob

The job to reschedule.

required
Source code in src/hassette/core/scheduler_service.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
async def reschedule_job(self, job: "ScheduledJob") -> None:
    """Reschedule a job based on its trigger's next_run_time().

    If the trigger raises or returns None, the job is treated as exhausted and removed. If the trigger
    returns a non-future time (delta ≤ 0), a WARNING is logged and the next run
    is advanced by 1 second. Exceptions from next_run_time() are caught, logged,
    and treated as exhaustion — the scheduler must never crash due to a
    misbehaving trigger.

    Args:
        job: The job to reschedule.
    """

    try:
        next_run = job.trigger.next_run_time(job.next_run, date_utils.now()) if job.trigger else None
    except Exception:
        self.logger.exception(
            "reschedule_job: trigger raised for db_id=%s callable=%s trigger=%r",
            job.db_id,
            getattr(job.job, "__qualname__", str(job.job)),
            job.trigger,
        )
        await self._remove_job(job)
        return

    if next_run is None:
        await self._remove_job(job)
        return

    curr_next_run = job.next_run
    job.set_next_run(next_run)
    delta_to_now = (job.next_run - date_utils.now()).in_seconds()
    if delta_to_now <= 0:
        self.logger.warning(
            "Trigger produced non-future next_run (%.3fs in the past), advancing by 1s", -delta_to_now
        )
        job.set_next_run(date_utils.now().add(seconds=1))

    self.logger.debug(
        "Rescheduling repeating job %s from %s to %s",
        job,
        curr_next_run,
        job.next_run,
    )
    await self._enqueue_job(job)

trigger_due_jobs() -> int async

Fire all jobs due at the current time.

Snapshots due jobs via a single pop_due_and_peek_next(date_utils.now()) call, then awaits each _dispatch_and_log(job) inline (not via task_bucket.spawn). Jobs re-enqueued during dispatch (repeating jobs) are not included in this invocation — only the initial snapshot is processed, preventing infinite loops when the clock is frozen.

This method bypasses the serve() loop's timing and wakeup logic. Intended for controlled test dispatch via AppTestHarness.trigger_due_jobs() or HassetteHarness.scheduler_service.trigger_due_jobs().

Returns:

Type Description
int

The number of jobs dispatched.

Source code in src/hassette/core/scheduler_service.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
async def trigger_due_jobs(self) -> int:
    """Fire all jobs due at the current time.

    Snapshots due jobs via a single ``pop_due_and_peek_next(date_utils.now())``
    call, then awaits each ``_dispatch_and_log(job)`` inline (not via
    ``task_bucket.spawn``). Jobs re-enqueued during dispatch (repeating jobs)
    are not included in this invocation — only the initial snapshot is
    processed, preventing infinite loops when the clock is frozen.

    This method bypasses the ``serve()`` loop's timing and wakeup logic.
    Intended for controlled test dispatch via ``AppTestHarness.trigger_due_jobs()``
    or ``HassetteHarness.scheduler_service.trigger_due_jobs()``.

    Returns:
        The number of jobs dispatched.
    """
    current_time = date_utils.now()
    due_jobs, _next_run = await self._job_queue.pop_due_and_peek_next(current_time)

    count = 0
    for job in due_jobs:
        await self._dispatch_and_log(job)
        count += 1

    return count

get_all_jobs() -> list[ScheduledJob] async

Return all currently scheduled jobs across all apps.

Source code in src/hassette/core/scheduler_service.py
422
423
424
async def get_all_jobs(self) -> list["ScheduledJob"]:
    """Return all currently scheduled jobs across all apps."""
    return await self._job_queue.get_all()

remove_jobs_by_owner(owner: str) -> asyncio.Task[None]

Remove all jobs for a given owner.

Parameters:

Name Type Description Default
owner str

The owner of the jobs to remove.

required
Source code in src/hassette/core/scheduler_service.py
426
427
428
429
430
431
432
433
def remove_jobs_by_owner(self, owner: str) -> asyncio.Task[None]:
    """Remove all jobs for a given owner.

    Args:
        owner: The owner of the jobs to remove.
    """

    return self.task_bucket.spawn(self._remove_jobs_by_owner(owner), name="scheduler:remove_jobs_by_owner")

dequeue_job(job: ScheduledJob) -> bool

Remove a job from the scheduler synchronously, fire removal callbacks, and kick.

Calls _ScheduledJobQueue.remove_item_sync directly (no lock). Fires _fire_removal_callbacks unconditionally — even when the job was not in the heap — to prevent dict leaks when the serve loop already popped the job. Calls kick() only when the job was actually removed from the heap.

Parameters:

Name Type Description Default
job ScheduledJob

The job to remove.

required

Returns:

Type Description
bool

True if the job was found and removed from the heap, False otherwise.

Source code in src/hassette/core/scheduler_service.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def dequeue_job(self, job: "ScheduledJob") -> bool:
    """Remove a job from the scheduler synchronously, fire removal callbacks, and kick.

    Calls ``_ScheduledJobQueue.remove_item_sync`` directly (no lock). Fires
    ``_fire_removal_callbacks`` unconditionally — even when the job was not in
    the heap — to prevent dict leaks when the serve loop already popped the job.
    Calls ``kick()`` only when the job was actually removed from the heap.

    Args:
        job: The job to remove.

    Returns:
        True if the job was found and removed from the heap, False otherwise.
    """
    removed = self._job_queue.remove_item_sync(job)
    if removed:
        self.logger.debug("Dequeued job: %s", job)
        self.kick()
    else:
        self.logger.debug("Job not in heap (already popped by serve loop): %s", job)
    # Set _dequeued unconditionally — even when the job was already popped
    # from the heap by the serve loop. This prevents the dispatch race
    # (guard in _dispatch_and_log) and makes cancel idempotent.
    job._dequeued = True
    self._fire_removal_callbacks([job])
    return removed

mark_job_cancelled(db_id: int) -> None async

Persist durable cancellation state for a job by setting cancelled_at in the DB.

Delegates to CommandExecutor.mark_job_cancelled. No-op when db_id is None.

Parameters:

Name Type Description Default
db_id int

The id of the scheduled_jobs row to mark as cancelled.

required
Source code in src/hassette/core/scheduler_service.py
462
463
464
465
466
467
468
469
470
async def mark_job_cancelled(self, db_id: int) -> None:
    """Persist durable cancellation state for a job by setting ``cancelled_at`` in the DB.

    Delegates to ``CommandExecutor.mark_job_cancelled``. No-op when ``db_id`` is None.

    Args:
        db_id: The ``id`` of the ``scheduled_jobs`` row to mark as cancelled.
    """
    await self._executor.mark_job_cancelled(db_id)

HeapQueue dataclass

Bases: Generic[T]

Source code in src/hassette/core/scheduler_service.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
@dataclass
class HeapQueue(Generic[T]):
    _queue: list[T] = field(default_factory=list)

    def __iter__(self) -> Iterator[T]:
        """Iterate over all items in the queue (unordered)."""
        return iter(self._queue)

    def __len__(self) -> int:
        return len(self._queue)

    def push(self, job: T):
        """Push a job onto the queue."""
        heapq.heappush(self._queue, job)  # pyright: ignore[reportArgumentType]

    def pop(self) -> T:
        """Pop the next job from the queue."""
        return heapq.heappop(self._queue)  # pyright: ignore[reportArgumentType]

    def peek(self) -> T | None:
        """Peek at the next job without removing it.

        Returns:
            T | None: The next job in the queue, or None if the queue is empty"""
        return self._queue[0] if self._queue else None

    def is_empty(self) -> bool:
        """Check if the queue is empty."""
        return not self._queue

    def remove_where(self, predicate: Callable[[T], bool]) -> list[T]:
        """Remove all items matching the predicate, returning the removed items."""

        if not self._queue:
            return []

        remaining: list[T] = []
        removed: list[T] = []
        for item in self._queue:
            if predicate(item):
                removed.append(item)
            else:
                remaining.append(item)

        if removed:
            self._queue = remaining
            heapq.heapify(self._queue)  # pyright: ignore[reportArgumentType]

        return removed

    def remove_item(self, item: T) -> bool:
        """Remove a specific item from the queue if present."""

        if item not in self._queue:
            return False

        self._queue.remove(item)
        heapq.heapify(self._queue)  # pyright: ignore[reportArgumentType]
        return True

__iter__() -> Iterator[T]

Iterate over all items in the queue (unordered).

Source code in src/hassette/core/scheduler_service.py
638
639
640
def __iter__(self) -> Iterator[T]:
    """Iterate over all items in the queue (unordered)."""
    return iter(self._queue)

push(job: T)

Push a job onto the queue.

Source code in src/hassette/core/scheduler_service.py
645
646
647
def push(self, job: T):
    """Push a job onto the queue."""
    heapq.heappush(self._queue, job)  # pyright: ignore[reportArgumentType]

pop() -> T

Pop the next job from the queue.

Source code in src/hassette/core/scheduler_service.py
649
650
651
def pop(self) -> T:
    """Pop the next job from the queue."""
    return heapq.heappop(self._queue)  # pyright: ignore[reportArgumentType]

peek() -> T | None

Peek at the next job without removing it.

Returns:

Type Description
T | None

T | None: The next job in the queue, or None if the queue is empty

Source code in src/hassette/core/scheduler_service.py
653
654
655
656
657
658
def peek(self) -> T | None:
    """Peek at the next job without removing it.

    Returns:
        T | None: The next job in the queue, or None if the queue is empty"""
    return self._queue[0] if self._queue else None

is_empty() -> bool

Check if the queue is empty.

Source code in src/hassette/core/scheduler_service.py
660
661
662
def is_empty(self) -> bool:
    """Check if the queue is empty."""
    return not self._queue

remove_where(predicate: Callable[[T], bool]) -> list[T]

Remove all items matching the predicate, returning the removed items.

Source code in src/hassette/core/scheduler_service.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def remove_where(self, predicate: Callable[[T], bool]) -> list[T]:
    """Remove all items matching the predicate, returning the removed items."""

    if not self._queue:
        return []

    remaining: list[T] = []
    removed: list[T] = []
    for item in self._queue:
        if predicate(item):
            removed.append(item)
        else:
            remaining.append(item)

    if removed:
        self._queue = remaining
        heapq.heapify(self._queue)  # pyright: ignore[reportArgumentType]

    return removed

remove_item(item: T) -> bool

Remove a specific item from the queue if present.

Source code in src/hassette/core/scheduler_service.py
684
685
686
687
688
689
690
691
692
def remove_item(self, item: T) -> bool:
    """Remove a specific item from the queue if present."""

    if item not in self._queue:
        return False

    self._queue.remove(item)
    heapq.heapify(self._queue)  # pyright: ignore[reportArgumentType]
    return True