diff --git a/homeassistant/components/recorder/history/legacy.py b/homeassistant/components/recorder/history/legacy.py
index e9a435f96248037b0ccaecb953e88afcab9f4cb0..74b17d9daa72e46116a6e2bc6b1c5a555eced896 100644
--- a/homeassistant/components/recorder/history/legacy.py
+++ b/homeassistant/components/recorder/history/legacy.py
@@ -24,13 +24,16 @@ from ... import recorder
 from ..db_schema import RecorderRuns, StateAttributes, States
 from ..filters import Filters
 from ..models import (
-    LazyState,
     process_datetime_to_timestamp,
     process_timestamp,
     process_timestamp_to_utc_isoformat,
-    row_to_compressed_state,
 )
-from ..models.legacy import LazyStatePreSchema31, row_to_compressed_state_pre_schema_31
+from ..models.legacy import (
+    LegacyLazyState,
+    LegacyLazyStatePreSchema31,
+    legacy_row_to_compressed_state,
+    legacy_row_to_compressed_state_pre_schema_31,
+)
 from ..util import execute_stmt_lambda_element, session_scope
 from .common import _schema_version
 from .const import (
@@ -713,17 +716,17 @@ def _sorted_states_to_dict(
     ]
     if compressed_state_format:
         if schema_version >= 31:
-            state_class = row_to_compressed_state
+            state_class = legacy_row_to_compressed_state
         else:
-            state_class = row_to_compressed_state_pre_schema_31
+            state_class = legacy_row_to_compressed_state_pre_schema_31
         _process_timestamp = process_datetime_to_timestamp
         attr_time = COMPRESSED_STATE_LAST_UPDATED
         attr_state = COMPRESSED_STATE_STATE
     else:
         if schema_version >= 31:
-            state_class = LazyState
+            state_class = LegacyLazyState
         else:
-            state_class = LazyStatePreSchema31
+            state_class = LegacyLazyStatePreSchema31
         _process_timestamp = process_timestamp_to_utc_isoformat
         attr_time = LAST_CHANGED_KEY
         attr_state = STATE_KEY
diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py
index 75a99c6e5023647727a95361993717c54f3cbf6b..74f613c2cec324a7364c6c96471807a0d1f7d970 100644
--- a/homeassistant/components/recorder/history/modern.py
+++ b/homeassistant/components/recorder/history/modern.py
@@ -1,29 +1,38 @@
 """Provide pre-made queries on top of the recorder component."""
 from __future__ import annotations
 
-from collections import defaultdict
 from collections.abc import Callable, Iterable, Iterator, MutableMapping
 from datetime import datetime
 from itertools import groupby
 from operator import itemgetter
 from typing import Any, cast
 
-from sqlalchemy import Column, and_, func, lambda_stmt, select
+from sqlalchemy import (
+    CompoundSelect,
+    Integer,
+    Select,
+    Subquery,
+    and_,
+    func,
+    lambda_stmt,
+    literal,
+    select,
+    union_all,
+)
+from sqlalchemy.dialects import postgresql
 from sqlalchemy.engine.row import Row
-from sqlalchemy.orm.properties import MappedColumn
 from sqlalchemy.orm.session import Session
-from sqlalchemy.sql.expression import literal
-from sqlalchemy.sql.lambdas import StatementLambdaElement
 
 from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE
 from homeassistant.core import HomeAssistant, State, split_entity_id
 import homeassistant.util.dt as dt_util
 
 from ... import recorder
-from ..db_schema import RecorderRuns, StateAttributes, States
+from ..db_schema import StateAttributes, States
 from ..filters import Filters
 from ..models import (
     LazyState,
+    datetime_to_timestamp_or_none,
     extract_metadata_ids,
     process_timestamp,
     row_to_compressed_state,
@@ -36,57 +45,66 @@ from .const import (
     STATE_KEY,
 )
 
-_BASE_STATES = (
-    States.metadata_id,
-    States.state,
-    States.last_changed_ts,
-    States.last_updated_ts,
-)
-_BASE_STATES_NO_LAST_CHANGED = (  # type: ignore[var-annotated]
-    States.metadata_id,
-    States.state,
-    literal(value=None).label("last_changed_ts"),
-    States.last_updated_ts,
-)
-_QUERY_STATE_NO_ATTR = (*_BASE_STATES,)
-_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = (*_BASE_STATES_NO_LAST_CHANGED,)
-_QUERY_STATES = (
-    *_BASE_STATES,
-    # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
-    States.attributes,
-    StateAttributes.shared_attrs,
-)
-_QUERY_STATES_NO_LAST_CHANGED = (
-    *_BASE_STATES_NO_LAST_CHANGED,
-    # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
-    States.attributes,
-    StateAttributes.shared_attrs,
-)
 _FIELD_MAP = {
-    cast(MappedColumn, field).name: idx
-    for idx, field in enumerate(_QUERY_STATE_NO_ATTR)
+    "metadata_id": 0,
+    "state": 1,
+    "last_updated_ts": 2,
 }
 
 
-def _lambda_stmt_and_join_attributes(
-    no_attributes: bool, include_last_changed: bool = True
-) -> StatementLambdaElement:
-    """Return the lambda_stmt and if StateAttributes should be joined.
+CASTABLE_DOUBLE_TYPE = (
+    # MySQL/MariaDB < 10.4+ does not support casting to DOUBLE so we have to use Integer instead but it doesn't
+    # matter because we don't use the value as its always set to NULL
+    #
+    # sqlalchemy.exc.SAWarning: Datatype DOUBLE does not support CAST on MySQL/MariaDb; the CAST will be skipped.
+    #
+    Integer().with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
+)
 
-    Because these are lambda_stmt the values inside the lambdas need
-    to be explicitly written out to avoid caching the wrong values.
-    """
-    # If no_attributes was requested we do the query
-    # without the attributes fields and do not join the
-    # state_attributes table
-    if no_attributes:
-        if include_last_changed:
-            return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR))
-        return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED))
 
+def _stmt_and_join_attributes(
+    no_attributes: bool, include_last_changed: bool
+) -> Select:
+    """Return the statement and if StateAttributes should be joined."""
+    _select = select(States.metadata_id, States.state, States.last_updated_ts)
+    if include_last_changed:
+        _select = _select.add_columns(States.last_changed_ts)
+    if not no_attributes:
+        _select = _select.add_columns(States.attributes, StateAttributes.shared_attrs)
+    return _select
+
+
+def _stmt_and_join_attributes_for_start_state(
+    no_attributes: bool, include_last_changed: bool
+) -> Select:
+    """Return the statement and if StateAttributes should be joined."""
+    _select = select(States.metadata_id, States.state)
+    _select = _select.add_columns(
+        literal(value=None).label("last_updated_ts").cast(CASTABLE_DOUBLE_TYPE)
+    )
+    if include_last_changed:
+        _select = _select.add_columns(
+            literal(value=None).label("last_changed_ts").cast(CASTABLE_DOUBLE_TYPE)
+        )
+    if not no_attributes:
+        _select = _select.add_columns(States.attributes, StateAttributes.shared_attrs)
+    return _select
+
+
+def _select_from_subquery(
+    subquery: Subquery | CompoundSelect, no_attributes: bool, include_last_changed: bool
+) -> Select:
+    """Return the statement to select from the union."""
+    base_select = select(
+        subquery.c.metadata_id,
+        subquery.c.state,
+        subquery.c.last_updated_ts,
+    )
     if include_last_changed:
-        return lambda_stmt(lambda: select(*_QUERY_STATES))
-    return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED))
+        base_select = base_select.add_columns(subquery.c.last_changed_ts)
+    if no_attributes:
+        return base_select
+    return base_select.add_columns(subquery.c.attributes, subquery.c.shared_attrs)
 
 
 def get_significant_states(
@@ -119,38 +137,65 @@ def get_significant_states(
 
 
 def _significant_states_stmt(
-    start_time: datetime,
-    end_time: datetime | None,
+    start_time_ts: float,
+    end_time_ts: float | None,
+    single_metadata_id: int | None,
     metadata_ids: list[int],
     metadata_ids_in_significant_domains: list[int],
     significant_changes_only: bool,
     no_attributes: bool,
-) -> StatementLambdaElement:
+    include_start_time_state: bool,
+    run_start_ts: float | None,
+) -> Select | CompoundSelect:
     """Query the database for significant state changes."""
-    stmt = _lambda_stmt_and_join_attributes(
-        no_attributes, include_last_changed=not significant_changes_only
-    )
+    include_last_changed = not significant_changes_only
+    stmt = _stmt_and_join_attributes(no_attributes, include_last_changed)
     if significant_changes_only:
         # Since we are filtering on entity_id (metadata_id) we can avoid
         # the join of the states_meta table since we already know which
         # metadata_ids are in the significant domains.
-        stmt += lambda q: q.filter(
-            States.metadata_id.in_(metadata_ids_in_significant_domains)
-            | (States.last_changed_ts == States.last_updated_ts)
-            | States.last_changed_ts.is_(None)
-        )
-    stmt += lambda q: q.filter(States.metadata_id.in_(metadata_ids))
-    start_time_ts = start_time.timestamp()
-    stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts)
-    if end_time:
-        end_time_ts = end_time.timestamp()
-        stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
+        if metadata_ids_in_significant_domains:
+            stmt = stmt.filter(
+                States.metadata_id.in_(metadata_ids_in_significant_domains)
+                | (States.last_changed_ts == States.last_updated_ts)
+                | States.last_changed_ts.is_(None)
+            )
+        else:
+            stmt = stmt.filter(
+                (States.last_changed_ts == States.last_updated_ts)
+                | States.last_changed_ts.is_(None)
+            )
+    stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter(
+        States.last_updated_ts > start_time_ts
+    )
+    if end_time_ts:
+        stmt = stmt.filter(States.last_updated_ts < end_time_ts)
     if not no_attributes:
-        stmt += lambda q: q.outerjoin(
+        stmt = stmt.outerjoin(
             StateAttributes, States.attributes_id == StateAttributes.attributes_id
         )
-    stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts)
-    return stmt
+    stmt = stmt.order_by(States.metadata_id, States.last_updated_ts)
+    if not include_start_time_state or not run_start_ts:
+        return stmt
+    return _select_from_subquery(
+        union_all(
+            _select_from_subquery(
+                _get_start_time_state_stmt(
+                    run_start_ts,
+                    start_time_ts,
+                    single_metadata_id,
+                    metadata_ids,
+                    no_attributes,
+                    include_last_changed,
+                ).subquery(),
+                no_attributes,
+                include_last_changed,
+            ),
+            _select_from_subquery(stmt.subquery(), no_attributes, include_last_changed),
+        ).subquery(),
+        no_attributes,
+        include_last_changed,
+    )
 
 
 def get_significant_states_with_session(
@@ -181,7 +226,6 @@ def get_significant_states_with_session(
         raise NotImplementedError("Filters are no longer supported")
     if not entity_ids:
         raise ValueError("entity_ids must be provided")
-    metadata_ids: list[int] | None = None
     entity_id_to_metadata_id: dict[str, int | None] | None = None
     metadata_ids_in_significant_domains: list[int] = []
     instance = recorder.get_instance(hass)
@@ -189,8 +233,9 @@ def get_significant_states_with_session(
         entity_id_to_metadata_id := instance.states_meta_manager.get_many(
             entity_ids, session, False
         )
-    ) or not (metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)):
+    ) or not (possible_metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)):
         return {}
+    metadata_ids = possible_metadata_ids
     if significant_changes_only:
         metadata_ids_in_significant_domains = [
             metadata_id
@@ -198,25 +243,41 @@ def get_significant_states_with_session(
             if metadata_id is not None
             and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS
         ]
-    stmt = _significant_states_stmt(
-        start_time,
-        end_time,
-        metadata_ids,
-        metadata_ids_in_significant_domains,
-        significant_changes_only,
-        no_attributes,
+    run_start_ts: float | None = None
+    if include_start_time_state and not (
+        run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
+    ):
+        include_start_time_state = False
+    start_time_ts = dt_util.utc_to_timestamp(start_time)
+    end_time_ts = datetime_to_timestamp_or_none(end_time)
+    single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None
+    stmt = lambda_stmt(
+        lambda: _significant_states_stmt(
+            start_time_ts,
+            end_time_ts,
+            single_metadata_id,
+            metadata_ids,
+            metadata_ids_in_significant_domains,
+            significant_changes_only,
+            no_attributes,
+            include_start_time_state,
+            run_start_ts,
+        ),
+        track_on=[
+            bool(single_metadata_id),
+            bool(metadata_ids_in_significant_domains),
+            bool(end_time_ts),
+            significant_changes_only,
+            no_attributes,
+            include_start_time_state,
+        ],
     )
-    states = execute_stmt_lambda_element(session, stmt, None, end_time)
     return _sorted_states_to_dict(
-        hass,
-        session,
-        states,
-        start_time,
+        execute_stmt_lambda_element(session, stmt, None, end_time),
+        start_time_ts if include_start_time_state else None,
         entity_ids,
         entity_id_to_metadata_id,
-        include_start_time_state,
         minimal_response,
-        no_attributes,
         compressed_state_format,
     )
 
@@ -255,38 +316,60 @@ def get_full_significant_states_with_session(
 
 
 def _state_changed_during_period_stmt(
-    start_time: datetime,
-    end_time: datetime | None,
-    metadata_id: int | None,
+    start_time_ts: float,
+    end_time_ts: float | None,
+    single_metadata_id: int,
     no_attributes: bool,
-    descending: bool,
     limit: int | None,
-) -> StatementLambdaElement:
-    stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=False)
-    start_time_ts = start_time.timestamp()
-    stmt += lambda q: q.filter(
-        (
-            (States.last_changed_ts == States.last_updated_ts)
-            | States.last_changed_ts.is_(None)
+    include_start_time_state: bool,
+    run_start_ts: float | None,
+) -> Select | CompoundSelect:
+    stmt = (
+        _stmt_and_join_attributes(no_attributes, False)
+        .filter(
+            (
+                (States.last_changed_ts == States.last_updated_ts)
+                | States.last_changed_ts.is_(None)
+            )
+            & (States.last_updated_ts > start_time_ts)
         )
-        & (States.last_updated_ts > start_time_ts)
+        .filter(States.metadata_id == single_metadata_id)
     )
-    if end_time:
-        end_time_ts = end_time.timestamp()
-        stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
-    if metadata_id:
-        stmt += lambda q: q.filter(States.metadata_id == metadata_id)
+    if end_time_ts:
+        stmt = stmt.filter(States.last_updated_ts < end_time_ts)
     if not no_attributes:
-        stmt += lambda q: q.outerjoin(
+        stmt = stmt.outerjoin(
             StateAttributes, States.attributes_id == StateAttributes.attributes_id
         )
-    if descending:
-        stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts.desc())
-    else:
-        stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts)
     if limit:
-        stmt += lambda q: q.limit(limit)
-    return stmt
+        stmt = stmt.limit(limit)
+    stmt = stmt.order_by(
+        States.metadata_id,
+        States.last_updated_ts,
+    )
+    if not include_start_time_state or not run_start_ts:
+        return stmt
+    return _select_from_subquery(
+        union_all(
+            _select_from_subquery(
+                _get_single_entity_start_time_stmt(
+                    start_time_ts,
+                    single_metadata_id,
+                    no_attributes,
+                    False,
+                ).subquery(),
+                no_attributes,
+                False,
+            ),
+            _select_from_subquery(
+                stmt.subquery(),
+                no_attributes,
+                False,
+            ),
+        ).subquery(),
+        no_attributes,
+        False,
+    )
 
 
 def state_changes_during_period(
@@ -305,42 +388,57 @@ def state_changes_during_period(
     entity_ids = [entity_id.lower()]
 
     with session_scope(hass=hass, read_only=True) as session:
-        metadata_id: int | None = None
         instance = recorder.get_instance(hass)
         if not (
-            metadata_id := instance.states_meta_manager.get(entity_id, session, False)
+            possible_metadata_id := instance.states_meta_manager.get(
+                entity_id, session, False
+            )
         ):
             return {}
-        entity_id_to_metadata_id: dict[str, int | None] = {entity_id: metadata_id}
-        stmt = _state_changed_during_period_stmt(
-            start_time,
-            end_time,
-            metadata_id,
-            no_attributes,
-            descending,
-            limit,
+        single_metadata_id = possible_metadata_id
+        entity_id_to_metadata_id: dict[str, int | None] = {
+            entity_id: single_metadata_id
+        }
+        run_start_ts: float | None = None
+        if include_start_time_state and not (
+            run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
+        ):
+            include_start_time_state = False
+        start_time_ts = dt_util.utc_to_timestamp(start_time)
+        end_time_ts = datetime_to_timestamp_or_none(end_time)
+        stmt = lambda_stmt(
+            lambda: _state_changed_during_period_stmt(
+                start_time_ts,
+                end_time_ts,
+                single_metadata_id,
+                no_attributes,
+                limit,
+                include_start_time_state,
+                run_start_ts,
+            ),
+            track_on=[
+                bool(end_time_ts),
+                no_attributes,
+                bool(limit),
+                include_start_time_state,
+            ],
         )
-        states = execute_stmt_lambda_element(session, stmt, None, end_time)
         return cast(
             MutableMapping[str, list[State]],
             _sorted_states_to_dict(
-                hass,
-                session,
-                states,
-                start_time,
+                execute_stmt_lambda_element(session, stmt, None, end_time),
+                start_time_ts if include_start_time_state else None,
                 entity_ids,
                 entity_id_to_metadata_id,
-                include_start_time_state=include_start_time_state,
+                descending=descending,
             ),
         )
 
 
-def _get_last_state_changes_stmt(
-    number_of_states: int, metadata_id: int
-) -> StatementLambdaElement:
-    stmt = _lambda_stmt_and_join_attributes(False, include_last_changed=False)
-    if number_of_states == 1:
-        stmt += lambda q: q.join(
+def _get_last_state_changes_single_stmt(metadata_id: int) -> Select:
+    return (
+        _stmt_and_join_attributes(False, False)
+        .join(
             (
                 lastest_state_for_metadata_id := (
                     select(
@@ -360,8 +458,19 @@ def _get_last_state_changes_stmt(
                 == lastest_state_for_metadata_id.c.max_last_updated,
             ),
         )
-    else:
-        stmt += lambda q: q.where(
+        .outerjoin(
+            StateAttributes, States.attributes_id == StateAttributes.attributes_id
+        )
+        .order_by(States.state_id.desc())
+    )
+
+
+def _get_last_state_changes_multiple_stmt(
+    number_of_states: int, metadata_id: int
+) -> Select:
+    return (
+        _stmt_and_join_attributes(False, False)
+        .where(
             States.state_id
             == (
                 select(States.state_id)
@@ -371,10 +480,11 @@ def _get_last_state_changes_stmt(
                 .subquery()
             ).c.state_id
         )
-    stmt += lambda q: q.outerjoin(
-        StateAttributes, States.attributes_id == StateAttributes.attributes_id
-    ).order_by(States.state_id.desc())
-    return stmt
+        .outerjoin(
+            StateAttributes, States.attributes_id == StateAttributes.attributes_id
+        )
+        .order_by(States.state_id.desc())
+    )
 
 
 def get_last_state_changes(
@@ -391,39 +501,48 @@ def get_last_state_changes(
     with session_scope(hass=hass, read_only=True) as session:
         instance = recorder.get_instance(hass)
         if not (
-            metadata_id := instance.states_meta_manager.get(entity_id, session, False)
+            possible_metadata_id := instance.states_meta_manager.get(
+                entity_id, session, False
+            )
         ):
             return {}
+        metadata_id = possible_metadata_id
         entity_id_to_metadata_id: dict[str, int | None] = {entity_id_lower: metadata_id}
-        stmt = _get_last_state_changes_stmt(number_of_states, metadata_id)
+        if number_of_states == 1:
+            stmt = lambda_stmt(
+                lambda: _get_last_state_changes_single_stmt(metadata_id),
+            )
+        else:
+            stmt = lambda_stmt(
+                lambda: _get_last_state_changes_multiple_stmt(
+                    number_of_states, metadata_id
+                ),
+            )
         states = list(execute_stmt_lambda_element(session, stmt))
         return cast(
             MutableMapping[str, list[State]],
             _sorted_states_to_dict(
-                hass,
-                session,
                 reversed(states),
-                dt_util.utcnow(),
+                None,
                 entity_ids,
                 entity_id_to_metadata_id,
-                include_start_time_state=False,
             ),
         )
 
 
-def _get_states_for_entities_stmt(
-    run_start: datetime,
-    utc_point_in_time: datetime,
+def _get_start_time_state_for_entities_stmt(
+    run_start_ts: float,
+    epoch_time: float,
     metadata_ids: list[int],
     no_attributes: bool,
-) -> StatementLambdaElement:
+    include_last_changed: bool,
+) -> Select:
     """Baked query to get states for specific entities."""
-    stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True)
     # We got an include-list of entities, accelerate the query by filtering already
     # in the inner query.
-    run_start_ts = process_timestamp(run_start).timestamp()
-    utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
-    stmt += lambda q: q.join(
+    stmt = _stmt_and_join_attributes_for_start_state(
+        no_attributes, include_last_changed
+    ).join(
         (
             most_recent_states_for_entities_by_date := (
                 select(
@@ -434,7 +553,7 @@ def _get_states_for_entities_stmt(
                 )
                 .filter(
                     (States.last_updated_ts >= run_start_ts)
-                    & (States.last_updated_ts < utc_point_in_time_ts)
+                    & (States.last_updated_ts < epoch_time)
                 )
                 .filter(States.metadata_id.in_(metadata_ids))
                 .group_by(States.metadata_id)
@@ -448,89 +567,88 @@ def _get_states_for_entities_stmt(
             == most_recent_states_for_entities_by_date.c.max_last_updated,
         ),
     )
-    if not no_attributes:
-        stmt += lambda q: q.outerjoin(
-            StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
-        )
-    return stmt
-
+    if no_attributes:
+        return stmt
+    return stmt.outerjoin(
+        StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
+    )
 
-def _get_rows_with_session(
-    hass: HomeAssistant,
-    session: Session,
-    utc_point_in_time: datetime,
-    entity_ids: list[str],
-    entity_id_to_metadata_id: dict[str, int | None] | None = None,
-    run: RecorderRuns | None = None,
-    no_attributes: bool = False,
-) -> Iterable[Row]:
-    """Return the states at a specific point in time."""
-    if len(entity_ids) == 1:
-        if not entity_id_to_metadata_id or not (
-            metadata_id := entity_id_to_metadata_id.get(entity_ids[0])
-        ):
-            return []
-        return execute_stmt_lambda_element(
-            session,
-            _get_single_entity_states_stmt(
-                utc_point_in_time, metadata_id, no_attributes
-            ),
-        )
 
-    if run is None:
-        run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
+def _get_run_start_ts_for_utc_point_in_time(
+    hass: HomeAssistant, utc_point_in_time: datetime
+) -> float | None:
+    """Return the start time of a run."""
+    run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
+    if (
+        run is not None
+        and (run_start := process_timestamp(run.start)) < utc_point_in_time
+    ):
+        return run_start.timestamp()
+    # History did not run before utc_point_in_time but we still
+    return None
 
-    if run is None or process_timestamp(run.start) > utc_point_in_time:
-        # History did not run before utc_point_in_time
-        return []
 
+def _get_start_time_state_stmt(
+    run_start_ts: float,
+    epoch_time: float,
+    single_metadata_id: int | None,
+    metadata_ids: list[int],
+    no_attributes: bool,
+    include_last_changed: bool,
+) -> Select:
+    """Return the states at a specific point in time."""
+    if single_metadata_id:
+        # Use an entirely different (and extremely fast) query if we only
+        # have a single entity id
+        return _get_single_entity_start_time_stmt(
+            epoch_time,
+            single_metadata_id,
+            no_attributes,
+            include_last_changed,
+        )
     # We have more than one entity to look at so we need to do a query on states
     # since the last recorder run started.
-    if not entity_id_to_metadata_id or not (
-        metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)
-    ):
-        return []
-    stmt = _get_states_for_entities_stmt(
-        run.start, utc_point_in_time, metadata_ids, no_attributes
+    return _get_start_time_state_for_entities_stmt(
+        run_start_ts,
+        epoch_time,
+        metadata_ids,
+        no_attributes,
+        include_last_changed,
     )
-    return execute_stmt_lambda_element(session, stmt)
 
 
-def _get_single_entity_states_stmt(
-    utc_point_in_time: datetime,
+def _get_single_entity_start_time_stmt(
+    epoch_time: float,
     metadata_id: int,
-    no_attributes: bool = False,
-) -> StatementLambdaElement:
+    no_attributes: bool,
+    include_last_changed: bool,
+) -> Select:
     # Use an entirely different (and extremely fast) query if we only
     # have a single entity id
-    stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True)
-    utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
-    stmt += (
-        lambda q: q.filter(
-            States.last_updated_ts < utc_point_in_time_ts,
+    stmt = (
+        _stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed)
+        .filter(
+            States.last_updated_ts < epoch_time,
             States.metadata_id == metadata_id,
         )
         .order_by(States.last_updated_ts.desc())
         .limit(1)
     )
-    if not no_attributes:
-        stmt += lambda q: q.outerjoin(
-            StateAttributes, States.attributes_id == StateAttributes.attributes_id
-        )
-    return stmt
+    if no_attributes:
+        return stmt
+    return stmt.outerjoin(
+        StateAttributes, States.attributes_id == StateAttributes.attributes_id
+    )
 
 
 def _sorted_states_to_dict(
-    hass: HomeAssistant,
-    session: Session,
     states: Iterable[Row],
-    start_time: datetime,
+    start_time_ts: float | None,
     entity_ids: list[str],
     entity_id_to_metadata_id: dict[str, int | None],
-    include_start_time_state: bool = True,
     minimal_response: bool = False,
-    no_attributes: bool = False,
     compressed_state_format: bool = False,
+    descending: bool = False,
 ) -> MutableMapping[str, list[State | dict[str, Any]]]:
     """Convert SQL results into JSON friendly data structure.
 
@@ -545,7 +663,8 @@ def _sorted_states_to_dict(
     """
     field_map = _FIELD_MAP
     state_class: Callable[
-        [Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any]
+        [Row, dict[str, dict[str, Any]], float | None, str, str, float | None],
+        State | dict[str, Any],
     ]
     if compressed_state_format:
         state_class = row_to_compressed_state
@@ -556,32 +675,15 @@ def _sorted_states_to_dict(
         attr_time = LAST_CHANGED_KEY
         attr_state = STATE_KEY
 
-    result: dict[str, list[State | dict[str, Any]]] = defaultdict(list)
-    metadata_id_to_entity_id: dict[int, str] = {}
-    metadata_id_idx = field_map["metadata_id"]
-
     # Set all entity IDs to empty lists in result set to maintain the order
-    for ent_id in entity_ids:
-        result[ent_id] = []
-
+    result: dict[str, list[State | dict[str, Any]]] = {
+        entity_id: [] for entity_id in entity_ids
+    }
+    metadata_id_to_entity_id: dict[int, str] = {}
     metadata_id_to_entity_id = {
         v: k for k, v in entity_id_to_metadata_id.items() if v is not None
     }
     # Get the states at the start time
-    initial_states: dict[int, Row] = {}
-    if include_start_time_state:
-        initial_states = {
-            row[metadata_id_idx]: row
-            for row in _get_rows_with_session(
-                hass,
-                session,
-                start_time,
-                entity_ids,
-                entity_id_to_metadata_id,
-                no_attributes=no_attributes,
-            )
-        }
-
     if len(entity_ids) == 1:
         metadata_id = entity_id_to_metadata_id[entity_ids[0]]
         assert metadata_id is not None  # should not be possible if we got here
@@ -589,30 +691,35 @@ def _sorted_states_to_dict(
             (metadata_id, iter(states)),
         )
     else:
-        key_func = itemgetter(metadata_id_idx)
+        key_func = itemgetter(field_map["metadata_id"])
         states_iter = groupby(states, key_func)
 
+    state_idx = field_map["state"]
+    last_updated_ts_idx = field_map["last_updated_ts"]
+
     # Append all changes to it
     for metadata_id, group in states_iter:
+        entity_id = metadata_id_to_entity_id[metadata_id]
         attr_cache: dict[str, dict[str, Any]] = {}
-        prev_state: Column | str | None = None
-        if not (entity_id := metadata_id_to_entity_id.get(metadata_id)):
-            continue
         ent_results = result[entity_id]
-        if row := initial_states.pop(metadata_id, None):
-            prev_state = row.state
-            ent_results.append(state_class(row, attr_cache, start_time, entity_id=entity_id))  # type: ignore[call-arg]
-
         if (
             not minimal_response
             or split_entity_id(entity_id)[0] in NEED_ATTRIBUTE_DOMAINS
         ):
             ent_results.extend(
-                state_class(db_state, attr_cache, None, entity_id=entity_id)  # type: ignore[call-arg]
+                state_class(
+                    db_state,
+                    attr_cache,
+                    start_time_ts,
+                    entity_id,
+                    db_state[state_idx],
+                    db_state[last_updated_ts_idx],
+                )
                 for db_state in group
             )
             continue
 
+        prev_state: str | None = None
         # With minimal response we only provide a native
         # State for the first and last response. All the states
         # in-between only provide the "state" and the
@@ -620,14 +727,18 @@ def _sorted_states_to_dict(
         if not ent_results:
             if (first_state := next(group, None)) is None:
                 continue
-            prev_state = first_state.state
+            prev_state = first_state[state_idx]
             ent_results.append(
-                state_class(first_state, attr_cache, None, entity_id=entity_id)  # type: ignore[call-arg]
+                state_class(
+                    first_state,
+                    attr_cache,
+                    start_time_ts,
+                    entity_id,
+                    prev_state,  # type: ignore[arg-type]
+                    first_state[last_updated_ts_idx],
+                )
             )
 
-        state_idx = field_map["state"]
-        last_updated_ts_idx = field_map["last_updated_ts"]
-
         #
         # minimal_response only makes sense with last_updated == last_updated
         #
@@ -658,13 +769,9 @@ def _sorted_states_to_dict(
             if (state := row[state_idx]) != prev_state
         )
 
-    # If there are no states beyond the initial state,
-    # the state a was never popped from initial_states
-    for metadata_id, row in initial_states.items():
-        if entity_id := metadata_id_to_entity_id.get(metadata_id):
-            result[entity_id].append(
-                state_class(row, {}, start_time, entity_id=entity_id)  # type: ignore[call-arg]
-            )
+    if descending:
+        for ent_results in result.values():
+            ent_results.reverse()
 
     # Filter out the empty lists if some states had 0 results.
     return {key: val for key, val in result.items() if val}
diff --git a/homeassistant/components/recorder/models/legacy.py b/homeassistant/components/recorder/models/legacy.py
index c26e51777203fad8250edcb9e5345cc3e692ab75..8a093472afb7e2c90d4c60cb5d7af62c1b5c2b7c 100644
--- a/homeassistant/components/recorder/models/legacy.py
+++ b/homeassistant/components/recorder/models/legacy.py
@@ -13,6 +13,7 @@ from homeassistant.const import (
     COMPRESSED_STATE_STATE,
 )
 from homeassistant.core import Context, State
+import homeassistant.util.dt as dt_util
 
 from .state_attributes import decode_attributes_from_row
 from .time import (
@@ -24,7 +25,7 @@ from .time import (
 # pylint: disable=invalid-name
 
 
-class LazyStatePreSchema31(State):
+class LegacyLazyStatePreSchema31(State):
     """A lazy version of core State before schema 31."""
 
     __slots__ = [
@@ -138,7 +139,7 @@ class LazyStatePreSchema31(State):
         }
 
 
-def row_to_compressed_state_pre_schema_31(
+def legacy_row_to_compressed_state_pre_schema_31(
     row: Row,
     attr_cache: dict[str, dict[str, Any]],
     start_time: datetime | None,
@@ -162,3 +163,125 @@ def row_to_compressed_state_pre_schema_31(
                 row_changed_changed
             )
     return comp_state
+
+
+class LegacyLazyState(State):
+    """A lazy version of core State after schema 31."""
+
+    __slots__ = [
+        "_row",
+        "_attributes",
+        "_last_changed_ts",
+        "_last_updated_ts",
+        "_context",
+        "attr_cache",
+    ]
+
+    def __init__(  # pylint: disable=super-init-not-called
+        self,
+        row: Row,
+        attr_cache: dict[str, dict[str, Any]],
+        start_time: datetime | None,
+        entity_id: str | None = None,
+    ) -> None:
+        """Init the lazy state."""
+        self._row = row
+        self.entity_id = entity_id or self._row.entity_id
+        self.state = self._row.state or ""
+        self._attributes: dict[str, Any] | None = None
+        self._last_updated_ts: float | None = self._row.last_updated_ts or (
+            dt_util.utc_to_timestamp(start_time) if start_time else None
+        )
+        self._last_changed_ts: float | None = (
+            self._row.last_changed_ts or self._last_updated_ts
+        )
+        self._context: Context | None = None
+        self.attr_cache = attr_cache
+
+    @property  # type: ignore[override]
+    def attributes(self) -> dict[str, Any]:
+        """State attributes."""
+        if self._attributes is None:
+            self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
+        return self._attributes
+
+    @attributes.setter
+    def attributes(self, value: dict[str, Any]) -> None:
+        """Set attributes."""
+        self._attributes = value
+
+    @property
+    def context(self) -> Context:
+        """State context."""
+        if self._context is None:
+            self._context = Context(id=None)
+        return self._context
+
+    @context.setter
+    def context(self, value: Context) -> None:
+        """Set context."""
+        self._context = value
+
+    @property
+    def last_changed(self) -> datetime:
+        """Last changed datetime."""
+        assert self._last_changed_ts is not None
+        return dt_util.utc_from_timestamp(self._last_changed_ts)
+
+    @last_changed.setter
+    def last_changed(self, value: datetime) -> None:
+        """Set last changed datetime."""
+        self._last_changed_ts = process_timestamp(value).timestamp()
+
+    @property
+    def last_updated(self) -> datetime:
+        """Last updated datetime."""
+        assert self._last_updated_ts is not None
+        return dt_util.utc_from_timestamp(self._last_updated_ts)
+
+    @last_updated.setter
+    def last_updated(self, value: datetime) -> None:
+        """Set last updated datetime."""
+        self._last_updated_ts = process_timestamp(value).timestamp()
+
+    def as_dict(self) -> dict[str, Any]:  # type: ignore[override]
+        """Return a dict representation of the LazyState.
+
+        Async friendly.
+        To be used for JSON serialization.
+        """
+        last_updated_isoformat = self.last_updated.isoformat()
+        if self._last_changed_ts == self._last_updated_ts:
+            last_changed_isoformat = last_updated_isoformat
+        else:
+            last_changed_isoformat = self.last_changed.isoformat()
+        return {
+            "entity_id": self.entity_id,
+            "state": self.state,
+            "attributes": self._attributes or self.attributes,
+            "last_changed": last_changed_isoformat,
+            "last_updated": last_updated_isoformat,
+        }
+
+
+def legacy_row_to_compressed_state(
+    row: Row,
+    attr_cache: dict[str, dict[str, Any]],
+    start_time: datetime | None,
+    entity_id: str | None = None,
+) -> dict[str, Any]:
+    """Convert a database row to a compressed state schema 31 and later."""
+    comp_state = {
+        COMPRESSED_STATE_STATE: row.state,
+        COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
+    }
+    if start_time:
+        comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
+    else:
+        row_last_updated_ts: float = row.last_updated_ts
+        comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
+        if (
+            row_last_changed_ts := row.last_changed_ts
+        ) and row_last_updated_ts != row_last_changed_ts:
+            comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts
+    return comp_state
diff --git a/homeassistant/components/recorder/models/state.py b/homeassistant/components/recorder/models/state.py
index 5594f5f6d43784f9520eafe0c2f431da6a8cd33e..9d0d24c43fecbc64563044f01cc93a9f95796a72 100644
--- a/homeassistant/components/recorder/models/state.py
+++ b/homeassistant/components/recorder/models/state.py
@@ -51,20 +51,18 @@ class LazyState(State):
         self,
         row: Row,
         attr_cache: dict[str, dict[str, Any]],
-        start_time: datetime | None,
-        entity_id: str | None = None,
+        start_time_ts: float | None,
+        entity_id: str,
+        state: str,
+        last_updated_ts: float | None,
     ) -> None:
         """Init the lazy state."""
         self._row = row
-        self.entity_id = entity_id or self._row.entity_id
-        self.state = self._row.state or ""
+        self.entity_id = entity_id
+        self.state = state or ""
         self._attributes: dict[str, Any] | None = None
-        self._last_updated_ts: float | None = self._row.last_updated_ts or (
-            dt_util.utc_to_timestamp(start_time) if start_time else None
-        )
-        self._last_changed_ts: float | None = (
-            self._row.last_changed_ts or self._last_updated_ts
-        )
+        self._last_updated_ts: float | None = last_updated_ts or start_time_ts
+        self._last_changed_ts: float | None = None
         self._context: Context | None = None
         self.attr_cache = attr_cache
 
@@ -95,7 +93,10 @@ class LazyState(State):
     @property
     def last_changed(self) -> datetime:
         """Last changed datetime."""
-        assert self._last_changed_ts is not None
+        if self._last_changed_ts is None:
+            self._last_changed_ts = (
+                getattr(self._row, "last_changed_ts", None) or self._last_updated_ts
+            )
         return dt_util.utc_from_timestamp(self._last_changed_ts)
 
     @last_changed.setter
@@ -138,21 +139,22 @@ class LazyState(State):
 def row_to_compressed_state(
     row: Row,
     attr_cache: dict[str, dict[str, Any]],
-    start_time: datetime | None,
-    entity_id: str | None = None,
+    start_time_ts: float | None,
+    entity_id: str,
+    state: str,
+    last_updated_ts: float | None,
 ) -> dict[str, Any]:
     """Convert a database row to a compressed state schema 31 and later."""
-    comp_state = {
-        COMPRESSED_STATE_STATE: row.state,
+    comp_state: dict[str, Any] = {
+        COMPRESSED_STATE_STATE: state,
         COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
     }
-    if start_time:
-        comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
-    else:
-        row_last_updated_ts: float = row.last_updated_ts
-        comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
-        if (
-            row_changed_changed_ts := row.last_changed_ts
-        ) and row_last_updated_ts != row_changed_changed_ts:
-            comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts
+    row_last_updated_ts: float = last_updated_ts or start_time_ts  # type: ignore[assignment]
+    comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
+    if (
+        (row_last_changed_ts := getattr(row, "last_changed_ts", None))
+        and row_last_changed_ts
+        and row_last_updated_ts != row_last_changed_ts
+    ):
+        comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts
     return comp_state
diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py
index a499c5bb0be06f114791c13466265a699eb798d3..fa2332e71d7779cdac96bd5025bb19d28ed06688 100644
--- a/tests/components/history/test_init.py
+++ b/tests/components/history/test_init.py
@@ -130,13 +130,16 @@ def test_get_significant_states_with_initial(hass_history) -> None:
     """
     hass = hass_history
     zero, four, states = record_states(hass)
-    one = zero + timedelta(seconds=1)
     one_and_half = zero + timedelta(seconds=1.5)
     for entity_id in states:
         if entity_id == "media_player.test":
             states[entity_id] = states[entity_id][1:]
         for state in states[entity_id]:
-            if state.last_changed == one:
+            # If the state is recorded before the start time
+            # start it will have its last_updated and last_changed
+            # set to the start time.
+            if state.last_updated < one_and_half:
+                state.last_updated = one_and_half
                 state.last_changed = one_and_half
 
     hist = get_significant_states(
diff --git a/tests/components/history/test_init_db_schema_30.py b/tests/components/history/test_init_db_schema_30.py
index 893bcf946209166b05f4aa6607920cc3941f6ef3..92f3d61392c5b7aad955e0f1aee00a6dab0c39c3 100644
--- a/tests/components/history/test_init_db_schema_30.py
+++ b/tests/components/history/test_init_db_schema_30.py
@@ -4,17 +4,13 @@ from __future__ import annotations
 # pylint: disable=invalid-name
 from datetime import timedelta
 from http import HTTPStatus
-import importlib
 import json
-import sys
 from unittest.mock import patch, sentinel
 
 import pytest
-from sqlalchemy import create_engine
-from sqlalchemy.orm import Session
 
 from homeassistant.components import recorder
-from homeassistant.components.recorder import Recorder, core, statistics
+from homeassistant.components.recorder import Recorder
 from homeassistant.components.recorder.history import get_significant_states
 from homeassistant.components.recorder.models import process_timestamp
 from homeassistant.core import HomeAssistant
@@ -29,59 +25,16 @@ from tests.components.recorder.common import (
     assert_states_equal_without_context,
     async_recorder_block_till_done,
     async_wait_recording_done,
+    old_db_schema,
     wait_recording_done,
 )
 from tests.typing import ClientSessionGenerator, WebSocketGenerator
 
-CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
-SCHEMA_MODULE = "tests.components.recorder.db_schema_30"
-
-
-def _create_engine_test(*args, **kwargs):
-    """Test version of create_engine that initializes with old schema.
-
-    This simulates an existing db with the old schema.
-    """
-    importlib.import_module(SCHEMA_MODULE)
-    old_db_schema = sys.modules[SCHEMA_MODULE]
-    engine = create_engine(*args, **kwargs)
-    old_db_schema.Base.metadata.create_all(engine)
-    with Session(engine) as session:
-        session.add(
-            recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
-        )
-        session.add(
-            recorder.db_schema.SchemaChanges(
-                schema_version=old_db_schema.SCHEMA_VERSION
-            )
-        )
-        session.commit()
-    return engine
-
 
 @pytest.fixture(autouse=True)
 def db_schema_30():
-    """Fixture to initialize the db with the old schema."""
-    importlib.import_module(SCHEMA_MODULE)
-    old_db_schema = sys.modules[SCHEMA_MODULE]
-
-    with patch.object(recorder, "db_schema", old_db_schema), patch.object(
-        recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
-    ), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
-        core, "EventTypes", old_db_schema.EventTypes
-    ), patch.object(
-        core, "EventData", old_db_schema.EventData
-    ), patch.object(
-        core, "States", old_db_schema.States
-    ), patch.object(
-        core, "Events", old_db_schema.Events
-    ), patch.object(
-        core, "StateAttributes", old_db_schema.StateAttributes
-    ), patch.object(
-        core, "EntityIDMigrationTask", core.RecorderTask
-    ), patch(
-        CREATE_ENGINE_TARGET, new=_create_engine_test
-    ):
+    """Fixture to initialize the db with the old schema 30."""
+    with old_db_schema("30"):
         yield
 
 
diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py
index 8e353def1d15bc391a2675ebf1bfab5d658e33fd..fe202ef46bb79e74ad0a4f2a6983e65b6c2453d8 100644
--- a/tests/components/history/test_websocket_api.py
+++ b/tests/components/history/test_websocket_api.py
@@ -10,12 +10,7 @@ import pytest
 from homeassistant.components import history
 from homeassistant.components.history import websocket_api
 from homeassistant.components.recorder import Recorder
-from homeassistant.const import (
-    CONF_DOMAINS,
-    CONF_ENTITIES,
-    CONF_INCLUDE,
-    EVENT_HOMEASSISTANT_FINAL_WRITE,
-)
+from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
 from homeassistant.core import HomeAssistant
 from homeassistant.setup import async_setup_component
 import homeassistant.util.dt as dt_util
@@ -1359,19 +1354,10 @@ async def test_history_stream_before_history_starts(
     include_start_time_state,
 ) -> None:
     """Test history stream before we have history."""
-    sort_order = ["sensor.two", "sensor.four", "sensor.one"]
     await async_setup_component(
         hass,
         "history",
-        {
-            history.DOMAIN: {
-                history.CONF_ORDER: True,
-                CONF_INCLUDE: {
-                    CONF_ENTITIES: sort_order,
-                    CONF_DOMAINS: ["sensor"],
-                },
-            }
-        },
+        {},
     )
     await async_setup_component(hass, "sensor", {})
     await async_recorder_block_till_done(hass)
@@ -1416,19 +1402,10 @@ async def test_history_stream_for_entity_with_no_possible_changes(
     recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
 ) -> None:
     """Test history stream for future with no possible changes where end time is less than or equal to now."""
-    sort_order = ["sensor.two", "sensor.four", "sensor.one"]
     await async_setup_component(
         hass,
         "history",
-        {
-            history.DOMAIN: {
-                history.CONF_ORDER: True,
-                CONF_INCLUDE: {
-                    CONF_ENTITIES: sort_order,
-                    CONF_DOMAINS: ["sensor"],
-                },
-            }
-        },
+        {},
     )
     await async_setup_component(hass, "sensor", {})
     await async_recorder_block_till_done(hass)
diff --git a/tests/components/history/test_websocket_api_schema_32.py b/tests/components/history/test_websocket_api_schema_32.py
new file mode 100644
index 0000000000000000000000000000000000000000..aebf5aa7ac233268d8183ed095c7011399f4217b
--- /dev/null
+++ b/tests/components/history/test_websocket_api_schema_32.py
@@ -0,0 +1,161 @@
+"""The tests the History component websocket_api."""
+# pylint: disable=protected-access,invalid-name
+
+import pytest
+
+from homeassistant.components import recorder
+from homeassistant.components.recorder import Recorder
+from homeassistant.core import HomeAssistant
+from homeassistant.setup import async_setup_component
+import homeassistant.util.dt as dt_util
+
+from tests.components.recorder.common import (
+    async_recorder_block_till_done,
+    async_wait_recording_done,
+    old_db_schema,
+)
+from tests.typing import WebSocketGenerator
+
+
+@pytest.fixture(autouse=True)
+def db_schema_32():
+    """Fixture to initialize the db with the old schema 32."""
+    with old_db_schema("32"):
+        yield
+
+
+async def test_history_during_period(
+    recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
+) -> None:
+    """Test history_during_period."""
+    now = dt_util.utcnow()
+
+    await async_setup_component(hass, "history", {})
+    await async_setup_component(hass, "sensor", {})
+    await async_recorder_block_till_done(hass)
+    recorder.get_instance(hass).states_meta_manager.active = False
+    assert recorder.get_instance(hass).schema_version == 32
+
+    hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
+    await async_recorder_block_till_done(hass)
+    hass.states.async_set("sensor.test", "off", attributes={"any": "attr"})
+    await async_recorder_block_till_done(hass)
+    hass.states.async_set("sensor.test", "off", attributes={"any": "changed"})
+    await async_recorder_block_till_done(hass)
+    hass.states.async_set("sensor.test", "off", attributes={"any": "again"})
+    await async_recorder_block_till_done(hass)
+    hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
+    await async_wait_recording_done(hass)
+
+    await async_wait_recording_done(hass)
+
+    client = await hass_ws_client()
+    await client.send_json(
+        {
+            "id": 1,
+            "type": "history/history_during_period",
+            "start_time": now.isoformat(),
+            "end_time": now.isoformat(),
+            "entity_ids": ["sensor.test"],
+            "include_start_time_state": True,
+            "significant_changes_only": False,
+            "no_attributes": True,
+        }
+    )
+    response = await client.receive_json()
+    assert response["success"]
+    assert response["result"] == {}
+
+    await client.send_json(
+        {
+            "id": 2,
+            "type": "history/history_during_period",
+            "start_time": now.isoformat(),
+            "entity_ids": ["sensor.test"],
+            "include_start_time_state": True,
+            "significant_changes_only": False,
+            "no_attributes": True,
+            "minimal_response": True,
+        }
+    )
+    response = await client.receive_json()
+    assert response["success"]
+    assert response["id"] == 2
+
+    sensor_test_history = response["result"]["sensor.test"]
+    assert len(sensor_test_history) == 3
+
+    assert sensor_test_history[0]["s"] == "on"
+    assert sensor_test_history[0]["a"] == {}
+    assert isinstance(sensor_test_history[0]["lu"], float)
+    assert "lc" not in sensor_test_history[0]  # skipped if the same a last_updated (lu)
+
+    assert "a" not in sensor_test_history[1]
+    assert sensor_test_history[1]["s"] == "off"
+    assert isinstance(sensor_test_history[1]["lu"], float)
+    assert "lc" not in sensor_test_history[1]  # skipped if the same a last_updated (lu)
+
+    assert sensor_test_history[2]["s"] == "on"
+    assert "a" not in sensor_test_history[2]
+
+    await client.send_json(
+        {
+            "id": 3,
+            "type": "history/history_during_period",
+            "start_time": now.isoformat(),
+            "entity_ids": ["sensor.test"],
+            "include_start_time_state": True,
+            "significant_changes_only": False,
+            "no_attributes": False,
+        }
+    )
+    response = await client.receive_json()
+    assert response["success"]
+    assert response["id"] == 3
+    sensor_test_history = response["result"]["sensor.test"]
+
+    assert len(sensor_test_history) == 5
+
+    assert sensor_test_history[0]["s"] == "on"
+    assert sensor_test_history[0]["a"] == {"any": "attr"}
+    assert isinstance(sensor_test_history[0]["lu"], float)
+    assert "lc" not in sensor_test_history[0]  # skipped if the same a last_updated (lu)
+
+    assert sensor_test_history[1]["s"] == "off"
+    assert isinstance(sensor_test_history[1]["lu"], float)
+    assert "lc" not in sensor_test_history[1]  # skipped if the same a last_updated (lu)
+    assert sensor_test_history[1]["a"] == {"any": "attr"}
+
+    assert sensor_test_history[4]["s"] == "on"
+    assert sensor_test_history[4]["a"] == {"any": "attr"}
+
+    await client.send_json(
+        {
+            "id": 4,
+            "type": "history/history_during_period",
+            "start_time": now.isoformat(),
+            "entity_ids": ["sensor.test"],
+            "include_start_time_state": True,
+            "significant_changes_only": True,
+            "no_attributes": False,
+        }
+    )
+    response = await client.receive_json()
+    assert response["success"]
+    assert response["id"] == 4
+    sensor_test_history = response["result"]["sensor.test"]
+
+    assert len(sensor_test_history) == 3
+
+    assert sensor_test_history[0]["s"] == "on"
+    assert sensor_test_history[0]["a"] == {"any": "attr"}
+    assert isinstance(sensor_test_history[0]["lu"], float)
+    assert "lc" not in sensor_test_history[0]  # skipped if the same a last_updated (lu)
+
+    assert sensor_test_history[1]["s"] == "off"
+    assert isinstance(sensor_test_history[1]["lu"], float)
+    assert "lc" not in sensor_test_history[1]  # skipped if the same a last_updated (lu)
+    assert sensor_test_history[1]["a"] == {"any": "attr"}
+
+    assert sensor_test_history[2]["s"] == "on"
+    assert sensor_test_history[2]["a"] == {"any": "attr"}
diff --git a/tests/components/history_stats/test_sensor.py b/tests/components/history_stats/test_sensor.py
index 4d705fefcc4a40a93bdc4dbb7840a88a0fcc5372..f98cf08b2c481d3127625a455324d4f293975754 100644
--- a/tests/components/history_stats/test_sensor.py
+++ b/tests/components/history_stats/test_sensor.py
@@ -20,6 +20,8 @@ from homeassistant.setup import async_setup_component
 import homeassistant.util.dt as dt_util
 
 from tests.common import async_fire_time_changed, get_fixture_path
+from tests.components.recorder.common import async_wait_recording_done
+from tests.typing import RecorderInstanceGenerator
 
 
 async def test_setup(recorder_mock: Recorder, hass: HomeAssistant) -> None:
@@ -1367,13 +1369,20 @@ async def test_measure_cet(recorder_mock: Recorder, hass: HomeAssistant) -> None
 
 @pytest.mark.parametrize("time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii"])
 async def test_end_time_with_microseconds_zeroed(
-    time_zone, recorder_mock: Recorder, hass: HomeAssistant
+    time_zone: str,
+    async_setup_recorder_instance: RecorderInstanceGenerator,
+    hass: HomeAssistant,
 ) -> None:
     """Test the history statistics sensor that has the end time microseconds zeroed out."""
     hass.config.set_time_zone(time_zone)
     start_of_today = dt_util.now().replace(
         day=9, month=7, year=1986, hour=0, minute=0, second=0, microsecond=0
     )
+    with freeze_time(start_of_today):
+        await async_setup_recorder_instance(hass)
+        await hass.async_block_till_done()
+        await async_wait_recording_done(hass)
+
     start_time = start_of_today + timedelta(minutes=60)
     t0 = start_time + timedelta(minutes=20)
     t1 = t0 + timedelta(minutes=10)
@@ -1434,7 +1443,7 @@ async def test_end_time_with_microseconds_zeroed(
         await hass.async_block_till_done()
         assert hass.states.get("sensor.heatpump_compressor_today").state == "1.83"
         hass.states.async_set("binary_sensor.heatpump_compressor_state", "on")
-        await hass.async_block_till_done()
+        await async_wait_recording_done(hass)
     time_600 = start_of_today + timedelta(hours=6)
     with freeze_time(time_600):
         async_fire_time_changed(hass, time_600)
@@ -1473,6 +1482,7 @@ async def test_end_time_with_microseconds_zeroed(
     )
     with freeze_time(rolled_to_next_day_plus_16_860000):
         hass.states.async_set("binary_sensor.heatpump_compressor_state", "off")
+        await async_wait_recording_done(hass)
         async_fire_time_changed(hass, rolled_to_next_day_plus_16_860000)
         await hass.async_block_till_done()
 
diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py
index ab4de11189ae60b59d467f11ffd38deebaf8c3c3..ec781dbfc9c7623ba87bc0180807a7b779738f36 100644
--- a/tests/components/recorder/test_history.py
+++ b/tests/components/recorder/test_history.py
@@ -24,7 +24,7 @@ from homeassistant.components.recorder.db_schema import (
 from homeassistant.components.recorder.filters import Filters
 from homeassistant.components.recorder.history import legacy
 from homeassistant.components.recorder.models import LazyState, process_timestamp
-from homeassistant.components.recorder.models.legacy import LazyStatePreSchema31
+from homeassistant.components.recorder.models.legacy import LegacyLazyStatePreSchema31
 from homeassistant.components.recorder.util import session_scope
 import homeassistant.core as ha
 from homeassistant.core import HomeAssistant, State
@@ -41,7 +41,6 @@ from .common import (
     wait_recording_done,
 )
 
-from tests.common import mock_state_change_event
 from tests.typing import RecorderInstanceGenerator
 
 
@@ -55,14 +54,20 @@ async def _async_get_states(
     """Get states from the database."""
 
     def _get_states_with_session():
-        if get_instance(hass).schema_version < 31:
-            klass = LazyStatePreSchema31
-        else:
-            klass = LazyState
         with session_scope(hass=hass, read_only=True) as session:
             attr_cache = {}
+            pre_31_schema = get_instance(hass).schema_version < 31
             return [
-                klass(row, attr_cache, None)
+                LegacyLazyStatePreSchema31(row, attr_cache, None)
+                if pre_31_schema
+                else LazyState(
+                    row,
+                    attr_cache,
+                    None,
+                    row.entity_id,
+                    row.state,
+                    getattr(row, "last_updated_ts", None),
+                )
                 for row in legacy._get_rows_with_session(
                     hass,
                     session,
@@ -112,44 +117,6 @@ def _add_db_entries(
             )
 
 
-def _setup_get_states(hass):
-    """Set up for testing get_states."""
-    states = []
-    now = dt_util.utcnow()
-    with patch(
-        "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
-    ):
-        for i in range(5):
-            state = ha.State(
-                f"test.point_in_time_{i % 5}",
-                f"State {i}",
-                {"attribute_test": i},
-            )
-
-            mock_state_change_event(hass, state)
-
-            states.append(state)
-
-        wait_recording_done(hass)
-
-    future = now + timedelta(seconds=1)
-    with patch(
-        "homeassistant.components.recorder.core.dt_util.utcnow", return_value=future
-    ):
-        for i in range(5):
-            state = ha.State(
-                f"test.point_in_time_{i % 5}",
-                f"State {i}",
-                {"attribute_test": i},
-            )
-
-            mock_state_change_event(hass, state)
-
-        wait_recording_done(hass)
-
-    return now, future, states
-
-
 def test_get_full_significant_states_with_session_entity_no_matches(
     hass_recorder: Callable[..., HomeAssistant]
 ) -> None:
@@ -297,12 +264,12 @@ def test_state_changes_during_period_descending(
         wait_recording_done(hass)
         return hass.states.get(entity_id)
 
-    start = dt_util.utcnow()
+    start = dt_util.utcnow().replace(microsecond=0)
     point = start + timedelta(seconds=1)
-    point2 = start + timedelta(seconds=1, microseconds=2)
-    point3 = start + timedelta(seconds=1, microseconds=3)
-    point4 = start + timedelta(seconds=1, microseconds=4)
-    end = point + timedelta(seconds=1)
+    point2 = start + timedelta(seconds=1, microseconds=100)
+    point3 = start + timedelta(seconds=1, microseconds=200)
+    point4 = start + timedelta(seconds=1, microseconds=300)
+    end = point + timedelta(seconds=1, microseconds=400)
 
     with patch(
         "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
@@ -336,6 +303,7 @@ def test_state_changes_during_period_descending(
     hist = history.state_changes_during_period(
         hass, start, end, entity_id, no_attributes=False, descending=False
     )
+
     assert_multiple_states_equal_without_context(states, hist[entity_id])
 
     hist = history.state_changes_during_period(
@@ -345,6 +313,57 @@ def test_state_changes_during_period_descending(
         states, list(reversed(list(hist[entity_id])))
     )
 
+    start_time = point2 + timedelta(microseconds=10)
+    hist = history.state_changes_during_period(
+        hass,
+        start_time,  # Pick a point where we will generate a start time state
+        end,
+        entity_id,
+        no_attributes=False,
+        descending=True,
+        include_start_time_state=True,
+    )
+    hist_states = list(hist[entity_id])
+    assert hist_states[-1].last_updated == start_time
+    assert hist_states[-1].last_changed == start_time
+    assert len(hist_states) == 3
+    # Make sure they are in descending order
+    assert (
+        hist_states[0].last_updated
+        > hist_states[1].last_updated
+        > hist_states[2].last_updated
+    )
+    assert (
+        hist_states[0].last_changed
+        > hist_states[1].last_changed
+        > hist_states[2].last_changed
+    )
+
+    hist = history.state_changes_during_period(
+        hass,
+        start_time,  # Pick a point where we will generate a start time state
+        end,
+        entity_id,
+        no_attributes=False,
+        descending=False,
+        include_start_time_state=True,
+    )
+    hist_states = list(hist[entity_id])
+    assert hist_states[0].last_updated == start_time
+    assert hist_states[0].last_changed == start_time
+    assert len(hist_states) == 3
+    # Make sure they are in ascending order
+    assert (
+        hist_states[0].last_updated
+        < hist_states[1].last_updated
+        < hist_states[2].last_updated
+    )
+    assert (
+        hist_states[0].last_changed
+        < hist_states[1].last_changed
+        < hist_states[2].last_changed
+    )
+
 
 def test_get_last_state_changes(hass_recorder: Callable[..., HomeAssistant]) -> None:
     """Test number of state changes."""
@@ -548,13 +567,16 @@ def test_get_significant_states_with_initial(
     hass = hass_recorder()
     hass.config.set_time_zone(time_zone)
     zero, four, states = record_states(hass)
-    one = zero + timedelta(seconds=1)
     one_and_half = zero + timedelta(seconds=1.5)
     for entity_id in states:
         if entity_id == "media_player.test":
             states[entity_id] = states[entity_id][1:]
         for state in states[entity_id]:
-            if state.last_changed == one:
+            # If the state is recorded before the start time
+            # start it will have its last_updated and last_changed
+            # set to the start time.
+            if state.last_updated < one_and_half:
+                state.last_updated = one_and_half
                 state.last_changed = one_and_half
 
     hist = history.get_significant_states(
diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py
index c5033481f23f258a19a96173f7d16390b5b8befe..9dc2d53125f7fbb134e042515c32fe442174ddfa 100644
--- a/tests/components/recorder/test_models.py
+++ b/tests/components/recorder/test_models.py
@@ -269,7 +269,7 @@ async def test_lazy_state_handles_include_json(
         entity_id="sensor.invalid",
         shared_attrs="{INVALID_JSON}",
     )
-    assert LazyState(row, {}, None).attributes == {}
+    assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {}
     assert "Error converting row to state attributes" in caplog.text
 
 
@@ -282,7 +282,7 @@ async def test_lazy_state_prefers_shared_attrs_over_attrs(
         shared_attrs='{"shared":true}',
         attributes='{"shared":false}',
     )
-    assert LazyState(row, {}, None).attributes == {"shared": True}
+    assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {"shared": True}
 
 
 async def test_lazy_state_handles_different_last_updated_and_last_changed(
@@ -297,7 +297,7 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(
         last_updated_ts=now.timestamp(),
         last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
     )
-    lstate = LazyState(row, {}, None)
+    lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
     assert lstate.as_dict() == {
         "attributes": {"shared": True},
         "entity_id": "sensor.valid",
@@ -328,7 +328,7 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(
         last_updated_ts=now.timestamp(),
         last_changed_ts=now.timestamp(),
     )
-    lstate = LazyState(row, {}, None)
+    lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
     assert lstate.as_dict() == {
         "attributes": {"shared": True},
         "entity_id": "sensor.valid",
diff --git a/tests/components/recorder/test_models_legacy.py b/tests/components/recorder/test_models_legacy.py
new file mode 100644
index 0000000000000000000000000000000000000000..f830ac535443cb4c7c9cbb61cce41c903e91b0ee
--- /dev/null
+++ b/tests/components/recorder/test_models_legacy.py
@@ -0,0 +1,98 @@
+"""The tests for the Recorder component legacy models."""
+from datetime import datetime, timedelta
+from unittest.mock import PropertyMock
+
+import pytest
+
+from homeassistant.components.recorder.models.legacy import LegacyLazyState
+from homeassistant.util import dt as dt_util
+
+
+async def test_legacy_lazy_state_prefers_shared_attrs_over_attrs(
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test that the LazyState prefers shared_attrs over attributes."""
+    row = PropertyMock(
+        entity_id="sensor.invalid",
+        shared_attrs='{"shared":true}',
+        attributes='{"shared":false}',
+    )
+    assert LegacyLazyState(row, {}, None).attributes == {"shared": True}
+
+
+async def test_legacy_lazy_state_handles_different_last_updated_and_last_changed(
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test that the LazyState handles different last_updated and last_changed."""
+    now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
+    row = PropertyMock(
+        entity_id="sensor.valid",
+        state="off",
+        shared_attrs='{"shared":true}',
+        last_updated_ts=now.timestamp(),
+        last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
+    )
+    lstate = LegacyLazyState(row, {}, None)
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2021-06-12T03:03:01.000323+00:00",
+        "last_updated": "2021-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
+    assert lstate.last_updated.timestamp() == row.last_updated_ts
+    assert lstate.last_changed.timestamp() == row.last_changed_ts
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2021-06-12T03:03:01.000323+00:00",
+        "last_updated": "2021-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
+
+
+async def test_legacy_lazy_state_handles_same_last_updated_and_last_changed(
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test that the LazyState handles same last_updated and last_changed."""
+    now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
+    row = PropertyMock(
+        entity_id="sensor.valid",
+        state="off",
+        shared_attrs='{"shared":true}',
+        last_updated_ts=now.timestamp(),
+        last_changed_ts=now.timestamp(),
+    )
+    lstate = LegacyLazyState(row, {}, None)
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2021-06-12T03:04:01.000323+00:00",
+        "last_updated": "2021-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
+    assert lstate.last_updated.timestamp() == row.last_updated_ts
+    assert lstate.last_changed.timestamp() == row.last_changed_ts
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2021-06-12T03:04:01.000323+00:00",
+        "last_updated": "2021-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
+    lstate.last_updated = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2021-06-12T03:04:01.000323+00:00",
+        "last_updated": "2020-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
+    lstate.last_changed = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
+    assert lstate.as_dict() == {
+        "attributes": {"shared": True},
+        "entity_id": "sensor.valid",
+        "last_changed": "2020-06-12T03:04:01.000323+00:00",
+        "last_updated": "2020-06-12T03:04:01.000323+00:00",
+        "state": "off",
+    }
diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py
index b78b0a0aaee2030a3202ed4aee68a199275e0b61..f71ca773daadbdf7b00e498e0c29481e9b638ff3 100644
--- a/tests/components/recorder/test_util.py
+++ b/tests/components/recorder/test_util.py
@@ -7,7 +7,7 @@ import sqlite3
 from unittest.mock import MagicMock, Mock, patch
 
 import pytest
-from sqlalchemy import text
+from sqlalchemy import lambda_stmt, text
 from sqlalchemy.engine.result import ChunkedIteratorResult
 from sqlalchemy.exc import SQLAlchemyError
 from sqlalchemy.sql.elements import TextClause
@@ -18,7 +18,7 @@ from homeassistant.components.recorder import util
 from homeassistant.components.recorder.const import DOMAIN, SQLITE_URL_PREFIX
 from homeassistant.components.recorder.db_schema import RecorderRuns
 from homeassistant.components.recorder.history.modern import (
-    _get_single_entity_states_stmt,
+    _get_single_entity_start_time_stmt,
 )
 from homeassistant.components.recorder.models import (
     UnsupportedDialect,
@@ -908,7 +908,12 @@ def test_execute_stmt_lambda_element(
     with session_scope(hass=hass) as session:
         # No time window, we always get a list
         metadata_id = instance.states_meta_manager.get("sensor.on", session, True)
-        stmt = _get_single_entity_states_stmt(dt_util.utcnow(), metadata_id, False)
+        start_time_ts = dt_util.utcnow().timestamp()
+        stmt = lambda_stmt(
+            lambda: _get_single_entity_start_time_stmt(
+                start_time_ts, metadata_id, False, False
+            )
+        )
         rows = util.execute_stmt_lambda_element(session, stmt)
         assert isinstance(rows, list)
         assert rows[0].state == new_state.state