Skip to content

State Proxy

StateProxy

Bases: Resource

Source code in src/hassette/core/state_proxy.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class StateProxy(Resource):
    depends_on: ClassVar[list[type[Resource]]] = [WebsocketService, ApiResource, BusService, SchedulerService]

    states: dict[str, "HassStateDict"]
    lock: FairAsyncRLock
    _reconnect_lock: asyncio.Lock
    bus: Bus
    scheduler: Scheduler
    state_change_sub: "Subscription | None"
    poll_job: "ScheduledJob | None"

    def __init__(self, hassette: "Hassette", *, parent: Resource | None = None) -> None:
        super().__init__(hassette, parent=parent)
        self.states = {}
        self.lock = FairAsyncRLock()
        self._reconnect_lock = asyncio.Lock()
        self.bus = self.add_child(Bus, priority=100)
        self.scheduler = self.add_child(Scheduler)
        self.state_change_sub = None
        self.poll_job = None

    @property
    def config_log_level(self) -> LOG_LEVEL_TYPE:
        return self.hassette.config.logging.state_proxy

    async def on_initialize(self) -> None:
        """Initialize the state proxy.

        WebsocketService, ApiResource, BusService, and SchedulerService are guaranteed
        ready by depends_on auto-wait. Performs initial state sync and subscribes to
        state change and registry events with high priority.
        """
        self.logger.debug("Dependencies ready, performing initial state sync")

        await self.subscribe_to_events()

        await self.bus.on_websocket_connected(handler=self.on_reconnect, name="hassette.state_proxy.on_reconnect")
        await self.bus.on_websocket_disconnected(handler=self.on_disconnect, name="hassette.state_proxy.on_disconnect")

        # Perform initial state sync
        try:
            await self._load_cache()

            self.mark_ready(reason="Initial state sync complete")

        except Exception as e:
            self.logger.exception("Failed to perform initial state sync: %s", e)
            raise

    async def subscribe_to_events(self) -> None:
        # Cancel existing subscriptions to prevent leaks on rapid reconnect
        if self.state_change_sub is not None:
            self.state_change_sub.cancel()
            self.state_change_sub = None
        if self.poll_job is not None:
            self.scheduler.scheduler_service.dequeue_job(self.poll_job)
            self.poll_job = None

        self.state_change_sub = await self.bus.on(
            topic=Topic.HASS_EVENT_STATE_CHANGED,
            handler=self._on_state_change,
            name="hassette.state_proxy.on_state_change",
        )
        if not self.hassette.config.disable_state_proxy_polling:
            self.poll_job = await self.scheduler.run_every(
                self._load_cache,
                seconds=self.hassette.config.state_proxy_poll_interval_seconds,
                if_exists="skip",
            )
        else:
            self.poll_job = None
            self.logger.warning("State proxy polling is disabled per configuration")

    async def on_shutdown(self) -> None:
        """Shutdown the state proxy and clean up resources."""
        self.logger.debug("Shutting down state proxy")
        self.mark_not_ready(reason="Shutting down")
        # Null out subscription/job references to guard against on_disconnect() race.
        self.poll_job = None
        self.state_change_sub = None

        async with self.lock:
            self.states.clear()

    def num_domain_states(self, domain: str) -> int:
        """Return the number of states for a specific domain.

        Args:
            domain: The domain to filter by (e.g., "light").

        Returns:
            The number of states in the specified domain.

        Raises:
            ResourceNotReadyError: If not ready and cache is empty (cold start).
                When disconnected but cache is populated, stale data is returned.
        """
        return sum(1 for _ in self.yield_domain_states(domain))

    @retry(
        retry=retry_if_exception_type(ResourceNotReadyError),
        stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
        wait=wait_exponential_jitter(),
        reraise=True,
    )
    def get_state(self, entity_id: str) -> "HassStateDict | None":
        """Get the current state for an entity.

        Args:
            entity_id: The entity ID to look up (e.g., "light.kitchen").

        Returns:
            The typed state object if found, None otherwise.

        Raises:
            ResourceNotReadyError: If not ready and cache is empty (cold start).
                When disconnected but cache is populated, stale data is returned.
        """

        # Lock-free read is safe because dict assignment is atomic in CPython
        # and we replace whole objects rather than mutating them

        return self.get_state_once(entity_id)

    def get_state_once(self, entity_id: str) -> "HassStateDict | None":
        # Stale reads allowed when cache is populated; only raise during cold start
        if not self.is_ready() and not self.states:
            raise ResourceNotReadyError(f"StateProxy is not ready (reason: {self._ready_reason}).")

        return self.states.get(entity_id)

    def get_domain_states(self, domain: str) -> dict[str, "HassStateDict"]:
        """Get all states for a specific domain.

        Args:
            domain: The domain to filter by (e.g., "light").

        Returns:
            A dictionary of entity_id to state for the specified domain.

        Raises:
            ResourceNotReadyError: If not ready and cache is empty (cold start).
                When disconnected but cache is populated, stale data is returned.
        """

        return {eid: state for eid, state in self.yield_domain_states(domain)}

    @retry(
        retry=retry_if_exception_type(ResourceNotReadyError),
        stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
        wait=wait_exponential_jitter(),
        reraise=True,
    )
    def yield_domain_states(self, domain: str) -> Generator[tuple[str, "HassStateDict"], Any, None]:
        """Yield all states for a specific domain.

        Args:
            domain: The domain to filter by (e.g., "light").

        Yields:
            Tuples of (entity_id, state) for the specified domain.

        Raises:
            ResourceNotReadyError: If not ready and cache is empty (cold start).
                When disconnected but cache is populated, stale data is returned.
        """
        if not self.is_ready() and not self.states:
            raise ResourceNotReadyError(f"StateProxy is not ready (reason: {self._ready_reason}).")

        # Snapshot to avoid RuntimeError if _load_cache() mutates the dict mid-iteration
        for eid, state in list(self.states.items()):
            try:
                if extract_domain(eid) == domain:
                    yield eid, state
            except ValueError:
                self.logger.warning("State for entity %s has invalid 'entity_id' value", eid)

    @retry(
        retry=retry_if_exception_type(ResourceNotReadyError),
        stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
        wait=wait_exponential_jitter(),
        reraise=True,
    )
    def __contains__(self, entity_id: str) -> bool:
        """Check if a specific entity ID exists in the state proxy.

        Args:
            entity_id: The entity ID to check (e.g., "light.kitchen").

        Returns:
            True if the entity exists, False otherwise.

        Raises:
            ResourceNotReadyError: If not ready and cache is empty (cold start).
                When disconnected but cache is populated, stale data is returned.
        """
        if not self.is_ready() and not self.states:
            raise ResourceNotReadyError(f"StateProxy is not ready (reason: {self._ready_reason}).")
        return entity_id in self.states

    async def _on_state_change(self, event: RawStateChangeEvent) -> None:
        """Handle state_changed events to update the cache.

        This handler runs with priority=100 to ensure the cache is updated before
        app handlers process the event.

        Args:
            entity_id: The entity ID that changed.
            new_state: The new state object, or None if the entity was removed.
        """
        # note: we are not listening to entity_registry_updated because state_changed seems to capture
        # both the new state when renamed and the removal when deleted.

        entity_id = event.payload.data.entity_id
        old_state_dict = event.payload.data.old_state
        new_state_dict = event.payload.data.new_state

        self.logger.debug("State changed event for %s", entity_id)
        async with self.lock:
            if new_state_dict is None:
                if entity_id in self.states:
                    self.states.pop(entity_id)
                    self.logger.debug("Removed state for %s", entity_id)
                    return
                self.logger.debug("Ignoring removal of unknown entity %s", entity_id)
                return

            # walrus operator to help type checker know we already validated these aren't None
            if (
                entity_id in self.states
                and (curr_last_updated := self.states[entity_id].get("last_updated")) is not None
                and (new_last_updated := new_state_dict.get("last_updated")) is not None
            ):
                if new_last_updated <= curr_last_updated:
                    self.logger.debug(
                        "Ignoring out-of-date state update for %s (new last_updated: %s, current: %s)",
                        entity_id,
                        new_last_updated,
                        curr_last_updated,
                    )
                    return

            self.states[entity_id] = new_state_dict
            if old_state_dict is None:
                self.logger.debug("Added state for %s", entity_id)
            else:
                self.logger.debug("Updated state for %s", entity_id)

    async def on_disconnect(self) -> None:
        """Handle WebSocket disconnection.

        Retains the state cache so consumers can read stale data while disconnected
        instead of hitting ResourceNotReadyError. Callers can check ``is_ready()`` to
        distinguish fresh from stale data. The cache is replaced with fresh data on
        reconnect via ``_load_cache()``.

        This method is idempotent: if StateProxy is already not-ready, subsequent
        calls are no-ops. This prevents redundant work during early-drop retry loops.
        """
        if not self.is_ready():
            return

        self.logger.info("WebSocket disconnected, retaining stale state cache (%d entities)", len(self.states))

        # cancel the state change subscription (WS events won't arrive while disconnected)
        if self.state_change_sub is not None:
            self.state_change_sub.cancel()
            self.state_change_sub = None

        # poll job stays alive so the cache can self-heal between
        # disconnect and the next on_reconnect call

        self.mark_not_ready(reason="Disconnected")
        await self._emit_readiness_event()

    async def on_reconnect(self) -> None:
        """Handle Home Assistant start events to trigger state resync.

        This runs after Home Assistant restart to rebuild the state cache.
        Serialized via _reconnect_lock to prevent duplicate subscriptions
        when WebSocket flaps rapidly (#993).
        """
        async with self._reconnect_lock:
            self.logger.info("WebSocket reconnected, performing state resync")

            load_cache_succeeded = False
            try:
                await self._load_cache()
                load_cache_succeeded = True
            except Exception as e:
                self.logger.exception("Failed to resync states after HA restart: %s", e)

            subscribe_succeeded = False
            try:
                await self.subscribe_to_events()
                subscribe_succeeded = True
            except Exception as e:
                self.logger.exception("Failed to subscribe to events after reconnect: %s", e)

            if load_cache_succeeded and subscribe_succeeded:
                self.mark_ready(reason="Connected")
            elif not load_cache_succeeded:
                self.mark_not_ready(reason="Failed to resync states after HA restart")
            else:
                self.mark_not_ready(reason="Failed to subscribe to events after reconnect")

            await self._emit_readiness_event()

    async def _load_cache(self) -> None:
        """Load the state cache from Home Assistant.

        This is called during initialization and reconnection to populate
        the state cache, as well as during periodic polling to keep the cache up to date.
        """
        states = await self.hassette.api.get_states_raw()
        state_dict = {s["entity_id"]: s for s in states if s["entity_id"]}
        async with self.lock:
            self.states = state_dict

        self.logger.debug("State cache loaded, tracking %d entities", len(self.states))

on_initialize() -> None async

Initialize the state proxy.

WebsocketService, ApiResource, BusService, and SchedulerService are guaranteed ready by depends_on auto-wait. Performs initial state sync and subscribes to state change and registry events with high priority.

Source code in src/hassette/core/state_proxy.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def on_initialize(self) -> None:
    """Initialize the state proxy.

    WebsocketService, ApiResource, BusService, and SchedulerService are guaranteed
    ready by depends_on auto-wait. Performs initial state sync and subscribes to
    state change and registry events with high priority.
    """
    self.logger.debug("Dependencies ready, performing initial state sync")

    await self.subscribe_to_events()

    await self.bus.on_websocket_connected(handler=self.on_reconnect, name="hassette.state_proxy.on_reconnect")
    await self.bus.on_websocket_disconnected(handler=self.on_disconnect, name="hassette.state_proxy.on_disconnect")

    # Perform initial state sync
    try:
        await self._load_cache()

        self.mark_ready(reason="Initial state sync complete")

    except Exception as e:
        self.logger.exception("Failed to perform initial state sync: %s", e)
        raise

on_shutdown() -> None async

Shutdown the state proxy and clean up resources.

Source code in src/hassette/core/state_proxy.py
102
103
104
105
106
107
108
109
110
111
async def on_shutdown(self) -> None:
    """Shutdown the state proxy and clean up resources."""
    self.logger.debug("Shutting down state proxy")
    self.mark_not_ready(reason="Shutting down")
    # Null out subscription/job references to guard against on_disconnect() race.
    self.poll_job = None
    self.state_change_sub = None

    async with self.lock:
        self.states.clear()

num_domain_states(domain: str) -> int

Return the number of states for a specific domain.

Parameters:

Name Type Description Default
domain str

The domain to filter by (e.g., "light").

required

Returns:

Type Description
int

The number of states in the specified domain.

Raises:

Type Description
ResourceNotReadyError

If not ready and cache is empty (cold start). When disconnected but cache is populated, stale data is returned.

Source code in src/hassette/core/state_proxy.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def num_domain_states(self, domain: str) -> int:
    """Return the number of states for a specific domain.

    Args:
        domain: The domain to filter by (e.g., "light").

    Returns:
        The number of states in the specified domain.

    Raises:
        ResourceNotReadyError: If not ready and cache is empty (cold start).
            When disconnected but cache is populated, stale data is returned.
    """
    return sum(1 for _ in self.yield_domain_states(domain))

get_state(entity_id: str) -> HassStateDict | None

Get the current state for an entity.

Parameters:

Name Type Description Default
entity_id str

The entity ID to look up (e.g., "light.kitchen").

required

Returns:

Type Description
HassStateDict | None

The typed state object if found, None otherwise.

Raises:

Type Description
ResourceNotReadyError

If not ready and cache is empty (cold start). When disconnected but cache is populated, stale data is returned.

Source code in src/hassette/core/state_proxy.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@retry(
    retry=retry_if_exception_type(ResourceNotReadyError),
    stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
    wait=wait_exponential_jitter(),
    reraise=True,
)
def get_state(self, entity_id: str) -> "HassStateDict | None":
    """Get the current state for an entity.

    Args:
        entity_id: The entity ID to look up (e.g., "light.kitchen").

    Returns:
        The typed state object if found, None otherwise.

    Raises:
        ResourceNotReadyError: If not ready and cache is empty (cold start).
            When disconnected but cache is populated, stale data is returned.
    """

    # Lock-free read is safe because dict assignment is atomic in CPython
    # and we replace whole objects rather than mutating them

    return self.get_state_once(entity_id)

get_domain_states(domain: str) -> dict[str, HassStateDict]

Get all states for a specific domain.

Parameters:

Name Type Description Default
domain str

The domain to filter by (e.g., "light").

required

Returns:

Type Description
dict[str, HassStateDict]

A dictionary of entity_id to state for the specified domain.

Raises:

Type Description
ResourceNotReadyError

If not ready and cache is empty (cold start). When disconnected but cache is populated, stale data is returned.

Source code in src/hassette/core/state_proxy.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_domain_states(self, domain: str) -> dict[str, "HassStateDict"]:
    """Get all states for a specific domain.

    Args:
        domain: The domain to filter by (e.g., "light").

    Returns:
        A dictionary of entity_id to state for the specified domain.

    Raises:
        ResourceNotReadyError: If not ready and cache is empty (cold start).
            When disconnected but cache is populated, stale data is returned.
    """

    return {eid: state for eid, state in self.yield_domain_states(domain)}

yield_domain_states(domain: str) -> Generator[tuple[str, HassStateDict], Any, None]

Yield all states for a specific domain.

Parameters:

Name Type Description Default
domain str

The domain to filter by (e.g., "light").

required

Yields:

Type Description
tuple[str, HassStateDict]

Tuples of (entity_id, state) for the specified domain.

Raises:

Type Description
ResourceNotReadyError

If not ready and cache is empty (cold start). When disconnected but cache is populated, stale data is returned.

Source code in src/hassette/core/state_proxy.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@retry(
    retry=retry_if_exception_type(ResourceNotReadyError),
    stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
    wait=wait_exponential_jitter(),
    reraise=True,
)
def yield_domain_states(self, domain: str) -> Generator[tuple[str, "HassStateDict"], Any, None]:
    """Yield all states for a specific domain.

    Args:
        domain: The domain to filter by (e.g., "light").

    Yields:
        Tuples of (entity_id, state) for the specified domain.

    Raises:
        ResourceNotReadyError: If not ready and cache is empty (cold start).
            When disconnected but cache is populated, stale data is returned.
    """
    if not self.is_ready() and not self.states:
        raise ResourceNotReadyError(f"StateProxy is not ready (reason: {self._ready_reason}).")

    # Snapshot to avoid RuntimeError if _load_cache() mutates the dict mid-iteration
    for eid, state in list(self.states.items()):
        try:
            if extract_domain(eid) == domain:
                yield eid, state
        except ValueError:
            self.logger.warning("State for entity %s has invalid 'entity_id' value", eid)

__contains__(entity_id: str) -> bool

Check if a specific entity ID exists in the state proxy.

Parameters:

Name Type Description Default
entity_id str

The entity ID to check (e.g., "light.kitchen").

required

Returns:

Type Description
bool

True if the entity exists, False otherwise.

Raises:

Type Description
ResourceNotReadyError

If not ready and cache is empty (cold start). When disconnected but cache is populated, stale data is returned.

Source code in src/hassette/core/state_proxy.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@retry(
    retry=retry_if_exception_type(ResourceNotReadyError),
    stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
    wait=wait_exponential_jitter(),
    reraise=True,
)
def __contains__(self, entity_id: str) -> bool:
    """Check if a specific entity ID exists in the state proxy.

    Args:
        entity_id: The entity ID to check (e.g., "light.kitchen").

    Returns:
        True if the entity exists, False otherwise.

    Raises:
        ResourceNotReadyError: If not ready and cache is empty (cold start).
            When disconnected but cache is populated, stale data is returned.
    """
    if not self.is_ready() and not self.states:
        raise ResourceNotReadyError(f"StateProxy is not ready (reason: {self._ready_reason}).")
    return entity_id in self.states

on_disconnect() -> None async

Handle WebSocket disconnection.

Retains the state cache so consumers can read stale data while disconnected instead of hitting ResourceNotReadyError. Callers can check is_ready() to distinguish fresh from stale data. The cache is replaced with fresh data on reconnect via _load_cache().

This method is idempotent: if StateProxy is already not-ready, subsequent calls are no-ops. This prevents redundant work during early-drop retry loops.

Source code in src/hassette/core/state_proxy.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
async def on_disconnect(self) -> None:
    """Handle WebSocket disconnection.

    Retains the state cache so consumers can read stale data while disconnected
    instead of hitting ResourceNotReadyError. Callers can check ``is_ready()`` to
    distinguish fresh from stale data. The cache is replaced with fresh data on
    reconnect via ``_load_cache()``.

    This method is idempotent: if StateProxy is already not-ready, subsequent
    calls are no-ops. This prevents redundant work during early-drop retry loops.
    """
    if not self.is_ready():
        return

    self.logger.info("WebSocket disconnected, retaining stale state cache (%d entities)", len(self.states))

    # cancel the state change subscription (WS events won't arrive while disconnected)
    if self.state_change_sub is not None:
        self.state_change_sub.cancel()
        self.state_change_sub = None

    # poll job stays alive so the cache can self-heal between
    # disconnect and the next on_reconnect call

    self.mark_not_ready(reason="Disconnected")
    await self._emit_readiness_event()

on_reconnect() -> None async

Handle Home Assistant start events to trigger state resync.

This runs after Home Assistant restart to rebuild the state cache. Serialized via _reconnect_lock to prevent duplicate subscriptions when WebSocket flaps rapidly (#993).

Source code in src/hassette/core/state_proxy.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def on_reconnect(self) -> None:
    """Handle Home Assistant start events to trigger state resync.

    This runs after Home Assistant restart to rebuild the state cache.
    Serialized via _reconnect_lock to prevent duplicate subscriptions
    when WebSocket flaps rapidly (#993).
    """
    async with self._reconnect_lock:
        self.logger.info("WebSocket reconnected, performing state resync")

        load_cache_succeeded = False
        try:
            await self._load_cache()
            load_cache_succeeded = True
        except Exception as e:
            self.logger.exception("Failed to resync states after HA restart: %s", e)

        subscribe_succeeded = False
        try:
            await self.subscribe_to_events()
            subscribe_succeeded = True
        except Exception as e:
            self.logger.exception("Failed to subscribe to events after reconnect: %s", e)

        if load_cache_succeeded and subscribe_succeeded:
            self.mark_ready(reason="Connected")
        elif not load_cache_succeeded:
            self.mark_not_ready(reason="Failed to resync states after HA restart")
        else:
            self.mark_not_ready(reason="Failed to subscribe to events after reconnect")

        await self._emit_readiness_event()