Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions ring_doorbell/doorbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ async def async_set_motion_detection(self, state: bool) -> None: # noqa: FBT001
async def generate_webrtc_stream(
self, sdp_offer: str, *, keep_alive_timeout: int | None = 30
) -> str:
"""Generate the rtc stream."""
"""Generate a webrtc stream.

This method will wait until an sdp answer is received beofre returning.
"""
if session_id := RingWebRtcStream.get_sdp_session_id(sdp_offer):

async def _close_callback() -> None:
Expand Down Expand Up @@ -484,11 +487,34 @@ async def generate_async_webrtc_stream(
*,
keep_alive_timeout: int | None = 60 * 5,
) -> None:
"""Generate the rtc stream. Will callback with answers and ICE candidates."""
"""Generate a webrtc stream.

Will callback with answers and ICE candidates.
"""
return await self.handle_webrtc_offer(
sdp_offer,
session_id,
on_message_callback,
keep_alive_timeout=keep_alive_timeout,
)

async def handle_webrtc_offer(
self,
sdp_offer: str,
session_id: str,
on_message_callback: RingWebRtcMessageCallback,
*,
keep_alive_timeout: int | None = 60 * 5,
) -> None:
"""Handle a webrtc offer."""

async def _close_callback() -> None:
await self.close_webrtc_stream(session_id)

if stream := self._webrtc_streams.get(session_id):
await stream.on_offer(sdp_offer)
return

stream = RingWebRtcStream(
self._ring,
self.device_api_id,
Expand Down
52 changes: 39 additions & 13 deletions ring_doorbell/webrtcstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(
self.device_api_id = device_api_id
self.sdp: str | None = None
self.websocket: ClientConnection | None = None
self.is_alive = True
self.is_alive = False
self.ping_task: asyncio.Task | None = None
self.read_task: asyncio.Task | None = None
self._close_task: asyncio.Task | None = None
Expand Down Expand Up @@ -157,7 +157,21 @@ async def _generate(self, sdp_offer: str) -> None:
ssl=self.ssl_context,
)

_LOGGER.debug("Connected to RTC streaming websocket")
self.dialog_id = str(uuid.uuid4())

_LOGGER.debug("Starting reader task")
self.read_task = asyncio.create_task(self.reader())

await self.on_offer(sdp_offer)

except Exception as ex:
exmsg = "Error generating RTC stream"
raise RingError(exmsg, ex) from ex

async def on_offer(self, sdp_offer: str) -> None:
"""Handle webrtc offer."""
if not self.is_alive:
offer_msg = {
"method": "live_view",
"dialog_id": self.dialog_id,
Expand All @@ -168,20 +182,30 @@ async def _generate(self, sdp_offer: str) -> None:
"type": "offer",
},
}
_LOGGER.debug(
"Connected to RTC streaming websocket, sending live_view offer msg: %s",
offer_msg,
)
_LOGGER.debug("Starting reader task")
self.read_task = asyncio.create_task(self.reader())
else:
offer_msg = {
"method": "sdp",
"body": {
"doorbot_id": self.device_api_id,
"sdp": sdp_offer,
"type": "offer",
},
}

if TYPE_CHECKING:
assert isinstance(offer_msg["body"], dict)
assert isinstance(self.websocket, ClientConnection)

await self.websocket.send(json_dumps(offer_msg))
if self.session_id:
offer_msg["body"]["session_id"] = self.session_id

self._offered_event.set()
self._offered_event.clear()

except Exception as ex:
exmsg = "Error generating RTC stream"
raise RingError(exmsg, ex) from ex
_LOGGER.debug("Sending live_view offer msg: %s", offer_msg)

await self.websocket.send(json_dumps(offer_msg))

self._offered_event.set()

async def _activate(self) -> None:
if TYPE_CHECKING:
Expand All @@ -192,6 +216,7 @@ async def _activate(self) -> None:

self._last_keep_alive = time.time()
self.ping_task = asyncio.create_task(self.pinger())
self.is_alive = True

async def on_ice_candidate(self, candidate: str, m_line_index: int) -> None:
"""Send an ICE candidate."""
Expand Down Expand Up @@ -285,7 +310,8 @@ async def handle_answer_message(self, message: dict) -> None:
answer_message = RingWebRtcMessage(answer=sdp)
self._on_message_callback(answer_message)

await self._activate()
if not self.is_alive:
await self._activate()

async def handle_close_message(self, message: dict) -> None:
"""Handle an sdp answer message."""
Expand Down
Loading