From 4f25296c5024b2ef45a7b31cae04a2e82b96f5e7 Mon Sep 17 00:00:00 2001
From: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
Date: Tue, 11 Mar 2025 10:12:23 +0100
Subject: [PATCH] Improve dependencies resolution (#138502)

* Improve dependencies resolution

* Improve tests

* Better docstrings

* Fix comment

* Improve tests

* Improve logging

* Address feedback

* Address feedback

* Address feedback

* Address feedback

* Address feedback

* Simplify error handling

* small log change

* Add comment

* Address feedback

* shorter comments

* Add test
---
 homeassistant/bootstrap.py | 281 ++++++++++++++++++------------------
 homeassistant/loader.py    | 285 ++++++++++++++++++++++++++-----------
 homeassistant/setup.py     |   2 +-
 tests/test_bootstrap.py    |  15 +-
 tests/test_loader.py       |  62 +++++---
 5 files changed, 398 insertions(+), 247 deletions(-)

diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py
index e301912806c..02a3b8c8fcc 100644
--- a/homeassistant/bootstrap.py
+++ b/homeassistant/bootstrap.py
@@ -93,6 +93,7 @@ from .helpers.dispatcher import async_dispatcher_send_internal
 from .helpers.storage import get_internal_store_manager
 from .helpers.system_info import async_get_system_info
 from .helpers.typing import ConfigType
+from .loader import Integration
 from .setup import (
     # _setup_started is marked as protected to make it clear
     # that it is not part of the public API and should not be used
@@ -711,20 +712,25 @@ def _get_domains(hass: core.HomeAssistant, config: dict[str, Any]) -> set[str]:
     return domains
 
 
-async def _async_resolve_domains_to_setup(
+async def _async_resolve_domains_and_preload(
     hass: core.HomeAssistant, config: dict[str, Any]
-) -> tuple[set[str], dict[str, loader.Integration]]:
-    """Resolve all dependencies and return list of domains to set up."""
+) -> tuple[dict[str, Integration], dict[str, Integration]]:
+    """Resolve all dependencies and return integrations to set up.
+
+    The return value is a tuple of two dictionaries:
+    - The first dictionary contains integrations
+      specified by the configuration (including config entries).
+    - The second dictionary contains the same integrations as the first dictionary
+      together with all their dependencies.
+    """
     domains_to_setup = _get_domains(hass, config)
-    needed_requirements: set[str] = set()
     platform_integrations = conf_util.extract_platform_integrations(
         config, BASE_PLATFORMS
     )
-    # Ensure base platforms that have platform integrations are added to
-    # to `domains_to_setup so they can be setup first instead of
-    # discovering them when later when a config entry setup task
-    # notices its needed and there is already a long line to use
-    # the import executor.
+    # Ensure base platforms that have platform integrations are added to `domains`,
+    # so they can be setup first instead of discovering them later when a config
+    # entry setup task notices that it's needed and there is already a long line
+    # to use the import executor.
     #
     # For example if we have
     # sensor:
@@ -740,111 +746,78 @@ async def _async_resolve_domains_to_setup(
     # so this will be less of a problem in the future.
     domains_to_setup.update(platform_integrations)
 
-    # Load manifests for base platforms and platform based integrations
-    # that are defined under base platforms right away since we do not require
-    # the manifest to list them as dependencies and we want to avoid the lock
-    # contention when multiple integrations try to load them at once
-    additional_manifests_to_load = {
+    # Additionally process base platforms since we do not require the manifest
+    # to list them as dependencies.
+    # We want to later avoid lock contention when multiple integrations try to load
+    # their manifests at once.
+    # Also process integrations that are defined under base platforms
+    # to speed things up.
+    additional_domains_to_process = {
         *BASE_PLATFORMS,
         *chain.from_iterable(platform_integrations.values()),
     }
 
-    translations_to_load = additional_manifests_to_load.copy()
-
     # Resolve all dependencies so we know all integrations
     # that will have to be loaded and start right-away
-    integration_cache: dict[str, loader.Integration] = {}
-    to_resolve: set[str] = domains_to_setup
-    while to_resolve or additional_manifests_to_load:
-        old_to_resolve: set[str] = to_resolve
-        to_resolve = set()
-
-        if additional_manifests_to_load:
-            to_get = {*old_to_resolve, *additional_manifests_to_load}
-            additional_manifests_to_load.clear()
-        else:
-            to_get = old_to_resolve
-
-        manifest_deps: set[str] = set()
-        resolve_dependencies_tasks: list[asyncio.Task[bool]] = []
-        integrations_to_process: list[loader.Integration] = []
-
-        for domain, itg in (await loader.async_get_integrations(hass, to_get)).items():
-            if not isinstance(itg, loader.Integration):
-                continue
-            integration_cache[domain] = itg
-            needed_requirements.update(itg.requirements)
-
-            # Make sure manifests for dependencies are loaded in the next
-            # loop to try to group as many as manifest loads in a single
-            # call to avoid the creating one-off executor jobs later in
-            # the setup process
-            additional_manifests_to_load.update(
-                dep
-                for dep in chain(itg.dependencies, itg.after_dependencies)
-                if dep not in integration_cache
-            )
+    integrations_or_excs = await loader.async_get_integrations(
+        hass, {*domains_to_setup, *additional_domains_to_process}
+    )
+    # Eliminate those missing or with invalid manifest
+    integrations_to_process = {
+        domain: itg
+        for domain, itg in integrations_or_excs.items()
+        if isinstance(itg, Integration)
+    }
+    integrations_dependencies = await loader.resolve_integrations_dependencies(
+        hass, integrations_to_process.values()
+    )
+    # Eliminate those without valid dependencies
+    integrations_to_process = {
+        domain: integrations_to_process[domain] for domain in integrations_dependencies
+    }
 
-            if domain not in old_to_resolve:
-                continue
-
-            integrations_to_process.append(itg)
-            manifest_deps.update(itg.dependencies)
-            manifest_deps.update(itg.after_dependencies)
-            if not itg.all_dependencies_resolved:
-                resolve_dependencies_tasks.append(
-                    create_eager_task(
-                        itg.resolve_dependencies(),
-                        name=f"resolve dependencies {domain}",
-                        loop=hass.loop,
-                    )
-                )
+    integrations_to_setup = {
+        domain: itg
+        for domain, itg in integrations_to_process.items()
+        if domain in domains_to_setup
+    }
+    all_integrations_to_setup = integrations_to_setup.copy()
+    all_integrations_to_setup.update(
+        (dep, loader.async_get_loaded_integration(hass, dep))
+        for domain in integrations_to_setup
+        for dep in integrations_dependencies[domain].difference(
+            all_integrations_to_setup
+        )
+    )
 
-        if unseen_deps := manifest_deps - integration_cache.keys():
-            # If there are dependencies, try to preload all
-            # the integrations manifest at once and add them
-            # to the list of requirements we need to install
-            # so we can try to check if they are already installed
-            # in a single call below which avoids each integration
-            # having to wait for the lock to do it individually
-            deps = await loader.async_get_integrations(hass, unseen_deps)
-            for dependant_domain, dependant_itg in deps.items():
-                if isinstance(dependant_itg, loader.Integration):
-                    integration_cache[dependant_domain] = dependant_itg
-                    needed_requirements.update(dependant_itg.requirements)
-
-        if resolve_dependencies_tasks:
-            await asyncio.gather(*resolve_dependencies_tasks)
-
-        for itg in integrations_to_process:
-            try:
-                all_deps = itg.all_dependencies
-            except RuntimeError:
-                # Integration.all_dependencies raises RuntimeError if
-                # dependencies could not be resolved
-                continue
-            for dep in all_deps:
-                if dep in domains_to_setup:
-                    continue
-                domains_to_setup.add(dep)
-                to_resolve.add(dep)
-
-    _LOGGER.info("Domains to be set up: %s", domains_to_setup)
+    # Gather requirements for all integrations,
+    # their dependencies and after dependencies.
+    # To gather all the requirements we must ignore exceptions here.
+    # The exceptions will be detected and handled later in the bootstrap process.
+    integrations_after_dependencies = (
+        await loader.resolve_integrations_after_dependencies(
+            hass, integrations_to_process.values(), ignore_exceptions=True
+        )
+    )
+    integrations_requirements = {
+        domain: itg.requirements for domain, itg in integrations_to_process.items()
+    }
+    integrations_requirements.update(
+        (dep, loader.async_get_loaded_integration(hass, dep).requirements)
+        for deps in integrations_after_dependencies.values()
+        for dep in deps.difference(integrations_requirements)
+    )
+    all_requirements = set(chain.from_iterable(integrations_requirements.values()))
 
     # Optimistically check if requirements are already installed
     # ahead of setting up the integrations so we can prime the cache
-    # We do not wait for this since its an optimization only
+    # We do not wait for this since it's an optimization only
     hass.async_create_background_task(
-        requirements.async_load_installed_versions(hass, needed_requirements),
+        requirements.async_load_installed_versions(hass, all_requirements),
         "check installed requirements",
         eager_start=True,
     )
 
-    #
-    # Only add the domains_to_setup after we finish resolving
-    # as new domains are likely to added in the process
-    #
-    translations_to_load.update(domains_to_setup)
     # Start loading translations for all integrations we are going to set up
     # in the background so they are ready when we need them. This avoids a
     # lot of waiting for the translation load lock and a thundering herd of
@@ -855,6 +828,7 @@ async def _async_resolve_domains_to_setup(
     # hold the translation load lock and if anything is fast enough to
     # wait for the translation load lock, loading will be done by the
     # time it gets to it.
+    translations_to_load = {*all_integrations_to_setup, *additional_domains_to_process}
     hass.async_create_background_task(
         translation.async_load_integrations(hass, translations_to_load),
         "load translations",
@@ -866,13 +840,13 @@ async def _async_resolve_domains_to_setup(
     # in the setup process.
     hass.async_create_background_task(
         get_internal_store_manager(hass).async_preload(
-            [*PRELOAD_STORAGE, *domains_to_setup]
+            [*PRELOAD_STORAGE, *all_integrations_to_setup]
         ),
         "preload storage",
         eager_start=True,
     )
 
-    return domains_to_setup, integration_cache
+    return integrations_to_setup, all_integrations_to_setup
 
 
 async def _async_set_up_integrations(
@@ -882,69 +856,90 @@ async def _async_set_up_integrations(
     watcher = _WatchPendingSetups(hass, _setup_started(hass))
     watcher.async_start()
 
-    domains_to_setup, integration_cache = await _async_resolve_domains_to_setup(
+    integrations, all_integrations = await _async_resolve_domains_and_preload(
         hass, config
     )
-    stage_2_domains = domains_to_setup.copy()
+    all_domains = set(all_integrations)
+    domains = set(integrations)
+
+    _LOGGER.info(
+        "Domains to be set up: %s | %s",
+        domains,
+        all_domains - domains,
+    )
 
     # Initialize recorder
-    if "recorder" in domains_to_setup:
+    if "recorder" in all_domains:
         recorder.async_initialize_recorder(hass)
 
     # Initialize backup
-    if "backup" in domains_to_setup:
+    if "backup" in all_domains:
         backup.async_initialize_backup(hass)
 
-    stage_0_and_1_domains: list[tuple[str, set[str], int | None]] = [
+    stages: list[tuple[str, set[str], int | None]] = [
         *(
-            (name, domain_group & domains_to_setup, timeout)
+            (name, domain_group, timeout)
             for name, domain_group, timeout in STAGE_0_INTEGRATIONS
         ),
-        ("stage 1", STAGE_1_INTEGRATIONS & domains_to_setup, STAGE_1_TIMEOUT),
+        ("1", STAGE_1_INTEGRATIONS, STAGE_1_TIMEOUT),
+        ("2", domains, STAGE_2_TIMEOUT),
     ]
 
-    _LOGGER.info("Setting up stage 0 and 1")
-    for name, domain_group, timeout in stage_0_and_1_domains:
-        if not domain_group:
+    _LOGGER.info("Setting up stage 0")
+    for name, domain_group, timeout in stages:
+        stage_domains_unfiltered = domain_group & all_domains
+        if not stage_domains_unfiltered:
+            _LOGGER.info("Nothing to set up in stage %s: %s", name, domain_group)
             continue
 
-        _LOGGER.info("Setting up %s: %s", name, domain_group)
-        to_be_loaded = domain_group.copy()
-        to_be_loaded.update(
+        stage_domains = stage_domains_unfiltered - hass.config.components
+        if not stage_domains:
+            _LOGGER.info("Already set up stage %s: %s", name, stage_domains_unfiltered)
+            continue
+
+        stage_dep_domains_unfiltered = {
             dep
-            for domain in domain_group
-            if (integration := integration_cache.get(domain)) is not None
-            for dep in integration.all_dependencies
+            for domain in stage_domains
+            for dep in all_integrations[domain].all_dependencies
+            if dep not in stage_domains
+        }
+        stage_dep_domains = stage_dep_domains_unfiltered - hass.config.components
+
+        stage_all_domains = stage_domains | stage_dep_domains
+        stage_all_integrations = {
+            domain: all_integrations[domain] for domain in stage_all_domains
+        }
+        # Detect all cycles
+        stage_integrations_after_dependencies = (
+            await loader.resolve_integrations_after_dependencies(
+                hass, stage_all_integrations.values(), stage_all_domains
+            )
+        )
+        stage_all_domains = set(stage_integrations_after_dependencies)
+        stage_domains &= stage_all_domains
+        stage_dep_domains &= stage_all_domains
+
+        _LOGGER.info(
+            "Setting up stage %s: %s | %s\nDependencies: %s | %s",
+            name,
+            stage_domains,
+            stage_domains_unfiltered - stage_domains,
+            stage_dep_domains,
+            stage_dep_domains_unfiltered - stage_dep_domains,
         )
-        async_set_domains_to_be_loaded(hass, to_be_loaded)
-        stage_2_domains -= to_be_loaded
-
-        if timeout is None:
-            await _async_setup_multi_components(hass, domain_group, config)
-        else:
-            try:
-                async with hass.timeout.async_timeout(timeout, cool_down=COOLDOWN_TIME):
-                    await _async_setup_multi_components(hass, domain_group, config)
-            except TimeoutError:
-                _LOGGER.warning(
-                    "Setup timed out for %s waiting on %s - moving forward",
-                    name,
-                    hass._active_tasks,  # noqa: SLF001
-                )
 
-    # Add after dependencies when setting up stage 2 domains
-    async_set_domains_to_be_loaded(hass, stage_2_domains)
+        async_set_domains_to_be_loaded(hass, stage_all_domains)
 
-    if stage_2_domains:
-        _LOGGER.info("Setting up stage 2: %s", stage_2_domains)
+        if timeout is None:
+            await _async_setup_multi_components(hass, stage_all_domains, config)
+            continue
         try:
-            async with hass.timeout.async_timeout(
-                STAGE_2_TIMEOUT, cool_down=COOLDOWN_TIME
-            ):
-                await _async_setup_multi_components(hass, stage_2_domains, config)
+            async with hass.timeout.async_timeout(timeout, cool_down=COOLDOWN_TIME):
+                await _async_setup_multi_components(hass, stage_all_domains, config)
         except TimeoutError:
             _LOGGER.warning(
-                "Setup timed out for stage 2 waiting on %s - moving forward",
+                "Setup timed out for stage %s waiting on %s - moving forward",
+                name,
                 hass._active_tasks,  # noqa: SLF001
             )
 
@@ -1046,8 +1041,6 @@ async def _async_setup_multi_components(
     config: dict[str, Any],
 ) -> None:
     """Set up multiple domains. Log on failure."""
-    # Avoid creating tasks for domains that were setup in a previous stage
-    domains_not_yet_setup = domains - hass.config.components
     # Create setup tasks for base platforms first since everything will have
     # to wait to be imported, and the sooner we can get the base platforms
     # loaded the sooner we can start loading the rest of the integrations.
@@ -1057,9 +1050,7 @@ async def _async_setup_multi_components(
             f"setup component {domain}",
             eager_start=True,
         )
-        for domain in sorted(
-            domains_not_yet_setup, key=SETUP_ORDER_SORT_KEY, reverse=True
-        )
+        for domain in sorted(domains, key=SETUP_ORDER_SORT_KEY, reverse=True)
     }
     results = await asyncio.gather(*futures.values(), return_exceptions=True)
     for idx, domain in enumerate(futures):
diff --git a/homeassistant/loader.py b/homeassistant/loader.py
index 3bc33f8374c..20763dc7b30 100644
--- a/homeassistant/loader.py
+++ b/homeassistant/loader.py
@@ -40,6 +40,8 @@ from .generated.ssdp import SSDP
 from .generated.usb import USB
 from .generated.zeroconf import HOMEKIT, ZEROCONF
 from .helpers.json import json_bytes, json_fragment
+from .helpers.typing import UNDEFINED, UndefinedType
+from .util.async_ import create_eager_task
 from .util.hass_dict import HassKey
 from .util.json import JSON_DECODE_EXCEPTIONS, json_loads
 
@@ -758,10 +760,8 @@ class Integration:
         manifest["overwrites_built_in"] = self.overwrites_built_in
 
         if self.dependencies:
-            self._all_dependencies_resolved: bool | None = None
-            self._all_dependencies: set[str] | None = None
+            self._all_dependencies: set[str] | Exception | None = None
         else:
-            self._all_dependencies_resolved = True
             self._all_dependencies = set()
 
         self._platforms_to_preload = hass.data[DATA_PRELOAD_PLATFORMS]
@@ -933,47 +933,25 @@ class Integration:
         """Return all dependencies including sub-dependencies."""
         if self._all_dependencies is None:
             raise RuntimeError("Dependencies not resolved!")
+        if isinstance(self._all_dependencies, Exception):
+            raise self._all_dependencies
 
         return self._all_dependencies
 
     @property
     def all_dependencies_resolved(self) -> bool:
         """Return if all dependencies have been resolved."""
-        return self._all_dependencies_resolved is not None
+        return self._all_dependencies is not None
 
-    async def resolve_dependencies(self) -> bool:
+    async def resolve_dependencies(self) -> set[str] | None:
         """Resolve all dependencies."""
-        if self._all_dependencies_resolved is not None:
-            return self._all_dependencies_resolved
-
-        self._all_dependencies_resolved = False
-        try:
-            dependencies = await _async_component_dependencies(self.hass, self)
-        except IntegrationNotFound as err:
-            _LOGGER.error(
-                (
-                    "Unable to resolve dependencies for %s: unable to resolve"
-                    " (sub)dependency %s"
-                ),
-                self.domain,
-                err.domain,
-            )
-        except CircularDependency as err:
-            _LOGGER.error(
-                (
-                    "Unable to resolve dependencies for %s: it contains a circular"
-                    " dependency: %s -> %s"
-                ),
-                self.domain,
-                err.from_domain,
-                err.to_domain,
-            )
-        else:
-            dependencies.discard(self.domain)
-            self._all_dependencies = dependencies
-            self._all_dependencies_resolved = True
+        if self._all_dependencies is not None:
+            if isinstance(self._all_dependencies, Exception):
+                return None
+            return self._all_dependencies
 
-        return self._all_dependencies_resolved
+        result = await resolve_integrations_dependencies(self.hass, (self,))
+        return result.get(self.domain)
 
     async def async_get_component(self) -> ComponentProtocol:
         """Return the component.
@@ -1441,6 +1419,189 @@ async def async_get_integrations(
     return results
 
 
+class _ResolveDependenciesCacheProtocol(Protocol):
+    def get(self, itg: Integration) -> set[str] | Exception | None: ...
+
+    def __setitem__(
+        self, itg: Integration, all_dependencies: set[str] | Exception
+    ) -> None: ...
+
+
+class _ResolveDependenciesCache(_ResolveDependenciesCacheProtocol):
+    """Cache for resolve_integrations_dependencies."""
+
+    def get(self, itg: Integration) -> set[str] | Exception | None:
+        return itg._all_dependencies  # noqa: SLF001
+
+    def __setitem__(
+        self, itg: Integration, all_dependencies: set[str] | Exception
+    ) -> None:
+        itg._all_dependencies = all_dependencies  # noqa: SLF001
+
+
+async def resolve_integrations_dependencies(
+    hass: HomeAssistant, integrations: Iterable[Integration]
+) -> dict[str, set[str]]:
+    """Resolve all dependencies for integrations.
+
+    Detects circular dependencies and missing integrations.
+    """
+    resolved = _ResolveDependenciesCache()
+
+    async def _resolve_deps_catch_exceptions(itg: Integration) -> set[str] | None:
+        try:
+            return await _do_resolve_dependencies(itg, cache=resolved)
+        except Exception as exc:  # noqa: BLE001
+            _LOGGER.error("Unable to resolve dependencies for %s: %s", itg.domain, exc)
+            return None
+
+    resolve_dependencies_tasks = {
+        itg.domain: create_eager_task(
+            _resolve_deps_catch_exceptions(itg),
+            name=f"resolve dependencies {itg.domain}",
+            loop=hass.loop,
+        )
+        for itg in integrations
+    }
+
+    result = await asyncio.gather(*resolve_dependencies_tasks.values())
+
+    return {
+        domain: deps
+        for domain, deps in zip(resolve_dependencies_tasks, result, strict=True)
+        if deps is not None
+    }
+
+
+async def resolve_integrations_after_dependencies(
+    hass: HomeAssistant,
+    integrations: Iterable[Integration],
+    possible_after_dependencies: set[str] | None = None,
+    *,
+    ignore_exceptions: bool = False,
+) -> dict[str, set[str]]:
+    """Resolve all dependencies, including after_dependencies, for integrations.
+
+    Detects circular dependencies and missing integrations.
+    """
+    resolved: dict[Integration, set[str] | Exception] = {}
+
+    async def _resolve_deps_catch_exceptions(itg: Integration) -> set[str] | None:
+        try:
+            return await _do_resolve_dependencies(
+                itg,
+                cache=resolved,
+                possible_after_dependencies=possible_after_dependencies,
+                ignore_exceptions=ignore_exceptions,
+            )
+        except Exception as exc:  # noqa: BLE001
+            _LOGGER.error(
+                "Unable to resolve (after) dependencies for %s: %s", itg.domain, exc
+            )
+            return None
+
+    resolve_dependencies_tasks = {
+        itg.domain: create_eager_task(
+            _resolve_deps_catch_exceptions(itg),
+            name=f"resolve after dependencies {itg.domain}",
+            loop=hass.loop,
+        )
+        for itg in integrations
+    }
+
+    result = await asyncio.gather(*resolve_dependencies_tasks.values())
+
+    return {
+        domain: deps
+        for domain, deps in zip(resolve_dependencies_tasks, result, strict=True)
+        if deps is not None
+    }
+
+
+async def _do_resolve_dependencies(
+    itg: Integration,
+    *,
+    cache: _ResolveDependenciesCacheProtocol,
+    possible_after_dependencies: set[str] | None | UndefinedType = UNDEFINED,
+    ignore_exceptions: bool = False,
+) -> set[str]:
+    """Recursively resolve all dependencies.
+
+    Uses `cache` to cache the results.
+
+    If `possible_after_dependencies` is not UNDEFINED,
+    listed after dependencies are also considered.
+    If `possible_after_dependencies` is None,
+    all the possible after dependencies are considered.
+
+    If `ignore_exceptions` is True, exceptions are caught and ignored
+    and the normal resolution algorithm continues.
+    Otherwise, exceptions are raised.
+    """
+    resolved = cache
+    resolving: set[str] = set()
+
+    async def do_resolve_dependencies_impl(itg: Integration) -> set[str]:
+        domain = itg.domain
+
+        # If it's already resolved, no point doing it again.
+        if (result := resolved.get(itg)) is not None:
+            if isinstance(result, Exception):
+                raise result
+            return result
+
+        # If we are already resolving it, we have a circular dependency.
+        if domain in resolving:
+            if ignore_exceptions:
+                resolved[itg] = set()
+                return set()
+            exc = CircularDependency([domain])
+            resolved[itg] = exc
+            raise exc
+
+        resolving.add(domain)
+
+        dependencies_domains = set(itg.dependencies)
+        if possible_after_dependencies is not UNDEFINED:
+            if possible_after_dependencies is None:
+                after_dependencies: Iterable[str] = itg.after_dependencies
+            else:
+                after_dependencies = (
+                    set(itg.after_dependencies) & possible_after_dependencies
+                )
+            dependencies_domains.update(after_dependencies)
+        dependencies = await async_get_integrations(itg.hass, dependencies_domains)
+
+        all_dependencies: set[str] = set()
+        for dep_domain, dep_integration in dependencies.items():
+            if isinstance(dep_integration, Exception):
+                if ignore_exceptions:
+                    continue
+                resolved[itg] = dep_integration
+                raise dep_integration
+
+            all_dependencies.add(dep_domain)
+
+            try:
+                dep_dependencies = await do_resolve_dependencies_impl(dep_integration)
+            except CircularDependency as exc:
+                exc.extend_cycle(domain)
+                resolved[itg] = exc
+                raise
+            except Exception as exc:
+                resolved[itg] = exc
+                raise
+
+            all_dependencies.update(dep_dependencies)
+
+        resolving.remove(domain)
+
+        resolved[itg] = all_dependencies
+        return all_dependencies
+
+    return await do_resolve_dependencies_impl(itg)
+
+
 class LoaderError(Exception):
     """Loader base error."""
 
@@ -1466,11 +1627,13 @@ class IntegrationNotLoaded(LoaderError):
 class CircularDependency(LoaderError):
     """Raised when a circular dependency is found when resolving components."""
 
-    def __init__(self, from_domain: str | set[str], to_domain: str) -> None:
+    def __init__(self, domain_cycle: list[str]) -> None:
         """Initialize circular dependency error."""
-        super().__init__(f"Circular dependency detected: {from_domain} -> {to_domain}.")
-        self.from_domain = from_domain
-        self.to_domain = to_domain
+        super().__init__("Circular dependency detected", domain_cycle)
+
+    def extend_cycle(self, domain: str) -> None:
+        """Extend the cycle with the domain."""
+        self.args[1].insert(0, domain)
 
 
 def _load_file(
@@ -1624,50 +1787,6 @@ def bind_hass[_CallableT: Callable[..., Any]](func: _CallableT) -> _CallableT:
     return func
 
 
-async def _async_component_dependencies(
-    hass: HomeAssistant,
-    integration: Integration,
-) -> set[str]:
-    """Get component dependencies."""
-    loading: set[str] = set()
-    loaded: set[str] = set()
-
-    async def component_dependencies_impl(integration: Integration) -> None:
-        """Recursively get component dependencies."""
-        domain = integration.domain
-        if not (dependencies := integration.dependencies):
-            loaded.add(domain)
-            return
-
-        loading.add(domain)
-        dep_integrations = await async_get_integrations(hass, dependencies)
-        for dependency_domain, dep_integration in dep_integrations.items():
-            if isinstance(dep_integration, Exception):
-                raise dep_integration
-
-            # If we are already loading it, we have a circular dependency.
-            # We have to check it here to make sure that every integration that
-            # depends on us, does not appear in our own after_dependencies.
-            if conflict := loading.intersection(dep_integration.after_dependencies):
-                raise CircularDependency(conflict, dependency_domain)
-
-            # If we have already loaded it, no point doing it again.
-            if dependency_domain in loaded:
-                continue
-
-            # If we are already loading it, we have a circular dependency.
-            if dependency_domain in loading:
-                raise CircularDependency(dependency_domain, domain)
-
-            await component_dependencies_impl(dep_integration)
-        loading.remove(domain)
-        loaded.add(domain)
-
-    await component_dependencies_impl(integration)
-
-    return loaded
-
-
 def _async_mount_config_dir(hass: HomeAssistant) -> None:
     """Mount config dir in order to load custom_component.
 
diff --git a/homeassistant/setup.py b/homeassistant/setup.py
index dc4d0988b91..9572136559a 100644
--- a/homeassistant/setup.py
+++ b/homeassistant/setup.py
@@ -323,7 +323,7 @@ async def _async_setup_component(
             translation.async_load_integrations(hass, integration_set), loop=hass.loop
         )
     # Validate all dependencies exist and there are no circular dependencies
-    if not await integration.resolve_dependencies():
+    if await integration.resolve_dependencies() is None:
         return False
 
     # Process requirements as soon as possible, so we can import the component
diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py
index e89d038f8ce..050963316dc 100644
--- a/tests/test_bootstrap.py
+++ b/tests/test_bootstrap.py
@@ -572,7 +572,7 @@ async def test_setup_after_deps_not_present(hass: HomeAssistant) -> None:
         MockModule(
             domain="second_dep",
             async_setup=gen_domain_setup("second_dep"),
-            partial_manifest={"after_dependencies": ["first_dep"]},
+            partial_manifest={"after_dependencies": ["first_dep", "root"]},
         ),
     )
 
@@ -1169,6 +1169,7 @@ async def test_bootstrap_is_cancellation_safe(
     hass: HomeAssistant, caplog: pytest.LogCaptureFixture
 ) -> None:
     """Test cancellation during async_setup_component does not cancel bootstrap."""
+    mock_integration(hass, MockModule(domain="cancel_integration"))
     with patch.object(
         bootstrap, "async_setup_component", side_effect=asyncio.CancelledError
     ):
@@ -1185,6 +1186,18 @@ async def test_bootstrap_empty_integrations(hass: HomeAssistant) -> None:
     await hass.async_block_till_done()
 
 
+@pytest.mark.parametrize("load_registries", [False])
+async def test_bootstrap_log_already_setup_stage(
+    hass: HomeAssistant, caplog: pytest.LogCaptureFixture
+) -> None:
+    """Test logging when all integrations in a stage were already setup."""
+    with patch.object(bootstrap, "STAGE_1_INTEGRATIONS", {"frontend"}):
+        await bootstrap._async_set_up_integrations(hass, {})
+        await hass.async_block_till_done()
+
+    assert "Already set up stage 1: {'frontend'}" in caplog.text
+
+
 @pytest.fixture(name="mock_mqtt_config_flow")
 def mock_mqtt_config_flow_fixture() -> Generator[None]:
     """Mock MQTT config flow."""
diff --git a/tests/test_loader.py b/tests/test_loader.py
index 548091a3503..0b83ddee3ea 100644
--- a/tests/test_loader.py
+++ b/tests/test_loader.py
@@ -27,33 +27,42 @@ async def test_circular_component_dependencies(hass: HomeAssistant) -> None:
     mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
     mock_integration(hass, MockModule("mod3", dependencies=["mod1"]))
     mod_4 = mock_integration(hass, MockModule("mod4", dependencies=["mod2", "mod3"]))
+    all_domains = {"mod1", "mod2", "mod3", "mod4"}
 
-    deps = await loader._async_component_dependencies(hass, mod_4)
-    assert deps == {"mod1", "mod2", "mod3", "mod4"}
+    deps = await loader._do_resolve_dependencies(mod_4, cache={})
+    assert deps == {"mod1", "mod2", "mod3"}
 
     # Create a circular dependency
     mock_integration(hass, MockModule("mod1", dependencies=["mod4"]))
     with pytest.raises(loader.CircularDependency):
-        await loader._async_component_dependencies(hass, mod_4)
+        await loader._do_resolve_dependencies(mod_4, cache={})
 
     # Create a different circular dependency
     mock_integration(hass, MockModule("mod1", dependencies=["mod3"]))
     with pytest.raises(loader.CircularDependency):
-        await loader._async_component_dependencies(hass, mod_4)
+        await loader._do_resolve_dependencies(mod_4, cache={})
 
     # Create a circular after_dependency
     mock_integration(
         hass, MockModule("mod1", partial_manifest={"after_dependencies": ["mod4"]})
     )
     with pytest.raises(loader.CircularDependency):
-        await loader._async_component_dependencies(hass, mod_4)
+        await loader._do_resolve_dependencies(
+            mod_4,
+            cache={},
+            possible_after_dependencies=all_domains,
+        )
 
     # Create a different circular after_dependency
     mock_integration(
         hass, MockModule("mod1", partial_manifest={"after_dependencies": ["mod3"]})
     )
     with pytest.raises(loader.CircularDependency):
-        await loader._async_component_dependencies(hass, mod_4)
+        await loader._do_resolve_dependencies(
+            mod_4,
+            cache={},
+            possible_after_dependencies=all_domains,
+        )
 
     # Create a circular after_dependency without a hard dependency
     mock_integration(
@@ -62,29 +71,48 @@ async def test_circular_component_dependencies(hass: HomeAssistant) -> None:
     mod_4 = mock_integration(
         hass, MockModule("mod4", partial_manifest={"after_dependencies": ["mod2"]})
     )
-    # this currently doesn't raise, but it should. Will be improved in a follow-up.
-    await loader._async_component_dependencies(hass, mod_4)
+    with pytest.raises(loader.CircularDependency):
+        await loader._do_resolve_dependencies(
+            mod_4,
+            cache={},
+            possible_after_dependencies=all_domains,
+        )
+
+    result = await loader.resolve_integrations_after_dependencies(hass, (mod_4,))
+    assert result == {}
+    result = await loader.resolve_integrations_after_dependencies(
+        hass, (mod_4,), ignore_exceptions=True
+    )
+    assert result["mod4"] == {"mod4", "mod2", "mod1"}
 
 
 async def test_nonexistent_component_dependencies(hass: HomeAssistant) -> None:
     """Test if we can detect nonexistent dependencies of components."""
     mod_1 = mock_integration(hass, MockModule("mod1", dependencies=["nonexistent"]))
-    with pytest.raises(loader.IntegrationNotFound):
-        await loader._async_component_dependencies(hass, mod_1)
     mod_2 = mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
 
-    assert not await mod_2.resolve_dependencies()
+    assert await mod_2.resolve_dependencies() is None
     assert mod_2.all_dependencies_resolved
-    with pytest.raises(RuntimeError):
+    with pytest.raises(loader.IntegrationNotFound):
         mod_2.all_dependencies  # noqa: B018
 
-    # this currently is not resolved, because intermediate results are not cached
-    # this will be improved in a follow-up
-    assert not mod_1.all_dependencies_resolved
-    assert not await mod_1.resolve_dependencies()
-    with pytest.raises(RuntimeError):
+    assert mod_1.all_dependencies_resolved
+    assert await mod_1.resolve_dependencies() is None
+    with pytest.raises(loader.IntegrationNotFound):
         mod_1.all_dependencies  # noqa: B018
 
+    result = await loader.resolve_integrations_dependencies(hass, (mod_2, mod_1))
+    assert result == {}
+
+    mod_1 = mock_integration(
+        hass,
+        MockModule("mod1", partial_manifest={"after_dependencies": ["non.existent"]}),
+    )
+    mod_2 = mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
+
+    result = await loader.resolve_integrations_after_dependencies(hass, (mod_2, mod_1))
+    assert result == {}
+
 
 def test_component_loader(hass: HomeAssistant) -> None:
     """Test loading components."""
-- 
GitLab