diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 0556bc2c7a9dde5b947749acee148cca00931cdb..fec9731136ff3e15709a5add72d070805eddd786 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -50,7 +50,7 @@ from .const import ( STREAM_RESTART_RESET_TIME, TARGET_SEGMENT_DURATION_NON_LL_HLS, ) -from .core import PROVIDERS, IdleTimer, StreamOutput, StreamSettings +from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings from .hls import HlsStreamOutput, async_setup_hls _LOGGER = logging.getLogger(__name__) @@ -137,6 +137,8 @@ def filter_libav_logging() -> None: # Set log level to error for libav.mp4 logging.getLogger("libav.mp4").setLevel(logging.ERROR) + # Suppress "deprecated pixel format" WARNING + logging.getLogger("libav.swscaler").setLevel(logging.ERROR) async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: @@ -214,6 +216,7 @@ class Stream: self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False + self._keyframe_converter = KeyFrameConverter(hass) self._available: bool = True self._update_callback: Callable[[], None] | None = None self._logger = ( @@ -327,6 +330,7 @@ class Stream: self.source, self.options, stream_state, + self._keyframe_converter, self._thread_quit, ) except StreamWorkerError as err: @@ -419,3 +423,12 @@ class Stream: # Wait for latest segment, then add the lookback await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) + + async def get_image( + self, + width: int | None = None, + height: int | None = None, + ) -> bytes | None: + """Wrap get_image from KeyFrameConverter.""" + + return await self._keyframe_converter.get_image(width=width, height=height) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index b51c953e915956e0f9bc69f3822ebf71c14e3947..08397fb68762b5f15c36751fe5b37c141b69163e 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -19,6 +19,8 @@ from homeassistant.util.decorator import Registry from .const import ATTR_STREAMS, DOMAIN if TYPE_CHECKING: + from av import CodecContext, Packet + from . import Stream PROVIDERS = Registry() @@ -356,3 +358,86 @@ class StreamView(HomeAssistantView): ) -> web.StreamResponse: """Handle the stream request.""" raise NotImplementedError() + + +class KeyFrameConverter: + """ + Generate and hold the keyframe as a jpeg. + + An overview of the thread and state interaction: + the worker thread sets a packet + at any time, main loop can run a get_image call + _generate_image will try to create an image from the packet + Running _generate_image will clear the packet, so there will only + be one attempt per packet + If successful, _image will be updated and returned by get_image + If unsuccessful, get_image will return the previous image + """ + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize.""" + + # Keep import here so that we can import stream integration without installing reqs + # pylint: disable=import-outside-toplevel + from homeassistant.components.camera.img_util import TurboJPEGSingleton + + self.packet: Packet = None + self._hass = hass + self._image: bytes | None = None + self._turbojpeg = TurboJPEGSingleton.instance() + self._lock = asyncio.Lock() + self._codec_context: CodecContext | None = None + + def create_codec_context(self, codec_context: CodecContext) -> None: + """ + Create a codec context to be used for decoding the keyframes. + + This is run by the worker thread and will only be called once per worker. + """ + + # Keep import here so that we can import stream integration without installing reqs + # pylint: disable=import-outside-toplevel + from av import CodecContext + + self._codec_context = CodecContext.create(codec_context.name, "r") + self._codec_context.extradata = codec_context.extradata + self._codec_context.skip_frame = "NONKEY" + self._codec_context.thread_type = "NONE" + + def _generate_image(self, width: int | None, height: int | None) -> None: + """ + Generate the keyframe image. + + This is run in an executor thread, but since it is called within an + the asyncio lock from the main thread, there will only be one entry + at a time per instance. + """ + + if not (self._turbojpeg and self.packet and self._codec_context): + return + packet = self.packet + self.packet = None + # decode packet (flush afterwards) + frames = self._codec_context.decode(packet) + for _i in range(2): + if frames: + break + frames = self._codec_context.decode(None) + if frames: + frame = frames[0] + if width and height: + frame = frame.reformat(width=width, height=height) + bgr_array = frame.to_ndarray(format="bgr24") + self._image = bytes(self._turbojpeg.encode(bgr_array)) + + async def get_image( + self, + width: int | None = None, + height: int | None = None, + ) -> bytes | None: + """Fetch an image from the Stream and return it as a jpeg in bytes.""" + + # Use a lock to ensure only one thread is working on the keyframe at a time + async with self._lock: + await self._hass.async_add_executor_job(self._generate_image, width, height) + return self._image diff --git a/homeassistant/components/stream/manifest.json b/homeassistant/components/stream/manifest.json index d5115754e2eb9fb0701e4c50a7b1c3b9e1a5c316..60d4a6e66ebe484d506c1a2b90bfc9481886f961 100644 --- a/homeassistant/components/stream/manifest.json +++ b/homeassistant/components/stream/manifest.json @@ -2,7 +2,7 @@ "domain": "stream", "name": "Stream", "documentation": "https://www.home-assistant.io/integrations/stream", - "requirements": ["ha-av==8.0.4-rc.1"], + "requirements": ["ha-av==8.0.4-rc.1", "PyTurboJPEG==1.6.3"], "dependencies": ["http"], "codeowners": ["@hunterjm", "@uvjustin", "@allenporter"], "quality_scale": "internal", diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index b1d79e528008fa9970f2c13cd56dddeaad14c557..e633d146444252367cc7a6489ba4cd7d42c68e27 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -14,7 +14,7 @@ import av from homeassistant.core import HomeAssistant -from . import redact_credentials +from . import KeyFrameConverter, redact_credentials from .const import ( ATTR_SETTINGS, AUDIO_CODECS, @@ -439,6 +439,7 @@ def stream_worker( source: str, options: dict[str, str], stream_state: StreamState, + keyframe_converter: KeyFrameConverter, quit_event: Event, ) -> None: """Handle consuming streams.""" @@ -453,6 +454,7 @@ def stream_worker( video_stream = container.streams.video[0] except (KeyError, IndexError) as ex: raise StreamWorkerError("Stream has no video") from ex + keyframe_converter.create_codec_context(codec_context=video_stream.codec_context) try: audio_stream = container.streams.audio[0] except (KeyError, IndexError): @@ -474,7 +476,7 @@ def stream_worker( def is_video(packet: av.Packet) -> Any: """Return true if the packet is for the video stream.""" - return packet.stream == video_stream + return packet.stream.type == "video" # Have to work around two problems with RTSP feeds in ffmpeg # 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018 @@ -535,3 +537,6 @@ def stream_worker( raise StreamWorkerError("Error demuxing stream: %s" % str(ex)) from ex muxer.mux_packet(packet) + + if packet.is_keyframe and is_video(packet): + keyframe_converter.packet = packet diff --git a/requirements_all.txt b/requirements_all.txt index a8696e0c04cd40b2b188a192e22e6be57f02f2e3..8c4e685de62a3fb927b53756be03b7727e5e7950 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -55,6 +55,7 @@ PySocks==1.7.1 PyTransportNSW==0.1.1 # homeassistant.components.camera +# homeassistant.components.stream PyTurboJPEG==1.6.3 # homeassistant.components.vicare diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 9c4303a33432a894c1b4624c987e2cc88864f49f..3a03271aa226a0b492ee77d6a45cd7c2c523db49 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -33,6 +33,7 @@ PyRMVtransport==0.3.3 PyTransportNSW==0.1.1 # homeassistant.components.camera +# homeassistant.components.stream PyTurboJPEG==1.6.3 # homeassistant.components.vicare diff --git a/tests/components/camera/common.py b/tests/components/camera/common.py index 756a553f3c72c42dedcda9f3ee2d1ee411e6032c..bd3841cc4e85c60a39c4218cbfdfb54951ae3b4a 100644 --- a/tests/components/camera/common.py +++ b/tests/components/camera/common.py @@ -29,4 +29,5 @@ def mock_turbo_jpeg( (second_width, second_height, 0, 0), ] mocked_turbo_jpeg.scale_with_quality.return_value = EMPTY_8_6_JPEG + mocked_turbo_jpeg.encode.return_value = EMPTY_8_6_JPEG return mocked_turbo_jpeg diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index f05b2ece8293a625c3c6657bc639e0dc2c849eba..eb50e76a80a621194a4c88da7250262d1179fdbc 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -23,7 +23,7 @@ from unittest.mock import patch import av import pytest -from homeassistant.components.stream import Stream, create_stream +from homeassistant.components.stream import KeyFrameConverter, Stream, create_stream from homeassistant.components.stream.const import ( ATTR_SETTINGS, CONF_LL_HLS, @@ -45,6 +45,7 @@ from homeassistant.components.stream.worker import ( ) from homeassistant.setup import async_setup_component +from tests.components.camera.common import EMPTY_8_6_JPEG, mock_turbo_jpeg from tests.components.stream.common import generate_h264_video, generate_h265_video from tests.components.stream.test_ll_hls import TEST_PART_DURATION @@ -97,6 +98,17 @@ class FakeAvInputStream: self.codec = FakeCodec() + class FakeCodecContext: + name = "h264" + extradata = None + + self.codec_context = FakeCodecContext() + + @property + def type(self): + """Return packet type.""" + return "video" if self.name == VIDEO_STREAM_FORMAT else "audio" + def __str__(self) -> str: """Return a stream name for debugging.""" return f"FakePyAvStream<{self.name}, {self.time_base}>" @@ -195,6 +207,7 @@ class FakePyAvBuffer: class FakeAvOutputStream: def __init__(self, capture_packets): self.capture_packets = capture_packets + self.type = "ignored-type" def close(self): return @@ -258,7 +271,9 @@ class MockPyAv: def run_worker(hass, stream, stream_source): """Run the stream worker under test.""" stream_state = StreamState(hass, stream.outputs) - stream_worker(stream_source, {}, stream_state, threading.Event()) + stream_worker( + stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event() + ) async def async_decode_stream(hass, packets, py_av=None): @@ -854,3 +869,29 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync): await record_worker_sync.join() stream.stop() + + +async def test_get_image(hass, record_worker_sync): + """Test that the has_keyframe metadata matches the media.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + source = generate_h264_video() + + # Since libjpeg-turbo is not installed on the CI runner, we use a mock + with patch( + "homeassistant.components.camera.img_util.TurboJPEGSingleton" + ) as mock_turbo_jpeg_singleton: + mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg() + stream = create_stream(hass, source, {}) + + # use record_worker_sync to grab output segments + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + + assert stream._keyframe_converter._image is None + + await record_worker_sync.join() + + assert await stream.get_image() == EMPTY_8_6_JPEG + + stream.stop()