diff --git a/homeassistant/components/unifiprotect/__init__.py b/homeassistant/components/unifiprotect/__init__.py index 40214b607668312e4320690436beef684c3370e6..5b4059ee1d427146967cc3af4343191c5d0e84af 100644 --- a/homeassistant/components/unifiprotect/__init__.py +++ b/homeassistant/components/unifiprotect/__init__.py @@ -40,6 +40,7 @@ from .discovery import async_start_discovery from .migrate import async_migrate_data from .services import async_cleanup_services, async_setup_services from .utils import _async_unifi_mac_from_hass, async_get_devices +from .views import ThumbnailProxyView, VideoProxyView _LOGGER = logging.getLogger(__name__) @@ -92,6 +93,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: hass.data.setdefault(DOMAIN, {})[entry.entry_id] = data_service hass.config_entries.async_setup_platforms(entry, PLATFORMS) async_setup_services(hass) + hass.http.register_view(ThumbnailProxyView(hass)) + hass.http.register_view(VideoProxyView(hass)) entry.async_on_unload(entry.add_update_listener(_async_options_updated)) entry.async_on_unload( diff --git a/homeassistant/components/unifiprotect/views.py b/homeassistant/components/unifiprotect/views.py new file mode 100644 index 0000000000000000000000000000000000000000..ea523d36dd2a95fe91c088243b2f1a9cd07bf5b7 --- /dev/null +++ b/homeassistant/components/unifiprotect/views.py @@ -0,0 +1,211 @@ +"""UniFi Protect Integration views.""" +from __future__ import annotations + +from datetime import datetime +from http import HTTPStatus +import logging +from typing import Any +from urllib.parse import urlencode + +from aiohttp import web +from pyunifiprotect.data import Event +from pyunifiprotect.exceptions import ClientError + +from homeassistant.components.http import HomeAssistantView +from homeassistant.core import HomeAssistant, callback + +from .const import DOMAIN +from .data import ProtectData + +_LOGGER = logging.getLogger(__name__) + + +@callback +def async_generate_thumbnail_url( + event_id: str, + nvr_id: str, + width: int | None = None, + height: int | None = None, +) -> str: + """Generate URL for event thumbnail.""" + + url_format = ThumbnailProxyView.url or "{nvr_id}/{event_id}" + url = url_format.format(nvr_id=nvr_id, event_id=event_id) + + params = {} + if width is not None: + params["width"] = str(width) + if height is not None: + params["height"] = str(height) + + return f"{url}?{urlencode(params)}" + + +@callback +def async_generate_event_video_url(event: Event) -> str: + """Generate URL for event video.""" + + _validate_event(event) + if event.start is None or event.end is None: + raise ValueError("Event is ongoing") + + url_format = VideoProxyView.url or "{nvr_id}/{camera_id}/{start}/{end}" + url = url_format.format( + nvr_id=event.api.bootstrap.nvr.id, + camera_id=event.camera_id, + start=event.start.isoformat(), + end=event.end.isoformat(), + ) + + return url + + +@callback +def _client_error(message: Any, code: HTTPStatus) -> web.Response: + _LOGGER.warning("Client error (%s): %s", code.value, message) + if code == HTTPStatus.BAD_REQUEST: + return web.Response(body=message, status=code) + return web.Response(status=code) + + +@callback +def _400(message: Any) -> web.Response: + return _client_error(message, HTTPStatus.BAD_REQUEST) + + +@callback +def _403(message: Any) -> web.Response: + return _client_error(message, HTTPStatus.FORBIDDEN) + + +@callback +def _404(message: Any) -> web.Response: + return _client_error(message, HTTPStatus.NOT_FOUND) + + +@callback +def _validate_event(event: Event) -> None: + if event.camera is None: + raise ValueError("Event does not have a camera") + if not event.camera.can_read_media(event.api.bootstrap.auth_user): + raise PermissionError(f"User cannot read media from camera: {event.camera.id}") + + +class ProtectProxyView(HomeAssistantView): + """Base class to proxy request to UniFi Protect console.""" + + requires_auth = True + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize a thumbnail proxy view.""" + self.hass = hass + self.data = hass.data[DOMAIN] + + def _get_data_or_404(self, nvr_id: str) -> ProtectData | web.Response: + all_data: list[ProtectData] = [] + + for data in self.data.values(): + if isinstance(data, ProtectData): + if data.api.bootstrap.nvr.id == nvr_id: + return data + all_data.append(data) + return _404("Invalid NVR ID") + + +class ThumbnailProxyView(ProtectProxyView): + """View to proxy event thumbnails from UniFi Protect.""" + + url = "/api/unifiprotect/thumbnail/{nvr_id}/{event_id}" + name = "api:unifiprotect_thumbnail" + + async def get( + self, request: web.Request, nvr_id: str, event_id: str + ) -> web.Response: + """Get Event Thumbnail.""" + + data = self._get_data_or_404(nvr_id) + if isinstance(data, web.Response): + return data + + width: int | str | None = request.query.get("width") + height: int | str | None = request.query.get("height") + + if width is not None: + try: + width = int(width) + except ValueError: + return _400("Invalid width param") + if height is not None: + try: + height = int(height) + except ValueError: + return _400("Invalid height param") + + try: + thumbnail = await data.api.get_event_thumbnail( + event_id, width=width, height=height + ) + except ClientError as err: + return _404(err) + + if thumbnail is None: + return _404("Event thumbnail not found") + + return web.Response(body=thumbnail, content_type="image/jpeg") + + +class VideoProxyView(ProtectProxyView): + """View to proxy video clips from UniFi Protect.""" + + url = "/api/unifiprotect/video/{nvr_id}/{camera_id}/{start}/{end}" + name = "api:unifiprotect_thumbnail" + + async def get( + self, request: web.Request, nvr_id: str, camera_id: str, start: str, end: str + ) -> web.StreamResponse: + """Get Camera Video clip.""" + + data = self._get_data_or_404(nvr_id) + if isinstance(data, web.Response): + return data + + camera = data.api.bootstrap.cameras.get(camera_id) + if camera is None: + return _404(f"Invalid camera ID: {camera_id}") + if not camera.can_read_media(data.api.bootstrap.auth_user): + return _403(f"User cannot read media from camera: {camera.id}") + + try: + start_dt = datetime.fromisoformat(start) + except ValueError: + return _400("Invalid start") + + try: + end_dt = datetime.fromisoformat(end) + except ValueError: + return _400("Invalid end") + + response = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "video/mp4", + }, + ) + + async def iterator(total: int, chunk: bytes | None) -> None: + if not response.prepared: + response.content_length = total + await response.prepare(request) + + if chunk is not None: + await response.write(chunk) + + try: + await camera.get_video(start_dt, end_dt, iterator_callback=iterator) + except ClientError as err: + return _404(err) + + if response.prepared: + await response.write_eof() + return response diff --git a/tests/components/unifiprotect/conftest.py b/tests/components/unifiprotect/conftest.py index 51cef190e2f7c290e96b4bdf31e7f94b974f773f..2a9edb605e7f344d00ea755c7c1a9ee07007f8ad 100644 --- a/tests/components/unifiprotect/conftest.py +++ b/tests/components/unifiprotect/conftest.py @@ -4,6 +4,7 @@ from __future__ import annotations from collections.abc import Callable from datetime import datetime, timedelta +from functools import partial from ipaddress import IPv4Address import json from typing import Any @@ -102,6 +103,11 @@ def mock_ufp_client(bootstrap: Bootstrap): """Mock ProtectApiClient for testing.""" client = Mock() client.bootstrap = bootstrap + client._bootstrap = bootstrap + client.api_path = "/api" + # functionality from API client tests actually need + client._stream_response = partial(ProtectApiClient._stream_response, client) + client.get_camera_video = partial(ProtectApiClient.get_camera_video, client) nvr = client.bootstrap.nvr nvr._api = client diff --git a/tests/components/unifiprotect/test_views.py b/tests/components/unifiprotect/test_views.py new file mode 100644 index 0000000000000000000000000000000000000000..e64a0a873773dc09c993e60473f38983190b2adc --- /dev/null +++ b/tests/components/unifiprotect/test_views.py @@ -0,0 +1,427 @@ +"""Test UniFi Protect views.""" + +from datetime import datetime, timedelta +from typing import Any, cast +from unittest.mock import AsyncMock, Mock + +from aiohttp import ClientResponse +import pytest +from pyunifiprotect.data import Camera, Event, EventType +from pyunifiprotect.exceptions import ClientError + +from homeassistant.components.unifiprotect.views import ( + async_generate_event_video_url, + async_generate_thumbnail_url, +) +from homeassistant.core import HomeAssistant + +from .utils import MockUFPFixture, init_entry + +from tests.test_util.aiohttp import mock_aiohttp_client + + +async def test_thumbnail_bad_nvr_id( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test invalid NVR ID in URL.""" + + ufp.api.get_event_thumbnail = AsyncMock() + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url("test_id", "bad_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.get_event_thumbnail.assert_not_called + + +@pytest.mark.parametrize("width,height", [("test", None), (None, "test")]) +async def test_thumbnail_bad_params( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + width: Any, + height: Any, +) -> None: + """Test invalid bad query parameters.""" + + ufp.api.get_event_thumbnail = AsyncMock() + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url( + "test_id", ufp.api.bootstrap.nvr.id, width=width, height=height + ) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 400 + ufp.api.get_event_thumbnail.assert_not_called + + +async def test_thumbnail_bad_event( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test invalid with error raised.""" + + ufp.api.get_event_thumbnail = AsyncMock(side_effect=ClientError()) + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None) + + +async def test_thumbnail_no_data( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test invalid no thumbnail returned.""" + + ufp.api.get_event_thumbnail = AsyncMock(return_value=None) + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None) + + +async def test_thumbnail( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test invalid NVR ID in URL.""" + + ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest") + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 200 + assert response.content_type == "image/jpeg" + assert await response.content.read() == b"testtest" + ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None) + + +async def test_video_bad_event( + hass: HomeAssistant, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test generating event with bad camera ID.""" + + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id="test_id", + start=fixed_now - timedelta(seconds=30), + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + with pytest.raises(ValueError): + async_generate_event_video_url(event) + + +async def test_video_bad_event_ongoing( + hass: HomeAssistant, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test generating event with bad camera ID.""" + + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id=camera.id, + start=fixed_now - timedelta(seconds=30), + end=None, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + with pytest.raises(ValueError): + async_generate_event_video_url(event) + + +async def test_video_bad_perms( + hass: HomeAssistant, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test generating event with bad user permissions.""" + + ufp.api.bootstrap.auth_user.all_permissions = [] + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id=camera.id, + start=fixed_now - timedelta(seconds=30), + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + with pytest.raises(PermissionError): + async_generate_event_video_url(event) + + +async def test_video_bad_nvr_id( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with bad NVR id.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id=camera.id, + start=fixed_now - timedelta(seconds=30), + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + url = url.replace(ufp.api.bootstrap.nvr.id, "bad_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.request.assert_not_called + + +async def test_video_bad_camera_id( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with bad camera id.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id=camera.id, + start=fixed_now - timedelta(seconds=30), + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + url = url.replace(camera.id, "bad_id") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.request.assert_not_called + + +async def test_video_bad_camera_perms( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with bad camera perms.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event = Event( + api=ufp.api, + camera_id=camera.id, + start=fixed_now - timedelta(seconds=30), + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + + ufp.api.bootstrap.auth_user.all_permissions = [] + ufp.api.bootstrap.auth_user._perm_cache = {} + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 403 + ufp.api.request.assert_not_called + + +@pytest.mark.parametrize("start,end", [("test", None), (None, "test")]) +async def test_video_bad_params( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, + start: Any, + end: Any, +) -> None: + """Test video URL with bad start/end params.""" + + ufp.api.request = AsyncMock() + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + api=ufp.api, + camera_id=camera.id, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + from_value = event_start if start is not None else fixed_now + to_value = start if start is not None else end + url = url.replace(from_value.isoformat(), to_value) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 400 + ufp.api.request.assert_not_called + + +async def test_video_bad_video( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with no video.""" + + ufp.api.request = AsyncMock(side_effect=ClientError) + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + api=ufp.api, + camera_id=camera.id, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 404 + ufp.api.request.assert_called_once + + +async def test_video( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with no video.""" + + content = Mock() + content.__anext__ = AsyncMock(side_effect=[b"test", b"test", StopAsyncIteration()]) + content.__aiter__ = Mock(return_value=content) + + mock_response = Mock() + mock_response.content_length = 8 + mock_response.content.iter_chunked = Mock(return_value=content) + + ufp.api.request = AsyncMock(return_value=mock_response) + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + api=ufp.api, + camera_id=camera.id, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + assert await response.content.read() == b"testtest" + + assert response.status == 200 + ufp.api.request.assert_called_once