Skip to content

State Store

state

StateStore — entity registry plus state priming and refresh.

Owns the EntityRegistry and the logic that:

  1. Subscribes to state_changed first (with buffering enabled).
  2. Pulls the REST snapshot and applies it to registered entities.
  3. Drains the buffered events so any state transitions that occurred between subscribe and snapshot are applied in order.

This sequence eliminates the connect-time race where events between the REST snapshot and the live event stream would be lost. The same priming runs after a reconnect so post-reconnect state is also reconciled.

StateStore

Registry of entities plus priming and refresh of their state.

Parameters:

Name Type Description Default
rest RestPort

REST adapter used for state snapshots.

required
events EventBus

Event bus used to receive state_changed events.

required
registry EntityRegistry or None

Existing registry instance. A new one is created when omitted.

None
Source code in src/haclient/core/state.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class StateStore:
    """Registry of entities plus priming and refresh of their state.

    Parameters
    ----------
    rest : RestPort
        REST adapter used for state snapshots.
    events : EventBus
        Event bus used to receive ``state_changed`` events.
    registry : EntityRegistry or None, optional
        Existing registry instance. A new one is created when omitted.
    """

    def __init__(
        self,
        rest: RestPort,
        events: EventBus,
        *,
        registry: EntityRegistry | None = None,
    ) -> None:
        self._rest = rest
        self._events = events
        self._registry = registry or EntityRegistry()
        events.subscribe(STATE_CHANGED, self._on_state_changed)

    @property
    def registry(self) -> EntityRegistry:
        """Return the underlying `EntityRegistry`."""
        return self._registry

    @property
    def rest(self) -> RestPort:
        """Return the REST adapter used for snapshots and refreshes."""
        return self._rest

    def register(self, entity: Entity) -> None:
        """Register *entity* in the underlying registry."""
        self._registry.register(entity)

    def get(self, entity_id: str) -> Entity | None:
        """Return the entity with *entity_id* if known."""
        return self._registry.get(entity_id)

    def __iter__(self) -> Iterator[Entity]:
        return iter(self._registry)

    async def prime(self) -> None:
        """Subscribe to ``state_changed`` then reconcile with a REST snapshot.

        The buffer captures events that arrive between the subscription
        confirmation and the REST snapshot. After applying the snapshot
        the buffer is drained, applying every captured event in order.
        Replays are idempotent because `_apply_state` does a full
        replacement.
        """
        self._events.enable_buffering(STATE_CHANGED)
        await self._events.start()
        try:
            states = await self._rest.get_states()
        except HAClientError as err:
            _LOGGER.warning("Initial state fetch failed: %s", err)
            self._events.discard_buffer(STATE_CHANGED)
            return
        for state in states:
            eid = state.get("entity_id") if isinstance(state, dict) else None
            if not isinstance(eid, str):
                continue
            entity = self._registry.get(eid)
            if entity is not None:
                entity._apply_state(state)  # noqa: SLF001
        await self._events.drain_buffer(STATE_CHANGED)

    async def refresh_all(self) -> None:
        """Refresh every registered entity from the REST API."""
        states = await self._rest.get_states()
        index: dict[str, dict[str, Any]] = {}
        for state in states:
            if not isinstance(state, dict):
                continue
            eid = state.get("entity_id")
            if isinstance(eid, str):
                index[eid] = state
        for entity in list(self._registry):
            entity._apply_state(index.get(entity.entity_id))  # noqa: SLF001

    def _on_state_changed(self, event: dict[str, Any]) -> None:
        """Apply a single ``state_changed`` event to its target entity."""
        data = event.get("data") or {}
        eid = data.get("entity_id")
        if not isinstance(eid, str):
            return
        entity = self._registry.get(eid)
        if entity is None:
            return
        entity._handle_state_changed(  # noqa: SLF001
            data.get("old_state"), data.get("new_state")
        )

registry property

registry: EntityRegistry

Return the underlying EntityRegistry.

rest property

rest: RestPort

Return the REST adapter used for snapshots and refreshes.

register

register(entity: Entity) -> None

Register entity in the underlying registry.

Source code in src/haclient/core/state.py
69
70
71
def register(self, entity: Entity) -> None:
    """Register *entity* in the underlying registry."""
    self._registry.register(entity)

get

get(entity_id: str) -> Entity | None

Return the entity with entity_id if known.

Source code in src/haclient/core/state.py
73
74
75
def get(self, entity_id: str) -> Entity | None:
    """Return the entity with *entity_id* if known."""
    return self._registry.get(entity_id)

prime async

prime() -> None

Subscribe to state_changed then reconcile with a REST snapshot.

The buffer captures events that arrive between the subscription confirmation and the REST snapshot. After applying the snapshot the buffer is drained, applying every captured event in order. Replays are idempotent because _apply_state does a full replacement.

Source code in src/haclient/core/state.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
async def prime(self) -> None:
    """Subscribe to ``state_changed`` then reconcile with a REST snapshot.

    The buffer captures events that arrive between the subscription
    confirmation and the REST snapshot. After applying the snapshot
    the buffer is drained, applying every captured event in order.
    Replays are idempotent because `_apply_state` does a full
    replacement.
    """
    self._events.enable_buffering(STATE_CHANGED)
    await self._events.start()
    try:
        states = await self._rest.get_states()
    except HAClientError as err:
        _LOGGER.warning("Initial state fetch failed: %s", err)
        self._events.discard_buffer(STATE_CHANGED)
        return
    for state in states:
        eid = state.get("entity_id") if isinstance(state, dict) else None
        if not isinstance(eid, str):
            continue
        entity = self._registry.get(eid)
        if entity is not None:
            entity._apply_state(state)  # noqa: SLF001
    await self._events.drain_buffer(STATE_CHANGED)

refresh_all async

refresh_all() -> None

Refresh every registered entity from the REST API.

Source code in src/haclient/core/state.py
106
107
108
109
110
111
112
113
114
115
116
117
async def refresh_all(self) -> None:
    """Refresh every registered entity from the REST API."""
    states = await self._rest.get_states()
    index: dict[str, dict[str, Any]] = {}
    for state in states:
        if not isinstance(state, dict):
            continue
        eid = state.get("entity_id")
        if isinstance(eid, str):
            index[eid] = state
    for entity in list(self._registry):
        entity._apply_state(index.get(entity.entity_id))  # noqa: SLF001