#!/usr/bin/env python3 import sys import string import re import urwid import pychromecast import asyncio import nest_asyncio from pprint import pprint from dataclasses import dataclass def eprint(*args, **kwargs): sys.stderr.flush() print(*args, file=sys.stderr, **kwargs) sys.stderr.flush() # ---------- class CommandPopUpLauncher(urwid.PopUpLauncher): def __init__(self, wgt): self.__super.__init__(wgt) self.app = None urwid.connect_signal(wgt, 'sel_dev_pu', lambda button: self.popup()) def popup(self): eprint(u"Got signal: sel_dev_pu") self.open_pop_up() def create_pop_up(self): eprint(u"Opening pop up for: sel_dev_pu") self.app.get_cast_devs() body = [urwid.Text('Cast Device'), urwid.Divider()] for d in self.app.get_cast_devs(): button = urwid.Button(d.device.friendly_name) body.append(urwid.AttrMap(button, None, focus_map='selble')) popup = CastDeviceChoiceBox(body) urwid.connect_signal(popup, 'sel_dev', lambda button: self.select_and_close(button)) return popup def select_and_close(self, pop): eprint(u"Got signal: sel_dev") self.close_pop_up() self.app.body_entry.insert_text(u" \"" + pop.get_sel_val() + u"\" url \"\"") size = len(self.app.body_entry.get_edit_text()) self.app.body_entry.keypress((size,), 'left') def get_pop_up_parameters(self): return {'left':2, 'top':2, 'overlay_width':50, 'overlay_height':10} # ----------- class CastDeviceChoiceBox(urwid.WidgetWrap): signals = ['sel_dev'] def __init__(self, body): # super(CastDeviceChoiceBox, self).__init__(urwid.SolidFill(u" ")) self.lw = urwid.SimpleFocusListWalker(body) self.__super.__init__(urwid.AttrWrap(urwid.Filler(urwid.BoxAdapter(urwid.ListBox(self.lw), 10)), 'popbg')) eprint(u"Opened popup") def get_sel_val(self): (btn, idx) = self.lw.get_focus() return btn.original_widget.label # self.open_box(box) # def open_box(self, box): # self.original_widget = urwid.Overlay(urwid.LineBox(box), # self.original_widget, # align='center', width=('relative', 30), # valign='middle', height=('relative', 30), # min_width=30, min_height=5) def keypress(self, size, key): if key == 'enter': self._emit("sel_dev") eprint(u"Emit signal: sel_dev") return super(CastDeviceChoiceBox, self).keypress(size, key) # ----------- class CommandBox(urwid.Edit): signals = ['sel_dev_pu'] def __init__(self): super().__init__() self.app = None def keypress(self, size, key): eprint(key) if key == 'enter': self.app.execute_command() elif key == 'ctrl d': self.set_edit_text("") elif key == 'tab': eprint(["Hit tab> ", self.app.cmd_state]) if self.app.cmd_state == 'cast_on': self._emit("sel_dev_pu") else: return super(CommandBox, self).keypress(size, key) # ----------- class CasterApp(object): def __init__(self): self.colors = [ ('head', 'black', 'white', '', '#F47', '#000'), ('body', 'black', 'white', '', '#233', '#000'), ('foot', 'black', 'white', '', '#233', '#000'), ('bg', 'black', 'white', '', '#eee', '#000'), ('normal', 'black', 'white', '', '#eee', '#000'), ('assist', 'black', 'dark gray', '', '#777', '#000'), ('selble', 'light gray', 'black', '', '#000', '#eee'), ('popbg', 'white', 'dark gray', '', '#000', '#777'), ] self.cc_fut = asyncio.Future() nest_asyncio.apply() self.cmd_state = '' self.have_devs = False self.active_cc = None def task_load_cast_devices(self): eprint("Getting chromecasts") (self.cast_devs,zc) = pychromecast.get_chromecasts() self.have_devs = True eprint("Got chromecasts") async def load_cast_devices(self): th = asyncio.to_thread(self.task_load_cast_devices) await th async def initial_tasks(self): eprint("Starting Init tasks") self.body_sts.set_text(u"Getting chromecast devices...") await self.load_cast_devices() self.body_sts.set_text(u"Got chromecast devies") self.body_out.set_text(u"Type a command for assistance.\nSupported commands: cast, list, status, q") eprint("Done Init tasks") def handle_global_input(self,key): if key in ('q', 'Q'): raise urwid.ExitMainLoop() def execute_command(self): cmd = self.body_entry.get_edit_text() self.body_sts.set_text(u"Executing: " + cmd) if re.search("^\s*q\s*$", cmd, flags=re.IGNORECASE): raise urwid.ExitMainLoop() elif re.search("^\s*status\s*$", cmd, flags=re.IGNORECASE): self.show_cast_status() else: grps = re.match("^\s*cast\s+on\s*\"(.*?)\"\s+url\s+\"(.*?)\"", cmd, flags=re.IGNORECASE) if (grps): self.cast_media(grps[1], grps[2]) else: grps = re.match("^\s*cast\s+on\s*\"(.*?)\"\s*$", cmd, flags=re.IGNORECASE) if (grps): self.load_cc(grps[1]) else: self.body_sts.set_text(u"Error in interpreting: " + cmd) def get_cast_devs(self): return self.cast_devs def get_named_cast_dev(self, fname): f_devs = list(filter(lambda d: d.device.friendly_name == fname, self.cast_devs)) if len(f_devs) < 1: eprint("Error! Did not find device with name " + fname) elif len(f_devs) > 1: eprint("Error! Found multiple devices with name " + fname) else: return f_devs[0] def load_cc(self, dev): eprint(u"Loading device " + dev) self.active_cc = self.get_named_cast_dev(dev); if self.active_cc: self.active_cc.connect() self.active_cc.wait() self.show_cast_status() def cast_media(self, dev, media): eprint(u"Casting " + media + u" on " + dev) self.active_cc = self.get_named_cast_dev(dev); if self.active_cc: self.active_cc.wait() mc = self.active_cc.media_controller # Example URL: http://bbcwssc.ic.llnwd.net/stream/bbcwssc_mp1_ws-einws mc.play_media(media, 'audio/mpeg') mc.block_until_active() self.show_cast_status() def show_cast_status(self): if self.active_cc: self.active_cc.wait() mc = self.active_cc.media_controller eprint(self.active_cc.device) eprint(self.active_cc.status) eprint(mc.status) self.body_sts.set_text(self.active_cc.device.friendly_name + u" [" + mc.status.player_state + u"]" + ((u" casting: " + mc.status.content_id) if (mc.status.player_state != 'UNKNOWN') else "")) else: self.body_sts.set_text(u"No active cast") def show_dev_choice(self): body = [urwid.Text('Cast Device'), urwid.Divider()] for d in self.cast_devs: button = urwid.Button(d.device.friendly_name) body.append(urwid.AttrMap(button, None, focus_map='selble')) popup = CastDeviceChoiceBox(urwid.ListBox(urwid.SimpleFocusListWalker(body))) urwid.connect_signal(popup, 'sel_dev', lambda button: self.close_pop_up()) self.body_sts.set_text(u"Choose cast device to cast on") return popup def handle_body_entry(self,edit,edit_txt): if (re.search("^\s*cast\s+on\s*\".*?\"\s?(u|ur|url|\s+)", edit_txt,flags=re.IGNORECASE)): self.cmd_state = 'cast_on_url' self.body_out.set_text([u"[Cast command] ", ('foot', u"cast on url "), ('bold', "")]) elif (re.search("^\s*cast\s+on\s*$", edit_txt,flags=re.IGNORECASE)): if self.have_devs: devs = "" self.cmd_state = 'cast_on' for d in self.cast_devs: devs += " - " + d.device.friendly_name + "\n" self.body_out.set_text([u"[Cast command] ", ('foot', u"cast on "), ('bold', ""), u"\n" + devs]) else: self.body_out.set_text(u"Waiting for cast device information...") elif (re.search("^\s*cast\s*(o)?$", edit_txt,flags=re.IGNORECASE)): self.cmd_state = 'cast' self.body_out.set_text([u"[Cast command] ", ('foot', u"cast"), ('bold', " on ")]) elif (re.search("^\s*q\s*$", edit_txt,flags=re.IGNORECASE)): self.body_out.set_text(u"[Quit command] Hit to quit") self.cmd_state = 'quit' else: self.cmd_state = 'none' self.body_out.set_text("Type a command for assistance.\nSupported commands: cast, list, status, q") def setup_ui(self): self.base_layer = urwid.SolidFill() self.head_txt = urwid.Text(('head', u"Media Cast v0.1"), align='left') self.foot_txt = urwid.Text(('foot', u"2020 © HeshApps"), align='right') self.body_entry = CommandBox() self.body_entry.app = self self.cbox = CommandPopUpLauncher(self.body_entry) self.cbox.app = self body_entry_box = urwid.AttrMap(urwid.LineBox( urwid.AttrMap(self.cbox, 'normal'), 'Command'), 'body') # urwid.AttrMap(self.body_entry, 'normal'), 'Command'), 'body') self.body_out = urwid.Text(('body', ""), align='left') body_out_box = urwid.AttrMap(urwid.LineBox( urwid.Filler(urwid.AttrMap(self.body_out, 'assist'), valign='top'), 'Assist'), 'body') self.body_sts = urwid.Text(('assist', "Initializing..."), align='left') body_sts_box = urwid.AttrMap(urwid.LineBox( urwid.AttrMap(self.body_sts, 'assist'), 'Status'), 'body') self.body_cont = urwid.Frame(body_out_box, header=body_entry_box, footer=body_sts_box, focus_part = 'header') urwid.connect_signal(self.body_entry, 'change', self.handle_body_entry) evl = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop()) self.loop = urwid.MainLoop(self.base_layer, self.colors, unhandled_input=self.handle_global_input, event_loop=evl, pop_ups=True) self.loop.screen.set_terminal_properties(colors=256) self.loop.widget = urwid.AttrMap(self.base_layer, 'bg') self.loop.widget.original_widget = urwid.Frame(self.body_cont, header=self.head_txt, footer=self.foot_txt, focus_part = 'body') async def start_ui(self): eprint("Starting UI tasks") self.loop.run() async def start(self): self.setup_ui() eprint("Starting tasks") await asyncio.gather( self.initial_tasks(), self.start_ui() ) eprint("All tasks done") # ----------- def main(): app = CasterApp() asyncio.run(app.start()) # ----------- if __name__ == "__main__": main()