Skip to content

WebSocket adapter (aiohttp)

ws_aiohttp

aiohttp-based implementation of WebSocketPort.

The implementation focuses on robustness:

  • A single background task (_reader_task) consumes incoming frames.
  • Each outgoing command gets a monotonically increasing id and resolves through an :class:asyncio.Future when the matching result arrives.
  • A separate _keepalive_task periodically sends ping messages.
  • On unexpected disconnects, an exponential back-off reconnect loop is spawned and any registered subscriptions are re-established before the reconnect listeners fire.

AiohttpWebSocketAdapter

Async Home Assistant WebSocket adapter.

Parameters:

Name Type Description Default
url str

Fully-qualified WebSocket URL.

required
token str

Long-lived access token.

required
session ClientSession or None

Externally-owned session to reuse.

None
reconnect bool

Whether to reconnect automatically.

True
ping_interval float

Seconds between keepalive pings (0 disables them).

30.0
request_timeout float

Default timeout for command responses.

30.0
verify_ssl bool

Verify TLS certificates.

True
Source code in src/haclient/infra/ws_aiohttp.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class AiohttpWebSocketAdapter:
    """Async Home Assistant WebSocket adapter.

    Parameters
    ----------
    url : str
        Fully-qualified WebSocket URL.
    token : str
        Long-lived access token.
    session : aiohttp.ClientSession or None, optional
        Externally-owned session to reuse.
    reconnect : bool, optional
        Whether to reconnect automatically.
    ping_interval : float, optional
        Seconds between keepalive pings (``0`` disables them).
    request_timeout : float, optional
        Default timeout for command responses.
    verify_ssl : bool, optional
        Verify TLS certificates.
    """

    def __init__(
        self,
        url: str,
        token: str,
        *,
        session: aiohttp.ClientSession | None = None,
        reconnect: bool = True,
        ping_interval: float = 30.0,
        request_timeout: float = 30.0,
        verify_ssl: bool = True,
    ) -> None:
        self._url = url
        self._token = token
        self._session = session
        self._owns_session = session is None
        self._reconnect = reconnect
        self._ping_interval = ping_interval
        self._request_timeout = request_timeout
        self._verify_ssl = verify_ssl

        self._ws: aiohttp.ClientWebSocketResponse | None = None
        self._message_id = 0
        self._pending: dict[int, asyncio.Future[Any]] = {}
        self._pong_waiters: dict[int, asyncio.Future[Any]] = {}
        self._subscriptions: dict[int, EventHandler] = {}
        self._event_subs: dict[str, tuple[int, EventHandler]] = {}

        self._reader_task: asyncio.Task[None] | None = None
        self._keepalive_task: asyncio.Task[None] | None = None
        self._reconnect_task: asyncio.Task[None] | None = None
        self._closing = False
        self._connected = asyncio.Event()
        self._disconnect_listeners: list[DisconnectListener] = []
        self._reconnect_listeners: list[ReconnectListener] = []

    @property
    def connected(self) -> bool:
        """Return ``True`` while the underlying socket is open."""
        return self._ws is not None and not self._ws.closed

    async def connect(self) -> None:
        """Establish the WebSocket connection and authenticate."""
        if self.connected:
            return
        self._closing = False
        await self._do_connect()
        self._reader_task = asyncio.create_task(self._reader_loop(), name="ha-ws-reader")
        if self._ping_interval > 0:
            self._keepalive_task = asyncio.create_task(
                self._keepalive_loop(), name="ha-ws-keepalive"
            )

    async def _ensure_session(self) -> aiohttp.ClientSession:
        """Return the current session, creating one if necessary."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
            self._owns_session = True
        return self._session

    async def _do_connect(self) -> None:
        """Open the WebSocket and perform the auth handshake."""
        session = await self._ensure_session()
        try:
            self._ws = await session.ws_connect(
                self._url,
                heartbeat=None,
                ssl=self._verify_ssl,
                autoping=False,
            )
        except aiohttp.ClientError as err:
            raise HAClientError(f"Failed to connect to {self._url}: {err}") from err

        msg = await self._recv_json()
        if msg.get("type") != "auth_required":
            raise AuthenticationError(f"Expected auth_required, got {msg.get('type')!r}")
        await self._ws.send_json({"type": "auth", "access_token": self._token})
        msg = await self._recv_json()
        mtype = msg.get("type")
        if mtype == "auth_invalid":
            await self._ws.close()
            raise AuthenticationError(msg.get("message", "Invalid access token"))
        if mtype != "auth_ok":
            raise AuthenticationError(f"Unexpected auth response: {mtype!r}")

        self._connected.set()

    async def close(self) -> None:
        """Close the WebSocket and stop background tasks.

        Notes
        -----
        Side-effects, in order:

        1. Marks the adapter as closing so the reader/keepalive loops
           exit instead of triggering a reconnect.
        2. Cancels and awaits the keepalive task.
        3. Closes the underlying socket and waits up to 5 seconds for
           the reader task to drain.
        4. Cancels and awaits any in-flight reconnect task so a
           background reconnect cannot survive ``close()``.
        5. Fails any pending command/pong futures with
           `ConnectionClosedError`.
        6. Closes the owned aiohttp session, if any.
        """
        self._closing = True
        self._connected.clear()
        if self._keepalive_task is not None:
            self._keepalive_task.cancel()
            with contextlib.suppress(asyncio.CancelledError, Exception):
                await self._keepalive_task
            self._keepalive_task = None
        if self._ws is not None and not self._ws.closed:
            await self._ws.close()
        if self._reader_task is not None:
            try:
                await asyncio.wait_for(self._reader_task, timeout=5.0)
            except (TimeoutError, asyncio.CancelledError, Exception):  # noqa: BLE001
                self._reader_task.cancel()
            self._reader_task = None
        if self._reconnect_task is not None:
            self._reconnect_task.cancel()
            with contextlib.suppress(asyncio.CancelledError, Exception):
                await self._reconnect_task
            self._reconnect_task = None
        for fut in self._pending.values():
            if not fut.done():
                fut.set_exception(ConnectionClosedError("WebSocket closed"))
        self._pending.clear()
        for fut in self._pong_waiters.values():
            if not fut.done():
                fut.set_exception(ConnectionClosedError("WebSocket closed"))
        self._pong_waiters.clear()
        if self._owns_session and self._session is not None and not self._session.closed:
            await self._session.close()

    def on_disconnect(self, handler: DisconnectListener) -> DisconnectListener:
        """Register a disconnect listener.

        Parameters
        ----------
        handler : DisconnectListener
            Sync or async zero-argument callable invoked when the
            WebSocket connection drops.

        Returns
        -------
        DisconnectListener
            The same *handler*, returned so the method can be used as a
            decorator.
        """
        self._disconnect_listeners.append(handler)
        return handler

    def on_reconnect(self, handler: ReconnectListener) -> ReconnectListener:
        """Register a reconnect listener.

        Parameters
        ----------
        handler : ReconnectListener
            Sync or async zero-argument callable invoked after the
            WebSocket reconnects and prior subscriptions have been
            re-established.

        Returns
        -------
        ReconnectListener
            The same *handler*, returned so the method can be used as a
            decorator.
        """
        self._reconnect_listeners.append(handler)
        return handler

    def _next_id(self) -> int:
        """Return the next monotonically-increasing message id."""
        self._message_id += 1
        return self._message_id

    async def _recv_json(self) -> dict[str, Any]:
        """Read a single JSON message from the WebSocket."""
        assert self._ws is not None
        msg = await self._ws.receive()
        if msg.type == aiohttp.WSMsgType.TEXT:
            data = msg.json()
            if isinstance(data, dict):
                return data
            raise HAClientError(f"Unexpected WebSocket payload: {data!r}")
        if msg.type in (
            aiohttp.WSMsgType.CLOSE,
            aiohttp.WSMsgType.CLOSED,
            aiohttp.WSMsgType.CLOSING,
        ):
            raise ConnectionClosedError("WebSocket closed during handshake")
        if msg.type == aiohttp.WSMsgType.ERROR:
            raise HAClientError(f"WebSocket error: {self._ws.exception()}")
        raise HAClientError(f"Unexpected WebSocket message type: {msg.type}")

    async def send_command(
        self,
        payload: dict[str, Any],
        *,
        timeout: float | None = None,
    ) -> Any:
        """Send a command and await its ``result`` frame.

        An ``id`` field is injected automatically; callers must not set
        it themselves.

        Parameters
        ----------
        payload : dict
            Command body without an ``id`` field.
        timeout : float or None, optional
            Seconds to wait for the matching ``result`` frame. ``None``
            uses the adapter's configured ``request_timeout``.

        Returns
        -------
        Any
            The ``result`` field of the response frame.

        Raises
        ------
        ConnectionClosedError
            If the WebSocket is not currently connected.
        TimeoutError
            If no matching ``result`` arrives within *timeout*.
        CommandError
            If Home Assistant responds with ``success=False``.
        """
        if not self.connected:
            raise ConnectionClosedError("WebSocket is not connected")
        assert self._ws is not None

        cmd_id = self._next_id()
        msg = {"id": cmd_id, **payload}
        fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
        self._pending[cmd_id] = fut
        try:
            await self._ws.send_json(msg)
            result = await asyncio.wait_for(
                fut, timeout=timeout if timeout is not None else self._request_timeout
            )
        except TimeoutError as err:
            self._pending.pop(cmd_id, None)
            raise HATimeoutError(f"Timed out waiting for response to command id={cmd_id}") from err
        finally:
            self._pending.pop(cmd_id, None)
        return result

    async def subscribe_events(
        self,
        handler: EventHandler,
        event_type: str | None = None,
    ) -> int:
        """Subscribe to Home Assistant events.

        Subscriptions are remembered so they can be re-established
        automatically on reconnect.

        Parameters
        ----------
        handler : callable
            Sync or async callable invoked with each matching event.
        event_type : str or None, optional
            Restrict the subscription to a single event type. ``None``
            subscribes to all events.

        Returns
        -------
        int
            The subscription id, suitable for passing to `unsubscribe`.
        """
        payload: dict[str, Any] = {"type": "subscribe_events"}
        if event_type is not None:
            payload["event_type"] = event_type
        cmd_id = self._next_id()
        self._subscriptions[cmd_id] = handler
        if event_type is not None:
            self._event_subs[event_type] = (cmd_id, handler)
        msg = {"id": cmd_id, **payload}
        assert self._ws is not None
        fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
        self._pending[cmd_id] = fut
        try:
            await self._ws.send_json(msg)
            await asyncio.wait_for(fut, timeout=self._request_timeout)
        except Exception:
            self._subscriptions.pop(cmd_id, None)
            if event_type is not None:
                self._event_subs.pop(event_type, None)
            self._pending.pop(cmd_id, None)
            raise
        return cmd_id

    async def unsubscribe(self, subscription_id: int) -> None:
        """Cancel a previously registered subscription.

        Parameters
        ----------
        subscription_id : int
            The id returned by `subscribe_events`. Unknown ids are
            silently ignored on the local side; HA may still return an
            error.
        """
        await self.send_command({"type": "unsubscribe_events", "subscription": subscription_id})
        self._subscriptions.pop(subscription_id, None)
        for k, (sid, _handler) in list(self._event_subs.items()):
            if sid == subscription_id:
                self._event_subs.pop(k, None)

    async def _reader_loop(self) -> None:
        """Consume incoming WebSocket messages until the socket closes."""
        assert self._ws is not None
        try:
            while not self._closing:
                try:
                    msg = await self._ws.receive()
                except Exception as err:  # noqa: BLE001
                    _LOGGER.warning("WebSocket receive failed: %s", err)
                    break

                if msg.type == aiohttp.WSMsgType.TEXT:
                    try:
                        data = msg.json()
                    except ValueError:
                        _LOGGER.warning("Non-JSON WebSocket frame: %r", msg.data)
                        continue
                    if isinstance(data, dict):
                        await self._dispatch(data)
                elif msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    _LOGGER.debug("WebSocket closed by peer")
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    _LOGGER.warning("WebSocket error: %s", self._ws.exception())
                    break
        finally:
            self._connected.clear()
            for fut in list(self._pending.values()):
                if not fut.done():
                    fut.set_exception(ConnectionClosedError("WebSocket closed"))
            self._pending.clear()
            for fut in list(self._pong_waiters.values()):
                if not fut.done():
                    fut.set_exception(ConnectionClosedError("WebSocket closed"))
            self._pong_waiters.clear()
            await self._notify_disconnect()
            if (
                self._reconnect
                and not self._closing
                and (self._reconnect_task is None or self._reconnect_task.done())
            ):
                self._reconnect_task = asyncio.create_task(
                    self._reconnect_loop(), name="ha-ws-reconnect"
                )

    async def _notify_disconnect(self) -> None:
        """Invoke all registered disconnect listeners."""
        for listener in list(self._disconnect_listeners):
            try:
                result = listener()
                if asyncio.iscoroutine(result):
                    await result
            except Exception:  # pragma: no cover - defensive
                _LOGGER.exception("Disconnect listener raised")

    async def _notify_reconnect(self) -> None:
        """Invoke all registered reconnect listeners."""
        for listener in list(self._reconnect_listeners):
            try:
                result = listener()
                if asyncio.iscoroutine(result):
                    await result
            except Exception:  # pragma: no cover - defensive
                _LOGGER.exception("Reconnect listener raised")

    async def _dispatch(self, msg: dict[str, Any]) -> None:
        """Route an incoming message to its handler or future."""
        mtype = msg.get("type")
        mid = msg.get("id")
        if mtype == "result":
            fut = self._pending.get(mid) if isinstance(mid, int) else None
            if fut is None or fut.done():
                return
            if msg.get("success", False):
                fut.set_result(msg.get("result"))
            else:
                err = msg.get("error") or {}
                fut.set_exception(
                    CommandError(
                        str(err.get("code", "unknown")),
                        str(err.get("message", "unknown error")),
                    )
                )
            return
        if mtype == "event":
            if isinstance(mid, int):
                handler = self._subscriptions.get(mid)
                if handler is not None:
                    await self._invoke_handler(handler, msg.get("event", {}))
            return
        if mtype == "pong":
            if isinstance(mid, int):
                pong_fut = self._pong_waiters.get(mid)
                if pong_fut is not None and not pong_fut.done():
                    pong_fut.set_result(msg)
            return
        _LOGGER.debug("Unhandled WS message: %s", mtype)

    async def _invoke_handler(self, handler: EventHandler, event: dict[str, Any]) -> None:
        """Call an event handler, awaiting it if it returns a coroutine."""
        try:
            result = handler(event)
            if asyncio.iscoroutine(result):
                await result
        except Exception:  # pragma: no cover - defensive
            _LOGGER.exception("Event handler raised")

    async def _reconnect_loop(self) -> None:
        """Re-establish the connection with exponential back-off.

        Notes
        -----
        Stale subscription ids from the dropped connection are dropped
        before resubscribing so ``_subscriptions`` only ever contains
        ids that are valid on the live session.
        """
        delay = 1.0
        attempt = 0
        try:
            while not self._closing:
                attempt += 1
                try:
                    _LOGGER.info("Reconnecting to %s (attempt %d)", self._url, attempt)
                    await self._do_connect()
                except asyncio.CancelledError:
                    raise
                except Exception as err:  # noqa: BLE001
                    _LOGGER.warning("Reconnect attempt %d failed: %s", attempt, err)
                    await asyncio.sleep(delay + random.uniform(0, 0.5))
                    delay = min(delay * 2, 60.0)
                    continue

                self._reader_task = asyncio.create_task(self._reader_loop(), name="ha-ws-reader")
                if self._ping_interval > 0:
                    self._keepalive_task = asyncio.create_task(
                        self._keepalive_loop(), name="ha-ws-keepalive"
                    )
                # Drop stale subscription ids tied to the previous
                # connection before we resubscribe; ``subscribe_events``
                # will register fresh ids on the live session.
                stale_ids = {sid for sid, _ in self._event_subs.values()}
                for sid in stale_ids:
                    self._subscriptions.pop(sid, None)
                for event_type, (_old_id, handler) in list(self._event_subs.items()):
                    try:
                        await self.subscribe_events(handler, event_type)
                    except Exception as err:  # noqa: BLE001
                        _LOGGER.warning("Failed to resubscribe to %s: %s", event_type, err)
                await self._notify_reconnect()
                return
        finally:
            self._reconnect_task = None

    async def ping(self, *, timeout: float | None = None) -> None:
        """Send a ``ping`` frame and wait for the matching ``pong``.

        Parameters
        ----------
        timeout : float or None, optional
            Seconds to wait for the pong. ``None`` uses the adapter's
            configured ``request_timeout``.

        Raises
        ------
        ConnectionClosedError
            If the WebSocket is not currently connected.
        TimeoutError
            If the pong does not arrive within *timeout*.
        """
        if not self.connected:
            raise ConnectionClosedError("WebSocket is not connected")
        assert self._ws is not None
        cmd_id = self._next_id()
        fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
        self._pong_waiters[cmd_id] = fut
        try:
            await self._ws.send_json({"id": cmd_id, "type": "ping"})
            await asyncio.wait_for(
                fut, timeout=timeout if timeout is not None else self._request_timeout
            )
        except TimeoutError as err:
            raise HATimeoutError("Ping timed out") from err
        finally:
            self._pong_waiters.pop(cmd_id, None)

    async def _keepalive_loop(self) -> None:
        """Periodically ping the server and force a reconnect on timeout."""
        try:
            while self.connected and not self._closing:
                await asyncio.sleep(self._ping_interval)
                if not self.connected:
                    return
                try:
                    await self.ping(timeout=self._ping_interval)
                except HATimeoutError:
                    _LOGGER.warning("Ping timed out – forcing reconnect")
                    if self._ws is not None and not self._ws.closed:
                        await self._ws.close()
                    return
                except Exception as err:  # noqa: BLE001
                    _LOGGER.debug("Keepalive error: %s", err)
                    return
        except asyncio.CancelledError:  # pragma: no cover
            pass

connected property

connected: bool

Return True while the underlying socket is open.

connect async

connect() -> None

Establish the WebSocket connection and authenticate.

Source code in src/haclient/infra/ws_aiohttp.py
 97
 98
 99
100
101
102
103
104
105
106
107
async def connect(self) -> None:
    """Establish the WebSocket connection and authenticate."""
    if self.connected:
        return
    self._closing = False
    await self._do_connect()
    self._reader_task = asyncio.create_task(self._reader_loop(), name="ha-ws-reader")
    if self._ping_interval > 0:
        self._keepalive_task = asyncio.create_task(
            self._keepalive_loop(), name="ha-ws-keepalive"
        )

close async

close() -> None

Close the WebSocket and stop background tasks.

Notes

Side-effects, in order:

  1. Marks the adapter as closing so the reader/keepalive loops exit instead of triggering a reconnect.
  2. Cancels and awaits the keepalive task.
  3. Closes the underlying socket and waits up to 5 seconds for the reader task to drain.
  4. Cancels and awaits any in-flight reconnect task so a background reconnect cannot survive close().
  5. Fails any pending command/pong futures with ConnectionClosedError.
  6. Closes the owned aiohttp session, if any.
Source code in src/haclient/infra/ws_aiohttp.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
async def close(self) -> None:
    """Close the WebSocket and stop background tasks.

    Notes
    -----
    Side-effects, in order:

    1. Marks the adapter as closing so the reader/keepalive loops
       exit instead of triggering a reconnect.
    2. Cancels and awaits the keepalive task.
    3. Closes the underlying socket and waits up to 5 seconds for
       the reader task to drain.
    4. Cancels and awaits any in-flight reconnect task so a
       background reconnect cannot survive ``close()``.
    5. Fails any pending command/pong futures with
       `ConnectionClosedError`.
    6. Closes the owned aiohttp session, if any.
    """
    self._closing = True
    self._connected.clear()
    if self._keepalive_task is not None:
        self._keepalive_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._keepalive_task
        self._keepalive_task = None
    if self._ws is not None and not self._ws.closed:
        await self._ws.close()
    if self._reader_task is not None:
        try:
            await asyncio.wait_for(self._reader_task, timeout=5.0)
        except (TimeoutError, asyncio.CancelledError, Exception):  # noqa: BLE001
            self._reader_task.cancel()
        self._reader_task = None
    if self._reconnect_task is not None:
        self._reconnect_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._reconnect_task
        self._reconnect_task = None
    for fut in self._pending.values():
        if not fut.done():
            fut.set_exception(ConnectionClosedError("WebSocket closed"))
    self._pending.clear()
    for fut in self._pong_waiters.values():
        if not fut.done():
            fut.set_exception(ConnectionClosedError("WebSocket closed"))
    self._pong_waiters.clear()
    if self._owns_session and self._session is not None and not self._session.closed:
        await self._session.close()

on_disconnect

on_disconnect(handler: DisconnectListener) -> DisconnectListener

Register a disconnect listener.

Parameters:

Name Type Description Default
handler DisconnectListener

Sync or async zero-argument callable invoked when the WebSocket connection drops.

required

Returns:

Type Description
DisconnectListener

The same handler, returned so the method can be used as a decorator.

Source code in src/haclient/infra/ws_aiohttp.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def on_disconnect(self, handler: DisconnectListener) -> DisconnectListener:
    """Register a disconnect listener.

    Parameters
    ----------
    handler : DisconnectListener
        Sync or async zero-argument callable invoked when the
        WebSocket connection drops.

    Returns
    -------
    DisconnectListener
        The same *handler*, returned so the method can be used as a
        decorator.
    """
    self._disconnect_listeners.append(handler)
    return handler

on_reconnect

on_reconnect(handler: ReconnectListener) -> ReconnectListener

Register a reconnect listener.

Parameters:

Name Type Description Default
handler ReconnectListener

Sync or async zero-argument callable invoked after the WebSocket reconnects and prior subscriptions have been re-established.

required

Returns:

Type Description
ReconnectListener

The same handler, returned so the method can be used as a decorator.

Source code in src/haclient/infra/ws_aiohttp.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def on_reconnect(self, handler: ReconnectListener) -> ReconnectListener:
    """Register a reconnect listener.

    Parameters
    ----------
    handler : ReconnectListener
        Sync or async zero-argument callable invoked after the
        WebSocket reconnects and prior subscriptions have been
        re-established.

    Returns
    -------
    ReconnectListener
        The same *handler*, returned so the method can be used as a
        decorator.
    """
    self._reconnect_listeners.append(handler)
    return handler

send_command async

send_command(payload: dict[str, Any], *, timeout: float | None = None) -> Any

Send a command and await its result frame.

An id field is injected automatically; callers must not set it themselves.

Parameters:

Name Type Description Default
payload dict

Command body without an id field.

required
timeout float or None

Seconds to wait for the matching result frame. None uses the adapter's configured request_timeout.

None

Returns:

Type Description
Any

The result field of the response frame.

Raises:

Type Description
ConnectionClosedError

If the WebSocket is not currently connected.

TimeoutError

If no matching result arrives within timeout.

CommandError

If Home Assistant responds with success=False.

Source code in src/haclient/infra/ws_aiohttp.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
async def send_command(
    self,
    payload: dict[str, Any],
    *,
    timeout: float | None = None,
) -> Any:
    """Send a command and await its ``result`` frame.

    An ``id`` field is injected automatically; callers must not set
    it themselves.

    Parameters
    ----------
    payload : dict
        Command body without an ``id`` field.
    timeout : float or None, optional
        Seconds to wait for the matching ``result`` frame. ``None``
        uses the adapter's configured ``request_timeout``.

    Returns
    -------
    Any
        The ``result`` field of the response frame.

    Raises
    ------
    ConnectionClosedError
        If the WebSocket is not currently connected.
    TimeoutError
        If no matching ``result`` arrives within *timeout*.
    CommandError
        If Home Assistant responds with ``success=False``.
    """
    if not self.connected:
        raise ConnectionClosedError("WebSocket is not connected")
    assert self._ws is not None

    cmd_id = self._next_id()
    msg = {"id": cmd_id, **payload}
    fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
    self._pending[cmd_id] = fut
    try:
        await self._ws.send_json(msg)
        result = await asyncio.wait_for(
            fut, timeout=timeout if timeout is not None else self._request_timeout
        )
    except TimeoutError as err:
        self._pending.pop(cmd_id, None)
        raise HATimeoutError(f"Timed out waiting for response to command id={cmd_id}") from err
    finally:
        self._pending.pop(cmd_id, None)
    return result

subscribe_events async

subscribe_events(handler: EventHandler, event_type: str | None = None) -> int

Subscribe to Home Assistant events.

Subscriptions are remembered so they can be re-established automatically on reconnect.

Parameters:

Name Type Description Default
handler callable

Sync or async callable invoked with each matching event.

required
event_type str or None

Restrict the subscription to a single event type. None subscribes to all events.

None

Returns:

Type Description
int

The subscription id, suitable for passing to unsubscribe.

Source code in src/haclient/infra/ws_aiohttp.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
async def subscribe_events(
    self,
    handler: EventHandler,
    event_type: str | None = None,
) -> int:
    """Subscribe to Home Assistant events.

    Subscriptions are remembered so they can be re-established
    automatically on reconnect.

    Parameters
    ----------
    handler : callable
        Sync or async callable invoked with each matching event.
    event_type : str or None, optional
        Restrict the subscription to a single event type. ``None``
        subscribes to all events.

    Returns
    -------
    int
        The subscription id, suitable for passing to `unsubscribe`.
    """
    payload: dict[str, Any] = {"type": "subscribe_events"}
    if event_type is not None:
        payload["event_type"] = event_type
    cmd_id = self._next_id()
    self._subscriptions[cmd_id] = handler
    if event_type is not None:
        self._event_subs[event_type] = (cmd_id, handler)
    msg = {"id": cmd_id, **payload}
    assert self._ws is not None
    fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
    self._pending[cmd_id] = fut
    try:
        await self._ws.send_json(msg)
        await asyncio.wait_for(fut, timeout=self._request_timeout)
    except Exception:
        self._subscriptions.pop(cmd_id, None)
        if event_type is not None:
            self._event_subs.pop(event_type, None)
        self._pending.pop(cmd_id, None)
        raise
    return cmd_id

unsubscribe async

unsubscribe(subscription_id: int) -> None

Cancel a previously registered subscription.

Parameters:

Name Type Description Default
subscription_id int

The id returned by subscribe_events. Unknown ids are silently ignored on the local side; HA may still return an error.

required
Source code in src/haclient/infra/ws_aiohttp.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
async def unsubscribe(self, subscription_id: int) -> None:
    """Cancel a previously registered subscription.

    Parameters
    ----------
    subscription_id : int
        The id returned by `subscribe_events`. Unknown ids are
        silently ignored on the local side; HA may still return an
        error.
    """
    await self.send_command({"type": "unsubscribe_events", "subscription": subscription_id})
    self._subscriptions.pop(subscription_id, None)
    for k, (sid, _handler) in list(self._event_subs.items()):
        if sid == subscription_id:
            self._event_subs.pop(k, None)

ping async

ping(*, timeout: float | None = None) -> None

Send a ping frame and wait for the matching pong.

Parameters:

Name Type Description Default
timeout float or None

Seconds to wait for the pong. None uses the adapter's configured request_timeout.

None

Raises:

Type Description
ConnectionClosedError

If the WebSocket is not currently connected.

TimeoutError

If the pong does not arrive within timeout.

Source code in src/haclient/infra/ws_aiohttp.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
async def ping(self, *, timeout: float | None = None) -> None:
    """Send a ``ping`` frame and wait for the matching ``pong``.

    Parameters
    ----------
    timeout : float or None, optional
        Seconds to wait for the pong. ``None`` uses the adapter's
        configured ``request_timeout``.

    Raises
    ------
    ConnectionClosedError
        If the WebSocket is not currently connected.
    TimeoutError
        If the pong does not arrive within *timeout*.
    """
    if not self.connected:
        raise ConnectionClosedError("WebSocket is not connected")
    assert self._ws is not None
    cmd_id = self._next_id()
    fut: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
    self._pong_waiters[cmd_id] = fut
    try:
        await self._ws.send_json({"id": cmd_id, "type": "ping"})
        await asyncio.wait_for(
            fut, timeout=timeout if timeout is not None else self._request_timeout
        )
    except TimeoutError as err:
        raise HATimeoutError("Ping timed out") from err
    finally:
        self._pong_waiters.pop(cmd_id, None)