Skip to content
Snippets Groups Projects
Commit 066b569c authored by t30's avatar t30
Browse files

Added command_rollershutter component

parent 7a0c99a1
No related branches found
No related tags found
No related merge requests found
"""
homeassistant.components.rollershutter.command_rollershutter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Allows to configure a command rollershutter.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/rollershutter.command_rollershutter/
"""
import logging
import subprocess
from homeassistant.components.rollershutter import RollershutterDevice
from homeassistant.const import CONF_VALUE_TEMPLATE
from homeassistant.util import template
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
""" Find and return rollershutter controlled by shell commands. """
rollershutters = config.get('rollershutters', {})
devices = []
for dev_name, properties in rollershutters.items():
devices.append(
CommandRollershutter(
hass,
properties.get('name', dev_name),
properties.get('upcmd', 'true'),
properties.get('downcmd', 'true'),
properties.get('stopcmd', 'true'),
properties.get('statecmd', False),
properties.get(CONF_VALUE_TEMPLATE, '{{ value }}')))
add_devices_callback(devices)
# pylint: disable=too-many-arguments, too-many-instance-attributes
class CommandRollershutter(RollershutterDevice):
""" Represents a rollershutter - can be controlled using shell cmd. """
# pylint: disable=too-many-arguments
def __init__(self, hass, name, command_up, command_down, command_stop,
command_state, value_template):
self._hass = hass
self._name = name
self._state = None # Unknown
self._command_up = command_up
self._command_down = command_down
self._command_stop = command_stop
self._command_state = command_state
self._value_template = value_template
@staticmethod
def _move_rollershutter(command):
""" Execute the actual commands. """
_LOGGER.info('Running command: %s', command)
success = (subprocess.call(command, shell=True) == 0)
if not success:
_LOGGER.error('Command failed: %s', command)
return success
@staticmethod
def _query_state_value(command):
""" Execute state command for return value. """
_LOGGER.info('Running state command: %s', command)
try:
return_value = subprocess.check_output(command, shell=True)
return return_value.strip().decode('utf-8')
except subprocess.CalledProcessError:
_LOGGER.error('Command failed: %s', command)
@property
def should_poll(self):
""" Only poll if we have statecmd. """
return self._command_state is not None
@property
def name(self):
""" The name of the rollershutter. """
return self._name
@property
def current_position(self):
"""
Return current position of rollershutter.
None is unknown, 0 is closed, 100 is fully open.
"""
return self._state
def _query_state(self):
""" Query for state. """
if not self._command_state:
_LOGGER.error('No state command specified')
return
return self._query_state_value(self._command_state)
def update(self):
""" Update device state. """
if self._command_state:
payload = str(self._query_state())
if self._value_template:
payload = template.render_with_possible_json_value(
self._hass, self._value_template, payload)
self._state = int(payload)
def move_up(self, **kwargs):
""" Move the rollershutter up. """
self._move_rollershutter(self._command_up)
def move_down(self, **kwargs):
""" Move the rollershutter down. """
self._move_rollershutter(self._command_down)
def stop(self, **kwargs):
""" Stop the device. """
self._move_rollershutter(self._command_stop)
"""
tests.components.rollershutter.command_rollershutter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the command_rollershutter component
"""
import os
import tempfile
import unittest
from unittest import mock
import homeassistant.core as ha
import homeassistant.components.rollershutter as rollershutter
from homeassistant.components.rollershutter import (
command_rollershutter as cmd_rs)
class TestCommandRollerShutter(unittest.TestCase):
def setup_method(self, method):
self.hass = ha.HomeAssistant()
self.hass.config.latitude = 32.87336
self.hass.config.longitude = 117.22743
self.rs = cmd_rs.CommandRollershutter(self.hass, 'foo',
'cmd_up', 'cmd_dn',
'cmd_stop', 'cmd_state',
None) # FIXME
def teardown_method(self, method):
""" Stop down stuff we started. """
self.hass.stop()
def test_should_poll(self):
self.assertTrue(self.rs.should_poll)
self.rs._command_state = None
self.assertFalse(self.rs.should_poll)
def test_query_state_value(self):
with mock.patch('subprocess.check_output') as mock_run:
mock_run.return_value = b' foo bar '
result = self.rs._query_state_value('runme')
self.assertEqual('foo bar', result)
mock_run.assert_called_once_with('runme', shell=True)
def test_state_value(self):
with tempfile.TemporaryDirectory() as tempdirname:
path = os.path.join(tempdirname, 'rollershutter_status')
test_rollershutter = {
'statecmd': 'cat {}'.format(path),
'upcmd': 'echo 1 > {}'.format(path),
'downcmd': 'echo 1 > {}'.format(path),
'stopcmd': 'echo 0 > {}'.format(path),
'value_template': '{{ value }}'
}
self.assertTrue(rollershutter.setup(self.hass, {
'rollershutter': {
'platform': 'command_rollershutter',
'rollershutters': {
'test': test_rollershutter
}
}
}))
state = self.hass.states.get('rollershutter.test')
self.assertEqual('unknown', state.state)
rollershutter.move_up(self.hass, 'rollershutter.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('rollershutter.test')
self.assertEqual('open', state.state)
rollershutter.move_down(self.hass, 'rollershutter.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('rollershutter.test')
self.assertEqual('open', state.state)
rollershutter.stop(self.hass, 'rollershutter.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('rollershutter.test')
self.assertEqual('closed', state.state)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment