diff --git a/.coveragerc b/.coveragerc index c51f6bc37cbca0b077b8b416fecb776b57c5b4c0..e38ceeba5cefb327bd50f0835d8413b8470d5b85 100644 --- a/.coveragerc +++ b/.coveragerc @@ -749,6 +749,7 @@ omit = homeassistant/components/tts/picotts.py homeassistant/components/vacuum/mqtt.py homeassistant/components/vacuum/roomba.py + homeassistant/components/watson_iot.py homeassistant/components/weather/bom.py homeassistant/components/weather/buienradar.py homeassistant/components/weather/darksky.py diff --git a/homeassistant/components/watson_iot.py b/homeassistant/components/watson_iot.py new file mode 100644 index 0000000000000000000000000000000000000000..246cf3a96c28ea5b25a54a4f7c8133eb09229e05 --- /dev/null +++ b/homeassistant/components/watson_iot.py @@ -0,0 +1,214 @@ +""" +A component which allows you to send data to the IBM Watson IoT Platform. + +For more details about this component, please refer to the documentation at +https://home-assistant.io/components/watson_iot/ +""" + +import logging +import queue +import threading +import time + +import voluptuous as vol + +from homeassistant.const import ( + CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE, + CONF_TOKEN, CONF_TYPE, EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP, + STATE_UNAVAILABLE, STATE_UNKNOWN) +from homeassistant.helpers import state as state_helper +import homeassistant.helpers.config_validation as cv + +REQUIREMENTS = ['ibmiotf==0.3.4'] + +_LOGGER = logging.getLogger(__name__) + +CONF_ORG = 'organization' +CONF_ID = 'id' + +DOMAIN = 'watson_iot' + +RETRY_DELAY = 20 +MAX_TRIES = 3 + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.All(vol.Schema({ + vol.Required(CONF_ORG): cv.string, + vol.Required(CONF_TYPE): cv.string, + vol.Required(CONF_ID): cv.string, + vol.Required(CONF_TOKEN): cv.string, + vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({ + vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids, + vol.Optional(CONF_DOMAINS, default=[]): + vol.All(cv.ensure_list, [cv.string]) + }), + vol.Optional(CONF_INCLUDE, default={}): vol.Schema({ + vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids, + vol.Optional(CONF_DOMAINS, default=[]): + vol.All(cv.ensure_list, [cv.string]) + }), + })), +}, extra=vol.ALLOW_EXTRA) + + +def setup(hass, config): + """Set up the Watson IoT Platform component.""" + from ibmiotf import gateway + + conf = config[DOMAIN] + + include = conf[CONF_INCLUDE] + exclude = conf[CONF_EXCLUDE] + whitelist_e = set(include[CONF_ENTITIES]) + whitelist_d = set(include[CONF_DOMAINS]) + blacklist_e = set(exclude[CONF_ENTITIES]) + blacklist_d = set(exclude[CONF_DOMAINS]) + + client_args = { + 'org': conf[CONF_ORG], + 'type': conf[CONF_TYPE], + 'id': conf[CONF_ID], + 'auth-method': 'token', + 'auth-token': conf[CONF_TOKEN], + } + watson_gateway = gateway.Client(client_args) + + def event_to_json(event): + """Add an event to the outgoing list.""" + state = event.data.get('new_state') + if state is None or state.state in ( + STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \ + state.entity_id in blacklist_e or state.domain in blacklist_d: + return + + if (whitelist_e and state.entity_id not in whitelist_e) or \ + (whitelist_d and state.domain not in whitelist_d): + return + + try: + _state_as_value = float(state.state) + except ValueError: + _state_as_value = None + + if _state_as_value is None: + try: + _state_as_value = float(state_helper.state_as_number(state)) + except ValueError: + _state_as_value = None + + out_event = { + 'tags': { + 'domain': state.domain, + 'entity_id': state.object_id, + }, + 'time': event.time_fired.isoformat(), + 'fields': { + 'state': state.state + } + } + if _state_as_value is not None: + out_event['fields']['state_value'] = _state_as_value + + for key, value in state.attributes.items(): + if key != 'unit_of_measurement': + # If the key is already in fields + if key in out_event['fields']: + key = key + "_" + # For each value we try to cast it as float + # But if we can not do it we store the value + # as string + try: + out_event['fields'][key] = float(value) + except (ValueError, TypeError): + out_event['fields'][key] = str(value) + + return out_event + + instance = hass.data[DOMAIN] = WatsonIOTThread( + hass, watson_gateway, event_to_json) + instance.start() + + def shutdown(event): + """Shut down the thread.""" + instance.queue.put(None) + instance.join() + + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + + return True + + +class WatsonIOTThread(threading.Thread): + """A threaded event handler class.""" + + def __init__(self, hass, gateway, event_to_json): + """Initialize the listener.""" + threading.Thread.__init__(self, name='WatsonIOT') + self.queue = queue.Queue() + self.gateway = gateway + self.gateway.connect() + self.event_to_json = event_to_json + self.write_errors = 0 + self.shutdown = False + hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener) + + def _event_listener(self, event): + """Listen for new messages on the bus and queue them for Watson IOT.""" + item = (time.monotonic(), event) + self.queue.put(item) + + def get_events_json(self): + """Return an event formatted for writing.""" + events = [] + + try: + item = self.queue.get() + + if item is None: + self.shutdown = True + else: + event_json = self.event_to_json(item[1]) + if event_json: + events.append(event_json) + + except queue.Empty: + pass + + return events + + def write_to_watson(self, events): + """Write preprocessed events to watson.""" + import ibmiotf + + for event in events: + for retry in range(MAX_TRIES + 1): + try: + for field in event['fields']: + value = event['fields'][field] + device_success = self.gateway.publishDeviceEvent( + event['tags']['domain'], + event['tags']['entity_id'], + field, 'json', value) + if not device_success: + _LOGGER.error( + "Failed to publish message to watson iot") + continue + break + except (ibmiotf.MissingMessageEncoderException, IOError): + if retry < MAX_TRIES: + time.sleep(RETRY_DELAY) + else: + _LOGGER.exception( + "Failed to publish message to watson iot") + + def run(self): + """Process incoming events.""" + while not self.shutdown: + event = self.get_events_json() + if event: + self.write_to_watson(event) + self.queue.task_done() + + def block_till_done(self): + """Block till all events processed.""" + self.queue.join() diff --git a/requirements_all.txt b/requirements_all.txt index ce34f2662eb70e502cedbcb45f011f94fb8e9e63..76492dee899b63a1920de23e821db1bc9b2dc8f6 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -438,6 +438,9 @@ hydrawiser==0.1.1 # homeassistant.components.sensor.htu21d # i2csense==0.0.4 +# homeassistant.components.watson_iot +ibmiotf==0.3.4 + # homeassistant.components.light.iglo iglo==1.2.7