diff --git a/homeassistant/components/risco/__init__.py b/homeassistant/components/risco/__init__.py index fea3ee63aac41294430569b0f95cd05d0a152764..e95b3016139de43c9cdd9b619c2ef8dd646d2936 100644 --- a/homeassistant/components/risco/__init__.py +++ b/homeassistant/components/risco/__init__.py @@ -1,14 +1,27 @@ """The Risco integration.""" +from collections.abc import Callable +from dataclasses import dataclass, field from datetime import timedelta import logging - -from pyrisco import CannotConnectError, OperationError, RiscoCloud, UnauthorizedError +from typing import Any + +from pyrisco import ( + CannotConnectError, + OperationError, + RiscoCloud, + RiscoLocal, + UnauthorizedError, +) +from pyrisco.common import Partition, Zone from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( + CONF_HOST, CONF_PASSWORD, CONF_PIN, + CONF_PORT, CONF_SCAN_INTERVAL, + CONF_TYPE, CONF_USERNAME, Platform, ) @@ -18,17 +31,94 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.storage import Store from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed -from .const import DATA_COORDINATOR, DEFAULT_SCAN_INTERVAL, DOMAIN, EVENTS_COORDINATOR +from .const import ( + DATA_COORDINATOR, + DEFAULT_SCAN_INTERVAL, + DOMAIN, + EVENTS_COORDINATOR, + TYPE_LOCAL, +) PLATFORMS = [Platform.ALARM_CONTROL_PANEL, Platform.BINARY_SENSOR, Platform.SENSOR] -UNDO_UPDATE_LISTENER = "undo_update_listener" LAST_EVENT_STORAGE_VERSION = 1 LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp" _LOGGER = logging.getLogger(__name__) +@dataclass +class LocalData: + """A data class for local data passed to the platforms.""" + + system: RiscoLocal + zone_updates: dict[int, Callable[[], Any]] = field(default_factory=dict) + partition_updates: dict[int, Callable[[], Any]] = field(default_factory=dict) + + +def is_local(entry: ConfigEntry) -> bool: + """Return whether the entry represents an instance with local communication.""" + return entry.data.get(CONF_TYPE) == TYPE_LOCAL + + async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up Risco from a config entry.""" + if is_local(entry): + return await _async_setup_local_entry(hass, entry) + + return await _async_setup_cloud_entry(hass, entry) + + +async def _async_setup_local_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + data = entry.data + risco = RiscoLocal(data[CONF_HOST], data[CONF_PORT], data[CONF_PIN]) + + try: + await risco.connect() + except CannotConnectError as error: + raise ConfigEntryNotReady() from error + except UnauthorizedError: + _LOGGER.exception("Failed to login to Risco cloud") + return False + + async def _error(error: Exception) -> None: + _LOGGER.error("Error in Risco library: %s", error) + + entry.async_on_unload(risco.add_error_handler(_error)) + + async def _default(command: str, result: str, *params: list[str]) -> None: + _LOGGER.debug( + "Unhandled update from Risco library: %s, %s, %s", command, result, params + ) + + entry.async_on_unload(risco.add_default_handler(_default)) + + local_data = LocalData(risco) + + async def _zone(zone_id: int, zone: Zone) -> None: + _LOGGER.debug("Risco zone update for %d", zone_id) + callback = local_data.zone_updates.get(zone_id) + if callback: + callback() + + entry.async_on_unload(risco.add_zone_handler(_zone)) + + async def _partition(partition_id: int, partition: Partition) -> None: + _LOGGER.debug("Risco partition update for %d", partition_id) + callback = local_data.partition_updates.get(partition_id) + if callback: + callback() + + entry.async_on_unload(risco.add_partition_handler(_partition)) + + entry.async_on_unload(entry.add_update_listener(_update_listener)) + + hass.data.setdefault(DOMAIN, {}) + hass.data[DOMAIN][entry.entry_id] = local_data + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + + return True + + +async def _async_setup_cloud_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: data = entry.data risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN]) try: @@ -46,12 +136,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: hass, risco, entry.entry_id, 60 ) - undo_listener = entry.add_update_listener(_update_listener) + entry.async_on_unload(entry.add_update_listener(_update_listener)) hass.data.setdefault(DOMAIN, {}) hass.data[DOMAIN][entry.entry_id] = { DATA_COORDINATOR: coordinator, - UNDO_UPDATE_LISTENER: undo_listener, EVENTS_COORDINATOR: events_coordinator, } @@ -65,7 +154,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) if unload_ok: - hass.data[DOMAIN][entry.entry_id][UNDO_UPDATE_LISTENER]() hass.data[DOMAIN].pop(entry.entry_id) return unload_ok diff --git a/homeassistant/components/risco/alarm_control_panel.py b/homeassistant/components/risco/alarm_control_panel.py index 3bad03fda104d6fbd8f41d7d9b7d77a1005f56d9..4196ee0cf426313b3630487941d506a7f0ba8837 100644 --- a/homeassistant/components/risco/alarm_control_panel.py +++ b/homeassistant/components/risco/alarm_control_panel.py @@ -1,7 +1,11 @@ """Support for Risco alarms.""" from __future__ import annotations +from collections.abc import Callable import logging +from typing import Any + +from pyrisco.common import Partition from homeassistant.components.alarm_control_panel import ( AlarmControlPanelEntity, @@ -23,6 +27,7 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback +from . import LocalData, RiscoDataUpdateCoordinator, is_local from .const import ( CONF_CODE_ARM_REQUIRED, CONF_CODE_DISARM_REQUIRED, @@ -53,57 +58,61 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up the Risco alarm control panel.""" - coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR] options = {**DEFAULT_OPTIONS, **config_entry.options} - entities = [ - RiscoAlarm(coordinator, partition_id, config_entry.data[CONF_PIN], options) - for partition_id in coordinator.data.partitions - ] - - async_add_entities(entities, False) + if is_local(config_entry): + local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id] + async_add_entities( + RiscoLocalAlarm( + local_data.system.id, + partition_id, + partition, + local_data.partition_updates, + config_entry.data[CONF_PIN], + options, + ) + for partition_id, partition in local_data.system.partitions.items() + ) + else: + coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][ + config_entry.entry_id + ][DATA_COORDINATOR] + async_add_entities( + RiscoCloudAlarm( + coordinator, partition_id, config_entry.data[CONF_PIN], options + ) + for partition_id in coordinator.data.partitions + ) -class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity): - """Representation of a Risco partition.""" +class RiscoAlarm(AlarmControlPanelEntity): + """Representation of a Risco cloud partition.""" _attr_code_format = CodeFormat.NUMBER - def __init__(self, coordinator, partition_id, code, options): + def __init__( + self, + *, + partition_id: int, + partition: Partition, + code: str, + options: dict[str, Any], + **kwargs: Any, + ) -> None: """Init the partition.""" - super().__init__(coordinator) + super().__init__(**kwargs) self._partition_id = partition_id - self._partition = self.coordinator.data.partitions[self._partition_id] + self._partition = partition self._code = code self._attr_code_arm_required = options[CONF_CODE_ARM_REQUIRED] self._code_disarm_required = options[CONF_CODE_DISARM_REQUIRED] self._risco_to_ha = options[CONF_RISCO_STATES_TO_HA] self._ha_to_risco = options[CONF_HA_STATES_TO_RISCO] self._attr_supported_features = 0 + self._attr_has_entity_name = True + self._attr_name = None for state in self._ha_to_risco: self._attr_supported_features |= STATES_TO_SUPPORTED_FEATURES[state] - def _get_data_from_coordinator(self): - self._partition = self.coordinator.data.partitions[self._partition_id] - - @property - def device_info(self) -> DeviceInfo: - """Return device info for this device.""" - return DeviceInfo( - identifiers={(DOMAIN, self.unique_id)}, - name=self.name, - manufacturer="Risco", - ) - - @property - def name(self) -> str: - """Return the name of the partition.""" - return f"Risco {self._risco.site_name} Partition {self._partition_id}" - - @property - def unique_id(self) -> str: - """Return a unique id for that partition.""" - return f"{self._risco.site_uuid}_{self._partition_id}" - @property def state(self) -> str | None: """Return the state of the device.""" @@ -165,7 +174,74 @@ class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity): else: await self._call_alarm_method(risco_state) + async def _call_alarm_method(self, method: str, *args: Any) -> None: + raise NotImplementedError + + +class RiscoCloudAlarm(RiscoAlarm, RiscoEntity): + """Representation of a Risco partition.""" + + def __init__( + self, + coordinator: RiscoDataUpdateCoordinator, + partition_id: int, + code: str, + options: dict[str, Any], + ) -> None: + """Init the partition.""" + super().__init__( + partition_id=partition_id, + partition=coordinator.data.partitions[partition_id], + coordinator=coordinator, + code=code, + options=options, + ) + self._attr_unique_id = f"{self._risco.site_uuid}_{partition_id}" + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, self._attr_unique_id)}, + name=f"Risco {self._risco.site_name} Partition {partition_id}", + manufacturer="Risco", + ) + + def _get_data_from_coordinator(self) -> None: + self._partition = self.coordinator.data.partitions[self._partition_id] + async def _call_alarm_method(self, method, *args): alarm = await getattr(self._risco, method)(self._partition_id, *args) self._partition = alarm.partitions[self._partition_id] self.async_write_ha_state() + + +class RiscoLocalAlarm(RiscoAlarm): + """Representation of a Risco local, partition.""" + + _attr_should_poll = False + + def __init__( + self, + system_id: str, + partition_id: int, + partition: Partition, + partition_updates: dict[int, Callable[[], Any]], + code: str, + options: dict[str, Any], + ) -> None: + """Init the partition.""" + super().__init__( + partition_id=partition_id, partition=partition, code=code, options=options + ) + self._system_id = system_id + self._partition_updates = partition_updates + self._attr_unique_id = f"{system_id}_{partition_id}_local" + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, self._attr_unique_id)}, + name=f"Risco {system_id} Partition {partition_id}", + manufacturer="Risco", + ) + + async def async_added_to_hass(self) -> None: + """Subscribe to updates.""" + self._partition_updates[self._partition_id] = self.async_write_ha_state + + async def _call_alarm_method(self, method: str, *args: Any) -> None: + await getattr(self._partition, method)(*args) diff --git a/homeassistant/components/risco/binary_sensor.py b/homeassistant/components/risco/binary_sensor.py index acb621132354c4c0a5ed3fed044e7b545c07c54f..9f98be09f0da2f6817cc9be5c725b9498e930d5f 100644 --- a/homeassistant/components/risco/binary_sensor.py +++ b/homeassistant/components/risco/binary_sensor.py @@ -1,4 +1,11 @@ """Support for Risco alarm zones.""" +from __future__ import annotations + +from collections.abc import Callable, Mapping +from typing import Any + +from pyrisco.common import Zone + from homeassistant.components.binary_sensor import ( BinarySensorDeviceClass, BinarySensorEntity, @@ -9,6 +16,7 @@ from homeassistant.helpers import entity_platform from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback +from . import LocalData, RiscoDataUpdateCoordinator, is_local from .const import DATA_COORDINATOR, DOMAIN from .entity import RiscoEntity, binary_sensor_unique_id @@ -28,69 +36,117 @@ async def async_setup_entry( SERVICE_UNBYPASS_ZONE, {}, "async_unbypass_zone" ) - coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR] - entities = [ - RiscoBinarySensor(coordinator, zone_id, zone) - for zone_id, zone in coordinator.data.zones.items() - ] - async_add_entities(entities, False) + if is_local(config_entry): + local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id] + async_add_entities( + RiscoLocalBinarySensor( + local_data.system.id, zone_id, zone, local_data.zone_updates + ) + for zone_id, zone in local_data.system.zones.items() + ) + else: + coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][ + config_entry.entry_id + ][DATA_COORDINATOR] + async_add_entities( + RiscoCloudBinarySensor(coordinator, zone_id, zone) + for zone_id, zone in coordinator.data.zones.items() + ) -class RiscoBinarySensor(BinarySensorEntity, RiscoEntity): +class RiscoBinarySensor(BinarySensorEntity): """Representation of a Risco zone as a binary sensor.""" - def __init__(self, coordinator, zone_id, zone): + _attr_device_class = BinarySensorDeviceClass.MOTION + + def __init__(self, *, zone_id: int, zone: Zone, **kwargs: Any) -> None: """Init the zone.""" - super().__init__(coordinator) + super().__init__(**kwargs) self._zone_id = zone_id self._zone = zone - - def _get_data_from_coordinator(self): - self._zone = self.coordinator.data.zones[self._zone_id] - - @property - def device_info(self) -> DeviceInfo: - """Return device info for this device.""" - return DeviceInfo( - identifiers={(DOMAIN, self.unique_id)}, - manufacturer="Risco", - name=self.name, - ) + self._attr_has_entity_name = True + self._attr_name = None @property - def name(self): - """Return the name of the zone.""" - return self._zone.name - - @property - def unique_id(self): - """Return a unique id for this zone.""" - return binary_sensor_unique_id(self._risco, self._zone_id) - - @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> Mapping[str, Any] | None: """Return the state attributes.""" return {"zone_id": self._zone_id, "bypassed": self._zone.bypassed} @property - def is_on(self): + def is_on(self) -> bool | None: """Return true if sensor is on.""" return self._zone.triggered - @property - def device_class(self): - """Return the class of this sensor, from BinarySensorDeviceClass.""" - return BinarySensorDeviceClass.MOTION + async def async_bypass_zone(self) -> None: + """Bypass this zone.""" + await self._bypass(True) + + async def async_unbypass_zone(self) -> None: + """Unbypass this zone.""" + await self._bypass(False) + + async def _bypass(self, bypass: bool) -> None: + raise NotImplementedError - async def _bypass(self, bypass): + +class RiscoCloudBinarySensor(RiscoBinarySensor, RiscoEntity): + """Representation of a Risco cloud zone as a binary sensor.""" + + def __init__( + self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone + ) -> None: + """Init the zone.""" + super().__init__(zone_id=zone_id, zone=zone, coordinator=coordinator) + self._attr_unique_id = binary_sensor_unique_id(self._risco, zone_id) + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, self._attr_unique_id)}, + manufacturer="Risco", + name=self._zone.name, + ) + + def _get_data_from_coordinator(self) -> None: + self._zone = self.coordinator.data.zones[self._zone_id] + + async def _bypass(self, bypass: bool) -> None: alarm = await self._risco.bypass_zone(self._zone_id, bypass) self._zone = alarm.zones[self._zone_id] self.async_write_ha_state() - async def async_bypass_zone(self): - """Bypass this zone.""" - await self._bypass(True) - async def async_unbypass_zone(self): - """Unbypass this zone.""" - await self._bypass(False) +class RiscoLocalBinarySensor(RiscoBinarySensor): + """Representation of a Risco local zone as a binary sensor.""" + + _attr_should_poll = False + + def __init__( + self, + system_id: str, + zone_id: int, + zone: Zone, + zone_updates: dict[int, Callable[[], Any]], + ) -> None: + """Init the zone.""" + super().__init__(zone_id=zone_id, zone=zone) + self._system_id = system_id + self._zone_updates = zone_updates + self._attr_unique_id = f"{system_id}_zone_{zone_id}_local" + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, self._attr_unique_id)}, + manufacturer="Risco", + name=self._zone.name, + ) + + async def async_added_to_hass(self) -> None: + """Subscribe to updates.""" + self._zone_updates[self._zone_id] = self.async_write_ha_state + + @property + def extra_state_attributes(self) -> Mapping[str, Any] | None: + """Return the state attributes.""" + return { + **(super().extra_state_attributes or {}), + "groups": self._zone.groups, + } + + async def _bypass(self, bypass: bool) -> None: + await self._zone.bypass(bypass) diff --git a/homeassistant/components/risco/config_flow.py b/homeassistant/components/risco/config_flow.py index 5f8f40cb5f7414e0eef3c270c3ece429e3047aaf..1befe6263479b813aff4b5217122a059f6b4611b 100644 --- a/homeassistant/components/risco/config_flow.py +++ b/homeassistant/components/risco/config_flow.py @@ -1,16 +1,21 @@ """Config flow for Risco integration.""" from __future__ import annotations +import asyncio +from collections.abc import Mapping import logging -from pyrisco import CannotConnectError, RiscoCloud, UnauthorizedError +from pyrisco import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError import voluptuous as vol from homeassistant import config_entries, core from homeassistant.const import ( + CONF_HOST, CONF_PASSWORD, CONF_PIN, + CONF_PORT, CONF_SCAN_INTERVAL, + CONF_TYPE, CONF_USERNAME, STATE_ALARM_ARMED_AWAY, STATE_ALARM_ARMED_CUSTOM_BYPASS, @@ -27,18 +32,27 @@ from .const import ( DEFAULT_OPTIONS, DOMAIN, RISCO_STATES, + SLEEP_INTERVAL, + TYPE_LOCAL, ) _LOGGER = logging.getLogger(__name__) -DATA_SCHEMA = vol.Schema( +CLOUD_SCHEMA = vol.Schema( { vol.Required(CONF_USERNAME): str, vol.Required(CONF_PASSWORD): str, vol.Required(CONF_PIN): str, } ) +LOCAL_SCHEMA = vol.Schema( + { + vol.Required(CONF_HOST): str, + vol.Required(CONF_PORT, default=1000): int, + vol.Required(CONF_PIN): str, + } +) HA_STATES = [ STATE_ALARM_ARMED_AWAY, STATE_ALARM_ARMED_HOME, @@ -47,10 +61,10 @@ HA_STATES = [ ] -async def validate_input(hass: core.HomeAssistant, data): - """Validate the user input allows us to connect. +async def validate_cloud_input(hass: core.HomeAssistant, data) -> dict[str, str]: + """Validate the user input allows us to connect to Risco Cloud. - Data has the keys from DATA_SCHEMA with values provided by the user. + Data has the keys from CLOUD_SCHEMA with values provided by the user. """ risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN]) @@ -62,6 +76,20 @@ async def validate_input(hass: core.HomeAssistant, data): return {"title": risco.site_name} +async def validate_local_input( + hass: core.HomeAssistant, data: Mapping[str, str] +) -> dict[str, str]: + """Validate the user input allows us to connect to a local panel. + + Data has the keys from LOCAL_SCHEMA with values provided by the user. + """ + risco = RiscoLocal(data[CONF_HOST], data[CONF_PORT], data[CONF_PIN]) + await risco.connect() + site_id = risco.id + await risco.disconnect() + return {"title": site_id} + + class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow for Risco.""" @@ -77,13 +105,20 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): async def async_step_user(self, user_input=None): """Handle the initial step.""" + return self.async_show_menu( + step_id="user", + menu_options=["cloud", "local"], + ) + + async def async_step_cloud(self, user_input=None): + """Configure a cloud based alarm.""" errors = {} if user_input is not None: await self.async_set_unique_id(user_input[CONF_USERNAME]) self._abort_if_unique_id_configured() try: - info = await validate_input(self.hass, user_input) + info = await validate_cloud_input(self.hass, user_input) except CannotConnectError: errors["base"] = "cannot_connect" except UnauthorizedError: @@ -95,7 +130,35 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): return self.async_create_entry(title=info["title"], data=user_input) return self.async_show_form( - step_id="user", data_schema=DATA_SCHEMA, errors=errors + step_id="cloud", data_schema=CLOUD_SCHEMA, errors=errors + ) + + async def async_step_local(self, user_input=None): + """Configure a local based alarm.""" + errors = {} + if user_input is not None: + try: + info = await validate_local_input(self.hass, user_input) + except CannotConnectError: + errors["base"] = "cannot_connect" + except UnauthorizedError: + errors["base"] = "invalid_auth" + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Unexpected exception") + errors["base"] = "unknown" + else: + await self.async_set_unique_id(info["title"]) + self._abort_if_unique_id_configured() + + # Risco can hang if we don't wait before creating a new connection + await asyncio.sleep(SLEEP_INTERVAL) + + return self.async_create_entry( + title=info["title"], data={**user_input, **{CONF_TYPE: TYPE_LOCAL}} + ) + + return self.async_show_form( + step_id="local", data_schema=LOCAL_SCHEMA, errors=errors ) diff --git a/homeassistant/components/risco/const.py b/homeassistant/components/risco/const.py index 46eb011ba5b5b48f5df997876270c9a21285ceb8..f4ac170d3c7e29b0f5bcb40b5e17ed1593424a81 100644 --- a/homeassistant/components/risco/const.py +++ b/homeassistant/components/risco/const.py @@ -15,6 +15,8 @@ EVENTS_COORDINATOR = "risco_events" DEFAULT_SCAN_INTERVAL = 30 +TYPE_LOCAL = "local" + CONF_CODE_ARM_REQUIRED = "code_arm_required" CONF_CODE_DISARM_REQUIRED = "code_disarm_required" CONF_RISCO_STATES_TO_HA = "risco_states_to_ha" @@ -44,3 +46,5 @@ DEFAULT_OPTIONS = { CONF_RISCO_STATES_TO_HA: DEFAULT_RISCO_STATES_TO_HA, CONF_HA_STATES_TO_RISCO: DEFAULT_HA_STATES_TO_RISCO, } + +SLEEP_INTERVAL = 1 diff --git a/homeassistant/components/risco/entity.py b/homeassistant/components/risco/entity.py index 04b521156b172f707c56893752f65e459bd7259d..e49b632ac7838fe1e7e14f776700630ae212960e 100644 --- a/homeassistant/components/risco/entity.py +++ b/homeassistant/components/risco/entity.py @@ -1,13 +1,15 @@ """A risco entity base class.""" from homeassistant.helpers.update_coordinator import CoordinatorEntity +from . import RiscoDataUpdateCoordinator -def binary_sensor_unique_id(risco, zone_id): + +def binary_sensor_unique_id(risco, zone_id: int) -> str: """Return unique id for the binary sensor.""" return f"{risco.site_uuid}_zone_{zone_id}" -class RiscoEntity(CoordinatorEntity): +class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]): """Risco entity base class.""" def _get_data_from_coordinator(self): diff --git a/homeassistant/components/risco/manifest.json b/homeassistant/components/risco/manifest.json index fb4b8203aac0efa54c2b08f28dc4d9e727884347..0136e8f54de055ebd3f25027724000eb2591aab8 100644 --- a/homeassistant/components/risco/manifest.json +++ b/homeassistant/components/risco/manifest.json @@ -6,6 +6,6 @@ "requirements": ["pyrisco==0.5.2"], "codeowners": ["@OnFreund"], "quality_scale": "platinum", - "iot_class": "cloud_polling", + "iot_class": "local_push", "loggers": ["pyrisco"] } diff --git a/homeassistant/components/risco/sensor.py b/homeassistant/components/risco/sensor.py index 6038c2911c929b75a596d70c3a85c72c6f16b574..c4bd047e26005580b21ed2e5bf17f2154fae3e93 100644 --- a/homeassistant/components/risco/sensor.py +++ b/homeassistant/components/risco/sensor.py @@ -1,4 +1,9 @@ """Sensor for Risco Events.""" +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + from homeassistant.components.binary_sensor import DOMAIN as BS_DOMAIN from homeassistant.components.sensor import SensorDeviceClass, SensorEntity from homeassistant.config_entries import ConfigEntry @@ -8,6 +13,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.util import dt as dt_util +from . import RiscoEventsDataUpdateCoordinator, is_local from .const import DOMAIN, EVENTS_COORDINATOR from .entity import binary_sensor_unique_id @@ -38,7 +44,13 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up sensors for device.""" - coordinator = hass.data[DOMAIN][config_entry.entry_id][EVENTS_COORDINATOR] + if is_local(config_entry): + # no events in local comm + return + + coordinator: RiscoEventsDataUpdateCoordinator = hass.data[DOMAIN][ + config_entry.entry_id + ][EVENTS_COORDINATOR] sensors = [ RiscoSensor(coordinator, id, [], name, config_entry.entry_id) for id, name in CATEGORIES.items() @@ -62,19 +74,12 @@ class RiscoSensor(CoordinatorEntity, SensorEntity): self._excludes = excludes self._name = name self._entry_id = entry_id - self._entity_registry = None - - @property - def name(self): - """Return the name of the sensor.""" - return f"Risco {self.coordinator.risco.site_name} {self._name} Events" + self._entity_registry: er.EntityRegistry | None = None + self._attr_unique_id = f"events_{name}_{self.coordinator.risco.site_uuid}" + self._attr_name = f"Risco {self.coordinator.risco.site_name} {name} Events" + self._attr_device_class = SensorDeviceClass.TIMESTAMP - @property - def unique_id(self): - """Return a unique id for this sensor.""" - return f"events_{self._name}_{self.coordinator.risco.site_uuid}" - - async def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """When entity is added to hass.""" self._entity_registry = er.async_get(self.hass) self.async_on_remove( @@ -103,7 +108,7 @@ class RiscoSensor(CoordinatorEntity, SensorEntity): ) @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> Mapping[str, Any] | None: """State attributes.""" if self._event is None: return None @@ -120,8 +125,3 @@ class RiscoSensor(CoordinatorEntity, SensorEntity): attrs["zone_entity_id"] = zone_entity_id return attrs - - @property - def device_class(self): - """Device class of sensor.""" - return SensorDeviceClass.TIMESTAMP diff --git a/homeassistant/components/risco/strings.json b/homeassistant/components/risco/strings.json index ebce9dda514bc352f886b82c5ed18d7f8bf43f39..1cc2fe7317cc855d341b2ced39eaf1941dcbde93 100644 --- a/homeassistant/components/risco/strings.json +++ b/homeassistant/components/risco/strings.json @@ -2,11 +2,24 @@ "config": { "step": { "user": { + "menu_options": { + "cloud": "Risco Cloud (recommended)", + "local": "Local Risco Panel (advanced)" + } + }, + "cloud": { "data": { "username": "[%key:common::config_flow::data::username%]", "password": "[%key:common::config_flow::data::password%]", "pin": "[%key:common::config_flow::data::pin%]" } + }, + "local": { + "data": { + "host": "[%key:common::config_flow::data::host%]", + "port": "[%key:common::config_flow::data::port%]", + "pin": "[%key:common::config_flow::data::pin%]" + } } }, "error": { diff --git a/homeassistant/components/risco/translations/en.json b/homeassistant/components/risco/translations/en.json index c98cd82ede84a5f4d7f93b83010e8c78a520db1c..95dd395e5018cd81c5a6a352890a5fb255b66a49 100644 --- a/homeassistant/components/risco/translations/en.json +++ b/homeassistant/components/risco/translations/en.json @@ -9,12 +9,25 @@ "unknown": "Unexpected error" }, "step": { - "user": { + "cloud": { "data": { "password": "Password", "pin": "PIN Code", "username": "Username" } + }, + "local": { + "data": { + "host": "Host", + "pin": "PIN Code", + "port": "Port" + } + }, + "user": { + "menu_options": { + "cloud": "Risco Cloud (recommended)", + "local": "Local Risco Panel (advanced)" + } } } }, diff --git a/tests/components/risco/conftest.py b/tests/components/risco/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..006e57b9ae50762f0041eb63af2ea2385b64f02e --- /dev/null +++ b/tests/components/risco/conftest.py @@ -0,0 +1,148 @@ +"""Fixtures for Risco tests.""" +from unittest.mock import MagicMock, PropertyMock, patch + +from pytest import fixture + +from homeassistant.components.risco.const import DOMAIN, TYPE_LOCAL +from homeassistant.const import ( + CONF_HOST, + CONF_PASSWORD, + CONF_PIN, + CONF_PORT, + CONF_TYPE, + CONF_USERNAME, +) + +from .util import TEST_SITE_NAME, TEST_SITE_UUID, zone_mock + +from tests.common import MockConfigEntry + +TEST_CLOUD_CONFIG = { + CONF_USERNAME: "test-username", + CONF_PASSWORD: "test-password", + CONF_PIN: "1234", +} +TEST_LOCAL_CONFIG = { + CONF_TYPE: TYPE_LOCAL, + CONF_HOST: "test-host", + CONF_PORT: 5004, + CONF_PIN: "1234", +} + + +@fixture +def two_zone_cloud(): + """Fixture to mock alarm with two zones.""" + zone_mocks = {0: zone_mock(), 1: zone_mock()} + alarm_mock = MagicMock() + with patch.object( + zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) + ), patch.object( + zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") + ), patch.object( + zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) + ), patch.object( + zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") + ), patch.object( + alarm_mock, + "zones", + new_callable=PropertyMock(return_value=zone_mocks), + ), patch( + "homeassistant.components.risco.RiscoCloud.get_state", + return_value=alarm_mock, + ): + yield zone_mocks + + +@fixture +def options(): + """Fixture for default (empty) options.""" + return {} + + +@fixture +def events(): + """Fixture for default (empty) events.""" + return [] + + +@fixture +def cloud_config_entry(hass, options): + """Fixture for a cloud config entry.""" + config_entry = MockConfigEntry( + domain=DOMAIN, data=TEST_CLOUD_CONFIG, options=options + ) + config_entry.add_to_hass(hass) + return config_entry + + +@fixture +def login_with_error(exception): + """Fixture to simulate error on login.""" + with patch( + "homeassistant.components.risco.RiscoCloud.login", + side_effect=exception, + ): + yield + + +@fixture +async def setup_risco_cloud(hass, cloud_config_entry, events): + """Set up a Risco integration for testing.""" + with patch( + "homeassistant.components.risco.RiscoCloud.login", + return_value=True, + ), patch( + "homeassistant.components.risco.RiscoCloud.site_uuid", + new_callable=PropertyMock(return_value=TEST_SITE_UUID), + ), patch( + "homeassistant.components.risco.RiscoCloud.site_name", + new_callable=PropertyMock(return_value=TEST_SITE_NAME), + ), patch( + "homeassistant.components.risco.RiscoCloud.close" + ), patch( + "homeassistant.components.risco.RiscoCloud.get_events", + return_value=events, + ): + await hass.config_entries.async_setup(cloud_config_entry.entry_id) + await hass.async_block_till_done() + + yield cloud_config_entry + + +@fixture +def local_config_entry(hass, options): + """Fixture for a local config entry.""" + config_entry = MockConfigEntry( + domain=DOMAIN, data=TEST_LOCAL_CONFIG, options=options + ) + config_entry.add_to_hass(hass) + return config_entry + + +@fixture +def connect_with_error(exception): + """Fixture to simulate error on connect.""" + with patch( + "homeassistant.components.risco.RiscoLocal.connect", + side_effect=exception, + ): + yield + + +@fixture +async def setup_risco_local(hass, local_config_entry): + """Set up a local Risco integration for testing.""" + with patch( + "homeassistant.components.risco.RiscoLocal.connect", + return_value=True, + ), patch( + "homeassistant.components.risco.RiscoLocal.id", + new_callable=PropertyMock(return_value=TEST_SITE_UUID), + ), patch( + "homeassistant.components.risco.RiscoLocal.disconnect" + ): + await hass.config_entries.async_setup(local_config_entry.entry_id) + await hass.async_block_till_done() + + yield local_config_entry diff --git a/tests/components/risco/test_alarm_control_panel.py b/tests/components/risco/test_alarm_control_panel.py index 4a82656147d81900a2b577d1ea661da9c2e46822..ca0eb604eef53c7adab0f834f5c01f0d08ea15ae 100644 --- a/tests/components/risco/test_alarm_control_panel.py +++ b/tests/components/risco/test_alarm_control_panel.py @@ -1,5 +1,5 @@ """Tests for the Risco alarm control panel device.""" -from unittest.mock import MagicMock, PropertyMock, patch +from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch import pytest @@ -31,12 +31,13 @@ from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.helpers.entity_component import async_update_entity -from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco +from .util import TEST_SITE_UUID -from tests.common import MockConfigEntry +FIRST_CLOUD_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_0" +SECOND_CLOUD_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_1" -FIRST_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_0" -SECOND_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_1" +FIRST_LOCAL_ENTITY_ID = "alarm_control_panel.risco_test_site_uuid_partition_0" +SECOND_LOCAL_ENTITY_ID = "alarm_control_panel.risco_test_site_uuid_partition_1" CODES_REQUIRED_OPTIONS = {"code_arm_required": True, "code_disarm_required": True} TEST_RISCO_TO_HA = { @@ -86,7 +87,7 @@ def _partition_mock(): @pytest.fixture -def two_part_alarm(): +def two_part_cloud_alarm(): """Fixture to mock alarm with two partitions.""" partition_mocks = {0: _partition_mock(), 1: _partition_mock()} alarm_mock = MagicMock() @@ -102,52 +103,42 @@ def two_part_alarm(): "homeassistant.components.risco.RiscoCloud.get_state", return_value=alarm_mock, ): - yield alarm_mock + yield partition_mocks -async def test_cannot_connect(hass): - """Test connection error.""" - - with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=CannotConnectError, +@pytest.fixture +def two_part_local_alarm(): + """Fixture to mock alarm with two partitions.""" + partition_mocks = {0: _partition_mock(), 1: _partition_mock()} + with patch.object( + partition_mocks[0], "id", new_callable=PropertyMock(return_value=0) + ), patch.object( + partition_mocks[1], "id", new_callable=PropertyMock(return_value=1) + ), patch( + "homeassistant.components.risco.RiscoLocal.zones", + new_callable=PropertyMock(return_value={}), + ), patch( + "homeassistant.components.risco.RiscoLocal.partitions", + new_callable=PropertyMock(return_value=partition_mocks), ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - registry = er.async_get(hass) - assert not registry.async_is_registered(FIRST_ENTITY_ID) - assert not registry.async_is_registered(SECOND_ENTITY_ID) + yield partition_mocks -async def test_unauthorized(hass): - """Test unauthorized error.""" - - with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=UnauthorizedError, - ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - registry = er.async_get(hass) - assert not registry.async_is_registered(FIRST_ENTITY_ID) - assert not registry.async_is_registered(SECOND_ENTITY_ID) +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_login(hass, login_with_error, cloud_config_entry): + """Test error on login.""" + await hass.config_entries.async_setup(cloud_config_entry.entry_id) + await hass.async_block_till_done() + registry = er.async_get(hass) + assert not registry.async_is_registered(FIRST_CLOUD_ENTITY_ID) + assert not registry.async_is_registered(SECOND_CLOUD_ENTITY_ID) -async def test_setup(hass, two_part_alarm): +async def test_cloud_setup(hass, two_part_cloud_alarm, setup_risco_cloud): """Test entity setup.""" registry = er.async_get(hass) - - assert not registry.async_is_registered(FIRST_ENTITY_ID) - assert not registry.async_is_registered(SECOND_ENTITY_ID) - - await setup_risco(hass) - - assert registry.async_is_registered(FIRST_ENTITY_ID) - assert registry.async_is_registered(SECOND_ENTITY_ID) + assert registry.async_is_registered(FIRST_CLOUD_ENTITY_ID) + assert registry.async_is_registered(SECOND_CLOUD_ENTITY_ID) registry = dr.async_get(hass) device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_0")}) @@ -159,50 +150,59 @@ async def test_setup(hass, two_part_alarm): assert device.manufacturer == "Risco" -async def _check_state(hass, alarm, property, state, entity_id, partition_id): - with patch.object(alarm.partitions[partition_id], property, return_value=True): +async def _check_cloud_state( + hass, partitions, property, state, entity_id, partition_id +): + with patch.object(partitions[partition_id], property, return_value=True): await async_update_entity(hass, entity_id) await hass.async_block_till_done() assert hass.states.get(entity_id).state == state -async def test_states(hass, two_part_alarm): +@pytest.mark.parametrize("options", [CUSTOM_MAPPING_OPTIONS]) +async def test_cloud_states(hass, two_part_cloud_alarm, setup_risco_cloud): """Test the various alarm states.""" - await setup_risco(hass, [], CUSTOM_MAPPING_OPTIONS) - - assert hass.states.get(FIRST_ENTITY_ID).state == STATE_UNKNOWN - for partition_id, entity_id in {0: FIRST_ENTITY_ID, 1: SECOND_ENTITY_ID}.items(): - await _check_state( + assert hass.states.get(FIRST_CLOUD_ENTITY_ID).state == STATE_UNKNOWN + for partition_id, entity_id in { + 0: FIRST_CLOUD_ENTITY_ID, + 1: SECOND_CLOUD_ENTITY_ID, + }.items(): + await _check_cloud_state( hass, - two_part_alarm, + two_part_cloud_alarm, "triggered", STATE_ALARM_TRIGGERED, entity_id, partition_id, ) - await _check_state( - hass, two_part_alarm, "arming", STATE_ALARM_ARMING, entity_id, partition_id + await _check_cloud_state( + hass, + two_part_cloud_alarm, + "arming", + STATE_ALARM_ARMING, + entity_id, + partition_id, ) - await _check_state( + await _check_cloud_state( hass, - two_part_alarm, + two_part_cloud_alarm, "armed", STATE_ALARM_ARMED_AWAY, entity_id, partition_id, ) - await _check_state( + await _check_cloud_state( hass, - two_part_alarm, + two_part_cloud_alarm, "partially_armed", STATE_ALARM_ARMED_HOME, entity_id, partition_id, ) - await _check_state( + await _check_cloud_state( hass, - two_part_alarm, + two_part_cloud_alarm, "disarmed", STATE_ALARM_DISARMED, entity_id, @@ -211,13 +211,13 @@ async def test_states(hass, two_part_alarm): groups = {"A": False, "B": False, "C": True, "D": False} with patch.object( - two_part_alarm.partitions[partition_id], + two_part_cloud_alarm[partition_id], "groups", new_callable=PropertyMock(return_value=groups), ): - await _check_state( + await _check_cloud_state( hass, - two_part_alarm, + two_part_cloud_alarm, "partially_armed", STATE_ALARM_ARMED_NIGHT, entity_id, @@ -225,7 +225,15 @@ async def test_states(hass, two_part_alarm): ) -async def _test_service_call( +async def _call_alarm_service(hass, service, entity_id, **kwargs): + data = {"entity_id": entity_id, **kwargs} + + await hass.services.async_call( + ALARM_DOMAIN, service, service_data=data, blocking=True + ) + + +async def _test_cloud_service_call( hass, service, method, entity_id, partition_id, *args, **kwargs ): with patch(f"homeassistant.components.risco.RiscoCloud.{method}") as set_mock: @@ -233,7 +241,7 @@ async def _test_service_call( set_mock.assert_awaited_once_with(partition_id, *args) -async def _test_no_service_call( +async def _test_cloud_no_service_call( hass, service, method, entity_id, partition_id, **kwargs ): with patch(f"homeassistant.components.risco.RiscoCloud.{method}") as set_mock: @@ -241,189 +249,679 @@ async def _test_no_service_call( set_mock.assert_not_awaited() -async def _call_alarm_service(hass, service, entity_id, **kwargs): - data = {"entity_id": entity_id, **kwargs} - - await hass.services.async_call( - ALARM_DOMAIN, service, service_data=data, blocking=True - ) - - -async def test_sets_custom_mapping(hass, two_part_alarm): +@pytest.mark.parametrize("options", [CUSTOM_MAPPING_OPTIONS]) +async def test_cloud_sets_custom_mapping(hass, two_part_cloud_alarm, setup_risco_cloud): """Test settings the various modes when mapping some states.""" - await setup_risco(hass, [], CUSTOM_MAPPING_OPTIONS) - registry = er.async_get(hass) - entity = registry.async_get(FIRST_ENTITY_ID) + entity = registry.async_get(FIRST_CLOUD_ENTITY_ID) assert entity.supported_features == EXPECTED_FEATURES - await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0) - await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1) - await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0) - await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0 + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", FIRST_CLOUD_ENTITY_ID, 0 ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1 + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", SECOND_CLOUD_ENTITY_ID, 1 ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, "C" + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_CLOUD_ENTITY_ID, 0 ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C" + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_CLOUD_ENTITY_ID, 1 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_CLOUD_ENTITY_ID, 0 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_CLOUD_ENTITY_ID, 1 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_CLOUD_ENTITY_ID, 0, "C" + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_CLOUD_ENTITY_ID, 1, "C" ) -async def test_sets_full_custom_mapping(hass, two_part_alarm): +@pytest.mark.parametrize("options", [FULL_CUSTOM_MAPPING]) +async def test_cloud_sets_full_custom_mapping( + hass, two_part_cloud_alarm, setup_risco_cloud +): """Test settings the various modes when mapping all states.""" - await setup_risco(hass, [], FULL_CUSTOM_MAPPING) - registry = er.async_get(hass) - entity = registry.async_get(FIRST_ENTITY_ID) + entity = registry.async_get(FIRST_CLOUD_ENTITY_ID) assert ( entity.supported_features == EXPECTED_FEATURES | SUPPORT_ALARM_ARM_CUSTOM_BYPASS ) - await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0) - await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1) - await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0) - await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0 + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", FIRST_CLOUD_ENTITY_ID, 0 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", SECOND_CLOUD_ENTITY_ID, 1 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_CLOUD_ENTITY_ID, 0 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_CLOUD_ENTITY_ID, 1 + ) + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_CLOUD_ENTITY_ID, 0 ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1 + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_CLOUD_ENTITY_ID, 1 ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, "C" + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_CLOUD_ENTITY_ID, 0, "C" ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C" + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_CLOUD_ENTITY_ID, 1, "C" ) - await _test_service_call( + await _test_cloud_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "group_arm", - FIRST_ENTITY_ID, + FIRST_CLOUD_ENTITY_ID, 0, "D", ) - await _test_service_call( + await _test_cloud_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "group_arm", - SECOND_ENTITY_ID, + SECOND_CLOUD_ENTITY_ID, 1, "D", ) -async def test_sets_with_correct_code(hass, two_part_alarm): +@pytest.mark.parametrize( + "options", [{**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}] +) +async def test_cloud_sets_with_correct_code( + hass, two_part_cloud_alarm, setup_risco_cloud +): """Test settings the various modes when code is required.""" - await setup_risco(hass, [], {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}) - code = {"code": 1234} - await _test_service_call( - hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_service_call( - hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_service_call( + await _test_cloud_service_call( hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", - FIRST_ENTITY_ID, + FIRST_CLOUD_ENTITY_ID, 0, "C", **code, ) - await _test_service_call( + await _test_cloud_service_call( hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", - SECOND_ENTITY_ID, + SECOND_CLOUD_ENTITY_ID, 1, "C", **code, ) with pytest.raises(HomeAssistantError): - await _test_no_service_call( + await _test_cloud_no_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", - FIRST_ENTITY_ID, + FIRST_CLOUD_ENTITY_ID, 0, **code, ) with pytest.raises(HomeAssistantError): - await _test_no_service_call( + await _test_cloud_no_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", - SECOND_ENTITY_ID, + SECOND_CLOUD_ENTITY_ID, 1, **code, ) -async def test_sets_with_incorrect_code(hass, two_part_alarm): +@pytest.mark.parametrize( + "options", [{**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}] +) +async def test_cloud_sets_with_incorrect_code( + hass, two_part_cloud_alarm, setup_risco_cloud +): """Test settings the various modes when code is required and incorrect.""" - await setup_risco(hass, [], {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}) - code = {"code": 4321} - await _test_no_service_call( - hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_DISARM, "disarm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_CLOUD_ENTITY_ID, 1, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_CLOUD_ENTITY_ID, 0, **code ) - await _test_no_service_call( - hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, **code + await _test_cloud_no_service_call( + hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_CLOUD_ENTITY_ID, 1, **code ) with pytest.raises(HomeAssistantError): - await _test_no_service_call( + await _test_cloud_no_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", - FIRST_ENTITY_ID, + FIRST_CLOUD_ENTITY_ID, 0, **code, ) with pytest.raises(HomeAssistantError): - await _test_no_service_call( + await _test_cloud_no_service_call( hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", - SECOND_ENTITY_ID, + SECOND_CLOUD_ENTITY_ID, 1, **code, ) + + +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_connect(hass, connect_with_error, local_config_entry): + """Test error on connect.""" + await hass.config_entries.async_setup(local_config_entry.entry_id) + await hass.async_block_till_done() + registry = er.async_get(hass) + assert not registry.async_is_registered(FIRST_LOCAL_ENTITY_ID) + assert not registry.async_is_registered(SECOND_LOCAL_ENTITY_ID) + + +async def test_local_setup(hass, two_part_local_alarm, setup_risco_local): + """Test entity setup.""" + registry = er.async_get(hass) + assert registry.async_is_registered(FIRST_LOCAL_ENTITY_ID) + assert registry.async_is_registered(SECOND_LOCAL_ENTITY_ID) + + registry = dr.async_get(hass) + device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_0_local")}) + assert device is not None + assert device.manufacturer == "Risco" + + device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_1_local")}) + assert device is not None + assert device.manufacturer == "Risco" + + +async def _check_local_state( + hass, partitions, property, state, entity_id, partition_id, callback +): + with patch.object(partitions[partition_id], property, return_value=True): + await callback(partition_id, partitions[partition_id]) + + assert hass.states.get(entity_id).state == state + + +@pytest.fixture +def _mock_partition_handler(): + with patch( + "homeassistant.components.risco.RiscoLocal.add_partition_handler" + ) as mock: + yield mock + + +@pytest.mark.parametrize("options", [CUSTOM_MAPPING_OPTIONS]) +async def test_local_states( + hass, two_part_local_alarm, _mock_partition_handler, setup_risco_local +): + """Test the various alarm states.""" + callback = _mock_partition_handler.call_args.args[0] + + assert callback is not None + + assert hass.states.get(FIRST_LOCAL_ENTITY_ID).state == STATE_UNKNOWN + for partition_id, entity_id in { + 0: FIRST_LOCAL_ENTITY_ID, + 1: SECOND_LOCAL_ENTITY_ID, + }.items(): + await _check_local_state( + hass, + two_part_local_alarm, + "triggered", + STATE_ALARM_TRIGGERED, + entity_id, + partition_id, + callback, + ) + await _check_local_state( + hass, + two_part_local_alarm, + "arming", + STATE_ALARM_ARMING, + entity_id, + partition_id, + callback, + ) + await _check_local_state( + hass, + two_part_local_alarm, + "armed", + STATE_ALARM_ARMED_AWAY, + entity_id, + partition_id, + callback, + ) + await _check_local_state( + hass, + two_part_local_alarm, + "partially_armed", + STATE_ALARM_ARMED_HOME, + entity_id, + partition_id, + callback, + ) + await _check_local_state( + hass, + two_part_local_alarm, + "disarmed", + STATE_ALARM_DISARMED, + entity_id, + partition_id, + callback, + ) + + groups = {"A": False, "B": False, "C": True, "D": False} + with patch.object( + two_part_local_alarm[partition_id], + "groups", + new_callable=PropertyMock(return_value=groups), + ): + await _check_local_state( + hass, + two_part_local_alarm, + "partially_armed", + STATE_ALARM_ARMED_NIGHT, + entity_id, + partition_id, + callback, + ) + + +async def _test_local_service_call( + hass, service, method, entity_id, partition, *args, **kwargs +): + with patch.object(partition, method, AsyncMock()) as set_mock: + await _call_alarm_service(hass, service, entity_id, **kwargs) + set_mock.assert_awaited_once_with(*args) + + +async def _test_local_no_service_call( + hass, service, method, entity_id, partition, **kwargs +): + with patch.object(partition, method, AsyncMock()) as set_mock: + await _call_alarm_service(hass, service, entity_id, **kwargs) + set_mock.assert_not_awaited() + + +@pytest.mark.parametrize("options", [CUSTOM_MAPPING_OPTIONS]) +async def test_local_sets_custom_mapping(hass, two_part_local_alarm, setup_risco_local): + """Test settings the various modes when mapping some states.""" + registry = er.async_get(hass) + entity = registry.async_get(FIRST_LOCAL_ENTITY_ID) + assert entity.supported_features == EXPECTED_FEATURES + + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + "C", + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + "C", + ) + + +@pytest.mark.parametrize("options", [FULL_CUSTOM_MAPPING]) +async def test_local_sets_full_custom_mapping( + hass, two_part_local_alarm, setup_risco_local +): + """Test settings the various modes when mapping all states.""" + registry = er.async_get(hass) + entity = registry.async_get(FIRST_LOCAL_ENTITY_ID) + assert ( + entity.supported_features == EXPECTED_FEATURES | SUPPORT_ALARM_ARM_CUSTOM_BYPASS + ) + + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + "C", + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + "C", + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "group_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + "D", + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "group_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + "D", + ) + + +@pytest.mark.parametrize( + "options", [{**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}] +) +async def test_local_sets_with_correct_code( + hass, two_part_local_alarm, setup_risco_local +): + """Test settings the various modes when code is required.""" + code = {"code": 1234} + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + "C", + **code, + ) + await _test_local_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + "C", + **code, + ) + with pytest.raises(HomeAssistantError): + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + with pytest.raises(HomeAssistantError): + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + + +@pytest.mark.parametrize( + "options", [{**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS}] +) +async def test_local_sets_with_incorrect_code( + hass, two_part_local_alarm, setup_risco_local +): + """Test settings the various modes when code is required and incorrect.""" + code = {"code": 4321} + await _test_local_no_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_DISARM, + "disarm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_AWAY, + "arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_HOME, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_NIGHT, + "group_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) + with pytest.raises(HomeAssistantError): + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "partial_arm", + FIRST_LOCAL_ENTITY_ID, + two_part_local_alarm[0], + **code, + ) + with pytest.raises(HomeAssistantError): + await _test_local_no_service_call( + hass, + SERVICE_ALARM_ARM_CUSTOM_BYPASS, + "partial_arm", + SECOND_LOCAL_ENTITY_ID, + two_part_local_alarm[1], + **code, + ) diff --git a/tests/components/risco/test_binary_sensor.py b/tests/components/risco/test_binary_sensor.py index a7c11c9cb000671f5b2e05dcd76847b22725a000..2325d88c03faa90d60d411d97c7f5a62974f3f90 100644 --- a/tests/components/risco/test_binary_sensor.py +++ b/tests/components/risco/test_binary_sensor.py @@ -1,62 +1,55 @@ """Tests for the Risco binary sensors.""" from unittest.mock import PropertyMock, patch +import pytest + from homeassistant.components.risco import CannotConnectError, UnauthorizedError from homeassistant.components.risco.const import DOMAIN from homeassistant.const import STATE_OFF, STATE_ON from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.helpers.entity_component import async_update_entity -from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco -from .util import two_zone_alarm # noqa: F401 - -from tests.common import MockConfigEntry +from .util import TEST_SITE_UUID, zone_mock FIRST_ENTITY_ID = "binary_sensor.zone_0" SECOND_ENTITY_ID = "binary_sensor.zone_1" -async def test_cannot_connect(hass): - """Test connection error.""" - - with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=CannotConnectError, - ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - registry = er.async_get(hass) - assert not registry.async_is_registered(FIRST_ENTITY_ID) - assert not registry.async_is_registered(SECOND_ENTITY_ID) - - -async def test_unauthorized(hass): - """Test unauthorized error.""" - - with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=UnauthorizedError, +@pytest.fixture +def two_zone_local(): + """Fixture to mock alarm with two zones.""" + zone_mocks = {0: zone_mock(), 1: zone_mock()} + with patch.object( + zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) + ), patch.object( + zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") + ), patch.object( + zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) + ), patch.object( + zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") + ), patch( + "homeassistant.components.risco.RiscoLocal.partitions", + new_callable=PropertyMock(return_value={}), + ), patch( + "homeassistant.components.risco.RiscoLocal.zones", + new_callable=PropertyMock(return_value=zone_mocks), ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - registry = er.async_get(hass) - assert not registry.async_is_registered(FIRST_ENTITY_ID) - assert not registry.async_is_registered(SECOND_ENTITY_ID) + yield zone_mocks -async def test_setup(hass, two_zone_alarm): # noqa: F811 - """Test entity setup.""" +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_login(hass, login_with_error, cloud_config_entry): + """Test error on login.""" + await hass.config_entries.async_setup(cloud_config_entry.entry_id) + await hass.async_block_till_done() registry = er.async_get(hass) - assert not registry.async_is_registered(FIRST_ENTITY_ID) assert not registry.async_is_registered(SECOND_ENTITY_ID) - await setup_risco(hass) +async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud): + """Test entity setup.""" + registry = er.async_get(hass) assert registry.async_is_registered(FIRST_ENTITY_ID) assert registry.async_is_registered(SECOND_ENTITY_ID) @@ -70,13 +63,13 @@ async def test_setup(hass, two_zone_alarm): # noqa: F811 assert device.manufacturer == "Risco" -async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id): +async def _check_cloud_state(hass, zones, triggered, bypassed, entity_id, zone_id): with patch.object( - alarm.zones[zone_id], + zones[zone_id], "triggered", new_callable=PropertyMock(return_value=triggered), ), patch.object( - alarm.zones[zone_id], + zones[zone_id], "bypassed", new_callable=PropertyMock(return_value=bypassed), ): @@ -89,23 +82,20 @@ async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id): assert hass.states.get(entity_id).attributes["zone_id"] == zone_id -async def test_states(hass, two_zone_alarm): # noqa: F811 +async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud): """Test the various alarm states.""" - await setup_risco(hass) - - await _check_state(hass, two_zone_alarm, True, True, FIRST_ENTITY_ID, 0) - await _check_state(hass, two_zone_alarm, True, False, FIRST_ENTITY_ID, 0) - await _check_state(hass, two_zone_alarm, False, True, FIRST_ENTITY_ID, 0) - await _check_state(hass, two_zone_alarm, False, False, FIRST_ENTITY_ID, 0) - await _check_state(hass, two_zone_alarm, True, True, SECOND_ENTITY_ID, 1) - await _check_state(hass, two_zone_alarm, True, False, SECOND_ENTITY_ID, 1) - await _check_state(hass, two_zone_alarm, False, True, SECOND_ENTITY_ID, 1) - await _check_state(hass, two_zone_alarm, False, False, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, True, True, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, True, False, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, False, True, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, False, False, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, True, True, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, True, False, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, False, True, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, False, False, SECOND_ENTITY_ID, 1) -async def test_bypass(hass, two_zone_alarm): # noqa: F811 +async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud): """Test bypassing a zone.""" - await setup_risco(hass) with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: data = {"entity_id": FIRST_ENTITY_ID} @@ -116,9 +106,8 @@ async def test_bypass(hass, two_zone_alarm): # noqa: F811 mock.assert_awaited_once_with(0, True) -async def test_unbypass(hass, two_zone_alarm): # noqa: F811 +async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud): """Test unbypassing a zone.""" - await setup_risco(hass) with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: data = {"entity_id": FIRST_ENTITY_ID} @@ -127,3 +116,113 @@ async def test_unbypass(hass, two_zone_alarm): # noqa: F811 ) mock.assert_awaited_once_with(0, False) + + +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_connect(hass, connect_with_error, local_config_entry): + """Test error on connect.""" + await hass.config_entries.async_setup(local_config_entry.entry_id) + await hass.async_block_till_done() + registry = er.async_get(hass) + assert not registry.async_is_registered(FIRST_ENTITY_ID) + assert not registry.async_is_registered(SECOND_ENTITY_ID) + + +async def test_local_setup(hass, two_zone_local, setup_risco_local): + """Test entity setup.""" + registry = er.async_get(hass) + assert registry.async_is_registered(FIRST_ENTITY_ID) + assert registry.async_is_registered(SECOND_ENTITY_ID) + + registry = dr.async_get(hass) + device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_0_local")}) + assert device is not None + assert device.manufacturer == "Risco" + + device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_1_local")}) + assert device is not None + assert device.manufacturer == "Risco" + + +async def _check_local_state( + hass, zones, triggered, bypassed, entity_id, zone_id, callback +): + with patch.object( + zones[zone_id], + "triggered", + new_callable=PropertyMock(return_value=triggered), + ), patch.object( + zones[zone_id], + "bypassed", + new_callable=PropertyMock(return_value=bypassed), + ): + await callback(zone_id, zones[zone_id]) + + expected_triggered = STATE_ON if triggered else STATE_OFF + assert hass.states.get(entity_id).state == expected_triggered + assert hass.states.get(entity_id).attributes["bypassed"] == bypassed + assert hass.states.get(entity_id).attributes["zone_id"] == zone_id + + +@pytest.fixture +def _mock_zone_handler(): + with patch("homeassistant.components.risco.RiscoLocal.add_zone_handler") as mock: + yield mock + + +async def test_local_states( + hass, two_zone_local, _mock_zone_handler, setup_risco_local +): + """Test the various alarm states.""" + callback = _mock_zone_handler.call_args.args[0] + + assert callback is not None + + await _check_local_state( + hass, two_zone_local, True, True, FIRST_ENTITY_ID, 0, callback + ) + await _check_local_state( + hass, two_zone_local, True, False, FIRST_ENTITY_ID, 0, callback + ) + await _check_local_state( + hass, two_zone_local, False, True, FIRST_ENTITY_ID, 0, callback + ) + await _check_local_state( + hass, two_zone_local, False, False, FIRST_ENTITY_ID, 0, callback + ) + await _check_local_state( + hass, two_zone_local, True, True, SECOND_ENTITY_ID, 1, callback + ) + await _check_local_state( + hass, two_zone_local, True, False, SECOND_ENTITY_ID, 1, callback + ) + await _check_local_state( + hass, two_zone_local, False, True, SECOND_ENTITY_ID, 1, callback + ) + await _check_local_state( + hass, two_zone_local, False, False, SECOND_ENTITY_ID, 1, callback + ) + + +async def test_local_bypass(hass, two_zone_local, setup_risco_local): + """Test bypassing a zone.""" + with patch.object(two_zone_local[0], "bypass") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + DOMAIN, "bypass_zone", service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(True) + + +async def test_local_unbypass(hass, two_zone_local, setup_risco_local): + """Test unbypassing a zone.""" + with patch.object(two_zone_local[0], "bypass") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + DOMAIN, "unbypass_zone", service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(False) diff --git a/tests/components/risco/test_config_flow.py b/tests/components/risco/test_config_flow.py index 8d04f478e446d486ebeb68f4495c5e11908ddd51..a39a724d7b94bf2a58556e6e12f98f612f34db14 100644 --- a/tests/components/risco/test_config_flow.py +++ b/tests/components/risco/test_config_flow.py @@ -4,22 +4,29 @@ from unittest.mock import PropertyMock, patch import pytest import voluptuous as vol -from homeassistant import config_entries, data_entry_flow +from homeassistant import config_entries from homeassistant.components.risco.config_flow import ( CannotConnectError, UnauthorizedError, ) from homeassistant.components.risco.const import DOMAIN +from homeassistant.data_entry_flow import FlowResultType from tests.common import MockConfigEntry TEST_SITE_NAME = "test-site-name" -TEST_DATA = { +TEST_CLOUD_DATA = { "username": "test-username", "password": "test-password", "pin": "1234", } +TEST_LOCAL_DATA = { + "host": "test-host", + "port": 5004, + "pin": "1234", +} + TEST_RISCO_TO_HA = { "arm": "armed_away", "partial_arm": "armed_home", @@ -42,13 +49,19 @@ TEST_OPTIONS = { } -async def test_form(hass): - """Test we get the form.""" +async def test_cloud_form(hass): + """Test we get the cloud form.""" result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER} ) - assert result["type"] == "form" - assert result["errors"] == {} + assert result["type"] == FlowResultType.MENU + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "cloud"} + ) + + assert result2["type"] == FlowResultType.FORM + assert result2["errors"] == {} with patch( "homeassistant.components.risco.config_flow.RiscoCloud.login", @@ -59,17 +72,20 @@ async def test_form(hass): ), patch( "homeassistant.components.risco.config_flow.RiscoCloud.close" ) as mock_close, patch( + "homeassistant.components.risco.config_flow.SLEEP_INTERVAL", + 0, + ), patch( "homeassistant.components.risco.async_setup_entry", return_value=True, ) as mock_setup_entry: - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], TEST_DATA + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_CLOUD_DATA ) await hass.async_block_till_done() - assert result2["type"] == "create_entry" - assert result2["title"] == TEST_SITE_NAME - assert result2["data"] == TEST_DATA + assert result3["type"] == FlowResultType.CREATE_ENTRY + assert result3["title"] == TEST_SITE_NAME + assert result3["data"] == TEST_CLOUD_DATA assert len(mock_setup_entry.mock_calls) == 1 mock_close.assert_awaited_once() @@ -82,33 +98,126 @@ async def test_form(hass): (Exception, "unknown"), ], ) -async def test_error(hass, exception, error): +async def test_cloud_error(hass, login_with_error, error): """Test we handle config flow errors.""" result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER} ) + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "cloud"} + ) with patch( - "homeassistant.components.risco.config_flow.RiscoCloud.login", - side_effect=exception, - ), patch( "homeassistant.components.risco.config_flow.RiscoCloud.close" ) as mock_close: - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], TEST_DATA + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_CLOUD_DATA ) mock_close.assert_awaited_once() - assert result2["type"] == "form" - assert result2["errors"] == {"base": error} + assert result3["type"] == FlowResultType.FORM + assert result3["errors"] == {"base": error} -async def test_form_already_exists(hass): +async def test_form_cloud_already_exists(hass): """Test that a flow with an existing username aborts.""" entry = MockConfigEntry( domain=DOMAIN, - unique_id=TEST_DATA["username"], - data=TEST_DATA, + unique_id=TEST_CLOUD_DATA["username"], + data=TEST_CLOUD_DATA, + ) + + entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "cloud"} + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_CLOUD_DATA + ) + + assert result3["type"] == FlowResultType.ABORT + assert result3["reason"] == "already_configured" + + +async def test_local_form(hass): + """Test we get the local form.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == FlowResultType.MENU + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "local"} + ) + + assert result2["type"] == FlowResultType.FORM + assert result2["errors"] == {} + + with patch( + "homeassistant.components.risco.config_flow.RiscoLocal.connect", + return_value=True, + ), patch( + "homeassistant.components.risco.config_flow.RiscoLocal.id", + new_callable=PropertyMock(return_value=TEST_SITE_NAME), + ), patch( + "homeassistant.components.risco.config_flow.RiscoLocal.disconnect" + ) as mock_close, patch( + "homeassistant.components.risco.config_flow.SLEEP_INTERVAL", + 0, + ), patch( + "homeassistant.components.risco.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_LOCAL_DATA + ) + await hass.async_block_till_done() + + expected_data = {**TEST_LOCAL_DATA, **{"type": "local"}} + assert result3["type"] == FlowResultType.CREATE_ENTRY + assert result3["title"] == TEST_SITE_NAME + assert result3["data"] == expected_data + assert len(mock_setup_entry.mock_calls) == 1 + mock_close.assert_awaited_once() + + +@pytest.mark.parametrize( + "exception, error", + [ + (UnauthorizedError, "invalid_auth"), + (CannotConnectError, "cannot_connect"), + (Exception, "unknown"), + ], +) +async def test_local_error(hass, connect_with_error, error): + """Test we handle config flow errors.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "local"} + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_LOCAL_DATA + ) + + assert result3["type"] == FlowResultType.FORM + assert result3["errors"] == {"base": error} + + +async def test_form_local_already_exists(hass): + """Test that a flow with an existing host aborts.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id=TEST_SITE_NAME, + data=TEST_LOCAL_DATA, ) entry.add_to_hass(hass) @@ -118,33 +227,46 @@ async def test_form_already_exists(hass): ) result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], TEST_DATA + result["flow_id"], {"next_step_id": "local"} ) - assert result2["type"] == "abort" - assert result2["reason"] == "already_configured" + with patch( + "homeassistant.components.risco.config_flow.RiscoLocal.connect", + return_value=True, + ), patch( + "homeassistant.components.risco.config_flow.RiscoLocal.id", + new_callable=PropertyMock(return_value=TEST_SITE_NAME), + ), patch( + "homeassistant.components.risco.config_flow.RiscoLocal.disconnect" + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], TEST_LOCAL_DATA + ) + + assert result3["type"] == FlowResultType.ABORT + assert result3["reason"] == "already_configured" async def test_options_flow(hass): """Test options flow.""" entry = MockConfigEntry( domain=DOMAIN, - unique_id=TEST_DATA["username"], - data=TEST_DATA, + unique_id=TEST_CLOUD_DATA["username"], + data=TEST_CLOUD_DATA, ) entry.add_to_hass(hass) result = await hass.config_entries.options.async_init(entry.entry_id) - assert result["type"] == data_entry_flow.FlowResultType.FORM + assert result["type"] == FlowResultType.FORM assert result["step_id"] == "init" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input=TEST_OPTIONS, ) - assert result["type"] == data_entry_flow.FlowResultType.FORM + assert result["type"] == FlowResultType.FORM assert result["step_id"] == "risco_to_ha" result = await hass.config_entries.options.async_configure( @@ -152,7 +274,7 @@ async def test_options_flow(hass): user_input=TEST_RISCO_TO_HA, ) - assert result["type"] == data_entry_flow.FlowResultType.FORM + assert result["type"] == FlowResultType.FORM assert result["step_id"] == "ha_to_risco" with patch("homeassistant.components.risco.async_setup_entry", return_value=True): @@ -161,7 +283,7 @@ async def test_options_flow(hass): user_input=TEST_HA_TO_RISCO, ) - assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY + assert result["type"] == FlowResultType.CREATE_ENTRY assert entry.options == { **TEST_OPTIONS, "risco_states_to_ha": TEST_RISCO_TO_HA, @@ -173,8 +295,8 @@ async def test_ha_to_risco_schema(hass): """Test that the schema for the ha-to-risco mapping step is generated properly.""" entry = MockConfigEntry( domain=DOMAIN, - unique_id=TEST_DATA["username"], - data=TEST_DATA, + unique_id=TEST_CLOUD_DATA["username"], + data=TEST_CLOUD_DATA, ) entry.add_to_hass(hass) diff --git a/tests/components/risco/test_sensor.py b/tests/components/risco/test_sensor.py index 8fb4daf8624990d2d70384fa695200034c9607c3..55c75823a38105f82c965a39576e35ea4019d7a6 100644 --- a/tests/components/risco/test_sensor.py +++ b/tests/components/risco/test_sensor.py @@ -2,19 +2,17 @@ from datetime import timedelta from unittest.mock import MagicMock, PropertyMock, patch +import pytest + from homeassistant.components.risco import ( LAST_EVENT_TIMESTAMP_KEY, CannotConnectError, UnauthorizedError, ) -from homeassistant.components.risco.const import DOMAIN from homeassistant.helpers import entity_registry as er from homeassistant.util import dt -from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco -from .util import two_zone_alarm # noqa: F401 - -from tests.common import MockConfigEntry, async_fire_time_changed +from tests.common import async_fire_time_changed ENTITY_IDS = { "Alarm": "sensor.risco_test_site_name_alarm_events", @@ -109,34 +107,23 @@ CATEGORIES_TO_EVENTS = { } -async def test_cannot_connect(hass): - """Test connection error.""" - +@pytest.fixture +def _no_zones_and_partitions(): with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=CannotConnectError, + "homeassistant.components.risco.RiscoLocal.zones", + new_callable=PropertyMock(return_value=[]), + ), patch( + "homeassistant.components.risco.RiscoLocal.partitions", + new_callable=PropertyMock(return_value=[]), ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - - registry = er.async_get(hass) - for id in ENTITY_IDS.values(): - assert not registry.async_is_registered(id) - + yield -async def test_unauthorized(hass): - """Test unauthorized error.""" - with patch( - "homeassistant.components.risco.RiscoCloud.login", - side_effect=UnauthorizedError, - ): - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG) - config_entry.add_to_hass(hass) - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_login(hass, login_with_error, cloud_config_entry): + """Test error on login.""" + await hass.config_entries.async_setup(cloud_config_entry.entry_id) + await hass.async_block_till_done() registry = er.async_get(hass) for id in ENTITY_IDS.values(): @@ -166,29 +153,31 @@ def _check_state(hass, category, entity_id): assert "zone_entity_id" not in state.attributes -async def test_setup(hass, two_zone_alarm): # noqa: F811 - """Test entity setup.""" +@pytest.fixture +def _set_utc_time_zone(hass): hass.config.set_time_zone("UTC") - registry = er.async_get(hass) - for id in ENTITY_IDS.values(): - assert not registry.async_is_registered(id) +@pytest.fixture +def _save_mock(): with patch( - "homeassistant.components.risco.RiscoCloud.site_uuid", - new_callable=PropertyMock(return_value=TEST_SITE_UUID), - ), patch( "homeassistant.components.risco.Store.async_save", ) as save_mock: - await setup_risco(hass, TEST_EVENTS) - for id in ENTITY_IDS.values(): - assert registry.async_is_registered(id) + yield save_mock - save_mock.assert_awaited_once_with( - {LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time} - ) - for category, entity_id in ENTITY_IDS.items(): - _check_state(hass, category, entity_id) + +@pytest.mark.parametrize("events", [TEST_EVENTS]) +async def test_cloud_setup( + hass, two_zone_cloud, _set_utc_time_zone, _save_mock, setup_risco_cloud +): + """Test entity setup.""" + registry = er.async_get(hass) + for id in ENTITY_IDS.values(): + assert registry.async_is_registered(id) + + _save_mock.assert_awaited_once_with({LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time}) + for category, entity_id in ENTITY_IDS.items(): + _check_state(hass, category, entity_id) with patch( "homeassistant.components.risco.RiscoCloud.get_events", return_value=[] @@ -202,3 +191,10 @@ async def test_setup(hass, two_zone_alarm): # noqa: F811 for category, entity_id in ENTITY_IDS.items(): _check_state(hass, category, entity_id) + + +async def test_local_setup(hass, setup_risco_local, _no_zones_and_partitions): + """Test entity setup.""" + registry = er.async_get(hass) + for id in ENTITY_IDS.values(): + assert not registry.async_is_registered(id) diff --git a/tests/components/risco/util.py b/tests/components/risco/util.py index 3fa81586d27468034a6efa08795749672013c898..b2600383f2adf73d2570150440826873c7db782f 100644 --- a/tests/components/risco/util.py +++ b/tests/components/risco/util.py @@ -1,74 +1,12 @@ """Utilities for Risco tests.""" -from unittest.mock import MagicMock, PropertyMock, patch +from unittest.mock import AsyncMock, MagicMock -from pytest import fixture - -from homeassistant.components.risco.const import DOMAIN -from homeassistant.const import CONF_PASSWORD, CONF_PIN, CONF_USERNAME - -from tests.common import MockConfigEntry - -TEST_CONFIG = { - CONF_USERNAME: "test-username", - CONF_PASSWORD: "test-password", - CONF_PIN: "1234", -} TEST_SITE_UUID = "test-site-uuid" TEST_SITE_NAME = "test-site-name" -async def setup_risco(hass, events=[], options={}): - """Set up a Risco integration for testing.""" - config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG, options=options) - config_entry.add_to_hass(hass) - - with patch( - "homeassistant.components.risco.RiscoCloud.login", - return_value=True, - ), patch( - "homeassistant.components.risco.RiscoCloud.site_uuid", - new_callable=PropertyMock(return_value=TEST_SITE_UUID), - ), patch( - "homeassistant.components.risco.RiscoCloud.site_name", - new_callable=PropertyMock(return_value=TEST_SITE_NAME), - ), patch( - "homeassistant.components.risco.RiscoCloud.close" - ), patch( - "homeassistant.components.risco.RiscoCloud.get_events", - return_value=events, - ): - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() - - return config_entry - - -def _zone_mock(): +def zone_mock(): + """Return a mocked zone.""" return MagicMock( - triggered=False, - bypassed=False, + triggered=False, bypassed=False, bypass=AsyncMock(return_value=True) ) - - -@fixture -def two_zone_alarm(): - """Fixture to mock alarm with two zones.""" - zone_mocks = {0: _zone_mock(), 1: _zone_mock()} - alarm_mock = MagicMock() - with patch.object( - zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) - ), patch.object( - zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") - ), patch.object( - zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) - ), patch.object( - zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") - ), patch.object( - alarm_mock, - "zones", - new_callable=PropertyMock(return_value=zone_mocks), - ), patch( - "homeassistant.components.risco.RiscoCloud.get_state", - return_value=alarm_mock, - ): - yield alarm_mock