Commit 4693950e authored by Bruno Barcarol Guimarães's avatar Bruno Barcarol Guimarães
Browse files

Move source files to a sub-directory

parent e8005647
import datetime
import youtube_dl
class Client(object):
CHANNEL_URL = 'https://www.youtube.com/channel/{}'
VIDEO_URL = 'https://www.youtube.com/watch?v={}'
class logger(object):
warning = print
error = print
def __init__(self, verbose):
self.debug = (lambda *_: None) if verbose < 2 else print
def __init__(self, verbose: int, ydl=None):
self._opts = {'logger': self.logger(verbose), 'extract_flat': True}
self._ydl = ydl or youtube_dl.YoutubeDL(self._opts)
def info(self, url: str):
return self._ydl.extract_info(url, download=False)
def channel_entries(self, yt_id: str):
url = self.info(self.CHANNEL_URL.format(yt_id))['url']
return self.info(url)['entries']
def upload_date(self, yt_id: str):
d = self.info(self.VIDEO_URL.format(yt_id))['upload_date']
ts = datetime.datetime.strptime(d, '%Y%m%d').timestamp()
return int(ts)
import typing
class Query(object):
@classmethod
def make_args(cls, n: int): return ', '.join(('?',) * n)
def __init__(self, table: str):
self.table = table
self.fields: typing.List[str] = []
self.joins: typing.List[str] = []
self.wheres: typing.List[str] = []
self.group_bys: typing.List[str] = []
self.order_bys: typing.List[str] = []
def add_fields(self, *name: str):
self.fields.extend(name)
def add_joins(self, *joins: str):
self.joins.extend(joins)
def add_filter(self, *exprs: str):
self.wheres.extend(exprs)
def add_group(self, *exprs: str):
self.group_bys.extend(exprs)
def add_order(self, *exprs: str):
self.order_bys.extend(exprs)
def _query(self, l):
if self.wheres:
l.append('where')
l.append(' and '.join(self.wheres))
if self.group_bys:
l.append('group by')
l.append(', '.join(self.group_bys))
if self.order_bys:
l.append('order by')
l.append(', '.join(self.order_bys))
return ' '.join(l)
def query(self, distinct=False):
ret = ['select']
if distinct:
ret.append('distinct')
ret.extend((', '.join(self.fields), 'from', self.table))
if self.joins:
ret.append(' '.join(self.joins))
return self._query(ret)
def update(self, q):
return self._query(['update', self.table, 'set', q])
......@@ -12,6 +12,9 @@ import urllib.parse
import xml.etree.ElementTree
import youtube_dl
from client import Client
from query import Query
VALID_SUB_FIELDS = {'yt_id', 'name'}
VALID_VIDEO_FIELDS = {'yt_id', 'title', 'watched', 'url'}
......@@ -118,86 +121,6 @@ def db_file():
return ret
class Client(object):
CHANNEL_URL = 'https://www.youtube.com/channel/{}'
VIDEO_URL = 'https://www.youtube.com/watch?v={}'
class logger(object):
warning = print
error = print
def __init__(self, verbose):
self.debug = (lambda *_: None) if verbose < 2 else print
def __init__(self, verbose: int, ydl=None):
self._opts = {'logger': self.logger(verbose), 'extract_flat': True}
self._ydl = ydl or youtube_dl.YoutubeDL(self._opts)
def info(self, url: str):
return self._ydl.extract_info(url, download=False)
def channel_entries(self, yt_id: str):
url = self.info(self.CHANNEL_URL.format(yt_id))['url']
return self.info(url)['entries']
def upload_date(self, yt_id: str):
d = self.info(self.VIDEO_URL.format(yt_id))['upload_date']
ts = datetime.datetime.strptime(d, '%Y%m%d').timestamp()
return int(ts)
class Query(object):
@classmethod
def make_args(cls, n: int): return ', '.join(('?',) * n)
def __init__(self, table: str):
self.table = table
self.fields: typing.List[str] = []
self.joins: typing.List[str] = []
self.wheres: typing.List[str] = []
self.group_bys: typing.List[str] = []
self.order_bys: typing.List[str] = []
def add_fields(self, *name: str):
self.fields.extend(name)
def add_joins(self, *joins: str):
self.joins.extend(joins)
def add_filter(self, *exprs: str):
self.wheres.extend(exprs)
def add_group(self, *exprs: str):
self.group_bys.extend(exprs)
def add_order(self, *exprs: str):
self.order_bys.extend(exprs)
def _query(self, l):
if self.wheres:
l.append('where')
l.append(' and '.join(self.wheres))
if self.group_bys:
l.append('group by')
l.append(', '.join(self.group_bys))
if self.order_bys:
l.append('order by')
l.append(', '.join(self.order_bys))
return ' '.join(l)
def query(self, distinct=False):
ret = ['select']
if distinct:
ret.append('distinct')
ret.extend((', '.join(self.fields), 'from', self.table))
if self.joins:
ret.append(' '.join(self.joins))
return self._query(ret)
def update(self, q):
return self._query(['update', self.table, 'set', q])
class Subscriptions(object):
def __init__(
self, verbose: int, conn: sqlite3.Connection,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment