Initial commit

Initial UI, popup popping
This commit is contained in:
Mahesh Asolkar 2021-01-03 15:18:11 -08:00
commit 1a0e7850e7

232
caster.py Executable file
View File

@ -0,0 +1,232 @@
#!/usr/bin/env python3
import sys
import string
import re
import urwid
import pychromecast
import asyncio
import nest_asyncio
import weakref
from pprint import pprint
from dataclasses import dataclass
def eprint(*args, **kwargs):
sys.stderr.flush()
print(*args, file=sys.stderr, **kwargs)
sys.stderr.flush()
# ----------
class CommandPopUpLauncher(urwid.PopUpLauncher):
def __init__(self, wgt):
self.__super.__init__(wgt)
self.app = None
urwid.connect_signal(wgt, 'sel_dev_pu',
lambda button: self.popup())
def popup(self):
eprint(u"Got signal: sel_dev_pu")
self.open_pop_up()
def create_pop_up(self):
eprint(u"Opening pop up for: sel_dev_pu")
self.app.get_cast_devs()
body = [urwid.Text('Cast Device'), urwid.Divider()]
for d in self.app.get_cast_devs():
button = urwid.Button(d.device.friendly_name)
body.append(urwid.AttrMap(button, None, focus_map='selble'))
popup = CastDeviceChoiceBox(urwid.BoxAdapter(urwid.ListBox(urwid.SimpleFocusListWalker(body)), 10))
# popup = urwid.ListBox(urwid.SimpleFocusListWalker(body))
urwid.connect_signal(popup, 'sel_dev',
lambda button: self.select_and_close())
return popup
def select_and_close(self):
eprint(u"Got signal: sel_dev")
self.close_pop_up()
def get_pop_up_parameters(self):
return {'left':2, 'top':2, 'overlay_width':50, 'overlay_height':10}
# -----------
class CastDeviceChoiceBox(urwid.WidgetWrap):
signals = ['sel_dev']
def __init__(self, box):
# super(CastDeviceChoiceBox, self).__init__(urwid.SolidFill(u" "))
self.__super.__init__(urwid.AttrWrap(urwid.Filler(box), 'popbg'))
eprint(u"Opened popup")
# self.open_box(box)
# def open_box(self, box):
# self.original_widget = urwid.Overlay(urwid.LineBox(box),
# self.original_widget,
# align='center', width=('relative', 30),
# valign='middle', height=('relative', 30),
# min_width=30, min_height=5)
def keypress(self, size, key):
if key == 'enter':
self._emit("sel_dev")
eprint(u"Emit signal: sel_dev")
return super(CastDeviceChoiceBox, self).keypress(size, key)
# -----------
class CommandBox(urwid.Edit):
signals = ['sel_dev_pu']
def __init__(self):
super().__init__()
self.app = None
def keypress(self, size, key):
eprint(key)
if key == 'enter':
self.app.execute_command()
if key == 'tab':
eprint(["Hit tab> ", self.app.cmd_state])
if self.app.cmd_state == 'cast_on':
self._emit("sel_dev_pu")
else:
return super(CommandBox, self).keypress(size, key)
# -----------
class CasterApp(object):
def __init__(self):
self.colors = [
('head', 'black', 'white', '', '#F47', '#000'),
('body', 'black', 'white', '', '#233', '#000'),
('foot', 'black', 'white', '', '#233', '#000'),
('bg', 'black', 'white', '', '#eee', '#000'),
('normal', 'black', 'white', '', '#eee', '#000'),
('assist', 'black', 'dark gray', '', '#777', '#000'),
('selble', 'light gray', 'black', '', '#000', '#eee'),
('popbg', 'white', 'dark gray', '', '#000', '#777'),
]
self.cc_fut = asyncio.Future()
nest_asyncio.apply()
self.cmd_state = ''
self.have_devs = False
def task_load_cast_devices(self):
eprint("Getting chromecasts")
(self.cast_devs,zc) = pychromecast.get_chromecasts()
self.have_devs = True
eprint("Got chromecasts")
async def load_cast_devices(self):
th = asyncio.to_thread(self.task_load_cast_devices)
await th
async def initial_tasks(self):
eprint("Starting Init tasks")
self.body_sts.set_text(u"Getting chromecast devices...")
await self.load_cast_devices()
self.body_sts.set_text(u"Got chromecast devies")
self.body_out.set_text(u"Type a command for assistance.\nSupported commands: cast, list, status, q")
eprint("Done Init tasks")
def handle_global_input(self,key):
if key in ('q', 'Q'):
raise urwid.ExitMainLoop()
def execute_command(self):
self.body_sts.set_text(u"Executing: " + self.body_entry.edit_text)
if re.search("^\s*q\s*$", self.body_entry.edit_text,flags=re.IGNORECASE):
raise urwid.ExitMainLoop()
def get_cast_devs(self):
return self.cast_devs
def show_dev_choice(self):
body = [urwid.Text('Cast Device'), urwid.Divider()]
for d in self.cast_devs:
button = urwid.Button(d.device.friendly_name)
body.append(urwid.AttrMap(button, None, focus_map='selble'))
popup = CastDeviceChoiceBox(urwid.ListBox(urwid.SimpleFocusListWalker(body)))
urwid.connect_signal(popup, 'sel_dev',
lambda button: self.close_pop_up())
self.body_sts.set_text(u"Choose cast device to cast on")
return popup
def handle_body_entry(self,edit,edit_txt):
if (re.search("^\s*cast\s+on\s*$", edit_txt,flags=re.IGNORECASE)):
if self.have_devs:
devs = ""
self.cmd_state = 'cast_on'
for d in self.cast_devs:
devs += " - " + d.device.friendly_name + "\n"
self.body_out.set_text([u"[Cast command] ", ('foot', u"cast on "), ('bold', "<device>"), u"\n" + devs])
else:
self.body_out.set_text(u"Waiting for cast device information...")
elif (re.search("^\s*cast\s*(o)?$", edit_txt,flags=re.IGNORECASE)):
self.cmd_state = 'cast'
self.body_out.set_text([u"[Cast command] ", ('foot', u"cast"), ('bold', " on <device>")])
elif (re.search("^\s*q\s*$", edit_txt,flags=re.IGNORECASE)):
self.body_out.set_text(u"[Quit command] Hit <enter> to quit")
self.cmd_state = 'quit'
else:
self.cmd_state = 'none'
self.body_out.set_text("Type a command for assistance.\nSupported commands: cast, list, status, q")
def setup_ui(self):
self.base_layer = urwid.SolidFill()
self.head_txt = urwid.Text(('head', u"Media Cast v0.1"), align='left')
self.foot_txt = urwid.Text(('foot', u"2020 © HeshApps"), align='right')
self.body_entry = CommandBox()
self.body_entry.app = self
self.cbox = CommandPopUpLauncher(self.body_entry)
self.cbox.app = self
body_entry_box = urwid.AttrMap(urwid.LineBox(
urwid.AttrMap(self.cbox, 'normal'), 'Command'), 'body')
# urwid.AttrMap(self.body_entry, 'normal'), 'Command'), 'body')
self.body_out = urwid.Text(('body', ""), align='left')
body_out_box = urwid.AttrMap(urwid.LineBox(
urwid.Filler(urwid.AttrMap(self.body_out, 'assist'), valign='top'),
'Assist'), 'body')
self.body_sts = urwid.Text(('assist', "Initializing..."), align='left')
body_sts_box = urwid.AttrMap(urwid.LineBox(
urwid.AttrMap(self.body_sts, 'assist'), 'Status'), 'body')
self.body_cont = urwid.Frame(body_out_box, header=body_entry_box,
footer=body_sts_box, focus_part = 'header')
urwid.connect_signal(self.body_entry, 'change', self.handle_body_entry)
evl = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop())
self.loop = urwid.MainLoop(self.base_layer, self.colors,
unhandled_input=self.handle_global_input, event_loop=evl, pop_ups=True)
self.loop.screen.set_terminal_properties(colors=256)
self.loop.widget = urwid.AttrMap(self.base_layer, 'bg')
self.loop.widget.original_widget = urwid.Frame(self.body_cont, header=self.head_txt,
footer=self.foot_txt, focus_part = 'body')
async def start_ui(self):
eprint("Starting UI tasks")
self.loop.run()
async def start(self):
self.setup_ui()
eprint("Starting tasks")
await asyncio.gather(
self.initial_tasks(),
self.start_ui()
)
eprint("All tasks done")
# -----------
def main():
app = CasterApp()
asyncio.run(app.start())
# -----------
if __name__ == "__main__":
main()