#!/usr/bin/env python3 import sys import string import re import urwid import pychromecast import asyncio import nest_asyncio import weakref from pprint import pprint from dataclasses import dataclass def eprint(*args, **kwargs): sys.stderr.flush() print(*args, file=sys.stderr, **kwargs) sys.stderr.flush() # ---------- class CommandPopUpLauncher(urwid.PopUpLauncher): def __init__(self, wgt): self.__super.__init__(wgt) self.app = None urwid.connect_signal(wgt, 'sel_dev_pu', lambda button: self.popup()) def popup(self): eprint(u"Got signal: sel_dev_pu") self.open_pop_up() def create_pop_up(self): eprint(u"Opening pop up for: sel_dev_pu") self.app.get_cast_devs() body = [urwid.Text('Cast Device'), urwid.Divider()] for d in self.app.get_cast_devs(): button = urwid.Button(d.device.friendly_name) body.append(urwid.AttrMap(button, None, focus_map='selble')) popup = CastDeviceChoiceBox(urwid.BoxAdapter(urwid.ListBox(urwid.SimpleFocusListWalker(body)), 10)) # popup = urwid.ListBox(urwid.SimpleFocusListWalker(body)) urwid.connect_signal(popup, 'sel_dev', lambda button: self.select_and_close()) return popup def select_and_close(self): eprint(u"Got signal: sel_dev") self.close_pop_up() def get_pop_up_parameters(self): return {'left':2, 'top':2, 'overlay_width':50, 'overlay_height':10} # ----------- class CastDeviceChoiceBox(urwid.WidgetWrap): signals = ['sel_dev'] def __init__(self, box): # super(CastDeviceChoiceBox, self).__init__(urwid.SolidFill(u" ")) self.__super.__init__(urwid.AttrWrap(urwid.Filler(box), 'popbg')) eprint(u"Opened popup") # self.open_box(box) # def open_box(self, box): # self.original_widget = urwid.Overlay(urwid.LineBox(box), # self.original_widget, # align='center', width=('relative', 30), # valign='middle', height=('relative', 30), # min_width=30, min_height=5) def keypress(self, size, key): if key == 'enter': self._emit("sel_dev") eprint(u"Emit signal: sel_dev") return super(CastDeviceChoiceBox, self).keypress(size, key) # ----------- class CommandBox(urwid.Edit): signals = ['sel_dev_pu'] def __init__(self): super().__init__() self.app = None def keypress(self, size, key): eprint(key) if key == 'enter': self.app.execute_command() if key == 'tab': eprint(["Hit tab> ", self.app.cmd_state]) if self.app.cmd_state == 'cast_on': self._emit("sel_dev_pu") else: return super(CommandBox, self).keypress(size, key) # ----------- class CasterApp(object): def __init__(self): self.colors = [ ('head', 'black', 'white', '', '#F47', '#000'), ('body', 'black', 'white', '', '#233', '#000'), ('foot', 'black', 'white', '', '#233', '#000'), ('bg', 'black', 'white', '', '#eee', '#000'), ('normal', 'black', 'white', '', '#eee', '#000'), ('assist', 'black', 'dark gray', '', '#777', '#000'), ('selble', 'light gray', 'black', '', '#000', '#eee'), ('popbg', 'white', 'dark gray', '', '#000', '#777'), ] self.cc_fut = asyncio.Future() nest_asyncio.apply() self.cmd_state = '' self.have_devs = False def task_load_cast_devices(self): eprint("Getting chromecasts") (self.cast_devs,zc) = pychromecast.get_chromecasts() self.have_devs = True eprint("Got chromecasts") async def load_cast_devices(self): th = asyncio.to_thread(self.task_load_cast_devices) await th async def initial_tasks(self): eprint("Starting Init tasks") self.body_sts.set_text(u"Getting chromecast devices...") await self.load_cast_devices() self.body_sts.set_text(u"Got chromecast devies") self.body_out.set_text(u"Type a command for assistance.\nSupported commands: cast, list, status, q") eprint("Done Init tasks") def handle_global_input(self,key): if key in ('q', 'Q'): raise urwid.ExitMainLoop() def execute_command(self): self.body_sts.set_text(u"Executing: " + self.body_entry.edit_text) if re.search("^\s*q\s*$", self.body_entry.edit_text,flags=re.IGNORECASE): raise urwid.ExitMainLoop() def get_cast_devs(self): return self.cast_devs def show_dev_choice(self): body = [urwid.Text('Cast Device'), urwid.Divider()] for d in self.cast_devs: button = urwid.Button(d.device.friendly_name) body.append(urwid.AttrMap(button, None, focus_map='selble')) popup = CastDeviceChoiceBox(urwid.ListBox(urwid.SimpleFocusListWalker(body))) urwid.connect_signal(popup, 'sel_dev', lambda button: self.close_pop_up()) self.body_sts.set_text(u"Choose cast device to cast on") return popup def handle_body_entry(self,edit,edit_txt): if (re.search("^\s*cast\s+on\s*$", edit_txt,flags=re.IGNORECASE)): if self.have_devs: devs = "" self.cmd_state = 'cast_on' for d in self.cast_devs: devs += " - " + d.device.friendly_name + "\n" self.body_out.set_text([u"[Cast command] ", ('foot', u"cast on "), ('bold', ""), u"\n" + devs]) else: self.body_out.set_text(u"Waiting for cast device information...") elif (re.search("^\s*cast\s*(o)?$", edit_txt,flags=re.IGNORECASE)): self.cmd_state = 'cast' self.body_out.set_text([u"[Cast command] ", ('foot', u"cast"), ('bold', " on ")]) elif (re.search("^\s*q\s*$", edit_txt,flags=re.IGNORECASE)): self.body_out.set_text(u"[Quit command] Hit to quit") self.cmd_state = 'quit' else: self.cmd_state = 'none' self.body_out.set_text("Type a command for assistance.\nSupported commands: cast, list, status, q") def setup_ui(self): self.base_layer = urwid.SolidFill() self.head_txt = urwid.Text(('head', u"Media Cast v0.1"), align='left') self.foot_txt = urwid.Text(('foot', u"2020 © HeshApps"), align='right') self.body_entry = CommandBox() self.body_entry.app = self self.cbox = CommandPopUpLauncher(self.body_entry) self.cbox.app = self body_entry_box = urwid.AttrMap(urwid.LineBox( urwid.AttrMap(self.cbox, 'normal'), 'Command'), 'body') # urwid.AttrMap(self.body_entry, 'normal'), 'Command'), 'body') self.body_out = urwid.Text(('body', ""), align='left') body_out_box = urwid.AttrMap(urwid.LineBox( urwid.Filler(urwid.AttrMap(self.body_out, 'assist'), valign='top'), 'Assist'), 'body') self.body_sts = urwid.Text(('assist', "Initializing..."), align='left') body_sts_box = urwid.AttrMap(urwid.LineBox( urwid.AttrMap(self.body_sts, 'assist'), 'Status'), 'body') self.body_cont = urwid.Frame(body_out_box, header=body_entry_box, footer=body_sts_box, focus_part = 'header') urwid.connect_signal(self.body_entry, 'change', self.handle_body_entry) evl = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop()) self.loop = urwid.MainLoop(self.base_layer, self.colors, unhandled_input=self.handle_global_input, event_loop=evl, pop_ups=True) self.loop.screen.set_terminal_properties(colors=256) self.loop.widget = urwid.AttrMap(self.base_layer, 'bg') self.loop.widget.original_widget = urwid.Frame(self.body_cont, header=self.head_txt, footer=self.foot_txt, focus_part = 'body') async def start_ui(self): eprint("Starting UI tasks") self.loop.run() async def start(self): self.setup_ui() eprint("Starting tasks") await asyncio.gather( self.initial_tasks(), self.start_ui() ) eprint("All tasks done") # ----------- def main(): app = CasterApp() asyncio.run(app.start()) # ----------- if __name__ == "__main__": main()