Skip to content
Snippets Groups Projects
Commit b60f5714 authored by Paulus Schoutsen's avatar Paulus Schoutsen Committed by GitHub
Browse files

Fix websocket async (#4752)

* Ensure we write to websocket from inside event loop

* Inline service call helper
parent fa8bc0a3
No related branches found
No related tags found
No related merge requests found
......@@ -204,7 +204,6 @@ class ActiveConnection:
self.hass = hass
self.request = request
self.wsock = None
self.socket_task = None
self.event_listeners = {}
def debug(self, message1, message2=''):
......@@ -220,34 +219,6 @@ class ActiveConnection:
self.debug('Sending', message)
self.wsock.send_json(message, dumps=JSON_DUMP)
@callback
def _cancel_connection(self, event):
"""Cancel this connection."""
self.socket_task.cancel()
@asyncio.coroutine
def _call_service_helper(self, msg):
"""Helper to call a service and fire complete message."""
yield from self.hass.services.async_call(msg['domain'], msg['service'],
msg['service_data'], True)
try:
self.send_message(result_message(msg['id']))
except RuntimeError:
# Socket has been closed.
pass
@callback
def _forward_event(self, iden, event):
"""Helper to forward events to websocket."""
if event.event_type == EVENT_TIME_CHANGED:
return
try:
self.send_message(event_message(iden, event))
except RuntimeError:
# Socket has been closed.
pass
@asyncio.coroutine
def handle(self):
"""Handle the websocket connection."""
......@@ -255,9 +226,15 @@ class ActiveConnection:
yield from wsock.prepare(self.request)
# Set up to cancel this connection when Home Assistant shuts down
self.socket_task = asyncio.Task.current_task(loop=self.hass.loop)
self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
self._cancel_connection)
socket_task = asyncio.Task.current_task(loop=self.hass.loop)
@callback
def cancel_connection(event):
"""Cancel this connection."""
socket_task.cancel()
unsub_stop = self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
cancel_connection)
self.debug('Connected')
......@@ -351,6 +328,8 @@ class ActiveConnection:
_LOGGER.exception(error)
finally:
unsub_stop()
for unsub in self.event_listeners.values():
unsub()
......@@ -363,8 +342,20 @@ class ActiveConnection:
"""Handle subscribe events command."""
msg = SUBSCRIBE_EVENTS_MESSAGE_SCHEMA(msg)
@callback
def forward_events(event):
"""Helper to forward events to websocket."""
if event.event_type == EVENT_TIME_CHANGED:
return
try:
self.send_message(event_message(msg['id'], event))
except RuntimeError:
# Socket has been closed.
pass
self.event_listeners[msg['id']] = self.hass.bus.async_listen(
msg['event_type'], partial(self._forward_event, msg['id']))
msg['event_type'], forward_events)
self.send_message(result_message(msg['id']))
......@@ -386,7 +377,18 @@ class ActiveConnection:
"""Handle call service command."""
msg = CALL_SERVICE_MESSAGE_SCHEMA(msg)
self.hass.async_add_job(self._call_service_helper(msg))
@asyncio.coroutine
def call_service_helper(msg):
"""Helper to call a service and fire complete message."""
yield from self.hass.services.async_call(
msg['domain'], msg['service'], msg['service_data'], True)
try:
self.send_message(result_message(msg['id']))
except RuntimeError:
# Socket has been closed.
pass
self.hass.async_add_job(call_service_helper(msg))
def handle_get_states(self, msg):
"""Handle get states command."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment