Skip to content
Snippets Groups Projects
Commit d14d2fe5 authored by Matthew Treinish's avatar Matthew Treinish Committed by Paulus Schoutsen
Browse files

Add IBM Watson IoT Platform component (#13664)

This commit adds a new history component for the IBM Watson IoT
Platform. The IBM Watson IoT Platform allows for tracking of devices
and analytics on top of the device data. This new component allows
users to have home assistant automatically populate a watson iot
platform board with device data from devices managed by home assistant.
parent f6963315
No related branches found
No related tags found
No related merge requests found
......@@ -749,6 +749,7 @@ omit =
homeassistant/components/tts/picotts.py
homeassistant/components/vacuum/mqtt.py
homeassistant/components/vacuum/roomba.py
homeassistant/components/watson_iot.py
homeassistant/components/weather/bom.py
homeassistant/components/weather/buienradar.py
homeassistant/components/weather/darksky.py
......
"""
A component which allows you to send data to the IBM Watson IoT Platform.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/watson_iot/
"""
import logging
import queue
import threading
import time
import voluptuous as vol
from homeassistant.const import (
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE,
CONF_TOKEN, CONF_TYPE, EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP,
STATE_UNAVAILABLE, STATE_UNKNOWN)
from homeassistant.helpers import state as state_helper
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['ibmiotf==0.3.4']
_LOGGER = logging.getLogger(__name__)
CONF_ORG = 'organization'
CONF_ID = 'id'
DOMAIN = 'watson_iot'
RETRY_DELAY = 20
MAX_TRIES = 3
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.All(vol.Schema({
vol.Required(CONF_ORG): cv.string,
vol.Required(CONF_TYPE): cv.string,
vol.Required(CONF_ID): cv.string,
vol.Required(CONF_TOKEN): cv.string,
vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
vol.Optional(CONF_INCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
})),
}, extra=vol.ALLOW_EXTRA)
def setup(hass, config):
"""Set up the Watson IoT Platform component."""
from ibmiotf import gateway
conf = config[DOMAIN]
include = conf[CONF_INCLUDE]
exclude = conf[CONF_EXCLUDE]
whitelist_e = set(include[CONF_ENTITIES])
whitelist_d = set(include[CONF_DOMAINS])
blacklist_e = set(exclude[CONF_ENTITIES])
blacklist_d = set(exclude[CONF_DOMAINS])
client_args = {
'org': conf[CONF_ORG],
'type': conf[CONF_TYPE],
'id': conf[CONF_ID],
'auth-method': 'token',
'auth-token': conf[CONF_TOKEN],
}
watson_gateway = gateway.Client(client_args)
def event_to_json(event):
"""Add an event to the outgoing list."""
state = event.data.get('new_state')
if state is None or state.state in (
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
state.entity_id in blacklist_e or state.domain in blacklist_d:
return
if (whitelist_e and state.entity_id not in whitelist_e) or \
(whitelist_d and state.domain not in whitelist_d):
return
try:
_state_as_value = float(state.state)
except ValueError:
_state_as_value = None
if _state_as_value is None:
try:
_state_as_value = float(state_helper.state_as_number(state))
except ValueError:
_state_as_value = None
out_event = {
'tags': {
'domain': state.domain,
'entity_id': state.object_id,
},
'time': event.time_fired.isoformat(),
'fields': {
'state': state.state
}
}
if _state_as_value is not None:
out_event['fields']['state_value'] = _state_as_value
for key, value in state.attributes.items():
if key != 'unit_of_measurement':
# If the key is already in fields
if key in out_event['fields']:
key = key + "_"
# For each value we try to cast it as float
# But if we can not do it we store the value
# as string
try:
out_event['fields'][key] = float(value)
except (ValueError, TypeError):
out_event['fields'][key] = str(value)
return out_event
instance = hass.data[DOMAIN] = WatsonIOTThread(
hass, watson_gateway, event_to_json)
instance.start()
def shutdown(event):
"""Shut down the thread."""
instance.queue.put(None)
instance.join()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
return True
class WatsonIOTThread(threading.Thread):
"""A threaded event handler class."""
def __init__(self, hass, gateway, event_to_json):
"""Initialize the listener."""
threading.Thread.__init__(self, name='WatsonIOT')
self.queue = queue.Queue()
self.gateway = gateway
self.gateway.connect()
self.event_to_json = event_to_json
self.write_errors = 0
self.shutdown = False
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
def _event_listener(self, event):
"""Listen for new messages on the bus and queue them for Watson IOT."""
item = (time.monotonic(), event)
self.queue.put(item)
def get_events_json(self):
"""Return an event formatted for writing."""
events = []
try:
item = self.queue.get()
if item is None:
self.shutdown = True
else:
event_json = self.event_to_json(item[1])
if event_json:
events.append(event_json)
except queue.Empty:
pass
return events
def write_to_watson(self, events):
"""Write preprocessed events to watson."""
import ibmiotf
for event in events:
for retry in range(MAX_TRIES + 1):
try:
for field in event['fields']:
value = event['fields'][field]
device_success = self.gateway.publishDeviceEvent(
event['tags']['domain'],
event['tags']['entity_id'],
field, 'json', value)
if not device_success:
_LOGGER.error(
"Failed to publish message to watson iot")
continue
break
except (ibmiotf.MissingMessageEncoderException, IOError):
if retry < MAX_TRIES:
time.sleep(RETRY_DELAY)
else:
_LOGGER.exception(
"Failed to publish message to watson iot")
def run(self):
"""Process incoming events."""
while not self.shutdown:
event = self.get_events_json()
if event:
self.write_to_watson(event)
self.queue.task_done()
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
......@@ -438,6 +438,9 @@ hydrawiser==0.1.1
# homeassistant.components.sensor.htu21d
# i2csense==0.0.4
# homeassistant.components.watson_iot
ibmiotf==0.3.4
# homeassistant.components.light.iglo
iglo==1.2.7
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment