StateStore — entity registry plus state priming and refresh.
Owns the EntityRegistry and the logic that:
- Subscribes to
state_changed first (with buffering enabled).
- Pulls the REST snapshot and applies it to registered entities.
- Drains the buffered events so any state transitions that occurred
between subscribe and snapshot are applied in order.
This sequence eliminates the connect-time race where events between the
REST snapshot and the live event stream would be lost. The same priming
runs after a reconnect so post-reconnect state is also reconciled.
StateStore
Registry of entities plus priming and refresh of their state.
Parameters:
| Name |
Type |
Description |
Default |
rest
|
RestPort
|
REST adapter used for state snapshots.
|
required
|
events
|
EventBus
|
Event bus used to receive state_changed events.
|
required
|
registry
|
EntityRegistry or None
|
Existing registry instance. A new one is created when omitted.
|
None
|
Source code in src/haclient/core/state.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 | class StateStore:
"""Registry of entities plus priming and refresh of their state.
Parameters
----------
rest : RestPort
REST adapter used for state snapshots.
events : EventBus
Event bus used to receive ``state_changed`` events.
registry : EntityRegistry or None, optional
Existing registry instance. A new one is created when omitted.
"""
def __init__(
self,
rest: RestPort,
events: EventBus,
*,
registry: EntityRegistry | None = None,
) -> None:
self._rest = rest
self._events = events
self._registry = registry or EntityRegistry()
events.subscribe(STATE_CHANGED, self._on_state_changed)
@property
def registry(self) -> EntityRegistry:
"""Return the underlying `EntityRegistry`."""
return self._registry
@property
def rest(self) -> RestPort:
"""Return the REST adapter used for snapshots and refreshes."""
return self._rest
def register(self, entity: Entity) -> None:
"""Register *entity* in the underlying registry."""
self._registry.register(entity)
def get(self, entity_id: str) -> Entity | None:
"""Return the entity with *entity_id* if known."""
return self._registry.get(entity_id)
def __iter__(self) -> Iterator[Entity]:
return iter(self._registry)
async def prime(self) -> None:
"""Subscribe to ``state_changed`` then reconcile with a REST snapshot.
The buffer captures events that arrive between the subscription
confirmation and the REST snapshot. After applying the snapshot
the buffer is drained, applying every captured event in order.
Replays are idempotent because `_apply_state` does a full
replacement.
"""
self._events.enable_buffering(STATE_CHANGED)
await self._events.start()
try:
states = await self._rest.get_states()
except HAClientError as err:
_LOGGER.warning("Initial state fetch failed: %s", err)
self._events.discard_buffer(STATE_CHANGED)
return
for state in states:
eid = state.get("entity_id") if isinstance(state, dict) else None
if not isinstance(eid, str):
continue
entity = self._registry.get(eid)
if entity is not None:
entity._apply_state(state) # noqa: SLF001
await self._events.drain_buffer(STATE_CHANGED)
async def refresh_all(self) -> None:
"""Refresh every registered entity from the REST API."""
states = await self._rest.get_states()
index: dict[str, dict[str, Any]] = {}
for state in states:
if not isinstance(state, dict):
continue
eid = state.get("entity_id")
if isinstance(eid, str):
index[eid] = state
for entity in list(self._registry):
entity._apply_state(index.get(entity.entity_id)) # noqa: SLF001
def _on_state_changed(self, event: dict[str, Any]) -> None:
"""Apply a single ``state_changed`` event to its target entity."""
data = event.get("data") or {}
eid = data.get("entity_id")
if not isinstance(eid, str):
return
entity = self._registry.get(eid)
if entity is None:
return
entity._handle_state_changed( # noqa: SLF001
data.get("old_state"), data.get("new_state")
)
|
registry
property
Return the underlying EntityRegistry.
rest
property
Return the REST adapter used for snapshots and refreshes.
register
register(entity: Entity) -> None
Register entity in the underlying registry.
Source code in src/haclient/core/state.py
| def register(self, entity: Entity) -> None:
"""Register *entity* in the underlying registry."""
self._registry.register(entity)
|
get
get(entity_id: str) -> Entity | None
Return the entity with entity_id if known.
Source code in src/haclient/core/state.py
| def get(self, entity_id: str) -> Entity | None:
"""Return the entity with *entity_id* if known."""
return self._registry.get(entity_id)
|
prime
async
Subscribe to state_changed then reconcile with a REST snapshot.
The buffer captures events that arrive between the subscription
confirmation and the REST snapshot. After applying the snapshot
the buffer is drained, applying every captured event in order.
Replays are idempotent because _apply_state does a full
replacement.
Source code in src/haclient/core/state.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 | async def prime(self) -> None:
"""Subscribe to ``state_changed`` then reconcile with a REST snapshot.
The buffer captures events that arrive between the subscription
confirmation and the REST snapshot. After applying the snapshot
the buffer is drained, applying every captured event in order.
Replays are idempotent because `_apply_state` does a full
replacement.
"""
self._events.enable_buffering(STATE_CHANGED)
await self._events.start()
try:
states = await self._rest.get_states()
except HAClientError as err:
_LOGGER.warning("Initial state fetch failed: %s", err)
self._events.discard_buffer(STATE_CHANGED)
return
for state in states:
eid = state.get("entity_id") if isinstance(state, dict) else None
if not isinstance(eid, str):
continue
entity = self._registry.get(eid)
if entity is not None:
entity._apply_state(state) # noqa: SLF001
await self._events.drain_buffer(STATE_CHANGED)
|
refresh_all
async
Refresh every registered entity from the REST API.
Source code in src/haclient/core/state.py
106
107
108
109
110
111
112
113
114
115
116
117 | async def refresh_all(self) -> None:
"""Refresh every registered entity from the REST API."""
states = await self._rest.get_states()
index: dict[str, dict[str, Any]] = {}
for state in states:
if not isinstance(state, dict):
continue
eid = state.get("entity_id")
if isinstance(eid, str):
index[eid] = state
for entity in list(self._registry):
entity._apply_state(index.get(entity.entity_id)) # noqa: SLF001
|