#!/usr/bin/python """Utilities for talking to MythTV from Python. Adam Sampson """ import MySQLdb import glob from offog import * import os import re import sys class MythException(Exception): pass @memoised def get_config(): """Read MythTV's configuration files.""" config = {} mythtv_dirs = [ os.getenv("HOME") + "/.mythtv", "/etc/mythtv", ] # Read mysql.txt for database details. for fn in [dir + "/mysql.txt" for dir in mythtv_dirs]: try: f = open(fn) except IOError: continue for l in f.readlines(): l = l.strip() if l == "" or l.startswith("#"): continue fs = l.split("=", 1) if len(fs) != 2: continue config[fs[0]] = fs[1] f.close() break # Find the directory in which recordings are stored. # To set this, create a symlink in the MythTV config directory called # "storage-dir". # (The MythTV client never looks in there itself, so this isn't # in the regular config files.) for fn in [dir + "/storage-dir" for dir in mythtv_dirs]: try: config["StorageDir"] = os.readlink(fn) break except OSError: continue # Check we've found everything we need. require_keys = set([ "DBHostName", "DBUserName", "DBPassword", "DBName", "StorageDir", ]) missing = require_keys - set(config.keys()) if len(missing) != 0: raise MythException("Configuration keys not found: " + ", ".join(missing)) return config def connect_db(): """Return a connection to the MythTV database.""" config = get_config() return MySQLdb.connect(host=config["DBHostName"], user=config["DBUserName"], passwd=config["DBPassword"], db=config["DBName"]) class Recording: """A MythTV recording.""" def __init__(self, **attrs): self.attrs = attrs def set(self, key, value): self.attrs[key] = value def __getattr__(self, key): return self.attrs[key] def __str__(self): return repr(self) def __repr__(self): return ("Recording(" + ", ".join(["%s=%s" % (k, repr(v)) for k, v in self.attrs.items()]) + ")") def get_recordings(conn, query=None, query_args=[]): """Find recordings that match a database query on the "recorded" table.""" if query is None: query = "TRUE" c = conn.cursor() c.execute("SELECT chanid, starttime, endtime, title, subtitle, description FROM recorded WHERE " + query + " ORDER BY starttime, chanid", query_args) storage_dir = get_config()["StorageDir"] recs = [] for row in c.fetchall(): (chanid, starttime, endtime, title, subtitle, description) = row fn = "%s/%s_%s" % (storage_dir, chanid, starttime.strftime("%Y%m%d%H%M%S")) fns = glob.glob(fn + "*.mpg") if fns == []: fns = glob.glob(fn + "*.nuv") if fns == []: raise MythException("Could not find video file: " + fn) fn = fns[0] rec = Recording(title=title, subtitle=subtitle, starttime=starttime, endtime=endtime, description=description, filename=fn) make_tidy_title(rec) recs.append(rec) c.close() conn.commit() return recs def make_tidy_title(recording): """Set clean_title/clean_subtitle for a recording, attempting to fix some of the ways in which broadcasters mangle the transmitted information. The resulting strings will be safe for use in filenames. This is very UK-specific.""" title = recording.title subtitle = recording.subtitle description = recording.description if subtitle != "" and title.endswith(subtitle): title = title[:-len(subtitle)] m = re.match(r'^\.\.\.([^\.]+)(?:\. (.+))?$', subtitle) if m is not None and title.endswith(m.group(1)): if m.group(2) is None: subtitle = "" else: subtitle = m.group(2) bad_subtitles = [ r'topical comedy', r'chairs the devious word game', r'perennial antidote to panel games', r'BBC Radio 7', r'7 Premiere', r'chairs the topical news quiz', ] for r in bad_subtitles: if re.search(r, subtitle) is not None: subtitle = "" m = re.search(r'^Part of.*?(\. *(.*))?$', subtitle) if m is not None: if m.group(1) is not None: subtitle = m.group(2) else: subtitle = "" m = re.search(r'^(.*)(: | - )(.*)$', title) if m is not None: title = m.group(1) subtitle = m.group(3) + " - " + subtitle m = re.search(r'^([^.]*): (.*)$', description) if subtitle == "" and m is not None: subtitle = m.group(1) m = re.search(r'Episode (\d+) of (\d+)', description) if m is not None: subtitle += " - %s of %s" % m.group(1, 2) if title == "A History of the World in 100 Objects": m = re.search(r'\b(\d+)(/\d+)?: ([^.]*)', description) if m is not None: subtitle = "%s - %s" % m.group(1, 3) else: # It's an omnibus (probably). subtitle = "" def clean(s): s = re.sub(r'(- )+', '- ', s) s = s.replace(": ", " - ") s = s.replace("/", " of ") s = re.sub(r'[\?!]', '', s) s = re.sub(r'\s+', ' ', s) s = s.strip() s = re.sub(r' -$', '', s) s = re.sub(r'^- ', '', s) if s == "": return None return s recording.set("clean_title", clean(title)) recording.set("clean_subtitle", clean(subtitle)) if __name__ == "__main__": print get_config()