Skip to content
Snippets Groups Projects
Unverified Commit 2451e557 authored by peteS-UK's avatar peteS-UK Committed by GitHub
Browse files

Add support for Apps and Radios to Squeezebox Media Browser (#135009)

parent 1c83dab0
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@
from __future__ import annotations
import contextlib
from dataclasses import dataclass, field
from typing import Any
from pysqueezebox import Player
......@@ -18,6 +19,8 @@ from homeassistant.components.media_player import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers.network import is_internal_request
from .const import UNPLAYABLE_TYPES
LIBRARY = [
"Favorites",
"Artists",
......@@ -26,9 +29,11 @@ LIBRARY = [
"Playlists",
"Genres",
"New Music",
"Apps",
"Radios",
]
MEDIA_TYPE_TO_SQUEEZEBOX = {
MEDIA_TYPE_TO_SQUEEZEBOX: dict[str | MediaType, str] = {
"Favorites": "favorites",
"Artists": "artists",
"Albums": "albums",
......@@ -41,19 +46,25 @@ MEDIA_TYPE_TO_SQUEEZEBOX = {
MediaType.TRACK: "title",
MediaType.PLAYLIST: "playlist",
MediaType.GENRE: "genre",
"Apps": "apps",
"Radios": "radios",
}
SQUEEZEBOX_ID_BY_TYPE = {
SQUEEZEBOX_ID_BY_TYPE: dict[str | MediaType, str] = {
MediaType.ALBUM: "album_id",
MediaType.ARTIST: "artist_id",
MediaType.TRACK: "track_id",
MediaType.PLAYLIST: "playlist_id",
MediaType.GENRE: "genre_id",
"Favorites": "item_id",
MediaType.APPS: "item_id",
}
CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | None]] = {
"Favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
"Apps": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
"Radios": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
"App": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
"Artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
"Albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
"Tracks": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
......@@ -65,9 +76,14 @@ CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | None]] =
MediaType.TRACK: {"item": MediaClass.TRACK, "children": None},
MediaType.GENRE: {"item": MediaClass.GENRE, "children": MediaClass.ARTIST},
MediaType.PLAYLIST: {"item": MediaClass.PLAYLIST, "children": MediaClass.TRACK},
MediaType.APP: {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
MediaType.APPS: {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
}
CONTENT_TYPE_TO_CHILD_TYPE = {
CONTENT_TYPE_TO_CHILD_TYPE: dict[
str | MediaType,
str | MediaType | None,
] = {
MediaType.ALBUM: MediaType.TRACK,
MediaType.PLAYLIST: MediaType.PLAYLIST,
MediaType.ARTIST: MediaType.ALBUM,
......@@ -78,15 +94,93 @@ CONTENT_TYPE_TO_CHILD_TYPE = {
"Playlists": MediaType.PLAYLIST,
"Genres": MediaType.GENRE,
"Favorites": None, # can only be determined after inspecting the item
"Apps": MediaClass.APP,
"Radios": MediaClass.APP,
"App": None, # can only be determined after inspecting the item
"New Music": MediaType.ALBUM,
MediaType.APPS: MediaType.APP,
MediaType.APP: MediaType.TRACK,
}
@dataclass
class BrowseData:
"""Class for browser to squeezebox mappings and other browse data."""
content_type_to_child_type: dict[
str | MediaType,
str | MediaType | None,
] = field(default_factory=dict)
content_type_media_class: dict[str | MediaType, dict[str, MediaClass | None]] = (
field(default_factory=dict)
)
squeezebox_id_by_type: dict[str | MediaType, str] = field(default_factory=dict)
media_type_to_squeezebox: dict[str | MediaType, str] = field(default_factory=dict)
known_apps_radios: set[str] = field(default_factory=set)
def __post_init__(self) -> None:
"""Initialise the maps."""
self.content_type_media_class.update(CONTENT_TYPE_MEDIA_CLASS)
self.content_type_to_child_type.update(CONTENT_TYPE_TO_CHILD_TYPE)
self.squeezebox_id_by_type.update(SQUEEZEBOX_ID_BY_TYPE)
self.media_type_to_squeezebox.update(MEDIA_TYPE_TO_SQUEEZEBOX)
@dataclass
class BrowseItemResponse:
"""Class for response data for browse item functions."""
child_item_type: str | MediaType
child_media_class: dict[str, MediaClass | None]
can_expand: bool
can_play: bool
def _add_new_command_to_browse_data(
browse_data: BrowseData, cmd: str | MediaType, type: str
) -> None:
"""Add items to maps for new apps or radios."""
browse_data.media_type_to_squeezebox[cmd] = cmd
browse_data.squeezebox_id_by_type[cmd] = type
browse_data.content_type_media_class[cmd] = {
"item": MediaClass.DIRECTORY,
"children": MediaClass.TRACK,
}
browse_data.content_type_to_child_type[cmd] = MediaType.TRACK
def _build_response_apps_radios_category(
browse_data: BrowseData,
cmd: str | MediaType,
) -> BrowseItemResponse:
"""Build item for App or radio category."""
return BrowseItemResponse(
child_item_type=cmd,
child_media_class=browse_data.content_type_media_class[cmd],
can_expand=True,
can_play=False,
)
def _build_response_known_app(
browse_data: BrowseData, search_type: str, item: dict[str, Any]
) -> BrowseItemResponse:
"""Build item for app or radio."""
return BrowseItemResponse(
child_item_type=search_type,
child_media_class=browse_data.content_type_media_class[search_type],
can_play=bool(item["isaudio"] and item.get("url")),
can_expand=item["hasitems"],
)
async def build_item_response(
entity: MediaPlayerEntity,
player: Player,
payload: dict[str, str | None],
browse_limit: int,
browse_data: BrowseData,
) -> BrowseMedia:
"""Create response payload for search described by payload."""
......@@ -97,29 +191,30 @@ async def build_item_response(
assert (
search_type is not None
) # async_browse_media will not call this function if search_type is None
media_class = CONTENT_TYPE_MEDIA_CLASS[search_type]
media_class = browse_data.content_type_media_class[search_type]
children = None
if search_id and search_id != search_type:
browse_id = (SQUEEZEBOX_ID_BY_TYPE[search_type], search_id)
browse_id = (browse_data.squeezebox_id_by_type[search_type], search_id)
else:
browse_id = None
result = await player.async_browse(
MEDIA_TYPE_TO_SQUEEZEBOX[search_type],
browse_data.media_type_to_squeezebox[search_type],
limit=browse_limit,
browse_id=browse_id,
)
if result is not None and result.get("items"):
item_type = CONTENT_TYPE_TO_CHILD_TYPE[search_type]
item_type = browse_data.content_type_to_child_type[search_type]
children = []
list_playable = []
for item in result["items"]:
item_id = str(item["id"])
item_id = str(item.get("id", ""))
item_thumbnail: str | None = None
if item_type:
child_item_type: MediaType | str = item_type
child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type]
......@@ -144,6 +239,47 @@ async def build_item_response(
can_expand = item["hasitems"]
can_play = item["isaudio"] and item.get("url")
if search_type in ["Apps", "Radios"]:
# item["cmd"] contains the name of the command to use with the cli for the app
# add the command to the dictionaries
if item["title"] == "Search" or item.get("type") in UNPLAYABLE_TYPES:
# Skip searches in apps as they'd need UI or if the link isn't to audio
continue
app_cmd = "app-" + item["cmd"]
if app_cmd not in browse_data.known_apps_radios:
browse_data.known_apps_radios.add(app_cmd)
_add_new_command_to_browse_data(browse_data, app_cmd, "item_id")
browse_item_response = _build_response_apps_radios_category(
browse_data, app_cmd
)
# Temporary variables until remainder of browse calls are restructured
child_item_type = browse_item_response.child_item_type
child_media_class = browse_item_response.child_media_class
can_expand = browse_item_response.can_expand
can_play = browse_item_response.can_play
elif search_type in browse_data.known_apps_radios:
if (
item.get("title") in ["Search", None]
or item.get("type") in UNPLAYABLE_TYPES
):
# Skip searches in apps as they'd need UI
continue
browse_item_response = _build_response_known_app(
browse_data, search_type, item
)
# Temporary variables until remainder of browse calls are restructured
child_item_type = browse_item_response.child_item_type
child_media_class = browse_item_response.child_media_class
can_expand = browse_item_response.can_expand
can_play = browse_item_response.can_play
if artwork_track_id := item.get("artwork_track_id"):
if internal_request:
item_thumbnail = player.generate_image_url_from_track_id(
......@@ -153,6 +289,8 @@ async def build_item_response(
item_thumbnail = entity.get_browse_image_url(
item_type, item_id, artwork_track_id
)
elif search_type in ["Apps", "Radios"]:
item_thumbnail = player.generate_image_url(item["icon"])
else:
item_thumbnail = item.get("image_url") # will not be proxied by HA
......@@ -176,6 +314,7 @@ async def build_item_response(
assert media_class["item"] is not None
if not search_id:
search_id = search_type
return BrowseMedia(
title=result.get("title"),
media_class=media_class["item"],
......@@ -188,7 +327,11 @@ async def build_item_response(
)
async def library_payload(hass: HomeAssistant, player: Player) -> BrowseMedia:
async def library_payload(
hass: HomeAssistant,
player: Player,
browse_media: BrowseData,
) -> BrowseMedia:
"""Create response payload to describe contents of library."""
library_info: dict[str, Any] = {
"title": "Music Library",
......@@ -201,10 +344,10 @@ async def library_payload(hass: HomeAssistant, player: Player) -> BrowseMedia:
}
for item in LIBRARY:
media_class = CONTENT_TYPE_MEDIA_CLASS[item]
media_class = browse_media.content_type_media_class[item]
result = await player.async_browse(
MEDIA_TYPE_TO_SQUEEZEBOX[item],
browse_media.media_type_to_squeezebox[item],
limit=1,
)
if result is not None and result.get("items") is not None:
......@@ -215,7 +358,7 @@ async def library_payload(hass: HomeAssistant, player: Player) -> BrowseMedia:
media_class=media_class["children"],
media_content_id=item,
media_content_type=item,
can_play=item != "Favorites",
can_play=item not in ["Favorites", "Apps", "Radios"],
can_expand=True,
)
)
......@@ -242,17 +385,23 @@ async def generate_playlist(
player: Player,
payload: dict[str, str],
browse_limit: int,
browse_media: BrowseData,
) -> list | None:
"""Generate playlist from browsing payload."""
media_type = payload["search_type"]
media_id = payload["search_id"]
if media_type not in SQUEEZEBOX_ID_BY_TYPE:
if media_type not in browse_media.squeezebox_id_by_type:
raise BrowseError(f"Media type not supported: {media_type}")
browse_id = (SQUEEZEBOX_ID_BY_TYPE[media_type], media_id)
browse_id = (browse_media.squeezebox_id_by_type[media_type], media_id)
if media_type.startswith("app-"):
category = media_type
else:
category = "titles"
result = await player.async_browse(
"titles", limit=browse_limit, browse_id=browse_id
category, limit=browse_limit, browse_id=browse_id
)
if result and "items" in result:
items: list = result["items"]
......
......@@ -27,7 +27,12 @@ STATUS_QUERY_LIBRARYNAME = "libraryname"
STATUS_QUERY_MAC = "mac"
STATUS_QUERY_UUID = "uuid"
STATUS_QUERY_VERSION = "version"
SQUEEZEBOX_SOURCE_STRINGS = ("source:", "wavin:", "spotify:")
SQUEEZEBOX_SOURCE_STRINGS = (
"source:",
"wavin:",
"spotify:",
"loop:",
)
SIGNAL_PLAYER_DISCOVERED = "squeezebox_player_discovered"
SIGNAL_PLAYER_REDISCOVERED = "squeezebox_player_rediscovered"
DISCOVERY_INTERVAL = 60
......@@ -38,3 +43,4 @@ DEFAULT_BROWSE_LIMIT = 1000
DEFAULT_VOLUME_STEP = 5
ATTR_ANNOUNCE_VOLUME = "announce_volume"
ATTR_ANNOUNCE_TIMEOUT = "announce_timeout"
UNPLAYABLE_TYPES = ("text", "actions")
......@@ -47,6 +47,7 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util.dt import utcnow
from .browse_media import (
BrowseData,
build_item_response,
generate_playlist,
library_payload,
......@@ -240,6 +241,7 @@ class SqueezeBoxMediaPlayerEntity(
model=player.model,
manufacturer=_manufacturer,
)
self._browse_data = BrowseData()
@callback
def _handle_coordinator_update(self) -> None:
......@@ -530,9 +532,7 @@ class SqueezeBoxMediaPlayerEntity(
"search_type": MediaType.PLAYLIST,
}
playlist = await generate_playlist(
self._player,
payload,
self.browse_limit,
self._player, payload, self.browse_limit, self._browse_data
)
except BrowseError:
# a list of urls
......@@ -545,9 +545,7 @@ class SqueezeBoxMediaPlayerEntity(
"search_type": media_type,
}
playlist = await generate_playlist(
self._player,
payload,
self.browse_limit,
self._player, payload, self.browse_limit, self._browse_data
)
_LOGGER.debug("Generated playlist: %s", playlist)
......@@ -646,7 +644,7 @@ class SqueezeBoxMediaPlayerEntity(
)
if media_content_type in [None, "library"]:
return await library_payload(self.hass, self._player)
return await library_payload(self.hass, self._player, self._browse_data)
if media_content_id and media_source.is_media_source_id(media_content_id):
return await media_source.async_browse_media(
......@@ -663,6 +661,7 @@ class SqueezeBoxMediaPlayerEntity(
self._player,
payload,
self.browse_limit,
self._browse_data,
)
async def async_get_browse_image(
......
......@@ -142,6 +142,9 @@ async def mock_async_browse(
"title": "title",
"playlists": "playlist",
"playlist": "title",
"apps": "app",
"radios": "app",
"app-fakecommand": "track",
}
fake_items = [
{
......@@ -152,6 +155,8 @@ async def mock_async_browse(
"item_type": child_types[media_type],
"artwork_track_id": "b35bb9e9",
"url": "file:///var/lib/squeezeboxserver/music/track_1.mp3",
"cmd": "fakecommand",
"icon": "plugins/Qobuz/html/images/qobuz.png",
},
{
"title": "Fake Item 2",
......@@ -161,6 +166,8 @@ async def mock_async_browse(
"item_type": child_types[media_type],
"image_url": "http://lms.internal:9000/html/images/favorites.png",
"url": "file:///var/lib/squeezeboxserver/music/track_2.mp3",
"cmd": "fakecommand",
"icon": "plugins/Qobuz/html/images/qobuz.png",
},
{
"title": "Fake Item 3",
......@@ -169,6 +176,19 @@ async def mock_async_browse(
"isaudio": True,
"album_id": FAKE_VALID_ITEM_ID if media_type == "favorites" else None,
"url": "file:///var/lib/squeezeboxserver/music/track_3.mp3",
"cmd": "fakecommand",
"icon": "plugins/Qobuz/html/images/qobuz.png",
},
{
"title": "Fake Invalid Item 1",
"id": FAKE_VALID_ITEM_ID + "invalid_3",
"hasitems": media_type == "favorites",
"isaudio": True,
"album_id": FAKE_VALID_ITEM_ID if media_type == "favorites" else None,
"url": "file:///var/lib/squeezeboxserver/music/track_3.mp3",
"cmd": "fakecommand",
"icon": "plugins/Qobuz/html/images/qobuz.png",
"type": "text",
},
]
......@@ -198,7 +218,10 @@ async def mock_async_browse(
"items": fake_items,
}
return None
if media_type in MEDIA_TYPE_TO_SQUEEZEBOX.values():
if (
media_type in MEDIA_TYPE_TO_SQUEEZEBOX.values()
or media_type == "app-fakecommand"
):
return {
"title": media_type,
"items": fake_items,
......@@ -232,6 +255,9 @@ def mock_pysqueezebox_player(uuid: str) -> MagicMock:
mock_player.async_play_announcement = AsyncMock(
side_effect=mock_async_play_announcement
)
mock_player.generate_image_url = MagicMock(
return_value="http://lms.internal:9000/html/images/favorites.png"
)
mock_player.name = TEST_PLAYER_NAME
mock_player.player_id = uuid
mock_player.mode = "stop"
......
......@@ -19,6 +19,8 @@ from homeassistant.components.squeezebox.browse_media import (
from homeassistant.const import ATTR_ENTITY_ID, Platform
from homeassistant.core import HomeAssistant
from .conftest import FAKE_VALID_ITEM_ID
from tests.common import MockConfigEntry
from tests.typing import WebSocketGenerator
......@@ -66,56 +68,143 @@ async def test_async_browse_media_root(
assert item["title"] == LIBRARY[idx]
@pytest.mark.parametrize(
("category", "child_count"),
[
("Favorites", 4),
("Artists", 4),
("Albums", 4),
("Playlists", 4),
("Genres", 4),
("New Music", 4),
("Apps", 3),
("Radios", 3),
],
)
async def test_async_browse_media_with_subitems(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
category: str,
child_count: int,
) -> None:
"""Test each category with subitems."""
for category in (
"Favorites",
"Artists",
"Albums",
"Playlists",
"Genres",
"New Music",
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
}
)
response = await client.receive_json()
assert response["success"]
category_level = response["result"]
assert category_level["title"] == MEDIA_TYPE_TO_SQUEEZEBOX[category]
assert category_level["children"][0]["title"] == "Fake Item 1"
# Look up a subitem
search_type = category_level["children"][0]["media_content_type"]
search_id = category_level["children"][0]["media_content_id"]
await client.send_json(
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
}
)
response = await client.receive_json()
assert response["success"]
category_level = response["result"]
assert category_level["title"] == MEDIA_TYPE_TO_SQUEEZEBOX[category]
assert category_level["children"][0]["title"] == "Fake Item 1"
assert len(category_level["children"]) == child_count
# Look up a subitem
search_type = category_level["children"][0]["media_content_type"]
search_id = category_level["children"][0]["media_content_id"]
await client.send_json(
{
"id": 2,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": search_id,
"media_content_type": search_type,
}
)
response = await client.receive_json()
assert response["success"]
search = response["result"]
assert search["title"] == "Fake Item 1"
async def test_async_browse_media_for_apps(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test browsing for app category."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
category = "Apps"
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
}
)
response = await client.receive_json()
assert response["success"]
# Look up a subitem
await client.send_json(
{
"id": 2,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "app-fakecommand",
}
)
response = await client.receive_json()
assert response["success"]
search = response["result"]
assert search["children"][0]["title"] == "Fake Item 1"
assert "Fake Invalid Item 1" not in search
async def test_generate_playlist_for_app(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test the generate_playlist for app-fakecommand media type."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
category = "Apps"
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
}
)
response = await client.receive_json()
assert response["success"]
try:
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
"id": 2,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": search_id,
"media_content_type": search_type,
}
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_TYPE: "app-fakecommand",
ATTR_MEDIA_CONTENT_ID: FAKE_VALID_ITEM_ID,
},
blocking=True,
)
response = await client.receive_json()
assert response["success"]
search = response["result"]
assert search["title"] == "Fake Item 1"
except BrowseError:
pytest.fail("generate_playlist fails for app")
async def test_async_browse_tracks(
......@@ -142,7 +231,7 @@ async def test_async_browse_tracks(
assert response["success"]
tracks = response["result"]
assert tracks["title"] == "titles"
assert len(tracks["children"]) == 3
assert len(tracks["children"]) == 4
async def test_async_browse_error(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment