Skip to content
Snippets Groups Projects
Commit 80a9422a authored by Paulus Schoutsen's avatar Paulus Schoutsen
Browse files

Merge pull request #545 from MakeMeASandwich/denon

Refactor denon remote
parents 060cbaf6 b6e65123
No related branches found
No related tags found
No related merge requests found
......@@ -6,8 +6,7 @@ Developed for a Denon DRA-N5, see
http://www.denon.co.uk/chg/product/compactsystems/networkmusicsystems/ceolpiccolo
A few notes:
- As long as this module is active and connected, the receiver does
not seem to accept additional telnet connections.
- The receiver handles only one telnet connection and refuses others.
- Be careful with the volume. 50% or even 100% are very loud.
......@@ -67,13 +66,15 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
CONF_HOST)
return False
add_devices([
DenonDevice(
config.get('name', 'Music station'),
config.get('host'))
])
return True
denon = DenonDevice(
config.get("name", "Music station"),
config.get("host")
)
if denon.update():
add_devices([denon])
return True
else:
return False
class DenonDevice(MediaPlayerDevice):
......@@ -84,28 +85,41 @@ class DenonDevice(MediaPlayerDevice):
def __init__(self, name, host):
self._name = name
self._host = host
self._telnet = telnetlib.Telnet(self._host)
def query(self, message):
""" Send request and await response from server """
self._pwstate = "PWSTANDBY"
self._volume = 0
self._muted = False
self._mediasource = ""
@classmethod
def telnet_request(cls, telnet, command):
""" Executes `command` and returns the response. """
telnet.write(command.encode("ASCII") + b"\r")
return telnet.read_until(b"\r", timeout=0.2).decode("ASCII").strip()
def telnet_command(self, command):
""" Establishes a telnet connection and sends `command`. """
telnet = telnetlib.Telnet(self._host)
telnet.write(command.encode("ASCII") + b"\r")
telnet.read_very_eager() # skip response
telnet.close()
def update(self):
try:
# unspecified command, should be ignored
self._telnet.write("?".encode('UTF-8') + b'\r')
except (EOFError, BrokenPipeError, ConnectionResetError):
self._telnet.open(self._host)
self._telnet.read_very_eager() # skip what is not requested
telnet = telnetlib.Telnet(self._host)
except ConnectionRefusedError:
return False
self._telnet.write(message.encode('ASCII') + b'\r')
# timeout 200ms, defined by protocol
resp = self._telnet.read_until(b'\r', timeout=0.2)\
.decode('UTF-8').strip()
self._pwstate = self.telnet_request(telnet, "PW?")
# PW? sends also SISTATUS, which is not interesting
telnet.read_until(b"\r", timeout=0.2)
if message == "PW?":
# workaround; PW? sends also SISTATUS
self._telnet.read_until(b'\r', timeout=0.2)
volume_str = self.telnet_request(telnet, "MV?")[len("MV"):]
self._volume = int(volume_str) / 60
self._muted = (self.telnet_request(telnet, "MU?") == "MUON")
self._mediasource = self.telnet_request(telnet, "SI?")[len("SI"):]
return resp
telnet.close()
return True
@property
def name(self):
......@@ -115,10 +129,9 @@ class DenonDevice(MediaPlayerDevice):
@property
def state(self):
""" Returns the state of the device. """
pwstate = self.query('PW?')
if pwstate == "PWSTANDBY":
if self._pwstate == "PWSTANDBY":
return STATE_OFF
if pwstate == "PWON":
if self._pwstate == "PWON":
return STATE_ON
return STATE_UNKNOWN
......@@ -126,17 +139,17 @@ class DenonDevice(MediaPlayerDevice):
@property
def volume_level(self):
""" Volume level of the media player (0..1). """
return int(self.query('MV?')[len('MV'):]) / 60
return self._volume
@property
def is_volume_muted(self):
""" Boolean if volume is currently muted. """
return self.query('MU?') == "MUON"
return self._muted
@property
def media_title(self):
""" Current media source. """
return self.query('SI?')[len('SI'):]
return self._mediasource
@property
def supported_media_commands(self):
......@@ -145,24 +158,24 @@ class DenonDevice(MediaPlayerDevice):
def turn_off(self):
""" turn_off media player. """
self.query('PWSTANDBY')
self.telnet_command("PWSTANDBY")
def volume_up(self):
""" volume_up media player. """
self.query('MVUP')
self.telnet_command("MVUP")
def volume_down(self):
""" volume_down media player. """
self.query('MVDOWN')
self.telnet_command("MVDOWN")
def set_volume_level(self, volume):
""" set volume level, range 0..1. """
# 60dB max
self.query('MV' + str(round(volume * 60)).zfill(2))
self.telnet_command("MV" + str(round(volume * 60)).zfill(2))
def mute_volume(self, mute):
""" mute (true) or unmute (false) media player. """
self.query('MU' + ('ON' if mute else 'OFF'))
self.telnet_command("MU" + ("ON" if mute else "OFF"))
def media_play_pause(self):
""" media_play_pause media player. """
......@@ -170,22 +183,22 @@ class DenonDevice(MediaPlayerDevice):
def media_play(self):
""" media_play media player. """
self.query('NS9A')
self.telnet_command("NS9A")
def media_pause(self):
""" media_pause media player. """
self.query('NS9B')
self.telnet_command("NS9B")
def media_next_track(self):
""" Send next track command. """
self.query('NS9D')
self.telnet_command("NS9D")
def media_previous_track(self):
self.query('NS9E')
self.telnet_command("NS9E")
def media_seek(self, position):
raise NotImplementedError()
def turn_on(self):
""" turn the media player on. """
self.query('PWON')
self.telnet_command("PWON")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment