Commit 09f8825c authored by Bruno Barcarol Guimarães's avatar Bruno Barcarol Guimarães
Browse files

Replace various arguments with `-f`/`--fields`

parent eafec1fa
......@@ -12,6 +12,12 @@ import xml.etree.ElementTree
import youtube_dl
VALID_SUB_FIELDS = {'yt_id', 'name'}
VALID_VIDEO_FIELDS = {'yt_id', 'title', 'watched', 'url'}
DEFAULT_SUB_FIELDS = ('name',)
DEFAULT_VIDEO_FIELDS = ('watched', 'title')
def main(argv: typing.Sequence[str]):
args = parse_args(argv)
args.file = args.file or db_file()
......@@ -49,20 +55,18 @@ List subscriptions using `ls`, videos using `videos`.
ls_parser = sub.add_parser('ls')
ls_parser.set_defaults(cmd=Subscriptions.list)
ls_parser.add_argument('ids', type=str, nargs='*')
ls_parser.add_argument('--id', action='store_true', dest='show_id')
ls_parser.add_argument('--show-ids', action='store_true')
ls_parser.add_argument('--unwatched', action='store_true')
ls_parser.add_argument('-f', '--fields', type=str, action='append')
videos_parser = sub.add_parser('videos')
videos_parser.set_defaults(cmd=Subscriptions.list_videos)
videos_parser.add_argument('subscriptions', type=str, nargs='*')
videos_parser.add_argument('-n', type=int)
videos_parser.add_argument('--by-name', action='store_true')
videos_parser.add_argument('--url', action='store_true')
videos_parser.add_argument('--flat', action='store_true')
videos_parser.add_argument('--show-ids', action='store_true')
videos_parser.add_argument('--watched', action='store_true', default=None)
videos_parser.add_argument('--unwatched',
action='store_false', dest='watched')
videos_parser.add_argument('-f', '--fields', type=str, action='append')
import_parser = sub.add_parser('import',
help='https://www.youtube.com/subscription_manager')
import_parser.set_defaults(cmd=Subscriptions.import_xml)
......@@ -219,16 +223,13 @@ class Subscriptions(object):
def list(
self, ids: typing.Sequence[str]=(),
show_id: bool=None, show_ids: bool=None, unwatched: bool=None):
assert(bool(show_id) + bool(show_ids) <= 1)
show_id: bool=None, unwatched: bool=None,
fields: typing.Optional[typing.Sequence[str]]=None):
q = Query(table='subs')
q.add_order('subs.id')
if show_id:
q.add_fields('subs.yt_id')
elif show_ids:
q.add_fields('subs.yt_id', 'subs.name')
else:
q.add_fields('subs.name')
fields = Subscriptions._parse_fields(
fields, DEFAULT_SUB_FIELDS, VALID_SUB_FIELDS)
q.add_fields(*('0' if x == 'url' else 'subs.' + x for x in fields))
if unwatched:
q.add_joins(
'join videos on subs.id == videos.sub and videos.watched == 0')
......@@ -241,16 +242,16 @@ class Subscriptions(object):
def list_videos(
self, subscriptions: typing.Collection[str]=(),
n: int=None, by_name: bool=None, url: bool=None, flat: bool=None,
show_ids: bool=None, watched: bool=None):
n: int=None, by_name: bool=None,
flat: bool=None, watched: bool=None,
fields: typing.Collection[str]=None):
q = Query('videos')
q.add_joins('join subs on subs.id == videos.sub')
q.add_fields('subs.id', 'subs.name', 'videos.watched')
q.add_fields('subs.id', 'subs.name', 'videos.yt_id', 'videos.watched')
q.add_order('subs.id', 'videos.id')
if show_ids or url:
q.add_fields('videos.yt_id')
if not url:
q.add_fields('videos.title')
fields = Subscriptions._parse_fields(
fields, DEFAULT_VIDEO_FIELDS, VALID_VIDEO_FIELDS)
q.add_fields(*('0' if x == 'url' else 'videos.' + x for x in fields))
args: typing.List[typing.Union[str, int]] = []
if subscriptions:
if by_name:
......@@ -270,15 +271,34 @@ class Subscriptions(object):
l = itertools.islice(l, n)
if not flat and n_subs != 1:
print(name)
for _, _, vwatched, *l in l:
for _, _, yt_id, vwatched, *fl in l:
if not flat and n_subs != 1:
print(end=' ')
if watched is None:
print(end='[' + ' x'[vwatched] + '] ')
if url:
print(Client.VIDEO_URL.format(*l))
else:
print(*l)
if fields:
if 'watched' in fields:
i = fields.index('watched')
fl[i] = '[' + ' x'[fl[i]] + ']'
if 'url' in fields:
i = fields.index('url')
fl[i] = Client.VIDEO_URL.format(yt_id)
print(*fl)
continue
print(*fl)
@staticmethod
def _parse_fields(
l: typing.Optional[typing.Collection[str]],
default: typing.Sequence[str],
valid: typing.Set[str],
) -> typing.Sequence[str]:
if l is None:
ret = default
else:
ret = sum([x.split(',') for x in l], [])
invalid = set(ret) - valid
if invalid:
raise ValueError('invalid fields: ' + ', '.join(invalid))
return ret
def count(self):
c = self._conn.execute(
......
......@@ -55,32 +55,18 @@ class TestList(unittest.TestCase):
def test_show_id(self):
with wrap_stdout() as out:
self.subs.list(show_id=True)
self.subs.list(fields=('yt_id',))
out = out.read()
self.assertEqual(out, 'yt_id0\nyt_id1\n')
with wrap_stdout() as out:
self.subs.list(ids=('sub0',), show_id=True)
self.subs.list(ids=('sub0',), fields=('yt_id',))
out = out.read()
self.assertEqual(out, 'yt_id0\n')
with wrap_stdout() as out:
self.subs.list(ids=('sub1',), show_id=True)
self.subs.list(ids=('sub1',), fields=('yt_id',))
out = out.read()
self.assertEqual(out, 'yt_id1\n')
def test_show_ids(self):
with wrap_stdout() as out:
self.subs.list(show_ids=True)
out = out.read()
self.assertEqual(out, 'yt_id0 sub0\nyt_id1 sub1\n')
with wrap_stdout() as out:
self.subs.list(ids=('sub0',), show_ids=True)
out = out.read()
self.assertEqual(out, 'yt_id0 sub0\n')
with wrap_stdout() as out:
self.subs.list(ids=('sub1',), show_ids=True)
out = out.read()
self.assertEqual(out, 'yt_id1 sub1\n')
def test_unwatched(self):
c = self.conn.cursor()
c.execute('update videos set watched = 1')
......@@ -178,22 +164,6 @@ class TestListVideos(unittest.TestCase):
'[ ] title4\n'
'[ ] title5\n')
def test_url(self):
with wrap_stdout() as out:
self.subs.list_videos(url=True)
out = out.read()
self.assertEqual(out,
'sub0\n'
' [ ] https://www.youtube.com/watch?v=yt_id2\n'
' [x] https://www.youtube.com/watch?v=yt_id3\n'
' [ ] https://www.youtube.com/watch?v=yt_id4\n'
' [x] https://www.youtube.com/watch?v=yt_id5\n'
'sub1\n'
' [ ] https://www.youtube.com/watch?v=yt_id6\n'
' [ ] https://www.youtube.com/watch?v=yt_id7\n'
'sub2\n'
' [x] https://www.youtube.com/watch?v=yt_id8\n')
def test_flat(self):
with wrap_stdout() as out:
self.subs.list_videos(flat=True)
......@@ -207,48 +177,94 @@ class TestListVideos(unittest.TestCase):
'[ ] title5\n'
'[x] title6\n')
def test_show_ids(self):
def test_watched(self):
with wrap_stdout() as out:
self.subs.list_videos(show_ids=True)
self.subs.list_videos(watched=True)
out = out.read()
self.assertEqual(out,
'sub0\n'
' [ ] yt_id2 title0\n'
' [x] yt_id3 title1\n'
' [ ] yt_id4 title2\n'
' [x] yt_id5 title3\n'
'sub1\n'
' [ ] yt_id6 title4\n'
' [ ] yt_id7 title5\n'
' [x] title1\n'
' [x] title3\n'
'sub2\n'
' [x] yt_id8 title6\n')
' [x] title6\n')
with wrap_stdout() as out:
self.subs.list_videos(watched=False)
out = out.read()
self.assertEqual(out,
'sub0\n'
' [ ] title0\n'
' [ ] title2\n'
'sub1\n'
' [ ] title4\n'
' [ ] title5\n')
def test_watched(self):
def test_fields(self):
self.assertRaisesRegex(
ValueError, '^invalid fields: invalid$',
self.subs.list_videos, fields=('invalid',))
with wrap_stdout() as out:
self.subs.list_videos(watched=True)
self.subs.list_videos(fields=('title',))
out = out.read()
self.assertEqual(out,
'sub0\n'
' title0\n'
' title1\n'
' title2\n'
' title3\n'
'sub1\n'
' title4\n'
' title5\n'
'sub2\n'
' title6\n')
with wrap_stdout() as out:
self.subs.list_videos(watched=False)
self.subs.list_videos(fields=('yt_id',))
out = out.read()
self.assertEqual(out,
'sub0\n'
' title0\n'
' title2\n'
' yt_id2\n'
' yt_id3\n'
' yt_id4\n'
' yt_id5\n'
'sub1\n'
' title4\n'
' title5\n')
' yt_id6\n'
' yt_id7\n'
'sub2\n'
' yt_id8\n')
with wrap_stdout() as out:
self.subs.list_videos(fields=('watched',))
out = out.read()
self.assertEqual(out,
'sub0\n'
' [ ]\n'
' [x]\n'
' [ ]\n'
' [x]\n'
'sub1\n'
' [ ]\n'
' [ ]\n'
'sub2\n'
' [x]\n')
with wrap_stdout() as out:
self.subs.list_videos(fields=('watched', 'yt_id', 'title'))
out = out.read()
self.assertEqual(out,
'sub0\n'
' [ ] yt_id2 title0\n'
' [x] yt_id3 title1\n'
' [ ] yt_id4 title2\n'
' [x] yt_id5 title3\n'
'sub1\n'
' [ ] yt_id6 title4\n'
' [ ] yt_id7 title5\n'
'sub2\n'
' [x] yt_id8 title6\n')
def test_mixed(self):
with wrap_stdout() as out:
self.subs.list_videos(
subscriptions=('sub0', 'sub2'),
n=2, by_name=True, flat=True, show_ids=True, watched=True)
fields=('yt_id', 'title'),
n=2, by_name=True, flat=True, watched=True)
out = out.read()
self.assertEqual(out,
'yt_id3 title1\n'
......@@ -422,7 +438,7 @@ class TestWatched(unittest.TestCase):
self.subs.list_videos(watched=True)
self.assertEqual(out.read(),
'sub0\n'
' title0\n')
' [x] title0\n')
with wrap_stdout() as out:
self.subs.watched(items=('yt_id2',), remove=True)
self.subs.list_videos(watched=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment