diff --git a/homeassistant/components/sensor/filter.py b/homeassistant/components/sensor/filter.py index 9c05028b3944ff17f02ef15c696467214371fe33..261f6e2b510a3f1a13335bfe68ea8a84e7019ef9 100644 --- a/homeassistant/components/sensor/filter.py +++ b/homeassistant/components/sensor/filter.py @@ -28,6 +28,7 @@ import homeassistant.util.dt as dt_util _LOGGER = logging.getLogger(__name__) +FILTER_NAME_RANGE = 'range' FILTER_NAME_LOWPASS = 'lowpass' FILTER_NAME_OUTLIER = 'outlier' FILTER_NAME_THROTTLE = 'throttle' @@ -40,6 +41,8 @@ CONF_FILTER_WINDOW_SIZE = 'window_size' CONF_FILTER_PRECISION = 'precision' CONF_FILTER_RADIUS = 'radius' CONF_FILTER_TIME_CONSTANT = 'time_constant' +CONF_FILTER_LOWER_BOUND = 'lower_bound' +CONF_FILTER_UPPER_BOUND = 'upper_bound' CONF_TIME_SMA_TYPE = 'type' TIME_SMA_LAST = 'last' @@ -77,6 +80,12 @@ FILTER_LOWPASS_SCHEMA = FILTER_SCHEMA.extend({ default=DEFAULT_FILTER_TIME_CONSTANT): vol.Coerce(int), }) +FILTER_RANGE_SCHEMA = FILTER_SCHEMA.extend({ + vol.Required(CONF_FILTER_NAME): FILTER_NAME_RANGE, + vol.Optional(CONF_FILTER_LOWER_BOUND): vol.Coerce(float), + vol.Optional(CONF_FILTER_UPPER_BOUND): vol.Coerce(float), +}) + FILTER_TIME_SMA_SCHEMA = FILTER_SCHEMA.extend({ vol.Required(CONF_FILTER_NAME): FILTER_NAME_TIME_SMA, vol.Optional(CONF_TIME_SMA_TYPE, @@ -100,7 +109,8 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ [vol.Any(FILTER_OUTLIER_SCHEMA, FILTER_LOWPASS_SCHEMA, FILTER_TIME_SMA_SCHEMA, - FILTER_THROTTLE_SCHEMA)]) + FILTER_THROTTLE_SCHEMA, + FILTER_RANGE_SCHEMA)]) }) @@ -325,6 +335,49 @@ class Filter(object): return new_state +@FILTERS.register(FILTER_NAME_RANGE) +class RangeFilter(Filter): + """Range filter. + + Determines if new state is in the range of upper_bound and lower_bound. + If not inside, lower or upper bound is returned instead. + + Args: + upper_bound (float): band upper bound + lower_bound (float): band lower bound + """ + + def __init__(self, entity, + lower_bound, upper_bound): + """Initialize Filter.""" + super().__init__(FILTER_NAME_RANGE, entity=entity) + self._lower_bound = lower_bound + self._upper_bound = upper_bound + self._stats_internal = Counter() + + def _filter_state(self, new_state): + """Implement the range filter.""" + if self._upper_bound and new_state.state > self._upper_bound: + + self._stats_internal['erasures_up'] += 1 + + _LOGGER.debug("Upper outlier nr. %s in %s: %s", + self._stats_internal['erasures_up'], + self._entity, new_state) + new_state.state = self._upper_bound + + elif self._lower_bound and new_state.state < self._lower_bound: + + self._stats_internal['erasures_low'] += 1 + + _LOGGER.debug("Lower outlier nr. %s in %s: %s", + self._stats_internal['erasures_low'], + self._entity, new_state) + new_state.state = self._lower_bound + + return new_state + + @FILTERS.register(FILTER_NAME_OUTLIER) class OutlierFilter(Filter): """BASIC outlier filter. diff --git a/tests/components/sensor/test_filter.py b/tests/components/sensor/test_filter.py index 8e79306fe136a28140bdafcfb7f52b89b3360c52..cf2cc9c42054dbda6ca72d5e7a9588d5b410c642 100644 --- a/tests/components/sensor/test_filter.py +++ b/tests/components/sensor/test_filter.py @@ -4,7 +4,8 @@ import unittest from unittest.mock import patch from homeassistant.components.sensor.filter import ( - LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter) + LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter, + RangeFilter) import homeassistant.util.dt as dt_util from homeassistant.setup import setup_component import homeassistant.core as ha @@ -131,6 +132,23 @@ class TestFilterSensor(unittest.TestCase): filtered = filt.filter_state(state) self.assertEqual(18.05, filtered.state) + def test_range(self): + """Test if range filter works.""" + lower = 10 + upper = 20 + filt = RangeFilter(entity=None, + lower_bound=lower, + upper_bound=upper) + for unf_state in self.values: + unf = float(unf_state.state) + filtered = filt.filter_state(unf_state) + if unf < lower: + self.assertEqual(lower, filtered.state) + elif unf > upper: + self.assertEqual(upper, filtered.state) + else: + self.assertEqual(unf, filtered.state) + def test_throttle(self): """Test if lowpass filter works.""" filt = ThrottleFilter(window_size=3,