From a458ce8069e0b660a6df7dcc20fd505cce595a88 Mon Sep 17 00:00:00 2001
From: Paulus Schoutsen <paulus@paulusschoutsen.nl>
Date: Mon, 5 Dec 2016 18:03:06 -0800
Subject: [PATCH] Fix websocket async (#4752)

* Ensure we write to websocket from inside event loop

* Inline service call helper
---
 homeassistant/components/websocket_api.py | 70 ++++++++++++-----------
 1 file changed, 36 insertions(+), 34 deletions(-)

diff --git a/homeassistant/components/websocket_api.py b/homeassistant/components/websocket_api.py
index 09f8699f5d1..70b35e00247 100644
--- a/homeassistant/components/websocket_api.py
+++ b/homeassistant/components/websocket_api.py
@@ -204,7 +204,6 @@ class ActiveConnection:
         self.hass = hass
         self.request = request
         self.wsock = None
-        self.socket_task = None
         self.event_listeners = {}
 
     def debug(self, message1, message2=''):
@@ -220,34 +219,6 @@ class ActiveConnection:
         self.debug('Sending', message)
         self.wsock.send_json(message, dumps=JSON_DUMP)
 
-    @callback
-    def _cancel_connection(self, event):
-        """Cancel this connection."""
-        self.socket_task.cancel()
-
-    @asyncio.coroutine
-    def _call_service_helper(self, msg):
-        """Helper to call a service and fire complete message."""
-        yield from self.hass.services.async_call(msg['domain'], msg['service'],
-                                                 msg['service_data'], True)
-        try:
-            self.send_message(result_message(msg['id']))
-        except RuntimeError:
-            # Socket has been closed.
-            pass
-
-    @callback
-    def _forward_event(self, iden, event):
-        """Helper to forward events to websocket."""
-        if event.event_type == EVENT_TIME_CHANGED:
-            return
-
-        try:
-            self.send_message(event_message(iden, event))
-        except RuntimeError:
-            # Socket has been closed.
-            pass
-
     @asyncio.coroutine
     def handle(self):
         """Handle the websocket connection."""
@@ -255,9 +226,15 @@ class ActiveConnection:
         yield from wsock.prepare(self.request)
 
         # Set up to cancel this connection when Home Assistant shuts down
-        self.socket_task = asyncio.Task.current_task(loop=self.hass.loop)
-        self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
-                                   self._cancel_connection)
+        socket_task = asyncio.Task.current_task(loop=self.hass.loop)
+
+        @callback
+        def cancel_connection(event):
+            """Cancel this connection."""
+            socket_task.cancel()
+
+        unsub_stop = self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
+                                                cancel_connection)
 
         self.debug('Connected')
 
@@ -351,6 +328,8 @@ class ActiveConnection:
             _LOGGER.exception(error)
 
         finally:
+            unsub_stop()
+
             for unsub in self.event_listeners.values():
                 unsub()
 
@@ -363,8 +342,20 @@ class ActiveConnection:
         """Handle subscribe events command."""
         msg = SUBSCRIBE_EVENTS_MESSAGE_SCHEMA(msg)
 
+        @callback
+        def forward_events(event):
+            """Helper to forward events to websocket."""
+            if event.event_type == EVENT_TIME_CHANGED:
+                return
+
+            try:
+                self.send_message(event_message(msg['id'], event))
+            except RuntimeError:
+                # Socket has been closed.
+                pass
+
         self.event_listeners[msg['id']] = self.hass.bus.async_listen(
-            msg['event_type'], partial(self._forward_event, msg['id']))
+            msg['event_type'], forward_events)
 
         self.send_message(result_message(msg['id']))
 
@@ -386,7 +377,18 @@ class ActiveConnection:
         """Handle call service command."""
         msg = CALL_SERVICE_MESSAGE_SCHEMA(msg)
 
-        self.hass.async_add_job(self._call_service_helper(msg))
+        @asyncio.coroutine
+        def call_service_helper(msg):
+            """Helper to call a service and fire complete message."""
+            yield from self.hass.services.async_call(
+                msg['domain'], msg['service'], msg['service_data'], True)
+            try:
+                self.send_message(result_message(msg['id']))
+            except RuntimeError:
+                # Socket has been closed.
+                pass
+
+        self.hass.async_add_job(call_service_helper(msg))
 
     def handle_get_states(self, msg):
         """Handle get states command."""
-- 
GitLab