Skip to content

Listeners

ListenerIdentity dataclass

Groups ownership and telemetry fields that identify who registered a listener and where it came from.

Source code in src/hassette/bus/listeners.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass(slots=True)
class ListenerIdentity:
    """Groups ownership and telemetry fields that identify who registered a listener and where it came from."""

    owner_id: str
    """Unique string identifier for the owner of the listener."""

    handler_name: str
    """Human-readable fully-qualified name for the handler, computed once at creation time."""

    handler_short_name: str
    """Short (last-segment) name for the handler, computed once at creation time."""

    app_key: str = ""
    """Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners."""

    instance_index: int = 0
    """App instance index for DB registration. 0 for non-App owners."""

    name: str | None = None
    """Optional stable name for the listener (the name= escape hatch on Bus.on())."""

    source_tier: SourceTier = "app"
    """Whether this listener originates from a user app or the framework itself."""

    source_location: str = ""
    """Captured source location (file:line) of the user code that registered this listener."""

    registration_source: str = ""
    """Captured source code snippet of the registration call."""

owner_id: str instance-attribute

Unique string identifier for the owner of the listener.

handler_name: str instance-attribute

Human-readable fully-qualified name for the handler, computed once at creation time.

handler_short_name: str instance-attribute

Short (last-segment) name for the handler, computed once at creation time.

app_key: str = '' class-attribute instance-attribute

Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners.

instance_index: int = 0 class-attribute instance-attribute

App instance index for DB registration. 0 for non-App owners.

name: str | None = None class-attribute instance-attribute

Optional stable name for the listener (the name= escape hatch on Bus.on()).

source_tier: SourceTier = 'app' class-attribute instance-attribute

Whether this listener originates from a user app or the framework itself.

source_location: str = '' class-attribute instance-attribute

Captured source location (file:line) of the user code that registered this listener.

registration_source: str = '' class-attribute instance-attribute

Captured source code snippet of the registration call.

ListenerOptions dataclass

Behavioral timing parameters (once, debounce, throttle, timeout, priority) with validation.

Source code in src/hassette/bus/listeners.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass(slots=True)
class ListenerOptions:
    """Behavioral timing parameters (once, debounce, throttle, timeout, priority) with validation."""

    once: bool = False
    """Whether the listener should be removed after one invocation."""

    debounce: float | None = None
    """Debounce delay in seconds. Events reset the timer; handler fires after the quiet period."""

    throttle: float | None = None
    """Throttle interval in seconds. At most one handler execution per window; extras are dropped."""

    timeout: float | None = None
    """Per-listener timeout in seconds. Overrides the global event_handler_timeout_seconds config.
    None means fall through to the config default."""

    timeout_disabled: bool = False
    """When True, disables timeout enforcement for this listener regardless of config."""

    priority: int = 0
    """Priority for listener ordering. Higher values run first. Default is 0 for app handlers."""

    def __post_init__(self) -> None:
        if self.debounce is not None and self.debounce <= 0:
            raise ValueError("'debounce' must be a positive number")
        if self.throttle is not None and self.throttle <= 0:
            raise ValueError("'throttle' must be a positive number")
        if self.debounce is not None and self.throttle is not None:
            raise ValueError("Cannot specify both 'debounce' and 'throttle' parameters")
        if self.once and (self.debounce is not None or self.throttle is not None):
            raise ValueError("Cannot combine 'once=True' with 'debounce' or 'throttle'")
        if self.timeout is not None and (isinstance(self.timeout, bool) or self.timeout <= 0):
            raise ValueError("timeout must be a positive number")
        if self.timeout_disabled and self.timeout is not None:
            raise ValueError("Cannot specify both 'timeout' and 'timeout_disabled=True'")

once: bool = False class-attribute instance-attribute

Whether the listener should be removed after one invocation.

debounce: float | None = None class-attribute instance-attribute

Debounce delay in seconds. Events reset the timer; handler fires after the quiet period.

throttle: float | None = None class-attribute instance-attribute

Throttle interval in seconds. At most one handler execution per window; extras are dropped.

timeout: float | None = None class-attribute instance-attribute

Per-listener timeout in seconds. Overrides the global event_handler_timeout_seconds config. None means fall through to the config default.

timeout_disabled: bool = False class-attribute instance-attribute

When True, disables timeout enforcement for this listener regardless of config.

priority: int = 0 class-attribute instance-attribute

Priority for listener ordering. Higher values run first. Default is 0 for app handlers.

HandlerInvoker dataclass

Owns handler invocation, async wrapping, parameter injection, rate limiting, and the once-guard.

Source code in src/hassette/bus/listeners.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
@dataclass(slots=True)
class HandlerInvoker:
    """Owns handler invocation, async wrapping, parameter injection, rate limiting, and the once-guard."""

    orig_handler: "HandlerType"
    """Original handler function provided by the user."""

    async_handler: "AsyncHandlerType"
    """Async-wrapped handler function."""

    injector: ParameterInjector
    """Parameter injector for dependency injection."""

    kwargs: Mapping[str, Any] | None
    """Keyword arguments to pass to the handler."""

    error_handler: "BusErrorHandlerType | None"
    """Optional per-listener error handler."""

    app_error_handler_resolver: "Callable[[], BusErrorHandlerType | None] | None"
    """Closure that resolves the app-level error handler at dispatch time."""

    rate_limiter: RateLimiter | None
    """Rate limiter for debounce/throttle. None when no rate limiting is configured."""

    once: bool = False
    """Whether this invoker fires only once. Intentional copy of ListenerOptions.once —
    dispatch() needs this but cannot back-reference options without a circular dependency."""

    fired: bool = field(default=False, init=False)
    """Guard for once=True: set before the first invocation to prevent double-fire."""

    @classmethod
    def create(
        cls,
        task_bucket: "TaskBucket",
        handler: "HandlerType",
        kwargs: Mapping[str, Any] | None,
        options: ListenerOptions,
        error_handler: "BusErrorHandlerType | None" = None,
        app_error_handler_resolver: "Callable[[], BusErrorHandlerType | None] | None" = None,
    ) -> "HandlerInvoker":
        """Construct a HandlerInvoker from a handler and options.

        Builds the async wrapper, injector, and rate limiter. Copies options.once.

        Args:
            task_bucket: TaskBucket for async adapter and rate limiter.
            handler: The user-supplied handler callable.
            kwargs: Optional keyword arguments to pass to the handler.
            options: Behavioral options (once, debounce, throttle).
            error_handler: Optional per-listener error handler.
            app_error_handler_resolver: Closure for app-level error handler resolution.
        """
        handler_name = callable_name(handler)
        signature = get_typed_signature(handler)
        async_handler = make_async_handler(handler, task_bucket)
        injector = ParameterInjector(handler_name, signature)

        rate_limiter: RateLimiter | None = None
        if options.debounce is not None or options.throttle is not None:
            rate_limiter = RateLimiter(
                task_bucket=task_bucket,
                debounce=options.debounce,
                throttle=options.throttle,
                handler_name=handler_name,
            )

        return cls(
            orig_handler=handler,
            async_handler=async_handler,
            injector=injector,
            kwargs=kwargs,
            error_handler=error_handler,
            app_error_handler_resolver=app_error_handler_resolver,
            rate_limiter=rate_limiter,
            once=options.once,
        )

    def mark_fired(self) -> None:
        """Mark this once-invoker as having fired. Called by dispatch() and Listener.cancel()."""
        self.fired = True

    def set_app_error_handler_resolver(self, resolver: "Callable[[], BusErrorHandlerType | None]") -> None:
        """Set the closure that resolves the app-level error handler at dispatch time."""
        self.app_error_handler_resolver = resolver

    async def dispatch(self, invoke_fn: Callable[[], Awaitable[None]]) -> None:
        """Apply rate limiting around the given invoke function.

        BusService builds the invoke function (internal error-catching or tracked
        telemetry), HandlerInvoker wraps it with rate limiting. BusService never
        touches the RateLimiter directly.

        Includes once-guard: if ``once=True`` and the invoker has already fired,
        this method returns immediately. Safe without a lock — no ``await`` between
        check-and-set.
        """
        if self.once and self.fired:
            return
        if self.once:
            self.mark_fired()

        if self.rate_limiter:
            await self.rate_limiter.call(invoke_fn)
        else:
            await invoke_fn()

    def cancel(self) -> None:
        """Cancel any pending rate-limiter tasks."""
        if self.rate_limiter:
            self.rate_limiter.cancel()

    async def invoke(self, event: "Event[Any]") -> None:
        """Invoke the handler with dependency injection."""
        kwargs = self.injector.inject_parameters(event, **(self.kwargs or {}))
        await self.async_handler(**kwargs)

orig_handler: HandlerType instance-attribute

Original handler function provided by the user.

async_handler: AsyncHandlerType instance-attribute

Async-wrapped handler function.

injector: ParameterInjector instance-attribute

Parameter injector for dependency injection.

kwargs: Mapping[str, Any] | None instance-attribute

Keyword arguments to pass to the handler.

error_handler: BusErrorHandlerType | None instance-attribute

Optional per-listener error handler.

app_error_handler_resolver: Callable[[], BusErrorHandlerType | None] | None instance-attribute

Closure that resolves the app-level error handler at dispatch time.

rate_limiter: RateLimiter | None instance-attribute

Rate limiter for debounce/throttle. None when no rate limiting is configured.

once: bool = False class-attribute instance-attribute

Whether this invoker fires only once. Intentional copy of ListenerOptions.once — dispatch() needs this but cannot back-reference options without a circular dependency.

fired: bool = field(default=False, init=False) class-attribute instance-attribute

Guard for once=True: set before the first invocation to prevent double-fire.

create(task_bucket: TaskBucket, handler: HandlerType, kwargs: Mapping[str, Any] | None, options: ListenerOptions, error_handler: BusErrorHandlerType | None = None, app_error_handler_resolver: Callable[[], BusErrorHandlerType | None] | None = None) -> HandlerInvoker classmethod

Construct a HandlerInvoker from a handler and options.

Builds the async wrapper, injector, and rate limiter. Copies options.once.

Parameters:

Name Type Description Default
task_bucket TaskBucket

TaskBucket for async adapter and rate limiter.

required
handler HandlerType

The user-supplied handler callable.

required
kwargs Mapping[str, Any] | None

Optional keyword arguments to pass to the handler.

required
options ListenerOptions

Behavioral options (once, debounce, throttle).

required
error_handler BusErrorHandlerType | None

Optional per-listener error handler.

None
app_error_handler_resolver Callable[[], BusErrorHandlerType | None] | None

Closure for app-level error handler resolution.

None
Source code in src/hassette/bus/listeners.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@classmethod
def create(
    cls,
    task_bucket: "TaskBucket",
    handler: "HandlerType",
    kwargs: Mapping[str, Any] | None,
    options: ListenerOptions,
    error_handler: "BusErrorHandlerType | None" = None,
    app_error_handler_resolver: "Callable[[], BusErrorHandlerType | None] | None" = None,
) -> "HandlerInvoker":
    """Construct a HandlerInvoker from a handler and options.

    Builds the async wrapper, injector, and rate limiter. Copies options.once.

    Args:
        task_bucket: TaskBucket for async adapter and rate limiter.
        handler: The user-supplied handler callable.
        kwargs: Optional keyword arguments to pass to the handler.
        options: Behavioral options (once, debounce, throttle).
        error_handler: Optional per-listener error handler.
        app_error_handler_resolver: Closure for app-level error handler resolution.
    """
    handler_name = callable_name(handler)
    signature = get_typed_signature(handler)
    async_handler = make_async_handler(handler, task_bucket)
    injector = ParameterInjector(handler_name, signature)

    rate_limiter: RateLimiter | None = None
    if options.debounce is not None or options.throttle is not None:
        rate_limiter = RateLimiter(
            task_bucket=task_bucket,
            debounce=options.debounce,
            throttle=options.throttle,
            handler_name=handler_name,
        )

    return cls(
        orig_handler=handler,
        async_handler=async_handler,
        injector=injector,
        kwargs=kwargs,
        error_handler=error_handler,
        app_error_handler_resolver=app_error_handler_resolver,
        rate_limiter=rate_limiter,
        once=options.once,
    )

mark_fired() -> None

Mark this once-invoker as having fired. Called by dispatch() and Listener.cancel().

Source code in src/hassette/bus/listeners.py
184
185
186
def mark_fired(self) -> None:
    """Mark this once-invoker as having fired. Called by dispatch() and Listener.cancel()."""
    self.fired = True

set_app_error_handler_resolver(resolver: Callable[[], BusErrorHandlerType | None]) -> None

Set the closure that resolves the app-level error handler at dispatch time.

Source code in src/hassette/bus/listeners.py
188
189
190
def set_app_error_handler_resolver(self, resolver: "Callable[[], BusErrorHandlerType | None]") -> None:
    """Set the closure that resolves the app-level error handler at dispatch time."""
    self.app_error_handler_resolver = resolver

dispatch(invoke_fn: Callable[[], Awaitable[None]]) -> None async

Apply rate limiting around the given invoke function.

BusService builds the invoke function (internal error-catching or tracked telemetry), HandlerInvoker wraps it with rate limiting. BusService never touches the RateLimiter directly.

Includes once-guard: if once=True and the invoker has already fired, this method returns immediately. Safe without a lock — no await between check-and-set.

Source code in src/hassette/bus/listeners.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
async def dispatch(self, invoke_fn: Callable[[], Awaitable[None]]) -> None:
    """Apply rate limiting around the given invoke function.

    BusService builds the invoke function (internal error-catching or tracked
    telemetry), HandlerInvoker wraps it with rate limiting. BusService never
    touches the RateLimiter directly.

    Includes once-guard: if ``once=True`` and the invoker has already fired,
    this method returns immediately. Safe without a lock — no ``await`` between
    check-and-set.
    """
    if self.once and self.fired:
        return
    if self.once:
        self.mark_fired()

    if self.rate_limiter:
        await self.rate_limiter.call(invoke_fn)
    else:
        await invoke_fn()

cancel() -> None

Cancel any pending rate-limiter tasks.

Source code in src/hassette/bus/listeners.py
213
214
215
216
def cancel(self) -> None:
    """Cancel any pending rate-limiter tasks."""
    if self.rate_limiter:
        self.rate_limiter.cancel()

invoke(event: Event[Any]) -> None async

Invoke the handler with dependency injection.

Source code in src/hassette/bus/listeners.py
218
219
220
221
async def invoke(self, event: "Event[Any]") -> None:
    """Invoke the handler with dependency injection."""
    kwargs = self.injector.inject_parameters(event, **(self.kwargs or {}))
    await self.async_handler(**kwargs)

DurationConfig dataclass

Groups duration-hold configuration fields and owns the timer lifecycle; timer is attached via attach_timer().

Source code in src/hassette/bus/listeners.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@dataclass(slots=True)
class DurationConfig:
    """Groups duration-hold configuration fields and owns the timer lifecycle; timer is attached via attach_timer()."""

    entity_id: str
    """Entity ID this duration listener is tracking. Required — non-empty."""

    duration: float | None = None
    """Duration in seconds the entity must remain in the matching state before the handler fires.
    None for immediate-only or entity_id-only listeners."""

    immediate: bool = False
    """If True, fire the handler immediately with the current entity state on registration."""

    is_attribute_listener: bool = False
    """True when this listener was registered via on_attribute_change."""

    hold_predicate: "Predicate | None" = None
    """State-value predicates only (excludes transition predicates like StateFrom, StateDidChange).
    Used by DurationTimer for cancel evaluation and fire-time recheck. None when not set."""

    _timer: "DurationTimer | None" = field(default=None, init=False)
    """Duration timer. Attached via attach_timer() during BusService registration."""

    def __post_init__(self) -> None:
        if not self.entity_id:
            raise ValueError("'entity_id' must be a non-empty string")
        if self.duration is not None and self.duration <= 0:
            raise ValueError("'duration' must be a positive number")

    @property
    def timer(self) -> "DurationTimer":
        """Return the attached DurationTimer. Asserts it has been attached."""
        assert self._timer is not None, "timer not yet attached — call attach_timer() first"
        return self._timer

    def cancel_timer(self) -> None:
        """Cancel the attached duration timer if present."""
        if self._timer is not None:
            self._timer.cancel()

    def attach_timer(
        self,
        task_bucket: "TaskBucket",
        owner_id: str,
        create_cancel_sub: "Callable[[], Subscription]",
        on_cancel: Callable[[], None] | None = None,
    ) -> None:
        """Construct a DurationTimer and store it.

        BusService calls this method during registration, passing the
        cancel-subscription factory and on_cancel callback. Counter
        ownership stays in BusService.
        """
        assert self._timer is None, "timer already attached — call cancel() before re-attaching"
        assert self.duration is not None, "attach_timer() requires a non-None duration"
        self._timer = DurationTimer(
            task_bucket=task_bucket,
            duration=self.duration,
            predicates=self.hold_predicate,
            entity_id=self.entity_id,
            owner_id=owner_id,
            create_cancel_sub=create_cancel_sub,
            on_cancel=on_cancel,
        )

entity_id: str instance-attribute

Entity ID this duration listener is tracking. Required — non-empty.

duration: float | None = None class-attribute instance-attribute

Duration in seconds the entity must remain in the matching state before the handler fires. None for immediate-only or entity_id-only listeners.

immediate: bool = False class-attribute instance-attribute

If True, fire the handler immediately with the current entity state on registration.

is_attribute_listener: bool = False class-attribute instance-attribute

True when this listener was registered via on_attribute_change.

hold_predicate: Predicate | None = None class-attribute instance-attribute

State-value predicates only (excludes transition predicates like StateFrom, StateDidChange). Used by DurationTimer for cancel evaluation and fire-time recheck. None when not set.

timer: DurationTimer property

Return the attached DurationTimer. Asserts it has been attached.

cancel_timer() -> None

Cancel the attached duration timer if present.

Source code in src/hassette/bus/listeners.py
260
261
262
263
def cancel_timer(self) -> None:
    """Cancel the attached duration timer if present."""
    if self._timer is not None:
        self._timer.cancel()

attach_timer(task_bucket: TaskBucket, owner_id: str, create_cancel_sub: Callable[[], Subscription], on_cancel: Callable[[], None] | None = None) -> None

Construct a DurationTimer and store it.

BusService calls this method during registration, passing the cancel-subscription factory and on_cancel callback. Counter ownership stays in BusService.

Source code in src/hassette/bus/listeners.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def attach_timer(
    self,
    task_bucket: "TaskBucket",
    owner_id: str,
    create_cancel_sub: "Callable[[], Subscription]",
    on_cancel: Callable[[], None] | None = None,
) -> None:
    """Construct a DurationTimer and store it.

    BusService calls this method during registration, passing the
    cancel-subscription factory and on_cancel callback. Counter
    ownership stays in BusService.
    """
    assert self._timer is None, "timer already attached — call cancel() before re-attaching"
    assert self.duration is not None, "attach_timer() requires a non-None duration"
    self._timer = DurationTimer(
        task_bucket=task_bucket,
        duration=self.duration,
        predicates=self.hold_predicate,
        entity_id=self.entity_id,
        owner_id=owner_id,
        create_cancel_sub=create_cancel_sub,
        on_cancel=on_cancel,
    )

Listener dataclass

A listener for events with a specific topic and handler.

Composes four focused sub-structs (identity, invoker, options, duration_config) plus routing fields (topic, predicate) and minimal runtime state.

Source code in src/hassette/bus/listeners.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@dataclass(slots=True)
class Listener:
    """A listener for events with a specific topic and handler.

    Composes four focused sub-structs (identity, invoker, options, duration_config)
    plus routing fields (topic, predicate) and minimal runtime state.
    """

    logger: Logger
    """Logger for the listener."""

    topic: str
    """Topic the listener is subscribed to."""

    predicate: "Predicate | None"
    """Predicate to filter events before invoking the handler."""

    identity: ListenerIdentity
    """Ownership and telemetry identity fields."""

    invoker: HandlerInvoker
    """Handler callable, dispatch engine, and once-guard."""

    options: ListenerOptions
    """Behavioral execution parameters."""

    duration_config: DurationConfig | None
    """Duration-hold configuration and timer. None for non-duration listeners."""

    listener_id: int = field(default_factory=next_id, init=False)
    """Unique identifier for the listener instance."""

    _cancelled: bool = field(default=False, init=False, repr=False)
    """Set by cancel() to signal that a pending add_listener task should skip route insertion."""

    db_id: int | None = field(default=None, init=False)
    """Database row ID for this listener. Set by the executor after persistence; None until then."""

    @property
    def is_cancelled(self) -> bool:
        """Whether this listener has been cancelled. Read-only — use cancel() to set."""
        return self._cancelled

    def mark_registered(self, db_id: int) -> None:
        """Set the database ID after persistence. One-time assignment by BusService."""
        if self.db_id is not None:
            LOGGER.warning(
                "Listener %s already registered with db_id=%s, ignoring new db_id=%s",
                self.listener_id,
                self.db_id,
                db_id,
            )
            return
        self.db_id = db_id

    def cancel(self) -> None:
        """Cancel the listener: set the cancelled flag and stop any pending tasks.

        Sets _cancelled flag, calls invoker.mark_fired() to prevent handler invocation
        on any in-flight dispatch task, and cancels rate limiter and duration timer.

        Terminal operation: the listener must not be reused after this call.
        """
        self._cancelled = True
        self.invoker.mark_fired()
        self.invoker.cancel()
        if self.duration_config is not None:
            self.duration_config.cancel_timer()

    def matches(self, ev: "Event[Any]") -> bool:
        """Check if the event matches the listener's predicate."""
        if self.predicate is None:
            return True
        matched = self.predicate(ev)

        verdict = "matched" if matched else "did not match"
        self.logger.debug("Listener %s %s predicate for event: %s", self, verdict, ev)
        return matched

    def __repr__(self) -> str:
        return f"Listener<{self.identity.owner_id} - {self.identity.handler_short_name}>"

    @classmethod
    def create(
        cls,
        topic: str,
        identity: ListenerIdentity,
        options: ListenerOptions,
        invoker: HandlerInvoker,
        where: "Predicate | Sequence[Predicate] | None" = None,
        duration_config: DurationConfig | None = None,
        logger: Logger = LOGGER,
    ) -> "Listener":
        """Create a Listener from pre-built sub-structs.

        Cross-concern validation (duration + debounce incompatibility) runs
        here since it spans two sub-structs.
        """
        if duration_config is not None and duration_config.duration is not None:
            if options.debounce is not None:
                raise ValueError("Cannot combine 'duration' with 'debounce'")
            if options.throttle is not None:
                raise ValueError("Cannot combine 'duration' with 'throttle'")

        pred = normalize_where(where)
        return cls(
            logger=logger,
            topic=topic,
            predicate=pred,
            identity=identity,
            invoker=invoker,
            options=options,
            duration_config=duration_config,
        )

    @classmethod
    def create_cancel_listener(
        cls,
        task_bucket: "TaskBucket",
        owner_id: str,
        topic: str,
        handler: "HandlerType",
        predicate: "Predicate | None" = None,
    ) -> "Listener":
        """Create a framework cancel-listener with sensible defaults.

        Produces a listener with source_tier='framework'. No rate limiter,
        no error handler, no duration config.
        """
        handler_name = callable_name(handler)
        short_name = callable_short_name(handler)

        identity = ListenerIdentity(
            owner_id=owner_id,
            handler_name=handler_name,
            handler_short_name=short_name,
            source_tier="framework",
        )

        options = ListenerOptions()

        invoker = HandlerInvoker.create(
            task_bucket=task_bucket,
            handler=handler,
            kwargs=None,
            options=options,
            error_handler=None,
        )

        return cls(
            logger=LOGGER,
            topic=topic,
            predicate=predicate,
            identity=identity,
            invoker=invoker,
            options=options,
            duration_config=None,
        )

logger: Logger instance-attribute

Logger for the listener.

topic: str instance-attribute

Topic the listener is subscribed to.

predicate: Predicate | None instance-attribute

Predicate to filter events before invoking the handler.

identity: ListenerIdentity instance-attribute

Ownership and telemetry identity fields.

invoker: HandlerInvoker instance-attribute

Handler callable, dispatch engine, and once-guard.

options: ListenerOptions instance-attribute

Behavioral execution parameters.

duration_config: DurationConfig | None instance-attribute

Duration-hold configuration and timer. None for non-duration listeners.

listener_id: int = field(default_factory=next_id, init=False) class-attribute instance-attribute

Unique identifier for the listener instance.

db_id: int | None = field(default=None, init=False) class-attribute instance-attribute

Database row ID for this listener. Set by the executor after persistence; None until then.

is_cancelled: bool property

Whether this listener has been cancelled. Read-only — use cancel() to set.

mark_registered(db_id: int) -> None

Set the database ID after persistence. One-time assignment by BusService.

Source code in src/hassette/bus/listeners.py
334
335
336
337
338
339
340
341
342
343
344
def mark_registered(self, db_id: int) -> None:
    """Set the database ID after persistence. One-time assignment by BusService."""
    if self.db_id is not None:
        LOGGER.warning(
            "Listener %s already registered with db_id=%s, ignoring new db_id=%s",
            self.listener_id,
            self.db_id,
            db_id,
        )
        return
    self.db_id = db_id

cancel() -> None

Cancel the listener: set the cancelled flag and stop any pending tasks.

Sets _cancelled flag, calls invoker.mark_fired() to prevent handler invocation on any in-flight dispatch task, and cancels rate limiter and duration timer.

Terminal operation: the listener must not be reused after this call.

Source code in src/hassette/bus/listeners.py
346
347
348
349
350
351
352
353
354
355
356
357
358
def cancel(self) -> None:
    """Cancel the listener: set the cancelled flag and stop any pending tasks.

    Sets _cancelled flag, calls invoker.mark_fired() to prevent handler invocation
    on any in-flight dispatch task, and cancels rate limiter and duration timer.

    Terminal operation: the listener must not be reused after this call.
    """
    self._cancelled = True
    self.invoker.mark_fired()
    self.invoker.cancel()
    if self.duration_config is not None:
        self.duration_config.cancel_timer()

matches(ev: Event[Any]) -> bool

Check if the event matches the listener's predicate.

Source code in src/hassette/bus/listeners.py
360
361
362
363
364
365
366
367
368
def matches(self, ev: "Event[Any]") -> bool:
    """Check if the event matches the listener's predicate."""
    if self.predicate is None:
        return True
    matched = self.predicate(ev)

    verdict = "matched" if matched else "did not match"
    self.logger.debug("Listener %s %s predicate for event: %s", self, verdict, ev)
    return matched

create(topic: str, identity: ListenerIdentity, options: ListenerOptions, invoker: HandlerInvoker, where: Predicate | Sequence[Predicate] | None = None, duration_config: DurationConfig | None = None, logger: Logger = LOGGER) -> Listener classmethod

Create a Listener from pre-built sub-structs.

Cross-concern validation (duration + debounce incompatibility) runs here since it spans two sub-structs.

Source code in src/hassette/bus/listeners.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@classmethod
def create(
    cls,
    topic: str,
    identity: ListenerIdentity,
    options: ListenerOptions,
    invoker: HandlerInvoker,
    where: "Predicate | Sequence[Predicate] | None" = None,
    duration_config: DurationConfig | None = None,
    logger: Logger = LOGGER,
) -> "Listener":
    """Create a Listener from pre-built sub-structs.

    Cross-concern validation (duration + debounce incompatibility) runs
    here since it spans two sub-structs.
    """
    if duration_config is not None and duration_config.duration is not None:
        if options.debounce is not None:
            raise ValueError("Cannot combine 'duration' with 'debounce'")
        if options.throttle is not None:
            raise ValueError("Cannot combine 'duration' with 'throttle'")

    pred = normalize_where(where)
    return cls(
        logger=logger,
        topic=topic,
        predicate=pred,
        identity=identity,
        invoker=invoker,
        options=options,
        duration_config=duration_config,
    )

create_cancel_listener(task_bucket: TaskBucket, owner_id: str, topic: str, handler: HandlerType, predicate: Predicate | None = None) -> Listener classmethod

Create a framework cancel-listener with sensible defaults.

Produces a listener with source_tier='framework'. No rate limiter, no error handler, no duration config.

Source code in src/hassette/bus/listeners.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@classmethod
def create_cancel_listener(
    cls,
    task_bucket: "TaskBucket",
    owner_id: str,
    topic: str,
    handler: "HandlerType",
    predicate: "Predicate | None" = None,
) -> "Listener":
    """Create a framework cancel-listener with sensible defaults.

    Produces a listener with source_tier='framework'. No rate limiter,
    no error handler, no duration config.
    """
    handler_name = callable_name(handler)
    short_name = callable_short_name(handler)

    identity = ListenerIdentity(
        owner_id=owner_id,
        handler_name=handler_name,
        handler_short_name=short_name,
        source_tier="framework",
    )

    options = ListenerOptions()

    invoker = HandlerInvoker.create(
        task_bucket=task_bucket,
        handler=handler,
        kwargs=None,
        options=options,
        error_handler=None,
    )

    return cls(
        logger=LOGGER,
        topic=topic,
        predicate=predicate,
        identity=identity,
        invoker=invoker,
        options=options,
        duration_config=None,
    )

Subscription dataclass

A subscription to an event topic with a specific listener key.

This class is used to manage the lifecycle of a listener, allowing it to be cancelled or managed within a context.

Source code in src/hassette/bus/listeners.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
@dataclass(slots=True)
class Subscription:
    """A subscription to an event topic with a specific listener key.

    This class is used to manage the lifecycle of a listener, allowing it to be cancelled
    or managed within a context.
    """

    listener: Listener
    """The listener associated with this subscription."""

    unsubscribe: Callable[[], None]
    """Function to call to unsubscribe the listener."""

    def cancel(self) -> None:
        """Cancel the subscription by calling the unsubscribe function."""
        self.unsubscribe()

listener: Listener instance-attribute

The listener associated with this subscription.

unsubscribe: Callable[[], None] instance-attribute

Function to call to unsubscribe the listener.

cancel() -> None

Cancel the subscription by calling the unsubscribe function.

Source code in src/hassette/bus/listeners.py
465
466
467
def cancel(self) -> None:
    """Cancel the subscription by calling the unsubscribe function."""
    self.unsubscribe()

make_async_handler(fn: HandlerType, task_bucket: TaskBucket) -> AsyncHandlerType

Wrap a function to ensure it is always called as an async handler.

If the function is already an async function, it will be called directly. If it is a regular function, it will be run in an executor to avoid blocking the event loop.

Parameters:

Name Type Description Default
fn HandlerType

The function to adapt.

required
task_bucket TaskBucket

TaskBucket used to create the async adapter (runs sync handlers in executor).

required

Returns:

Type Description
AsyncHandlerType

An async handler that wraps the original function.

Source code in src/hassette/bus/listeners.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def make_async_handler(fn: "HandlerType", task_bucket: "TaskBucket") -> "AsyncHandlerType":
    """Wrap a function to ensure it is always called as an async handler.

    If the function is already an async function, it will be called directly.
    If it is a regular function, it will be run in an executor to avoid blocking the event loop.

    Args:
        fn: The function to adapt.
        task_bucket: TaskBucket used to create the async adapter (runs sync handlers in executor).

    Returns:
        An async handler that wraps the original function.
    """
    return cast("AsyncHandlerType", task_bucket.make_async_adapter(fn))