diff --git a/homeassistant/components/unifiprotect/__init__.py b/homeassistant/components/unifiprotect/__init__.py index 394a7f43329760f8f052ba9061bf54d32343e1f3..ed409a6eea045b80f975de21fa9db97c5447bcc9 100644 --- a/homeassistant/components/unifiprotect/__init__.py +++ b/homeassistant/components/unifiprotect/__init__.py @@ -45,7 +45,7 @@ from .utils import ( async_create_api_client, async_get_devices, ) -from .views import ThumbnailProxyView, VideoProxyView +from .views import ThumbnailProxyView, VideoEventProxyView, VideoProxyView _LOGGER = logging.getLogger(__name__) @@ -174,6 +174,7 @@ async def _async_setup_entry( await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) hass.http.register_view(ThumbnailProxyView(hass)) hass.http.register_view(VideoProxyView(hass)) + hass.http.register_view(VideoEventProxyView(hass)) async def _async_options_updated(hass: HomeAssistant, entry: UFPConfigEntry) -> None: diff --git a/homeassistant/components/unifiprotect/views.py b/homeassistant/components/unifiprotect/views.py index 00128492c677b4475dbe8804f70199376e4b0da9..9bf6ed024f5d71248c10e2134cc3df49e17ee735 100644 --- a/homeassistant/components/unifiprotect/views.py +++ b/homeassistant/components/unifiprotect/views.py @@ -5,7 +5,7 @@ from __future__ import annotations from datetime import datetime from http import HTTPStatus import logging -from typing import Any +from typing import TYPE_CHECKING, Any from urllib.parse import urlencode from aiohttp import web @@ -30,7 +30,9 @@ def async_generate_thumbnail_url( ) -> str: """Generate URL for event thumbnail.""" - url_format = ThumbnailProxyView.url or "{nvr_id}/{event_id}" + url_format = ThumbnailProxyView.url + if TYPE_CHECKING: + assert url_format is not None url = url_format.format(nvr_id=nvr_id, event_id=event_id) params = {} @@ -50,7 +52,9 @@ def async_generate_event_video_url(event: Event) -> str: if event.start is None or event.end is None: raise ValueError("Event is ongoing") - url_format = VideoProxyView.url or "{nvr_id}/{camera_id}/{start}/{end}" + url_format = VideoProxyView.url + if TYPE_CHECKING: + assert url_format is not None return url_format.format( nvr_id=event.api.bootstrap.nvr.id, camera_id=event.camera_id, @@ -59,6 +63,19 @@ def async_generate_event_video_url(event: Event) -> str: ) +@callback +def async_generate_proxy_event_video_url( + nvr_id: str, + event_id: str, +) -> str: + """Generate proxy URL for event video.""" + + url_format = VideoEventProxyView.url + if TYPE_CHECKING: + assert url_format is not None + return url_format.format(nvr_id=nvr_id, event_id=event_id) + + @callback def _client_error(message: Any, code: HTTPStatus) -> web.Response: _LOGGER.warning("Client error (%s): %s", code.value, message) @@ -107,6 +124,27 @@ class ProtectProxyView(HomeAssistantView): return data return _404("Invalid NVR ID") + @callback + def _async_get_camera(self, data: ProtectData, camera_id: str) -> Camera | None: + if (camera := data.api.bootstrap.cameras.get(camera_id)) is not None: + return camera + + entity_registry = er.async_get(self.hass) + device_registry = dr.async_get(self.hass) + + if (entity := entity_registry.async_get(camera_id)) is None or ( + device := device_registry.async_get(entity.device_id or "") + ) is None: + return None + + macs = [c[1] for c in device.connections if c[0] == dr.CONNECTION_NETWORK_MAC] + for mac in macs: + if (ufp_device := data.api.bootstrap.get_device_from_mac(mac)) is not None: + if isinstance(ufp_device, Camera): + camera = ufp_device + break + return camera + class ThumbnailProxyView(ProtectProxyView): """View to proxy event thumbnails from UniFi Protect.""" @@ -156,27 +194,6 @@ class VideoProxyView(ProtectProxyView): url = "/api/unifiprotect/video/{nvr_id}/{camera_id}/{start}/{end}" name = "api:unifiprotect_thumbnail" - @callback - def _async_get_camera(self, data: ProtectData, camera_id: str) -> Camera | None: - if (camera := data.api.bootstrap.cameras.get(camera_id)) is not None: - return camera - - entity_registry = er.async_get(self.hass) - device_registry = dr.async_get(self.hass) - - if (entity := entity_registry.async_get(camera_id)) is None or ( - device := device_registry.async_get(entity.device_id or "") - ) is None: - return None - - macs = [c[1] for c in device.connections if c[0] == dr.CONNECTION_NETWORK_MAC] - for mac in macs: - if (ufp_device := data.api.bootstrap.get_device_from_mac(mac)) is not None: - if isinstance(ufp_device, Camera): - camera = ufp_device - break - return camera - async def get( self, request: web.Request, nvr_id: str, camera_id: str, start: str, end: str ) -> web.StreamResponse: @@ -226,3 +243,56 @@ class VideoProxyView(ProtectProxyView): if response.prepared: await response.write_eof() return response + + +class VideoEventProxyView(ProtectProxyView): + """View to proxy video clips for events from UniFi Protect.""" + + url = "/api/unifiprotect/video/{nvr_id}/{event_id}" + name = "api:unifiprotect_videoEventView" + + async def get( + self, request: web.Request, nvr_id: str, event_id: str + ) -> web.StreamResponse: + """Get Camera Video clip for an event.""" + + data = self._get_data_or_404(nvr_id) + if isinstance(data, web.Response): + return data + + try: + event = await data.api.get_event(event_id) + except ClientError: + return _404(f"Invalid event ID: {event_id}") + if event.start is None or event.end is None: + return _400("Event is still ongoing") + camera = self._async_get_camera(data, str(event.camera_id)) + if camera is None: + return _404(f"Invalid camera ID: {event.camera_id}") + if not camera.can_read_media(data.api.bootstrap.auth_user): + return _403(f"User cannot read media from camera: {camera.id}") + + response = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "video/mp4", + }, + ) + + async def iterator(total: int, chunk: bytes | None) -> None: + if not response.prepared: + response.content_length = total + await response.prepare(request) + + if chunk is not None: + await response.write(chunk) + + try: + await camera.get_video(event.start, event.end, iterator_callback=iterator) + except ClientError as err: + return _404(err) + + if response.prepared: + await response.write_eof() + return response diff --git a/tests/components/unifiprotect/test_views.py b/tests/components/unifiprotect/test_views.py index fed0a98552d7e24f6da7eae4b0ecf255fdd39730..0f1b779168045a1526ce0a0b7961824e0b5c34cf 100644 --- a/tests/components/unifiprotect/test_views.py +++ b/tests/components/unifiprotect/test_views.py @@ -11,6 +11,7 @@ from uiprotect.exceptions import ClientError from homeassistant.components.unifiprotect.views import ( async_generate_event_video_url, + async_generate_proxy_event_video_url, async_generate_thumbnail_url, ) from homeassistant.core import HomeAssistant @@ -520,3 +521,219 @@ async def test_video_entity_id( assert response.status == 200 ufp.api.request.assert_called_once() + + +async def test_video_event_bad_nvr_id( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + camera: Camera, + ufp: MockUFPFixture, +) -> None: + """Test video proxy URL with bad NVR id.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + url = async_generate_proxy_event_video_url("bad_id", "test_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.request.assert_not_called() + + +async def test_video_event_bad_event( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test generating event with bad event ID.""" + + ufp.api.get_event = AsyncMock(side_effect=ClientError()) + + await init_entry(hass, ufp, [camera]) + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "bad_event_id") + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + assert response.status == 404 + ufp.api.request.assert_not_called() + + +async def test_video_event_bad_camera( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test generating event with bad camera ID.""" + + ufp.api.get_event = AsyncMock(side_effect=ClientError()) + + await init_entry(hass, ufp, [camera]) + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "bad_event_id") + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + assert response.status == 404 + ufp.api.request.assert_not_called() + + +async def test_video_event_bad_camera_perms( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with bad camera perms.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + model=ModelType.EVENT, + api=ufp.api, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + camera_id="bad_id", + camera=camera, + ) + + ufp.api.get_event = AsyncMock(return_value=event) + + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "test_id") + + ufp.api.bootstrap.auth_user.all_permissions = [] + ufp.api.bootstrap.auth_user._perm_cache = {} + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.request.assert_not_called() + + +async def test_video_event_ongoing( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with ongoing event.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + model=ModelType.EVENT, + api=ufp.api, + start=event_start, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + camera_id=camera.id, + camera=camera, + ) + + ufp.api.get_event = AsyncMock(return_value=event) + + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "test_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 400 + ufp.api.request.assert_not_called() + + +async def test_event_video_no_data( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test invalid no event video returned.""" + + await init_entry(hass, ufp, [camera]) + event_start = fixed_now - timedelta(seconds=30) + event = Event( + model=ModelType.EVENT, + api=ufp.api, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + camera_id=camera.id, + camera=camera, + ) + + ufp.api.request = AsyncMock(side_effect=ClientError) + ufp.api.get_event = AsyncMock(return_value=event) + + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "test_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + + +async def test_event_video( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test event video URL with no video.""" + + content = Mock() + content.__anext__ = AsyncMock(side_effect=[b"test", b"test", StopAsyncIteration()]) + content.__aiter__ = Mock(return_value=content) + + mock_response = Mock() + mock_response.content_length = 8 + mock_response.content.iter_chunked = Mock(return_value=content) + + ufp.api.request = AsyncMock(return_value=mock_response) + await init_entry(hass, ufp, [camera]) + event_start = fixed_now - timedelta(seconds=30) + event = Event( + model=ModelType.EVENT, + api=ufp.api, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + camera_id=camera.id, + camera=camera, + ) + + ufp.api.get_event = AsyncMock(return_value=event) + + url = async_generate_proxy_event_video_url(ufp.api.bootstrap.nvr.id, "test_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + assert await response.content.read() == b"testtest" + + assert response.status == 200 + ufp.api.request.assert_called_once()