Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
bbguimaraes
subs
Commits
5e5b01fd
Commit
5e5b01fd
authored
Apr 08, 2020
by
Bruno Barcarol Guimarães
Browse files
Tags
parent
5644ef42
Changes
2
Hide whitespace changes
Inline
Side-by-side
subs.py
View file @
5e5b01fd
...
...
@@ -68,6 +68,8 @@ List subscriptions using `ls`, videos using `videos`.
videos_parser
.
add_argument
(
'--unwatched'
,
action
=
'store_false'
,
dest
=
'watched'
)
videos_parser
.
add_argument
(
'-f'
,
'--fields'
,
type
=
str
,
action
=
'append'
)
videos_parser
.
add_argument
(
'-t'
,
'--tags'
,
type
=
str
,
action
=
'append'
)
videos_parser
.
add_argument
(
'--untagged'
,
action
=
'store_true'
)
import_parser
=
sub
.
add_parser
(
'import'
,
help
=
'https://www.youtube.com/subscription_manager'
)
import_parser
.
set_defaults
(
cmd
=
Subscriptions
.
import_xml
)
...
...
@@ -89,6 +91,11 @@ only update subscriptions that were last fetched `s` seconds ago or earlier
type
=
int
,
metavar
=
's'
,
help
=
'''
\
only update subscriptions that had a new video less than `s` seconds ago
'''
)
tag_parser
=
sub
.
add_parser
(
'tag'
)
tag_parser
.
set_defaults
(
cmd
=
Subscriptions
.
tag
)
tag_parser
.
add_argument
(
'tag'
,
type
=
str
)
tag_parser
.
add_argument
(
'items'
,
type
=
str
,
nargs
=
'*'
)
tag_parser
.
add_argument
(
'--remove'
,
action
=
'store_true'
)
watched_parser
=
sub
.
add_parser
(
'watched'
)
watched_parser
.
set_defaults
(
cmd
=
Subscriptions
.
watched
)
watched_parser
.
add_argument
(
'items'
,
type
=
str
,
nargs
=
'*'
)
...
...
@@ -207,7 +214,7 @@ class Subscriptions(object):
def
init
(
self
):
c
=
self
.
_conn
.
cursor
()
c
.
execute
(
'select name from sqlite_master where type="table"'
)
tables
=
{
'subs'
,
'videos'
}
-
{
x
[
0
]
for
x
in
c
}
tables
=
{
'subs'
,
'videos'
,
'tags'
,
'videos_tags'
}
-
{
x
[
0
]
for
x
in
c
}
if
'subs'
in
tables
:
self
.
_log
(
'creating table subs'
)
c
.
execute
(
...
...
@@ -227,6 +234,21 @@ class Subscriptions(object):
' title text not null,'
' watched boolean not null default(0),'
' foreign key(sub) references subs(id))'
)
if
'tags'
in
tables
:
self
.
_log
(
'creating table tags'
)
c
.
execute
(
'create table tags ('
'id integer not null primary key,'
' name text not null)'
)
if
'videos_tags'
in
tables
:
self
.
_log
(
'creating table videos_tags'
)
c
.
execute
(
'create table videos_tags ('
'id integer not null primary key,'
'video integer not null,'
'tag integer not null,'
'foreign key(video) references videos(id),'
'foreign key(tag) references tags(id))'
)
def
list
(
self
,
ids
:
typing
.
Sequence
[
str
]
=
(),
...
...
@@ -251,6 +273,7 @@ class Subscriptions(object):
self
,
subscriptions
:
typing
.
Collection
[
str
]
=
(),
n
:
int
=
None
,
by_name
:
bool
=
None
,
flat
:
bool
=
None
,
watched
:
bool
=
None
,
tags
:
typing
.
Collection
[
str
]
=
None
,
untagged
:
bool
=
None
,
fields
:
typing
.
Collection
[
str
]
=
None
):
q
=
Query
(
'videos'
)
q
.
add_joins
(
'join subs on subs.id == videos.sub'
)
...
...
@@ -271,7 +294,19 @@ class Subscriptions(object):
if
watched
is
not
None
:
q
.
add_filter
(
'(videos.watched == ?)'
)
args
.
append
(
int
(
watched
))
c
=
self
.
_conn
.
cursor
().
execute
(
q
.
query
(),
args
)
if
tags
is
not
None
:
q
.
add_joins
(
'join (tags, videos_tags)'
' on (videos.id == videos_tags.video'
' and tags.id == videos_tags.tag)'
)
q
.
add_filter
(
'(tags.name in ({}))'
.
format
(
Query
.
make_args
(
len
(
tags
))))
args
.
extend
(
tags
)
elif
untagged
:
q
.
add_joins
(
'left join videos_tags on (videos.id == videos_tags.video)'
)
q
.
add_filter
(
'videos_tags.video is null'
)
c
=
self
.
_conn
.
cursor
().
execute
(
q
.
query
(
distinct
=
bool
(
tags
)),
args
)
n_subs
=
len
(
subscriptions
)
for
(
_
,
name
),
l
in
itertools
.
groupby
(
c
,
lambda
x
:
x
[:
2
]):
if
n
is
not
None
:
...
...
@@ -422,6 +457,33 @@ class Subscriptions(object):
'update subs set last_video = ? where id == ?'
,
(
last_video
,
sub_id
))
def
tag
(
self
,
tag
:
str
,
items
:
typing
.
Collection
[
str
]
=
(),
remove
:
bool
=
False
):
c
=
self
.
_conn
.
cursor
()
tag_id
=
\
c
.
execute
(
'select id from tags where name == ?'
,
(
tag
,))
\
.
fetchone
()
if
tag_id
is
not
None
:
tag_id
=
tag_id
[
0
]
else
:
c
.
execute
(
'insert into tags (name) values (?)'
,
(
tag
,))
tag_id
=
c
.
execute
(
'select last_insert_rowid()'
).
fetchone
()[
0
]
if
not
remove
:
c
.
execute
(
'insert into videos_tags (tag, video)'
' select ?, videos.id from videos'
' where videos.yt_id in ({})'
.
format
(
Query
.
make_args
(
len
(
items
))),
(
tag_id
,
*
items
))
else
:
c
.
execute
(
'delete from videos_tags'
' where tag == ? and video in ('
'select id from videos where yt_id in ({}))'
.
format
(
Query
.
make_args
(
len
(
items
))),
(
tag_id
,
*
items
))
def
watched
(
self
,
items
:
typing
.
Collection
[
str
]
=
(),
subs
:
bool
=
None
,
oldest
:
bool
=
None
,
older_than
:
bool
=
None
,
url
:
bool
=
None
,
...
...
test_subs.py
View file @
5e5b01fd
...
...
@@ -374,6 +374,84 @@ class TestUpdate(unittest.TestCase):
self
.
assertEqual
(
c
.
fetchone
()[
0
],
3
)
class
TestTag
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
conn
=
sqlite3
.
connect
(
':memory:'
)
self
.
subs
=
subs
.
Subscriptions
(
0
,
self
.
conn
)
self
.
subs
.
init
()
c
=
self
.
conn
.
cursor
()
c
.
executemany
(
'insert into subs (yt_id, name) values (?, ?)'
,
(
(
'yt_id0'
,
'sub0'
),
(
'yt_id1'
,
'sub1'
)))
c
.
executemany
(
'insert into videos (sub, yt_id, title) values (?, ?, ?)'
,
(
(
1
,
'yt_id2'
,
'title0'
),
(
1
,
'yt_id3'
,
'title1'
),
(
1
,
'yt_id4'
,
'title2'
),
(
1
,
'yt_id5'
,
'title3'
),
(
2
,
'yt_id6'
,
'title4'
),
(
2
,
'yt_id7'
,
'title5'
)))
c
=
self
.
conn
.
cursor
()
with
wrap_stdout
():
self
.
subs
.
tag
(
tag
=
'tag0'
,
items
=
(
'yt_id2'
,
'yt_id3'
))
self
.
subs
.
tag
(
tag
=
'tag1'
,
items
=
(
'yt_id3'
,
'yt_id6'
))
def
test_untagged
(
self
):
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
untagged
=
True
)
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title2
\n
'
' [ ] title3
\n
'
'sub1
\n
'
' [ ] title5
\n
'
)
def
test_tag
(
self
):
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
tags
=
(
'tag0'
,))
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title0
\n
'
' [ ] title1
\n
'
)
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
tags
=
(
'tag1'
,))
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title1
\n
'
'sub1
\n
'
' [ ] title4
\n
'
)
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
tags
=
(
'tag0'
,
'tag1'
))
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title0
\n
'
' [ ] title1
\n
'
'sub1
\n
'
' [ ] title4
\n
'
)
def
test_remove
(
self
):
c
=
self
.
conn
.
cursor
()
with
wrap_stdout
():
self
.
subs
.
tag
(
remove
=
True
,
tag
=
'tag0'
,
items
=
(
'yt_id3'
,))
self
.
subs
.
tag
(
remove
=
True
,
tag
=
'tag1'
,
items
=
(
'yt_id6'
,))
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
tags
=
(
'tag0'
,))
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title0
\n
'
)
with
wrap_stdout
()
as
out
:
self
.
subs
.
list_videos
(
tags
=
(
'tag1'
,))
out
=
out
.
read
()
self
.
assertEqual
(
out
,
'sub0
\n
'
' [ ] title1
\n
'
)
class
TestWatched
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
conn
=
sqlite3
.
connect
(
':memory:'
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment