diff --git a/homeassistant/components/plex/const.py b/homeassistant/components/plex/const.py
index 44bb25b3fd98442555f743d6b44000c222ba1815..126c6eb313a45491c226a95bac2391800d82cd8b 100644
--- a/homeassistant/components/plex/const.py
+++ b/homeassistant/components/plex/const.py
@@ -9,6 +9,7 @@ DEFAULT_PORT = 32400
 DEFAULT_SSL = False
 DEFAULT_VERIFY_SSL = True
 
+DEBOUNCE_TIMEOUT = 1
 DISPATCHERS = "dispatchers"
 PLATFORMS = frozenset(["media_player", "sensor"])
 PLATFORMS_COMPLETED = "platforms_completed"
diff --git a/homeassistant/components/plex/server.py b/homeassistant/components/plex/server.py
index 196968cc097424d61cdc97ebeccc192e895585d7..f2a4908e11969969aa27107eab06dbbd91efac64 100644
--- a/homeassistant/components/plex/server.py
+++ b/homeassistant/components/plex/server.py
@@ -1,4 +1,5 @@
 """Shared class to maintain Plex server instances."""
+from functools import partial, wraps
 import logging
 import ssl
 from urllib.parse import urlparse
@@ -12,6 +13,7 @@ import requests.exceptions
 from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
 from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
 from homeassistant.helpers.dispatcher import dispatcher_send
+from homeassistant.helpers.event import async_call_later
 
 from .const import (
     CONF_CLIENT_IDENTIFIER,
@@ -19,6 +21,7 @@ from .const import (
     CONF_MONITORED_USERS,
     CONF_SERVER,
     CONF_USE_EPISODE_ART,
+    DEBOUNCE_TIMEOUT,
     DEFAULT_VERIFY_SSL,
     PLEX_NEW_MP_SIGNAL,
     PLEX_UPDATE_MEDIA_PLAYER_SIGNAL,
@@ -39,12 +42,37 @@ plexapi.X_PLEX_PRODUCT = X_PLEX_PRODUCT
 plexapi.X_PLEX_VERSION = X_PLEX_VERSION
 
 
+def debounce(func):
+    """Decorate function to debounce callbacks from Plex websocket."""
+
+    unsub = None
+
+    async def call_later_listener(self, _):
+        """Handle call_later callback."""
+        nonlocal unsub
+        unsub = None
+        await self.hass.async_add_executor_job(func, self)
+
+    @wraps(func)
+    def wrapper(self):
+        """Schedule async callback."""
+        nonlocal unsub
+        if unsub:
+            _LOGGER.debug("Throttling update of %s", self.friendly_name)
+            unsub()  # pylint: disable=not-callable
+        unsub = async_call_later(
+            self.hass, DEBOUNCE_TIMEOUT, partial(call_later_listener, self),
+        )
+
+    return wrapper
+
+
 class PlexServer:
     """Manages a single Plex server connection."""
 
     def __init__(self, hass, server_config, known_server_id=None, options=None):
         """Initialize a Plex server instance."""
-        self._hass = hass
+        self.hass = hass
         self._plex_server = None
         self._known_clients = set()
         self._known_idle = set()
@@ -150,12 +178,13 @@ class PlexServer:
         unique_id = f"{self.machine_identifier}:{machine_identifier}"
         _LOGGER.debug("Refreshing %s", unique_id)
         dispatcher_send(
-            self._hass,
+            self.hass,
             PLEX_UPDATE_MEDIA_PLAYER_SIGNAL.format(unique_id),
             device,
             session,
         )
 
+    @debounce
     def update_platforms(self):
         """Update the platform entities."""
         _LOGGER.debug("Updating devices")
@@ -239,13 +268,13 @@ class PlexServer:
 
         if new_entity_configs:
             dispatcher_send(
-                self._hass,
+                self.hass,
                 PLEX_NEW_MP_SIGNAL.format(self.machine_identifier),
                 new_entity_configs,
             )
 
         dispatcher_send(
-            self._hass,
+            self.hass,
             PLEX_UPDATE_SENSOR_SIGNAL.format(self.machine_identifier),
             sessions,
         )
diff --git a/tests/components/plex/common.py b/tests/components/plex/common.py
new file mode 100644
index 0000000000000000000000000000000000000000..adc6f4e0299d492ada9f85d41d77cd4b73809189
--- /dev/null
+++ b/tests/components/plex/common.py
@@ -0,0 +1,20 @@
+"""Common fixtures and functions for Plex tests."""
+from datetime import timedelta
+
+from homeassistant.components.plex.const import (
+    DEBOUNCE_TIMEOUT,
+    PLEX_UPDATE_PLATFORMS_SIGNAL,
+)
+from homeassistant.helpers.dispatcher import async_dispatcher_send
+import homeassistant.util.dt as dt_util
+
+from tests.common import async_fire_time_changed
+
+
+async def trigger_plex_update(hass, server_id):
+    """Update Plex by sending signal and jumping ahead by debounce timeout."""
+    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
+    await hass.async_block_till_done()
+    next_update = dt_util.utcnow() + timedelta(seconds=DEBOUNCE_TIMEOUT)
+    async_fire_time_changed(hass, next_update)
+    await hass.async_block_till_done()
diff --git a/tests/components/plex/test_config_flow.py b/tests/components/plex/test_config_flow.py
index d839ccc674b83932046801cc7c43a689b2bdf576..bd5d45c0246b7104b8da6a5b2f0aa7ecab3c3adc 100644
--- a/tests/components/plex/test_config_flow.py
+++ b/tests/components/plex/test_config_flow.py
@@ -15,14 +15,13 @@ from homeassistant.components.plex.const import (
     CONF_USE_EPISODE_ART,
     DOMAIN,
     PLEX_SERVER_CONFIG,
-    PLEX_UPDATE_PLATFORMS_SIGNAL,
     SERVERS,
 )
 from homeassistant.config_entries import ENTRY_STATE_LOADED
 from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN, CONF_URL
-from homeassistant.helpers.dispatcher import async_dispatcher_send
 from homeassistant.setup import async_setup_component
 
+from .common import trigger_plex_update
 from .const import DEFAULT_DATA, DEFAULT_OPTIONS, MOCK_SERVERS, MOCK_TOKEN
 from .mock_classes import MockPlexAccount, MockPlexServer
 
@@ -416,8 +415,7 @@ async def test_option_flow_new_users_available(hass, caplog):
 
     server_id = mock_plex_server.machineIdentifier
 
-    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
 
diff --git a/tests/components/plex/test_init.py b/tests/components/plex/test_init.py
index 387ce6cac03e0e5dbe1a72c5e877a17457d9274b..1aef7878df5e8ea9414f0383de0aa5bb09e021b5 100644
--- a/tests/components/plex/test_init.py
+++ b/tests/components/plex/test_init.py
@@ -23,10 +23,10 @@ from homeassistant.const import (
     CONF_URL,
     CONF_VERIFY_SSL,
 )
-from homeassistant.helpers.dispatcher import async_dispatcher_send
 from homeassistant.setup import async_setup_component
 import homeassistant.util.dt as dt_util
 
+from .common import trigger_plex_update
 from .const import DEFAULT_DATA, DEFAULT_OPTIONS, MOCK_SERVERS, MOCK_TOKEN
 from .mock_classes import MockPlexAccount, MockPlexServer
 
@@ -74,7 +74,7 @@ async def test_setup_with_config(hass):
     )
 
 
-async def test_setup_with_config_entry(hass):
+async def test_setup_with_config_entry(hass, caplog):
     """Test setup component with config."""
 
     mock_plex_server = MockPlexServer()
@@ -109,30 +109,31 @@ async def test_setup_with_config_entry(hass):
         hass.data[const.DOMAIN][const.PLATFORMS_COMPLETED][server_id] == const.PLATFORMS
     )
 
-    async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     sensor = hass.states.get("sensor.plex_plex_server_1")
     assert sensor.state == str(len(mock_plex_server.accounts))
 
-    async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     with patch.object(
         mock_plex_server, "clients", side_effect=plexapi.exceptions.BadRequest
-    ):
-        async_dispatcher_send(
-            hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
-        )
-        await hass.async_block_till_done()
+    ) as patched_clients_bad_request:
+        await trigger_plex_update(hass, server_id)
+
+    assert patched_clients_bad_request.called
+    assert "Error requesting Plex client data from server" in caplog.text
 
     with patch.object(
         mock_plex_server, "clients", side_effect=requests.exceptions.RequestException
-    ):
-        async_dispatcher_send(
-            hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
-        )
-        await hass.async_block_till_done()
+    ) as patched_clients_requests_exception:
+        await trigger_plex_update(hass, server_id)
+
+    assert patched_clients_requests_exception.called
+    assert (
+        f"Could not connect to Plex server: {mock_plex_server.friendlyName}"
+        in caplog.text
+    )
 
 
 async def test_set_config_entry_unique_id(hass):
@@ -294,8 +295,7 @@ async def test_setup_with_photo_session(hass):
 
     server_id = mock_plex_server.machineIdentifier
 
-    async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     media_player = hass.states.get("media_player.plex_product_title")
     assert media_player.state == "idle"
diff --git a/tests/components/plex/test_server.py b/tests/components/plex/test_server.py
index 646a6ded32e64b7150ae94ef29a9bb80749185cc..242c0fe550463fae43af283d64c8a0c51d9a4bb1 100644
--- a/tests/components/plex/test_server.py
+++ b/tests/components/plex/test_server.py
@@ -1,5 +1,6 @@
 """Tests for Plex server."""
 import copy
+from datetime import timedelta
 
 from asynctest import patch
 
@@ -7,16 +8,19 @@ from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
 from homeassistant.components.plex.const import (
     CONF_IGNORE_NEW_SHARED_USERS,
     CONF_MONITORED_USERS,
+    DEBOUNCE_TIMEOUT,
     DOMAIN,
     PLEX_UPDATE_PLATFORMS_SIGNAL,
     SERVERS,
 )
 from homeassistant.helpers.dispatcher import async_dispatcher_send
+import homeassistant.util.dt as dt_util
 
+from .common import trigger_plex_update
 from .const import DEFAULT_DATA, DEFAULT_OPTIONS
 from .mock_classes import MockPlexServer
 
-from tests.common import MockConfigEntry
+from tests.common import MockConfigEntry, async_fire_time_changed
 
 
 async def test_new_users_available(hass):
@@ -44,8 +48,7 @@ async def test_new_users_available(hass):
 
     server_id = mock_plex_server.machineIdentifier
 
-    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
 
@@ -83,8 +86,7 @@ async def test_new_ignored_users_available(hass, caplog):
 
     server_id = mock_plex_server.machineIdentifier
 
-    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
 
@@ -118,8 +120,7 @@ async def test_mark_sessions_idle(hass):
 
     server_id = mock_plex_server.machineIdentifier
 
-    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     sensor = hass.states.get("sensor.plex_plex_server_1")
     assert sensor.state == str(len(mock_plex_server.accounts))
@@ -127,8 +128,44 @@ async def test_mark_sessions_idle(hass):
     mock_plex_server.clear_clients()
     mock_plex_server.clear_sessions()
 
-    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
-    await hass.async_block_till_done()
+    await trigger_plex_update(hass, server_id)
 
     sensor = hass.states.get("sensor.plex_plex_server_1")
     assert sensor.state == "0"
+
+
+async def test_debouncer(hass, caplog):
+    """Test debouncer decorator logic."""
+    entry = MockConfigEntry(
+        domain=DOMAIN,
+        data=DEFAULT_DATA,
+        options=DEFAULT_OPTIONS,
+        unique_id=DEFAULT_DATA["server_id"],
+    )
+
+    mock_plex_server = MockPlexServer(config_entry=entry)
+
+    with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
+        "homeassistant.components.plex.PlexWebsocket.listen"
+    ):
+        entry.add_to_hass(hass)
+        assert await hass.config_entries.async_setup(entry.entry_id)
+        await hass.async_block_till_done()
+
+    server_id = mock_plex_server.machineIdentifier
+
+    # First two updates are skipped
+    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
+    await hass.async_block_till_done()
+    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
+    await hass.async_block_till_done()
+    async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
+    await hass.async_block_till_done()
+
+    next_update = dt_util.utcnow() + timedelta(seconds=DEBOUNCE_TIMEOUT)
+    async_fire_time_changed(hass, next_update)
+    await hass.async_block_till_done()
+
+    assert (
+        caplog.text.count(f"Throttling update of {mock_plex_server.friendlyName}") == 2
+    )