From c7f6ab2457a9e6f794ae89951c7d787b048bcb8e Mon Sep 17 00:00:00 2001
From: Jan Bouwhuis <jbouwh@users.noreply.github.com>
Date: Wed, 20 Dec 2023 07:49:49 +0100
Subject: [PATCH] Add MQTT valve platform (#105766)

* Add mqtt valve platform

* No stop topic-reports_position and validation

* Do not allow state_open, state_closed with position reporing valve

* Allow open/close feature to be disabled

* Follow up comments

* Rename

* Apply defaults in validator

* Update docstr
---
 .../components/mqtt/abbreviations.py          |    1 +
 .../components/mqtt/config_integration.py     |    1 +
 homeassistant/components/mqtt/const.py        |   16 +
 homeassistant/components/mqtt/cover.py        |   32 +-
 homeassistant/components/mqtt/discovery.py    |    1 +
 homeassistant/components/mqtt/valve.py        |  420 +++++
 tests/components/mqtt/test_valve.py           | 1399 +++++++++++++++++
 7 files changed, 1855 insertions(+), 15 deletions(-)
 create mode 100644 homeassistant/components/mqtt/valve.py
 create mode 100644 tests/components/mqtt/test_valve.py

diff --git a/homeassistant/components/mqtt/abbreviations.py b/homeassistant/components/mqtt/abbreviations.py
index eb9ab56208e..64d8c27f1de 100644
--- a/homeassistant/components/mqtt/abbreviations.py
+++ b/homeassistant/components/mqtt/abbreviations.py
@@ -166,6 +166,7 @@ ABBREVIATIONS = {
     "pl_ton": "payload_turn_on",
     "pl_trig": "payload_trigger",
     "pl_unlk": "payload_unlock",
+    "pos": "reports_position",
     "pos_clsd": "position_closed",
     "pos_open": "position_open",
     "pow_cmd_t": "power_command_topic",
diff --git a/homeassistant/components/mqtt/config_integration.py b/homeassistant/components/mqtt/config_integration.py
index 71260dc0239..0f2d617930d 100644
--- a/homeassistant/components/mqtt/config_integration.py
+++ b/homeassistant/components/mqtt/config_integration.py
@@ -53,6 +53,7 @@ CONFIG_SCHEMA_BASE = vol.Schema(
         Platform.TEXT.value: vol.All(cv.ensure_list, [dict]),
         Platform.UPDATE.value: vol.All(cv.ensure_list, [dict]),
         Platform.VACUUM.value: vol.All(cv.ensure_list, [dict]),
+        Platform.VALVE.value: vol.All(cv.ensure_list, [dict]),
         Platform.WATER_HEATER.value: vol.All(cv.ensure_list, [dict]),
     }
 )
diff --git a/homeassistant/components/mqtt/const.py b/homeassistant/components/mqtt/const.py
index 685e45700b5..50ea3860d9e 100644
--- a/homeassistant/components/mqtt/const.py
+++ b/homeassistant/components/mqtt/const.py
@@ -42,9 +42,18 @@ CONF_MODE_COMMAND_TOPIC = "mode_command_topic"
 CONF_MODE_LIST = "modes"
 CONF_MODE_STATE_TEMPLATE = "mode_state_template"
 CONF_MODE_STATE_TOPIC = "mode_state_topic"
+CONF_PAYLOAD_CLOSE = "payload_close"
+CONF_PAYLOAD_OPEN = "payload_open"
+CONF_PAYLOAD_STOP = "payload_stop"
+CONF_POSITION_CLOSED = "position_closed"
+CONF_POSITION_OPEN = "position_open"
 CONF_POWER_COMMAND_TOPIC = "power_command_topic"
 CONF_POWER_COMMAND_TEMPLATE = "power_command_template"
 CONF_PRECISION = "precision"
+CONF_STATE_CLOSED = "state_closed"
+CONF_STATE_CLOSING = "state_closing"
+CONF_STATE_OPEN = "state_open"
+CONF_STATE_OPENING = "state_opening"
 CONF_TEMP_COMMAND_TEMPLATE = "temperature_command_template"
 CONF_TEMP_COMMAND_TOPIC = "temperature_command_topic"
 CONF_TEMP_STATE_TEMPLATE = "temperature_state_template"
@@ -81,11 +90,16 @@ DEFAULT_ENCODING = "utf-8"
 DEFAULT_OPTIMISTIC = False
 DEFAULT_QOS = 0
 DEFAULT_PAYLOAD_AVAILABLE = "online"
+DEFAULT_PAYLOAD_CLOSE = "CLOSE"
 DEFAULT_PAYLOAD_NOT_AVAILABLE = "offline"
+DEFAULT_PAYLOAD_OPEN = "OPEN"
 DEFAULT_PORT = 1883
 DEFAULT_RETAIN = False
 DEFAULT_WS_HEADERS: dict[str, str] = {}
 DEFAULT_WS_PATH = "/"
+DEFAULT_POSITION_CLOSED = 0
+DEFAULT_POSITION_OPEN = 100
+DEFAULT_RETAIN = False
 
 PROTOCOL_31 = "3.1"
 PROTOCOL_311 = "3.1.1"
@@ -146,6 +160,7 @@ PLATFORMS = [
     Platform.TEXT,
     Platform.UPDATE,
     Platform.VACUUM,
+    Platform.VALVE,
     Platform.WATER_HEATER,
 ]
 
@@ -173,5 +188,6 @@ RELOADABLE_PLATFORMS = [
     Platform.TEXT,
     Platform.UPDATE,
     Platform.VACUUM,
+    Platform.VALVE,
     Platform.WATER_HEATER,
 ]
diff --git a/homeassistant/components/mqtt/cover.py b/homeassistant/components/mqtt/cover.py
index 4e8cf0f4129..912de7e367b 100644
--- a/homeassistant/components/mqtt/cover.py
+++ b/homeassistant/components/mqtt/cover.py
@@ -38,10 +38,24 @@ from .config import MQTT_BASE_SCHEMA
 from .const import (
     CONF_COMMAND_TOPIC,
     CONF_ENCODING,
+    CONF_PAYLOAD_CLOSE,
+    CONF_PAYLOAD_OPEN,
+    CONF_PAYLOAD_STOP,
+    CONF_POSITION_CLOSED,
+    CONF_POSITION_OPEN,
     CONF_QOS,
     CONF_RETAIN,
+    CONF_STATE_CLOSED,
+    CONF_STATE_CLOSING,
+    CONF_STATE_OPEN,
+    CONF_STATE_OPENING,
     CONF_STATE_TOPIC,
     DEFAULT_OPTIMISTIC,
+    DEFAULT_PAYLOAD_CLOSE,
+    DEFAULT_PAYLOAD_OPEN,
+    DEFAULT_POSITION_CLOSED,
+    DEFAULT_POSITION_OPEN,
+    DEFAULT_RETAIN,
 )
 from .debug_info import log_messages
 from .mixins import (
@@ -64,15 +78,6 @@ CONF_TILT_COMMAND_TEMPLATE = "tilt_command_template"
 CONF_TILT_STATUS_TOPIC = "tilt_status_topic"
 CONF_TILT_STATUS_TEMPLATE = "tilt_status_template"
 
-CONF_PAYLOAD_CLOSE = "payload_close"
-CONF_PAYLOAD_OPEN = "payload_open"
-CONF_PAYLOAD_STOP = "payload_stop"
-CONF_POSITION_CLOSED = "position_closed"
-CONF_POSITION_OPEN = "position_open"
-CONF_STATE_CLOSED = "state_closed"
-CONF_STATE_CLOSING = "state_closing"
-CONF_STATE_OPEN = "state_open"
-CONF_STATE_OPENING = "state_opening"
 CONF_STATE_STOPPED = "state_stopped"
 CONF_TILT_CLOSED_POSITION = "tilt_closed_value"
 CONF_TILT_MAX = "tilt_max"
@@ -84,13 +89,10 @@ TILT_PAYLOAD = "tilt"
 COVER_PAYLOAD = "cover"
 
 DEFAULT_NAME = "MQTT Cover"
-DEFAULT_PAYLOAD_CLOSE = "CLOSE"
-DEFAULT_PAYLOAD_OPEN = "OPEN"
-DEFAULT_PAYLOAD_STOP = "STOP"
-DEFAULT_POSITION_CLOSED = 0
-DEFAULT_POSITION_OPEN = 100
-DEFAULT_RETAIN = False
+
 DEFAULT_STATE_STOPPED = "stopped"
+DEFAULT_PAYLOAD_STOP = "STOP"
+
 DEFAULT_TILT_CLOSED_POSITION = 0
 DEFAULT_TILT_MAX = 100
 DEFAULT_TILT_MIN = 0
diff --git a/homeassistant/components/mqtt/discovery.py b/homeassistant/components/mqtt/discovery.py
index c78319bb46a..84163e217df 100644
--- a/homeassistant/components/mqtt/discovery.py
+++ b/homeassistant/components/mqtt/discovery.py
@@ -74,6 +74,7 @@ SUPPORTED_COMPONENTS = {
     "text",
     "update",
     "vacuum",
+    "valve",
     "water_heater",
 }
 
diff --git a/homeassistant/components/mqtt/valve.py b/homeassistant/components/mqtt/valve.py
new file mode 100644
index 00000000000..2c1618c60ba
--- /dev/null
+++ b/homeassistant/components/mqtt/valve.py
@@ -0,0 +1,420 @@
+"""Support for MQTT valve devices."""
+from __future__ import annotations
+
+from contextlib import suppress
+import logging
+from typing import Any
+
+import voluptuous as vol
+
+from homeassistant.components import valve
+from homeassistant.components.valve import (
+    DEVICE_CLASSES_SCHEMA,
+    ValveEntity,
+    ValveEntityFeature,
+)
+from homeassistant.config_entries import ConfigEntry
+from homeassistant.const import (
+    CONF_DEVICE_CLASS,
+    CONF_NAME,
+    CONF_OPTIMISTIC,
+    CONF_VALUE_TEMPLATE,
+    STATE_CLOSED,
+    STATE_CLOSING,
+    STATE_OPEN,
+    STATE_OPENING,
+)
+from homeassistant.core import HomeAssistant, callback
+import homeassistant.helpers.config_validation as cv
+from homeassistant.helpers.entity_platform import AddEntitiesCallback
+from homeassistant.helpers.typing import ConfigType
+from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
+from homeassistant.util.percentage import (
+    percentage_to_ranged_value,
+    ranged_value_to_percentage,
+)
+
+from . import subscription
+from .config import MQTT_BASE_SCHEMA
+from .const import (
+    CONF_COMMAND_TEMPLATE,
+    CONF_COMMAND_TOPIC,
+    CONF_ENCODING,
+    CONF_PAYLOAD_CLOSE,
+    CONF_PAYLOAD_OPEN,
+    CONF_PAYLOAD_STOP,
+    CONF_POSITION_CLOSED,
+    CONF_POSITION_OPEN,
+    CONF_QOS,
+    CONF_RETAIN,
+    CONF_STATE_CLOSED,
+    CONF_STATE_CLOSING,
+    CONF_STATE_OPEN,
+    CONF_STATE_OPENING,
+    CONF_STATE_TOPIC,
+    DEFAULT_OPTIMISTIC,
+    DEFAULT_PAYLOAD_CLOSE,
+    DEFAULT_PAYLOAD_OPEN,
+    DEFAULT_POSITION_CLOSED,
+    DEFAULT_POSITION_OPEN,
+    DEFAULT_RETAIN,
+)
+from .debug_info import log_messages
+from .mixins import (
+    MQTT_ENTITY_COMMON_SCHEMA,
+    MqttEntity,
+    async_setup_entity_entry_helper,
+    write_state_on_attr_change,
+)
+from .models import (
+    MqttCommandTemplate,
+    MqttValueTemplate,
+    ReceiveMessage,
+    ReceivePayloadType,
+)
+from .util import valid_publish_topic, valid_subscribe_topic
+
+_LOGGER = logging.getLogger(__name__)
+
+CONF_REPORTS_POSITION = "reports_position"
+
+DEFAULT_NAME = "MQTT Valve"
+
+MQTT_VALVE_ATTRIBUTES_BLOCKED = frozenset(
+    {
+        valve.ATTR_CURRENT_POSITION,
+    }
+)
+
+NO_POSITION_KEYS = (
+    CONF_PAYLOAD_CLOSE,
+    CONF_PAYLOAD_OPEN,
+    CONF_STATE_CLOSED,
+    CONF_STATE_OPEN,
+)
+
+DEFAULTS = {
+    CONF_PAYLOAD_CLOSE: DEFAULT_PAYLOAD_CLOSE,
+    CONF_PAYLOAD_OPEN: DEFAULT_PAYLOAD_OPEN,
+    CONF_STATE_OPEN: STATE_OPEN,
+    CONF_STATE_CLOSED: STATE_CLOSED,
+}
+
+
+def _validate_and_add_defaults(config: ConfigType) -> ConfigType:
+    """Validate config options and set defaults."""
+    if config[CONF_REPORTS_POSITION] and any(key in config for key in NO_POSITION_KEYS):
+        raise vol.Invalid(
+            "Options `payload_open`, `payload_close`, `state_open` and "
+            "`state_closed` are not allowed if the valve reports a position."
+        )
+    return {**DEFAULTS, **config}
+
+
+_PLATFORM_SCHEMA_BASE = MQTT_BASE_SCHEMA.extend(
+    {
+        vol.Optional(CONF_COMMAND_TOPIC): valid_publish_topic,
+        vol.Optional(CONF_COMMAND_TEMPLATE): cv.template,
+        vol.Optional(CONF_DEVICE_CLASS): vol.Any(DEVICE_CLASSES_SCHEMA, None),
+        vol.Optional(CONF_NAME): vol.Any(cv.string, None),
+        vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
+        vol.Optional(CONF_PAYLOAD_CLOSE): vol.Any(cv.string, None),
+        vol.Optional(CONF_PAYLOAD_OPEN): vol.Any(cv.string, None),
+        vol.Optional(CONF_PAYLOAD_STOP): vol.Any(cv.string, None),
+        vol.Optional(CONF_POSITION_CLOSED, default=DEFAULT_POSITION_CLOSED): int,
+        vol.Optional(CONF_POSITION_OPEN, default=DEFAULT_POSITION_OPEN): int,
+        vol.Optional(CONF_REPORTS_POSITION, default=False): cv.boolean,
+        vol.Optional(CONF_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
+        vol.Optional(CONF_STATE_CLOSED): cv.string,
+        vol.Optional(CONF_STATE_CLOSING, default=STATE_CLOSING): cv.string,
+        vol.Optional(CONF_STATE_OPEN): cv.string,
+        vol.Optional(CONF_STATE_OPENING, default=STATE_OPENING): cv.string,
+        vol.Optional(CONF_STATE_TOPIC): valid_subscribe_topic,
+        vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
+    }
+).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
+
+PLATFORM_SCHEMA_MODERN = vol.All(_PLATFORM_SCHEMA_BASE, _validate_and_add_defaults)
+
+DISCOVERY_SCHEMA = vol.All(
+    _PLATFORM_SCHEMA_BASE.extend({}, extra=vol.REMOVE_EXTRA),
+    _validate_and_add_defaults,
+)
+
+
+async def async_setup_entry(
+    hass: HomeAssistant,
+    config_entry: ConfigEntry,
+    async_add_entities: AddEntitiesCallback,
+) -> None:
+    """Set up MQTT valve through YAML and through MQTT discovery."""
+    await async_setup_entity_entry_helper(
+        hass,
+        config_entry,
+        MqttValve,
+        valve.DOMAIN,
+        async_add_entities,
+        DISCOVERY_SCHEMA,
+        PLATFORM_SCHEMA_MODERN,
+    )
+
+
+class MqttValve(MqttEntity, ValveEntity):
+    """Representation of a valve that can be controlled using MQTT."""
+
+    _attr_is_closed: bool | None = None
+    _attributes_extra_blocked: frozenset[str] = MQTT_VALVE_ATTRIBUTES_BLOCKED
+    _default_name = DEFAULT_NAME
+    _entity_id_format: str = valve.ENTITY_ID_FORMAT
+    _optimistic: bool
+    _range: tuple[int, int]
+    _tilt_optimistic: bool
+
+    @staticmethod
+    def config_schema() -> vol.Schema:
+        """Return the config schema."""
+        return DISCOVERY_SCHEMA
+
+    def _setup_from_config(self, config: ConfigType) -> None:
+        """Set up valve from config."""
+        self._attr_reports_position = config[CONF_REPORTS_POSITION]
+        self._range = (
+            self._config[CONF_POSITION_CLOSED] + 1,
+            self._config[CONF_POSITION_OPEN],
+        )
+        no_state_topic = config.get(CONF_STATE_TOPIC) is None
+        self._optimistic = config[CONF_OPTIMISTIC] or no_state_topic
+        self._attr_assumed_state = self._optimistic
+
+        template_config_attributes = {
+            "position_open": config[CONF_POSITION_OPEN],
+            "position_closed": config[CONF_POSITION_CLOSED],
+        }
+
+        self._value_template = MqttValueTemplate(
+            config.get(CONF_VALUE_TEMPLATE), entity=self
+        ).async_render_with_possible_json_value
+
+        self._command_template = MqttCommandTemplate(
+            config.get(CONF_COMMAND_TEMPLATE), entity=self
+        ).async_render
+
+        self._value_template = MqttValueTemplate(
+            config.get(CONF_VALUE_TEMPLATE),
+            entity=self,
+            config_attributes=template_config_attributes,
+        ).async_render_with_possible_json_value
+
+        self._attr_device_class = config.get(CONF_DEVICE_CLASS)
+
+        supported_features = ValveEntityFeature(0)
+        if CONF_COMMAND_TOPIC in config:
+            if config[CONF_PAYLOAD_OPEN] is not None:
+                supported_features |= ValveEntityFeature.OPEN
+            if config[CONF_PAYLOAD_CLOSE] is not None:
+                supported_features |= ValveEntityFeature.CLOSE
+
+        if config[CONF_REPORTS_POSITION]:
+            supported_features |= ValveEntityFeature.SET_POSITION
+        if config.get(CONF_PAYLOAD_STOP) is not None:
+            supported_features |= ValveEntityFeature.STOP
+
+        self._attr_supported_features = supported_features
+
+    @callback
+    def _update_state(self, state: str) -> None:
+        """Update the valve state based on static payload."""
+        self._attr_is_closed = state == STATE_CLOSED
+        self._attr_is_opening = state == STATE_OPENING
+        self._attr_is_closing = state == STATE_CLOSING
+
+    @callback
+    def _process_binary_valve_update(
+        self, payload: ReceivePayloadType, state_payload: str
+    ) -> None:
+        """Process an update for a valve that does not report the position."""
+        state: str | None = None
+        if state_payload == self._config[CONF_STATE_OPENING]:
+            state = STATE_OPENING
+        elif state_payload == self._config[CONF_STATE_CLOSING]:
+            state = STATE_CLOSING
+        elif state_payload == self._config[CONF_STATE_OPEN]:
+            state = STATE_OPEN
+        elif state_payload == self._config[CONF_STATE_CLOSED]:
+            state = STATE_CLOSED
+        if state is None:
+            _LOGGER.warning(
+                "Payload is not one of [open, closed, opening, closing], got: %s",
+                payload,
+            )
+            return
+        self._update_state(state)
+
+    @callback
+    def _process_position_valve_update(
+        self, payload: ReceivePayloadType, position_payload: str, state_payload: str
+    ) -> None:
+        """Process an update for a valve that reports the position."""
+        state: str | None = None
+        if state_payload == self._config[CONF_STATE_OPENING]:
+            state = STATE_OPENING
+        elif state_payload == self._config[CONF_STATE_CLOSING]:
+            state = STATE_CLOSING
+        if state is None or position_payload != state_payload:
+            try:
+                percentage_payload = ranged_value_to_percentage(
+                    self._range, float(position_payload)
+                )
+            except ValueError:
+                _LOGGER.warning("Payload '%s' is not numeric", position_payload)
+                return
+
+            self._attr_current_valve_position = min(max(percentage_payload, 0), 100)
+        if state is None:
+            _LOGGER.warning(
+                "Payload is not one of [opening, closing], got: %s",
+                payload,
+            )
+            return
+        self._update_state(state)
+
+    def _prepare_subscribe_topics(self) -> None:
+        """(Re)Subscribe to topics."""
+        topics = {}
+
+        @callback
+        @log_messages(self.hass, self.entity_id)
+        @write_state_on_attr_change(
+            self,
+            {
+                "_attr_current_valve_position",
+                "_attr_is_closed",
+                "_attr_is_closing",
+                "_attr_is_opening",
+            },
+        )
+        def state_message_received(msg: ReceiveMessage) -> None:
+            """Handle new MQTT state messages."""
+            payload_dict: Any = None
+            position_payload: Any = None
+            state_payload: Any = None
+            payload = self._value_template(msg.payload)
+
+            if not payload:
+                _LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
+                return
+
+            with suppress(*JSON_DECODE_EXCEPTIONS):
+                payload_dict = json_loads(payload)
+            if isinstance(payload_dict, dict) and "position" in payload_dict:
+                position_payload = payload_dict["position"]
+            if isinstance(payload_dict, dict) and "state" in payload_dict:
+                state_payload = payload_dict["state"]
+            state_payload = payload if state_payload is None else state_payload
+            position_payload = payload if position_payload is None else position_payload
+
+            if self._config[CONF_REPORTS_POSITION]:
+                self._process_position_valve_update(
+                    payload, position_payload, state_payload
+                )
+            else:
+                self._process_binary_valve_update(payload, state_payload)
+
+        if self._config.get(CONF_STATE_TOPIC):
+            topics["state_topic"] = {
+                "topic": self._config.get(CONF_STATE_TOPIC),
+                "msg_callback": state_message_received,
+                "qos": self._config[CONF_QOS],
+                "encoding": self._config[CONF_ENCODING] or None,
+            }
+
+        self._sub_state = subscription.async_prepare_subscribe_topics(
+            self.hass, self._sub_state, topics
+        )
+
+    async def _subscribe_topics(self) -> None:
+        """(Re)Subscribe to topics."""
+        await subscription.async_subscribe_topics(self.hass, self._sub_state)
+
+    async def async_open_valve(self) -> None:
+        """Move the valve up.
+
+        This method is a coroutine.
+        """
+        payload = self._command_template(
+            self._config.get(CONF_PAYLOAD_OPEN, DEFAULT_PAYLOAD_OPEN)
+        )
+        await self.async_publish(
+            self._config[CONF_COMMAND_TOPIC],
+            payload,
+            self._config[CONF_QOS],
+            self._config[CONF_RETAIN],
+            self._config[CONF_ENCODING],
+        )
+        if self._optimistic:
+            # Optimistically assume that valve has changed state.
+            self._update_state(STATE_OPEN)
+            self.async_write_ha_state()
+
+    async def async_close_valve(self) -> None:
+        """Move the valve down.
+
+        This method is a coroutine.
+        """
+        payload = self._command_template(
+            self._config.get(CONF_PAYLOAD_CLOSE, DEFAULT_PAYLOAD_CLOSE)
+        )
+        await self.async_publish(
+            self._config[CONF_COMMAND_TOPIC],
+            payload,
+            self._config[CONF_QOS],
+            self._config[CONF_RETAIN],
+            self._config[CONF_ENCODING],
+        )
+        if self._optimistic:
+            # Optimistically assume that valve has changed state.
+            self._update_state(STATE_CLOSED)
+            self.async_write_ha_state()
+
+    async def async_stop_valve(self) -> None:
+        """Stop valve positioning.
+
+        This method is a coroutine.
+        """
+        payload = self._command_template(self._config[CONF_PAYLOAD_STOP])
+        await self.async_publish(
+            self._config[CONF_COMMAND_TOPIC],
+            payload,
+            self._config[CONF_QOS],
+            self._config[CONF_RETAIN],
+            self._config[CONF_ENCODING],
+        )
+
+    async def async_set_valve_position(self, position: int) -> None:
+        """Move the valve to a specific position."""
+        percentage_position = position
+        scaled_position = round(
+            percentage_to_ranged_value(self._range, percentage_position)
+        )
+        variables = {
+            "position": percentage_position,
+            "position_open": self._config[CONF_POSITION_OPEN],
+            "position_closed": self._config[CONF_POSITION_CLOSED],
+        }
+        rendered_position = self._command_template(scaled_position, variables=variables)
+
+        await self.async_publish(
+            self._config[CONF_COMMAND_TOPIC],
+            rendered_position,
+            self._config[CONF_QOS],
+            self._config[CONF_RETAIN],
+            self._config[CONF_ENCODING],
+        )
+        if self._optimistic:
+            self._update_state(
+                STATE_CLOSED
+                if percentage_position == self._config[CONF_POSITION_CLOSED]
+                else STATE_OPEN
+            )
+            self._attr_current_valve_position = percentage_position
+            self.async_write_ha_state()
diff --git a/tests/components/mqtt/test_valve.py b/tests/components/mqtt/test_valve.py
new file mode 100644
index 00000000000..27be72ecabc
--- /dev/null
+++ b/tests/components/mqtt/test_valve.py
@@ -0,0 +1,1399 @@
+"""The tests for the MQTT valve platform."""
+from typing import Any
+from unittest.mock import patch
+
+import pytest
+
+from homeassistant.components import mqtt, valve
+from homeassistant.components.mqtt.valve import (
+    MQTT_VALVE_ATTRIBUTES_BLOCKED,
+    ValveEntityFeature,
+)
+from homeassistant.components.valve import (
+    ATTR_CURRENT_POSITION,
+    ATTR_POSITION,
+    SERVICE_SET_VALVE_POSITION,
+)
+from homeassistant.const import (
+    ATTR_ASSUMED_STATE,
+    ATTR_ENTITY_ID,
+    ATTR_SUPPORTED_FEATURES,
+    SERVICE_CLOSE_VALVE,
+    SERVICE_OPEN_VALVE,
+    SERVICE_STOP_VALVE,
+    STATE_CLOSED,
+    STATE_CLOSING,
+    STATE_OPEN,
+    STATE_OPENING,
+    STATE_UNKNOWN,
+    Platform,
+)
+from homeassistant.core import HomeAssistant
+
+from .test_common import (
+    help_custom_config,
+    help_test_availability_when_connection_lost,
+    help_test_availability_without_topic,
+    help_test_custom_availability_payload,
+    help_test_default_availability_payload,
+    help_test_discovery_broken,
+    help_test_discovery_removal,
+    help_test_discovery_update,
+    help_test_discovery_update_attr,
+    help_test_discovery_update_unchanged,
+    help_test_encoding_subscribable_topics,
+    help_test_entity_debug_info_message,
+    help_test_entity_device_info_remove,
+    help_test_entity_device_info_update,
+    help_test_entity_device_info_with_connection,
+    help_test_entity_device_info_with_identifier,
+    help_test_entity_id_update_discovery_update,
+    help_test_entity_id_update_subscriptions,
+    help_test_publishing_with_custom_encoding,
+    help_test_reloadable,
+    help_test_setting_attribute_via_mqtt_json_message,
+    help_test_setting_attribute_with_template,
+    help_test_setting_blocked_attribute_via_mqtt_json_message,
+    help_test_skipped_async_ha_write_state,
+    help_test_unique_id,
+    help_test_unload_config_entry_with_platform,
+    help_test_update_with_json_attrs_bad_json,
+    help_test_update_with_json_attrs_not_dict,
+)
+
+from tests.common import async_fire_mqtt_message
+from tests.typing import MqttMockHAClientGenerator, MqttMockPahoClient
+
+DEFAULT_CONFIG = {
+    mqtt.DOMAIN: {
+        valve.DOMAIN: {
+            "command_topic": "command-topic",
+            "state_topic": "test-topic",
+            "name": "test",
+        }
+    }
+}
+
+DEFAULT_CONFIG_REPORTS_POSITION = {
+    mqtt.DOMAIN: {
+        valve.DOMAIN: {
+            "name": "test",
+            "command_topic": "command-topic",
+            "state_topic": "test-topic",
+            "reports_position": True,
+        }
+    }
+}
+
+
+@pytest.fixture(autouse=True)
+def valve_platform_only():
+    """Only setup the valve platform to speed up tests."""
+    with patch("homeassistant.components.mqtt.PLATFORMS", [Platform.VALVE]):
+        yield
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("message", "asserted_state"),
+    [
+        ("open", STATE_OPEN),
+        ("closed", STATE_CLOSED),
+        ("closing", STATE_CLOSING),
+        ("opening", STATE_OPENING),
+        ('{"state" : "open"}', STATE_OPEN),
+        ('{"state" : "closed"}', STATE_CLOSED),
+        ('{"state" : "closing"}', STATE_CLOSING),
+        ('{"state" : "opening"}', STATE_OPENING),
+    ],
+)
+async def test_state_via_state_topic_no_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    message: str,
+    asserted_state: str,
+) -> None:
+    """Test the controlling state via topic without position and without template."""
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert not state.attributes.get(ATTR_ASSUMED_STATE)
+
+    async_fire_mqtt_message(hass, "state-topic", message)
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "value_template": "{{ value_json.state }}",
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("message", "asserted_state"),
+    [
+        ('{"state":"open"}', STATE_OPEN),
+        ('{"state":"closed"}', STATE_CLOSED),
+        ('{"state":"closing"}', STATE_CLOSING),
+        ('{"state":"opening"}', STATE_OPENING),
+    ],
+)
+async def test_state_via_state_topic_with_template(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    message: str,
+    asserted_state: str,
+) -> None:
+    """Test the controlling state via topic with template."""
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert not state.attributes.get(ATTR_ASSUMED_STATE)
+
+    async_fire_mqtt_message(hass, "state-topic", message)
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "reports_position": True,
+                    "value_template": "{{ value_json.position }}",
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("message", "asserted_state"),
+    [
+        ('{"position":100}', STATE_OPEN),
+        ('{"position":50.0}', STATE_OPEN),
+        ('{"position":0}', STATE_CLOSED),
+        ('{"position":"non_numeric"}', STATE_UNKNOWN),
+        ('{"ignored":12}', STATE_UNKNOWN),
+    ],
+)
+async def test_state_via_state_topic_with_position_template(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    message: str,
+    asserted_state: str,
+) -> None:
+    """Test the controlling state via topic with position template."""
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert not state.attributes.get(ATTR_ASSUMED_STATE)
+
+    async_fire_mqtt_message(hass, "state-topic", message)
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "reports_position": True,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("message", "asserted_state", "valve_position"),
+    [
+        ("0", STATE_CLOSED, 0),
+        ("opening", STATE_OPENING, None),
+        ("50", STATE_OPEN, 50),
+        ("closing", STATE_CLOSING, None),
+        ("100", STATE_OPEN, 100),
+        ("open", STATE_UNKNOWN, None),
+        ("closed", STATE_UNKNOWN, None),
+        ("-10", STATE_CLOSED, 0),
+        ("110", STATE_OPEN, 100),
+        ('{"position": 0, "state": "opening"}', STATE_OPENING, 0),
+        ('{"position": 10, "state": "opening"}', STATE_OPENING, 10),
+        ('{"position": 50, "state": "open"}', STATE_OPEN, 50),
+        ('{"position": 100, "state": "closing"}', STATE_CLOSING, 100),
+        ('{"position": 90, "state": "closing"}', STATE_CLOSING, 90),
+        ('{"position": 0, "state": "closed"}', STATE_CLOSED, 0),
+        ('{"position": -10, "state": "closed"}', STATE_CLOSED, 0),
+        ('{"position": 110, "state": "open"}', STATE_OPEN, 100),
+    ],
+)
+async def test_state_via_state_topic_through_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    message: str,
+    asserted_state: str,
+    valve_position: int | None,
+) -> None:
+    """Test the controlling state via topic through position.
+
+    Test is still possible to process a `opening` or `closing` state update.
+    Additional we test json messages can be processed containing both position and state.
+    Incoming rendered positions are clamped between 0..100.
+    """
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert not state.attributes.get(ATTR_ASSUMED_STATE)
+
+    async_fire_mqtt_message(hass, "state-topic", message)
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+    assert state.attributes.get(ATTR_CURRENT_POSITION) == valve_position
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "reports_position": True,
+                    "position_closed": -128,
+                    "position_open": 127,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("message", "asserted_state", "valve_position"),
+    [
+        ("-128", STATE_CLOSED, 0),
+        ("0", STATE_OPEN, 50),
+        ("127", STATE_OPEN, 100),
+        ("-130", STATE_CLOSED, 0),
+        ("130", STATE_OPEN, 100),
+        ('{"position": -128, "state": "opening"}', STATE_OPENING, 0),
+        ('{"position": -30, "state": "opening"}', STATE_OPENING, 38),
+        ('{"position": 30, "state": "open"}', STATE_OPEN, 61),
+        ('{"position": 127, "state": "closing"}', STATE_CLOSING, 100),
+        ('{"position": 100, "state": "closing"}', STATE_CLOSING, 89),
+        ('{"position": -128, "state": "closed"}', STATE_CLOSED, 0),
+        ('{"position": -130, "state": "closed"}', STATE_CLOSED, 0),
+        ('{"position": 130, "state": "open"}', STATE_OPEN, 100),
+    ],
+)
+async def test_state_via_state_trough_position_with_alt_range(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    message: str,
+    asserted_state: str,
+    valve_position: int | None,
+) -> None:
+    """Test the controlling state via topic through position and an alternative range.
+
+    Test is still possible to process a `opening` or `closing` state update.
+    Additional we test json messages can be processed containing both position and state.
+    Incoming rendered positions are clamped between 0..100.
+    """
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert not state.attributes.get(ATTR_ASSUMED_STATE)
+
+    async_fire_mqtt_message(hass, "state-topic", message)
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+    assert state.attributes.get(ATTR_CURRENT_POSITION) == valve_position
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "SToP",
+                    "payload_open": "OPeN",
+                    "payload_close": "CLOsE",
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("service", "asserted_message"),
+    [
+        (SERVICE_CLOSE_VALVE, "CLOsE"),
+        (SERVICE_OPEN_VALVE, "OPeN"),
+        (SERVICE_STOP_VALVE, "SToP"),
+    ],
+)
+async def tests_controling_valve_by_state(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    service: str,
+    asserted_message: str,
+) -> None:
+    """Test controlling a valve by state."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        service,
+        {ATTR_ENTITY_ID: "valve.test"},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+
+@pytest.mark.parametrize(
+    ("hass_config", "supported_features"),
+    [
+        (DEFAULT_CONFIG, ValveEntityFeature.OPEN | ValveEntityFeature.CLOSE),
+        (
+            help_custom_config(
+                valve.DOMAIN,
+                DEFAULT_CONFIG,
+                ({"payload_open": "OPEN", "payload_close": "CLOSE"},),
+            ),
+            ValveEntityFeature.OPEN | ValveEntityFeature.CLOSE,
+        ),
+        (
+            help_custom_config(
+                valve.DOMAIN,
+                DEFAULT_CONFIG,
+                ({"payload_open": "OPEN", "payload_close": None},),
+            ),
+            ValveEntityFeature.OPEN,
+        ),
+        (
+            help_custom_config(
+                valve.DOMAIN,
+                DEFAULT_CONFIG,
+                ({"payload_open": None, "payload_close": "CLOSE"},),
+            ),
+            ValveEntityFeature.CLOSE,
+        ),
+        (
+            help_custom_config(
+                valve.DOMAIN, DEFAULT_CONFIG, ({"payload_stop": "STOP"},)
+            ),
+            ValveEntityFeature.OPEN
+            | ValveEntityFeature.CLOSE
+            | ValveEntityFeature.STOP,
+        ),
+        (
+            help_custom_config(
+                valve.DOMAIN,
+                DEFAULT_CONFIG_REPORTS_POSITION,
+                ({"payload_stop": "STOP"},),
+            ),
+            ValveEntityFeature.OPEN
+            | ValveEntityFeature.CLOSE
+            | ValveEntityFeature.STOP
+            | ValveEntityFeature.SET_POSITION,
+        ),
+    ],
+)
+async def tests_supported_features(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    supported_features: ValveEntityFeature,
+) -> None:
+    """Test the valve's supported features."""
+    assert await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state is not None
+    assert state.attributes.get(ATTR_SUPPORTED_FEATURES) == supported_features
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        help_custom_config(
+            valve.DOMAIN, DEFAULT_CONFIG_REPORTS_POSITION, ({"payload_open": "OPEN"},)
+        ),
+        help_custom_config(
+            valve.DOMAIN, DEFAULT_CONFIG_REPORTS_POSITION, ({"payload_close": "CLOSE"},)
+        ),
+        help_custom_config(
+            valve.DOMAIN, DEFAULT_CONFIG_REPORTS_POSITION, ({"state_open": "open"},)
+        ),
+        help_custom_config(
+            valve.DOMAIN, DEFAULT_CONFIG_REPORTS_POSITION, ({"state_closed": "closed"},)
+        ),
+    ],
+)
+async def tests_open_close_payload_config_not_allowed(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test open or close payload configs fail if valve reports position."""
+    assert await mqtt_mock_entry()
+
+    assert hass.states.get("valve.test") is None
+
+    assert (
+        "Options `payload_open`, `payload_close`, `state_open` and "
+        "`state_closed` are not allowed if the valve reports a position." in caplog.text
+    )
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "STOP",
+                    "optimistic": True,
+                }
+            }
+        },
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "command_topic": "command-topic",
+                    "payload_stop": "STOP",
+                }
+            }
+        },
+    ],
+)
+@pytest.mark.parametrize(
+    ("service", "asserted_message", "asserted_state"),
+    [
+        (SERVICE_CLOSE_VALVE, "CLOSE", STATE_CLOSED),
+        (SERVICE_OPEN_VALVE, "OPEN", STATE_OPEN),
+    ],
+)
+async def tests_controling_valve_by_state_optimistic(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    service: str,
+    asserted_message: str,
+    asserted_state: str,
+) -> None:
+    """Test controlling a valve by state explicit and implicit optimistic."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        service,
+        {ATTR_ENTITY_ID: "valve.test"},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "-1",
+                    "reports_position": True,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("service", "asserted_message"),
+    [
+        (SERVICE_CLOSE_VALVE, "0"),
+        (SERVICE_OPEN_VALVE, "100"),
+        (SERVICE_STOP_VALVE, "-1"),
+    ],
+)
+async def tests_controling_valve_by_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    service: str,
+    asserted_message: str,
+) -> None:
+    """Test controlling a valve by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        service,
+        {ATTR_ENTITY_ID: "valve.test"},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "-1",
+                    "reports_position": True,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("position", "asserted_message"),
+    [
+        (0, "0"),
+        (30, "30"),
+        (100, "100"),
+    ],
+)
+async def tests_controling_valve_by_set_valve_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    position: int,
+    asserted_message: str,
+) -> None:
+    """Test controlling a valve by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        SERVICE_SET_VALVE_POSITION,
+        {ATTR_ENTITY_ID: "valve.test", ATTR_POSITION: position},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "-1",
+                    "reports_position": True,
+                    "optimistic": True,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("position", "asserted_message", "asserted_position", "asserted_state"),
+    [
+        (0, "0", 0, STATE_CLOSED),
+        (30, "30", 30, STATE_OPEN),
+        (100, "100", 100, STATE_OPEN),
+    ],
+)
+async def tests_controling_valve_optimistic_by_set_valve_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    position: int,
+    asserted_message: str,
+    asserted_position: int,
+    asserted_state: str,
+) -> None:
+    """Test controlling a valve optimistic by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        SERVICE_SET_VALVE_POSITION,
+        {ATTR_ENTITY_ID: "valve.test", ATTR_POSITION: position},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+    assert state.attributes.get(ATTR_CURRENT_POSITION) == asserted_position
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "-1",
+                    "reports_position": True,
+                    "position_closed": -128,
+                    "position_open": 127,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("position", "asserted_message"),
+    [
+        (0, "-128"),
+        (30, "-52"),
+        (80, "76"),
+        (100, "127"),
+    ],
+)
+async def tests_controling_valve_with_alt_range_by_set_valve_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    position: int,
+    asserted_message: str,
+) -> None:
+    """Test controlling a valve with an alt range by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        SERVICE_SET_VALVE_POSITION,
+        {ATTR_ENTITY_ID: "valve.test", ATTR_POSITION: position},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "reports_position": True,
+                    "position_closed": -128,
+                    "position_open": 127,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("service", "asserted_message"),
+    [
+        (SERVICE_CLOSE_VALVE, "-128"),
+        (SERVICE_OPEN_VALVE, "127"),
+    ],
+)
+async def tests_controling_valve_with_alt_range_by_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    service: str,
+    asserted_message: str,
+) -> None:
+    """Test controlling a valve with an alt range by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        service,
+        {ATTR_ENTITY_ID: "valve.test"},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "STOP",
+                    "optimistic": True,
+                    "reports_position": True,
+                }
+            }
+        },
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "command_topic": "command-topic",
+                    "payload_stop": "STOP",
+                    "reports_position": True,
+                }
+            }
+        },
+    ],
+)
+@pytest.mark.parametrize(
+    ("service", "asserted_message", "asserted_state", "asserted_position"),
+    [
+        (SERVICE_CLOSE_VALVE, "0", STATE_CLOSED, 0),
+        (SERVICE_OPEN_VALVE, "100", STATE_OPEN, 100),
+    ],
+)
+async def tests_controling_valve_by_position_optimistic(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    service: str,
+    asserted_message: str,
+    asserted_state: str,
+    asserted_position: int,
+) -> None:
+    """Test controlling a valve by state explicit and implicit optimistic."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+    assert state.attributes.get(ATTR_CURRENT_POSITION) is None
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        service,
+        {ATTR_ENTITY_ID: "valve.test"},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+    assert state.attributes[ATTR_CURRENT_POSITION] == asserted_position
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "state_topic": "state-topic",
+                    "command_topic": "command-topic",
+                    "payload_stop": "-1",
+                    "reports_position": True,
+                    "optimistic": True,
+                    "position_closed": -128,
+                    "position_open": 127,
+                }
+            }
+        }
+    ],
+)
+@pytest.mark.parametrize(
+    ("position", "asserted_message", "asserted_position", "asserted_state"),
+    [
+        (0, "-128", 0, STATE_CLOSED),
+        (30, "-52", 30, STATE_OPEN),
+        (50, "0", 50, STATE_OPEN),
+        (100, "127", 100, STATE_OPEN),
+    ],
+)
+async def tests_controling_valve_optimistic_alt_trange_by_set_valve_position(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    position: int,
+    asserted_message: str,
+    asserted_position: int,
+    asserted_state: str,
+) -> None:
+    """Test controlling a valve optimistic and alt range by position."""
+    mqtt_mock = await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.state == STATE_UNKNOWN
+
+    await hass.services.async_call(
+        valve.DOMAIN,
+        SERVICE_SET_VALVE_POSITION,
+        {ATTR_ENTITY_ID: "valve.test", ATTR_POSITION: position},
+        blocking=True,
+    )
+
+    mqtt_mock.async_publish.assert_called_once_with(
+        "command-topic", asserted_message, 0, False
+    )
+
+    state = hass.states.get("valve.test")
+    assert state.state == asserted_state
+    assert state.attributes.get(ATTR_CURRENT_POSITION) == asserted_position
+
+
+@pytest.mark.parametrize("hass_config", [DEFAULT_CONFIG])
+async def test_availability_when_connection_lost(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test availability after MQTT disconnection."""
+    await help_test_availability_when_connection_lost(
+        hass, mqtt_mock_entry, valve.DOMAIN
+    )
+
+
+@pytest.mark.parametrize("hass_config", [DEFAULT_CONFIG])
+async def test_availability_without_topic(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test availability without defined availability topic."""
+    await help_test_availability_without_topic(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_default_availability_payload(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test availability by default payload with defined topic."""
+    await help_test_default_availability_payload(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_custom_availability_payload(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test availability by custom payload with defined topic."""
+    await help_test_custom_availability_payload(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "device_class": "water",
+                    "state_topic": "test-topic",
+                }
+            }
+        }
+    ],
+)
+async def test_valid_device_class(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test the setting of a valid device class."""
+    await mqtt_mock_entry()
+
+    state = hass.states.get("valve.test")
+    assert state.attributes.get("device_class") == "water"
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: {
+                    "name": "test",
+                    "device_class": "abc123",
+                    "state_topic": "test-topic",
+                }
+            }
+        }
+    ],
+)
+async def test_invalid_device_class(
+    hass: HomeAssistant,
+    caplog: pytest.LogCaptureFixture,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+) -> None:
+    """Test the setting of an invalid device class."""
+    assert await mqtt_mock_entry()
+    assert "expected ValveDeviceClass" in caplog.text
+
+
+async def test_setting_attribute_via_mqtt_json_message(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test the setting of attribute via MQTT with JSON payload."""
+    await help_test_setting_attribute_via_mqtt_json_message(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_setting_blocked_attribute_via_mqtt_json_message(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test the setting of attribute via MQTT with JSON payload."""
+    await help_test_setting_blocked_attribute_via_mqtt_json_message(
+        hass,
+        mqtt_mock_entry,
+        valve.DOMAIN,
+        DEFAULT_CONFIG,
+        MQTT_VALVE_ATTRIBUTES_BLOCKED,
+    )
+
+
+async def test_setting_attribute_with_template(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test the setting of attribute via MQTT with JSON payload."""
+    await help_test_setting_attribute_with_template(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_update_with_json_attrs_not_dict(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test attributes get extracted from a JSON result."""
+    await help_test_update_with_json_attrs_not_dict(
+        hass,
+        mqtt_mock_entry,
+        caplog,
+        valve.DOMAIN,
+        DEFAULT_CONFIG,
+    )
+
+
+async def test_update_with_json_attrs_bad_json(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test attributes get extracted from a JSON result."""
+    await help_test_update_with_json_attrs_bad_json(
+        hass,
+        mqtt_mock_entry,
+        caplog,
+        valve.DOMAIN,
+        DEFAULT_CONFIG,
+    )
+
+
+async def test_discovery_update_attr(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test update of discovered MQTTAttributes."""
+    await help_test_discovery_update_attr(
+        hass,
+        mqtt_mock_entry,
+        caplog,
+        valve.DOMAIN,
+        DEFAULT_CONFIG,
+    )
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        {
+            mqtt.DOMAIN: {
+                valve.DOMAIN: [
+                    {
+                        "name": "Test 1",
+                        "state_topic": "test-topic",
+                        "unique_id": "TOTALLY_UNIQUE",
+                    },
+                    {
+                        "name": "Test 2",
+                        "state_topic": "test-topic",
+                        "unique_id": "TOTALLY_UNIQUE",
+                    },
+                ]
+            }
+        }
+    ],
+)
+async def test_unique_id(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test unique_id option only creates one valve per id."""
+    await help_test_unique_id(hass, mqtt_mock_entry, valve.DOMAIN)
+
+
+async def test_discovery_removal_valve(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test removal of discovered valve."""
+    data = '{ "name": "test", "command_topic": "test_topic" }'
+    await help_test_discovery_removal(hass, mqtt_mock_entry, caplog, valve.DOMAIN, data)
+
+
+async def test_discovery_update_valve(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test update of discovered valve."""
+    config1 = {"name": "Beer", "command_topic": "test_topic"}
+    config2 = {"name": "Milk", "command_topic": "test_topic"}
+    await help_test_discovery_update(
+        hass, mqtt_mock_entry, caplog, valve.DOMAIN, config1, config2
+    )
+
+
+async def test_discovery_update_unchanged_valve(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test update of discovered valve."""
+    data1 = '{ "name": "Beer", "command_topic": "test_topic" }'
+    with patch(
+        "homeassistant.components.mqtt.valve.MqttValve.discovery_update"
+    ) as discovery_update:
+        await help_test_discovery_update_unchanged(
+            hass,
+            mqtt_mock_entry,
+            caplog,
+            valve.DOMAIN,
+            data1,
+            discovery_update,
+        )
+
+
+@pytest.mark.no_fail_on_log_exception
+async def test_discovery_broken(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test handling of bad discovery message."""
+    data1 = '{ "name": "Beer", "command_topic": "test_topic#" }'
+    data2 = '{ "name": "Milk", "command_topic": "test_topic" }'
+    await help_test_discovery_broken(
+        hass, mqtt_mock_entry, caplog, valve.DOMAIN, data1, data2
+    )
+
+
+async def test_entity_device_info_with_connection(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test MQTT valve device registry integration."""
+    await help_test_entity_device_info_with_connection(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_device_info_with_identifier(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test MQTT valve device registry integration."""
+    await help_test_entity_device_info_with_identifier(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_device_info_update(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test device registry update."""
+    await help_test_entity_device_info_update(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_device_info_remove(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test device registry remove."""
+    await help_test_entity_device_info_remove(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_id_update_subscriptions(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test MQTT subscriptions are managed when entity_id is updated."""
+    await help_test_entity_id_update_subscriptions(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_id_update_discovery_update(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test MQTT discovery update when entity_id is updated."""
+    await help_test_entity_id_update_discovery_update(
+        hass, mqtt_mock_entry, valve.DOMAIN, DEFAULT_CONFIG
+    )
+
+
+async def test_entity_debug_info_message(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test MQTT debug info."""
+    await help_test_entity_debug_info_message(
+        hass,
+        mqtt_mock_entry,
+        valve.DOMAIN,
+        DEFAULT_CONFIG,
+        SERVICE_OPEN_VALVE,
+        command_payload="OPEN",
+    )
+
+
+@pytest.mark.parametrize(
+    ("service", "topic", "parameters", "payload", "template"),
+    [
+        (
+            SERVICE_OPEN_VALVE,
+            "command_topic",
+            None,
+            "OPEN",
+            None,
+        ),
+    ],
+)
+async def test_publishing_with_custom_encoding(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    caplog: pytest.LogCaptureFixture,
+    service: str,
+    topic: str,
+    parameters: dict[str, Any],
+    payload: str,
+    template: str | None,
+) -> None:
+    """Test publishing MQTT payload with different encoding."""
+    domain = valve.DOMAIN
+    config = DEFAULT_CONFIG
+
+    await help_test_publishing_with_custom_encoding(
+        hass,
+        mqtt_mock_entry,
+        caplog,
+        domain,
+        config,
+        service,
+        topic,
+        parameters,
+        payload,
+        template,
+    )
+
+
+async def test_reloadable(
+    hass: HomeAssistant,
+    mqtt_client_mock: MqttMockPahoClient,
+) -> None:
+    """Test reloading the MQTT platform."""
+    domain = valve.DOMAIN
+    config = DEFAULT_CONFIG
+    await help_test_reloadable(hass, mqtt_client_mock, domain, config)
+
+
+@pytest.mark.parametrize(
+    ("topic", "value", "attribute", "attribute_value"),
+    [
+        ("state_topic", "open", None, None),
+        ("state_topic", "closing", None, None),
+    ],
+)
+async def test_encoding_subscribable_topics(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    topic: str,
+    value: str,
+    attribute: str | None,
+    attribute_value: Any,
+) -> None:
+    """Test handling of incoming encoded payload."""
+    await help_test_encoding_subscribable_topics(
+        hass,
+        mqtt_mock_entry,
+        valve.DOMAIN,
+        DEFAULT_CONFIG[mqtt.DOMAIN][valve.DOMAIN],
+        topic,
+        value,
+        attribute,
+        attribute_value,
+        skip_raw_test=True,
+    )
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [DEFAULT_CONFIG, {"mqtt": [DEFAULT_CONFIG["mqtt"]]}],
+    ids=["platform_key", "listed"],
+)
+async def test_setup_manual_entity_from_yaml(
+    hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
+) -> None:
+    """Test setup manual configured MQTT entity."""
+    await mqtt_mock_entry()
+    platform = valve.DOMAIN
+    assert hass.states.get(f"{platform}.test")
+
+
+async def test_unload_entry(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+) -> None:
+    """Test unloading the config entry."""
+    domain = valve.DOMAIN
+    config = DEFAULT_CONFIG
+    await help_test_unload_config_entry_with_platform(
+        hass, mqtt_mock_entry, domain, config
+    )
+
+
+@pytest.mark.parametrize(
+    "hass_config",
+    [
+        help_custom_config(
+            valve.DOMAIN,
+            DEFAULT_CONFIG,
+            (
+                {
+                    "availability_topic": "availability-topic",
+                    "json_attributes_topic": "json-attributes-topic",
+                    "state_topic": "test-topic",
+                },
+            ),
+        )
+    ],
+)
+@pytest.mark.parametrize(
+    ("topic", "payload1", "payload2"),
+    [
+        ("test-topic", "open", "closed"),
+        ("availability-topic", "online", "offline"),
+        ("json-attributes-topic", '{"attr1": "val1"}', '{"attr1": "val2"}'),
+    ],
+)
+async def test_skipped_async_ha_write_state(
+    hass: HomeAssistant,
+    mqtt_mock_entry: MqttMockHAClientGenerator,
+    topic: str,
+    payload1: str,
+    payload2: str,
+) -> None:
+    """Test a write state command is only called when there is change."""
+    await mqtt_mock_entry()
+    await help_test_skipped_async_ha_write_state(hass, topic, payload1, payload2)
-- 
GitLab