Skip to content
Snippets Groups Projects
Unverified Commit 9348569f authored by Brynley McDonald's avatar Brynley McDonald Committed by GitHub
Browse files

Update Flick Electric API (#133475)

parent 4a9d545f
No related branches found
No related tags found
No related merge requests found
Showing
with 1045 additions and 81 deletions
...@@ -20,7 +20,8 @@ from homeassistant.const import ( ...@@ -20,7 +20,8 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import aiohttp_client from homeassistant.helpers import aiohttp_client
from .const import CONF_TOKEN_EXPIRY, DOMAIN from .const import CONF_ACCOUNT_ID, CONF_SUPPLY_NODE_REF, CONF_TOKEN_EXPIRY
from .coordinator import FlickConfigEntry, FlickElectricDataCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
...@@ -29,24 +30,67 @@ CONF_ID_TOKEN = "id_token" ...@@ -29,24 +30,67 @@ CONF_ID_TOKEN = "id_token"
PLATFORMS = [Platform.SENSOR] PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: FlickConfigEntry) -> bool:
"""Set up Flick Electric from a config entry.""" """Set up Flick Electric from a config entry."""
auth = HassFlickAuth(hass, entry) auth = HassFlickAuth(hass, entry)
hass.data.setdefault(DOMAIN, {}) coordinator = FlickElectricDataCoordinator(
hass.data[DOMAIN][entry.entry_id] = FlickAPI(auth) hass, FlickAPI(auth), entry.data[CONF_SUPPLY_NODE_REF]
)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: FlickConfigEntry) -> bool:
"""Unload a config entry.""" """Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Migrate old entry."""
_LOGGER.debug(
"Migrating configuration from version %s.%s",
config_entry.version,
config_entry.minor_version,
)
if config_entry.version > 2:
return False
if config_entry.version == 1:
api = FlickAPI(HassFlickAuth(hass, config_entry))
accounts = await api.getCustomerAccounts()
active_accounts = [
account for account in accounts if account["status"] == "active"
]
# A single active account can be auto-migrated
if (len(active_accounts)) == 1:
account = active_accounts[0]
new_data = {**config_entry.data}
new_data[CONF_ACCOUNT_ID] = account["id"]
new_data[CONF_SUPPLY_NODE_REF] = account["main_consumer"]["supply_node_ref"]
hass.config_entries.async_update_entry(
config_entry,
title=account["address"],
unique_id=account["id"],
data=new_data,
version=2,
)
return True
config_entry.async_start_reauth(hass, data={**config_entry.data})
return False
return True
class HassFlickAuth(AbstractFlickAuth): class HassFlickAuth(AbstractFlickAuth):
......
"""Config Flow for Flick Electric integration.""" """Config Flow for Flick Electric integration."""
import asyncio import asyncio
from collections.abc import Mapping
import logging import logging
from typing import Any from typing import Any
from pyflick.authentication import AuthException, SimpleFlickAuth from aiohttp import ClientResponseError
from pyflick import FlickAPI
from pyflick.authentication import AbstractFlickAuth, SimpleFlickAuth
from pyflick.const import DEFAULT_CLIENT_ID, DEFAULT_CLIENT_SECRET from pyflick.const import DEFAULT_CLIENT_ID, DEFAULT_CLIENT_SECRET
from pyflick.types import APIException, AuthException, CustomerAccount
import voluptuous as vol import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlow, ConfigFlowResult
from homeassistant.const import ( from homeassistant.const import (
CONF_CLIENT_ID, CONF_CLIENT_ID,
CONF_CLIENT_SECRET, CONF_CLIENT_SECRET,
...@@ -17,12 +21,18 @@ from homeassistant.const import ( ...@@ -17,12 +21,18 @@ from homeassistant.const import (
) )
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import aiohttp_client from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from .const import DOMAIN from .const import CONF_ACCOUNT_ID, CONF_SUPPLY_NODE_REF, DOMAIN
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DATA_SCHEMA = vol.Schema( LOGIN_SCHEMA = vol.Schema(
{ {
vol.Required(CONF_USERNAME): str, vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str, vol.Required(CONF_PASSWORD): str,
...@@ -35,10 +45,13 @@ DATA_SCHEMA = vol.Schema( ...@@ -35,10 +45,13 @@ DATA_SCHEMA = vol.Schema(
class FlickConfigFlow(ConfigFlow, domain=DOMAIN): class FlickConfigFlow(ConfigFlow, domain=DOMAIN):
"""Flick config flow.""" """Flick config flow."""
VERSION = 1 VERSION = 2
auth: AbstractFlickAuth
accounts: list[CustomerAccount]
data: dict[str, Any]
async def _validate_input(self, user_input): async def _validate_auth(self, user_input: Mapping[str, Any]) -> bool:
auth = SimpleFlickAuth( self.auth = SimpleFlickAuth(
username=user_input[CONF_USERNAME], username=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD], password=user_input[CONF_PASSWORD],
websession=aiohttp_client.async_get_clientsession(self.hass), websession=aiohttp_client.async_get_clientsession(self.hass),
...@@ -48,22 +61,83 @@ class FlickConfigFlow(ConfigFlow, domain=DOMAIN): ...@@ -48,22 +61,83 @@ class FlickConfigFlow(ConfigFlow, domain=DOMAIN):
try: try:
async with asyncio.timeout(60): async with asyncio.timeout(60):
token = await auth.async_get_access_token() token = await self.auth.async_get_access_token()
except TimeoutError as err: except (TimeoutError, ClientResponseError) as err:
raise CannotConnect from err raise CannotConnect from err
except AuthException as err: except AuthException as err:
raise InvalidAuth from err raise InvalidAuth from err
return token is not None return token is not None
async def async_step_select_account(
self, user_input: Mapping[str, Any] | None = None
) -> ConfigFlowResult:
"""Ask user to select account."""
errors = {}
if user_input is not None and CONF_ACCOUNT_ID in user_input:
self.data[CONF_ACCOUNT_ID] = user_input[CONF_ACCOUNT_ID]
self.data[CONF_SUPPLY_NODE_REF] = self._get_supply_node_ref(
user_input[CONF_ACCOUNT_ID]
)
try:
# Ensure supply node is active
await FlickAPI(self.auth).getPricing(self.data[CONF_SUPPLY_NODE_REF])
except (APIException, ClientResponseError):
errors["base"] = "cannot_connect"
except AuthException:
# We should never get here as we have a valid token
return self.async_abort(reason="no_permissions")
else:
# Supply node is active
return await self._async_create_entry()
try:
self.accounts = await FlickAPI(self.auth).getCustomerAccounts()
except (APIException, ClientResponseError):
errors["base"] = "cannot_connect"
active_accounts = [a for a in self.accounts if a["status"] == "active"]
if len(active_accounts) == 0:
return self.async_abort(reason="no_accounts")
if len(active_accounts) == 1:
self.data[CONF_ACCOUNT_ID] = active_accounts[0]["id"]
self.data[CONF_SUPPLY_NODE_REF] = self._get_supply_node_ref(
active_accounts[0]["id"]
)
return await self._async_create_entry()
return self.async_show_form(
step_id="select_account",
data_schema=vol.Schema(
{
vol.Required(CONF_ACCOUNT_ID): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(
value=account["id"], label=account["address"]
)
for account in active_accounts
],
mode=SelectSelectorMode.LIST,
)
)
}
),
errors=errors,
)
async def async_step_user( async def async_step_user(
self, user_input: dict[str, Any] | None = None self, user_input: Mapping[str, Any] | None = None
) -> ConfigFlowResult: ) -> ConfigFlowResult:
"""Handle gathering login info.""" """Handle gathering login info."""
errors = {} errors = {}
if user_input is not None: if user_input is not None:
try: try:
await self._validate_input(user_input) await self._validate_auth(user_input)
except CannotConnect: except CannotConnect:
errors["base"] = "cannot_connect" errors["base"] = "cannot_connect"
except InvalidAuth: except InvalidAuth:
...@@ -72,20 +146,61 @@ class FlickConfigFlow(ConfigFlow, domain=DOMAIN): ...@@ -72,20 +146,61 @@ class FlickConfigFlow(ConfigFlow, domain=DOMAIN):
_LOGGER.exception("Unexpected exception") _LOGGER.exception("Unexpected exception")
errors["base"] = "unknown" errors["base"] = "unknown"
else: else:
await self.async_set_unique_id( self.data = dict(user_input)
f"flick_electric_{user_input[CONF_USERNAME]}" return await self.async_step_select_account(user_input)
)
self._abort_if_unique_id_configured() return self.async_show_form(
step_id="user", data_schema=LOGIN_SCHEMA, errors=errors
)
async def async_step_reauth(
self, user_input: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle re-authentication."""
self.data = {**user_input}
return await self.async_step_user(user_input)
return self.async_create_entry( async def _async_create_entry(self) -> ConfigFlowResult:
title=f"Flick Electric: {user_input[CONF_USERNAME]}", """Create an entry for the flow."""
data=user_input,
await self.async_set_unique_id(self.data[CONF_ACCOUNT_ID])
account = self._get_account(self.data[CONF_ACCOUNT_ID])
if self.source == SOURCE_REAUTH:
# Migration completed
if self._get_reauth_entry().version == 1:
self.hass.config_entries.async_update_entry(
self._get_reauth_entry(),
unique_id=self.unique_id,
data=self.data,
version=self.VERSION,
) )
return self.async_show_form( return self.async_update_reload_and_abort(
step_id="user", data_schema=DATA_SCHEMA, errors=errors self._get_reauth_entry(),
unique_id=self.unique_id,
title=account["address"],
data=self.data,
)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=account["address"],
data=self.data,
) )
def _get_account(self, account_id: str) -> CustomerAccount:
"""Get the account for the account ID."""
return next(a for a in self.accounts if a["id"] == account_id)
def _get_supply_node_ref(self, account_id: str) -> str:
"""Get the supply node ref for the account."""
return self._get_account(account_id)["main_consumer"][CONF_SUPPLY_NODE_REF]
class CannotConnect(HomeAssistantError): class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect.""" """Error to indicate we cannot connect."""
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
DOMAIN = "flick_electric" DOMAIN = "flick_electric"
CONF_TOKEN_EXPIRY = "expires" CONF_TOKEN_EXPIRY = "expires"
CONF_ACCOUNT_ID = "account_id"
CONF_SUPPLY_NODE_REF = "supply_node_ref"
ATTR_START_AT = "start_at" ATTR_START_AT = "start_at"
ATTR_END_AT = "end_at" ATTR_END_AT = "end_at"
......
"""Data Coordinator for Flick Electric."""
import asyncio
from datetime import timedelta
import logging
import aiohttp
from pyflick import FlickAPI, FlickPrice
from pyflick.types import APIException, AuthException
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5)
type FlickConfigEntry = ConfigEntry[FlickElectricDataCoordinator]
class FlickElectricDataCoordinator(DataUpdateCoordinator[FlickPrice]):
"""Coordinator for flick power price."""
def __init__(
self, hass: HomeAssistant, api: FlickAPI, supply_node_ref: str
) -> None:
"""Initialize FlickElectricDataCoordinator."""
super().__init__(
hass,
_LOGGER,
name="Flick Electric",
update_interval=SCAN_INTERVAL,
)
self.supply_node_ref = supply_node_ref
self._api = api
async def _async_update_data(self) -> FlickPrice:
"""Fetch pricing data from Flick Electric."""
try:
async with asyncio.timeout(60):
return await self._api.getPricing(self.supply_node_ref)
except AuthException as err:
raise ConfigEntryAuthFailed from err
except (APIException, aiohttp.ClientResponseError) as err:
raise UpdateFailed from err
...@@ -7,5 +7,5 @@ ...@@ -7,5 +7,5 @@
"integration_type": "service", "integration_type": "service",
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"loggers": ["pyflick"], "loggers": ["pyflick"],
"requirements": ["PyFlick==0.0.2"] "requirements": ["PyFlick==1.1.2"]
} }
"""Support for Flick Electric Pricing data.""" """Support for Flick Electric Pricing data."""
import asyncio
from datetime import timedelta from datetime import timedelta
from decimal import Decimal
import logging import logging
from typing import Any from typing import Any
from pyflick import FlickAPI, FlickPrice
from homeassistant.components.sensor import SensorEntity from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CURRENCY_CENT, UnitOfEnergy from homeassistant.const import CURRENCY_CENT, UnitOfEnergy
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utcnow from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import ATTR_COMPONENTS, ATTR_END_AT, ATTR_START_AT, DOMAIN from .const import ATTR_COMPONENTS, ATTR_END_AT, ATTR_START_AT
from .coordinator import FlickConfigEntry, FlickElectricDataCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5) SCAN_INTERVAL = timedelta(minutes=5)
async def async_setup_entry( async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback hass: HomeAssistant,
entry: FlickConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None: ) -> None:
"""Flick Sensor Setup.""" """Flick Sensor Setup."""
api: FlickAPI = hass.data[DOMAIN][entry.entry_id] coordinator = entry.runtime_data
async_add_entities([FlickPricingSensor(api)], True) async_add_entities([FlickPricingSensor(coordinator)])
class FlickPricingSensor(SensorEntity): class FlickPricingSensor(CoordinatorEntity[FlickElectricDataCoordinator], SensorEntity):
"""Entity object for Flick Electric sensor.""" """Entity object for Flick Electric sensor."""
_attr_attribution = "Data provided by Flick Electric" _attr_attribution = "Data provided by Flick Electric"
_attr_native_unit_of_measurement = f"{CURRENCY_CENT}/{UnitOfEnergy.KILO_WATT_HOUR}" _attr_native_unit_of_measurement = f"{CURRENCY_CENT}/{UnitOfEnergy.KILO_WATT_HOUR}"
_attr_has_entity_name = True _attr_has_entity_name = True
_attr_translation_key = "power_price" _attr_translation_key = "power_price"
_attributes: dict[str, Any] = {}
def __init__(self, api: FlickAPI) -> None: def __init__(self, coordinator: FlickElectricDataCoordinator) -> None:
"""Entity object for Flick Electric sensor.""" """Entity object for Flick Electric sensor."""
self._api: FlickAPI = api super().__init__(coordinator)
self._price: FlickPrice = None
self._attr_unique_id = f"{coordinator.supply_node_ref}_pricing"
@property @property
def native_value(self): def native_value(self) -> Decimal:
"""Return the state of the sensor.""" """Return the state of the sensor."""
return self._price.price # The API should return a unit price with quantity of 1.0 when no start/end time is provided
if self.coordinator.data.quantity != 1:
_LOGGER.warning(
"Unexpected quantity for unit price: %s", self.coordinator.data
)
return self.coordinator.data.cost
@property @property
def extra_state_attributes(self): def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return the state attributes.""" """Return the state attributes."""
return self._attributes components: dict[str, Decimal] = {}
async def async_update(self) -> None:
"""Get the Flick Pricing data from the web service."""
if self._price and self._price.end_at >= utcnow():
return # Power price data is still valid
async with asyncio.timeout(60): for component in self.coordinator.data.components:
self._price = await self._api.getPricing()
_LOGGER.debug("Pricing data: %s", self._price)
self._attributes[ATTR_START_AT] = self._price.start_at
self._attributes[ATTR_END_AT] = self._price.end_at
for component in self._price.components:
if component.charge_setter not in ATTR_COMPONENTS: if component.charge_setter not in ATTR_COMPONENTS:
_LOGGER.warning("Found unknown component: %s", component.charge_setter) _LOGGER.warning("Found unknown component: %s", component.charge_setter)
continue continue
self._attributes[component.charge_setter] = float(component.value) components[component.charge_setter] = component.value
return {
ATTR_START_AT: self.coordinator.data.start_at,
ATTR_END_AT: self.coordinator.data.end_at,
**components,
}
...@@ -9,6 +9,12 @@ ...@@ -9,6 +9,12 @@
"client_id": "Client ID (optional)", "client_id": "Client ID (optional)",
"client_secret": "Client Secret (optional)" "client_secret": "Client Secret (optional)"
} }
},
"select_account": {
"title": "Select account",
"data": {
"account_id": "Account"
}
} }
}, },
"error": { "error": {
...@@ -17,7 +23,10 @@ ...@@ -17,7 +23,10 @@
"unknown": "[%key:common::config_flow::error::unknown%]" "unknown": "[%key:common::config_flow::error::unknown%]"
}, },
"abort": { "abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]" "already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"no_permissions": "Cannot get pricing for this account. Please check user permissions.",
"no_accounts": "No services are active on this Flick account"
} }
}, },
"entity": { "entity": {
......
...@@ -48,7 +48,7 @@ ProgettiHWSW==0.1.3 ...@@ -48,7 +48,7 @@ ProgettiHWSW==0.1.3
PyChromecast==14.0.5 PyChromecast==14.0.5
# homeassistant.components.flick_electric # homeassistant.components.flick_electric
PyFlick==0.0.2 PyFlick==1.1.2
# homeassistant.components.flume # homeassistant.components.flume
PyFlume==0.6.5 PyFlume==0.6.5
......
...@@ -45,7 +45,7 @@ ProgettiHWSW==0.1.3 ...@@ -45,7 +45,7 @@ ProgettiHWSW==0.1.3
PyChromecast==14.0.5 PyChromecast==14.0.5
# homeassistant.components.flick_electric # homeassistant.components.flick_electric
PyFlick==0.0.2 PyFlick==1.1.2
# homeassistant.components.flume # homeassistant.components.flume
PyFlume==0.6.5 PyFlume==0.6.5
......
"""Tests for the Flick Electric integration.""" """Tests for the Flick Electric integration."""
from pyflick.types import FlickPrice
from homeassistant.components.flick_electric.const import (
CONF_ACCOUNT_ID,
CONF_SUPPLY_NODE_REF,
)
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
CONF = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_ACCOUNT_ID: "1234",
CONF_SUPPLY_NODE_REF: "123",
}
def _mock_flick_price():
return FlickPrice(
{
"cost": "0.25",
"quantity": "1.0",
"status": "final",
"start_at": "2024-01-01T00:00:00Z",
"end_at": "2024-01-01T00:00:00Z",
"type": "flat",
"components": [
{
"charge_method": "kwh",
"charge_setter": "network",
"value": "1.00",
"single_unit_price": "1.00",
"quantity": "1.0",
"unit_code": "NZD",
"charge_per": "kwh",
"flow_direction": "import",
},
{
"charge_method": "kwh",
"charge_setter": "nonsupported",
"value": "1.00",
"single_unit_price": "1.00",
"quantity": "1.0",
"unit_code": "NZD",
"charge_per": "kwh",
"flow_direction": "import",
},
],
}
)
This diff is collapsed.
"""Test the Flick Electric config flow."""
from unittest.mock import patch
from pyflick.authentication import AuthException
from homeassistant.components.flick_electric.const import CONF_ACCOUNT_ID, DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from . import CONF, _mock_flick_price
from tests.common import MockConfigEntry
async def test_init_auth_failure_triggers_auth(hass: HomeAssistant) -> None:
"""Test reauth flow is triggered when username/password is wrong."""
with (
patch(
"homeassistant.components.flick_electric.HassFlickAuth.async_get_access_token",
side_effect=AuthException,
),
):
entry = MockConfigEntry(
domain=DOMAIN,
data={**CONF},
title="123 Fake St",
unique_id="1234",
version=2,
)
entry.add_to_hass(hass)
# Ensure setup fails
assert not await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.SETUP_ERROR
# Ensure reauth flow is triggered
await hass.async_block_till_done()
assert len(hass.config_entries.flow.async_progress()) == 1
async def test_init_migration_single_account(hass: HomeAssistant) -> None:
"""Test migration with single account."""
with (
patch(
"homeassistant.components.flick_electric.HassFlickAuth.async_get_access_token",
return_value="123456789abcdef",
),
patch(
"homeassistant.components.flick_electric.FlickAPI.getCustomerAccounts",
return_value=[
{
"id": "1234",
"status": "active",
"address": "123 Fake St",
"main_consumer": {"supply_node_ref": "123"},
}
],
),
patch(
"homeassistant.components.flick_electric.FlickAPI.getPricing",
return_value=_mock_flick_price(),
),
):
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_USERNAME: CONF[CONF_USERNAME],
CONF_PASSWORD: CONF[CONF_PASSWORD],
},
title=CONF_USERNAME,
unique_id=CONF_USERNAME,
version=1,
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.config_entries.flow.async_progress()) == 0
assert entry.state is ConfigEntryState.LOADED
assert entry.version == 2
assert entry.unique_id == CONF[CONF_ACCOUNT_ID]
assert entry.data == CONF
async def test_init_migration_multi_account_reauth(hass: HomeAssistant) -> None:
"""Test migration triggers reauth with multiple accounts."""
with (
patch(
"homeassistant.components.flick_electric.HassFlickAuth.async_get_access_token",
return_value="123456789abcdef",
),
patch(
"homeassistant.components.flick_electric.FlickAPI.getCustomerAccounts",
return_value=[
{
"id": "1234",
"status": "active",
"address": "123 Fake St",
"main_consumer": {"supply_node_ref": "123"},
},
{
"id": "5678",
"status": "active",
"address": "456 Fake St",
"main_consumer": {"supply_node_ref": "456"},
},
],
),
patch(
"homeassistant.components.flick_electric.FlickAPI.getPricing",
return_value=_mock_flick_price(),
),
):
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_USERNAME: CONF[CONF_USERNAME],
CONF_PASSWORD: CONF[CONF_PASSWORD],
},
title=CONF_USERNAME,
unique_id=CONF_USERNAME,
version=1,
)
entry.add_to_hass(hass)
# ensure setup fails
assert not await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.MIGRATION_ERROR
await hass.async_block_till_done()
# Ensure reauth flow is triggered
await hass.async_block_till_done()
assert len(hass.config_entries.flow.async_progress()) == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment