From 602f3d0216bfa1ea6b311c3453cd61fbfc1f261a Mon Sep 17 00:00:00 2001 From: Mahesh Asolkar Date: Sun, 3 Jan 2021 19:11:57 -0800 Subject: [PATCH] Working basic casting and status display --- caster.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 11 deletions(-) diff --git a/caster.py b/caster.py index 185fb54..a39f442 100755 --- a/caster.py +++ b/caster.py @@ -7,7 +7,7 @@ import urwid import pychromecast import asyncio import nest_asyncio -import weakref + from pprint import pprint from dataclasses import dataclass @@ -35,15 +35,19 @@ class CommandPopUpLauncher(urwid.PopUpLauncher): for d in self.app.get_cast_devs(): button = urwid.Button(d.device.friendly_name) body.append(urwid.AttrMap(button, None, focus_map='selble')) - popup = CastDeviceChoiceBox(urwid.BoxAdapter(urwid.ListBox(urwid.SimpleFocusListWalker(body)), 10)) - # popup = urwid.ListBox(urwid.SimpleFocusListWalker(body)) + + popup = CastDeviceChoiceBox(body) urwid.connect_signal(popup, 'sel_dev', - lambda button: self.select_and_close()) + lambda button: self.select_and_close(button)) + return popup - def select_and_close(self): + def select_and_close(self, pop): eprint(u"Got signal: sel_dev") self.close_pop_up() + self.app.body_entry.insert_text(u" \"" + pop.get_sel_val() + u"\" url \"\"") + size = len(self.app.body_entry.get_edit_text()) + self.app.body_entry.keypress((size,), 'left') def get_pop_up_parameters(self): return {'left':2, 'top':2, 'overlay_width':50, 'overlay_height':10} @@ -51,11 +55,16 @@ class CommandPopUpLauncher(urwid.PopUpLauncher): # ----------- class CastDeviceChoiceBox(urwid.WidgetWrap): signals = ['sel_dev'] - def __init__(self, box): + def __init__(self, body): # super(CastDeviceChoiceBox, self).__init__(urwid.SolidFill(u" ")) - self.__super.__init__(urwid.AttrWrap(urwid.Filler(box), 'popbg')) + self.lw = urwid.SimpleFocusListWalker(body) + self.__super.__init__(urwid.AttrWrap(urwid.Filler(urwid.BoxAdapter(urwid.ListBox(self.lw), 10)), 'popbg')) eprint(u"Opened popup") + def get_sel_val(self): + (btn, idx) = self.lw.get_focus() + + return btn.original_widget.label # self.open_box(box) @@ -83,7 +92,9 @@ class CommandBox(urwid.Edit): eprint(key) if key == 'enter': self.app.execute_command() - if key == 'tab': + elif key == 'ctrl d': + self.set_edit_text("") + elif key == 'tab': eprint(["Hit tab> ", self.app.cmd_state]) if self.app.cmd_state == 'cast_on': self._emit("sel_dev_pu") @@ -108,6 +119,7 @@ class CasterApp(object): nest_asyncio.apply() self.cmd_state = '' self.have_devs = False + self.active_cc = None def task_load_cast_devices(self): eprint("Getting chromecasts") @@ -132,13 +144,73 @@ class CasterApp(object): raise urwid.ExitMainLoop() def execute_command(self): - self.body_sts.set_text(u"Executing: " + self.body_entry.edit_text) - if re.search("^\s*q\s*$", self.body_entry.edit_text,flags=re.IGNORECASE): + cmd = self.body_entry.get_edit_text() + self.body_sts.set_text(u"Executing: " + cmd) + if re.search("^\s*q\s*$", cmd, flags=re.IGNORECASE): raise urwid.ExitMainLoop() + elif re.search("^\s*status\s*$", cmd, flags=re.IGNORECASE): + self.show_cast_status() + else: + grps = re.match("^\s*cast\s+on\s*\"(.*?)\"\s+url\s+\"(.*?)\"", cmd, flags=re.IGNORECASE) + if (grps): + self.cast_media(grps[1], grps[2]) + else: + grps = re.match("^\s*cast\s+on\s*\"(.*?)\"\s*$", cmd, flags=re.IGNORECASE) + if (grps): + self.load_cc(grps[1]) + else: + self.body_sts.set_text(u"Error in interpreting: " + cmd) def get_cast_devs(self): return self.cast_devs + def get_named_cast_dev(self, fname): + f_devs = list(filter(lambda d: d.device.friendly_name == fname, self.cast_devs)) + if len(f_devs) < 1: + eprint("Error! Did not find device with name " + fname) + elif len(f_devs) > 1: + eprint("Error! Found multiple devices with name " + fname) + else: + return f_devs[0] + + def load_cc(self, dev): + eprint(u"Loading device " + dev) + self.active_cc = self.get_named_cast_dev(dev); + if self.active_cc: + self.active_cc.connect() + self.active_cc.wait() + self.show_cast_status() + + def cast_media(self, dev, media): + eprint(u"Casting " + media + u" on " + dev) + self.active_cc = self.get_named_cast_dev(dev); + if self.active_cc: + self.active_cc.wait() + + mc = self.active_cc.media_controller + # Example URL: http://bbcwssc.ic.llnwd.net/stream/bbcwssc_mp1_ws-einws + mc.play_media(media, 'audio/mpeg') + mc.block_until_active() + self.show_cast_status() + + def show_cast_status(self): + if self.active_cc: + self.active_cc.wait() + mc = self.active_cc.media_controller + + eprint(self.active_cc.device) + eprint(self.active_cc.status) + eprint(mc.status) + + self.body_sts.set_text(self.active_cc.device.friendly_name + + u" [" + mc.status.player_state + u"]" + + ((u" casting: " + mc.status.content_id) if (mc.status.player_state != 'UNKNOWN') else "")) + + else: + self.body_sts.set_text(u"No active cast") + + + def show_dev_choice(self): body = [urwid.Text('Cast Device'), urwid.Divider()] for d in self.cast_devs: @@ -151,7 +223,10 @@ class CasterApp(object): return popup def handle_body_entry(self,edit,edit_txt): - if (re.search("^\s*cast\s+on\s*$", edit_txt,flags=re.IGNORECASE)): + if (re.search("^\s*cast\s+on\s*\".*?\"\s?(u|ur|url|\s+)", edit_txt,flags=re.IGNORECASE)): + self.cmd_state = 'cast_on_url' + self.body_out.set_text([u"[Cast command] ", ('foot', u"cast on url "), ('bold', "")]) + elif (re.search("^\s*cast\s+on\s*$", edit_txt,flags=re.IGNORECASE)): if self.have_devs: devs = "" self.cmd_state = 'cast_on'