#!/usr/bin/env python # vim:ts=4:et:foldmethod=marker # cdsuite: Convert audio CDs to collections of audio files. # Copyright 2002, 2003, 2006, 2007 Adam Sampson # # cdsuite is free software; you can redistribute and/or modify it # under the terms of that license as published by the Free Software # Foundation; either version 2 of the License, or (at your option) # any later version. # # cdsuite is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with cdsuite; see the file COPYING. If not, write to the # Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, # MA 02111-1307 USA, or see http://www.gnu.org/. import sys, os, re import musicbrainz2.disc as mbdisc import musicbrainz2.webservice as mbws #{{{ options options = { "reader" : "default", "encoder" : "default", "buffersize" : "500000", "device" : "/dev/cdrom", "vorbis.quality" : "7", "vorbis.bitrate" : "", "lame.stereomode" : "s", "lame.bitrate" : "256", "lame.quality" : "", "lame.highquality" : "yes", "flac.quality" : "8", "trackformat" : "%n %t - %g", "nogrouptrackformat" : "%n %t", "albumformat" : "%a - %g", "nogroupalbumformat" : "%a", "eject" : "yes", "release" : "", "rename" : "", "offline" : "", "dryrun" : "", "submit" : "", "prefix" : "", } #}}} #{{{ encoders class Encoder: pass class WavEncoder(Encoder): def name(this): return "wav" def ext(this): return "wav" def open(this, filename): this.fd = os.open(filename, os.O_WRONLY | os.O_CREAT) return this.fd def close(this): os.close(this.fd) class PipedEncoder(Encoder): def open(this, filename): cmd = this.command(filename) (pr, this.fd) = os.pipe() this.pid = os.fork() if this.pid == 0: os.close(this.fd) os.dup2(pr, 0) os.execvp(cmd[0], cmd) print >>sys.stderr, "Unable to start encoder: " + cmd sys._exit(1) os.close(pr) return this.fd def close(this): os.close(this.fd) os.waitpid(this.pid, 0) class LameEncoder(PipedEncoder): def name(this): return "lame" def ext(this): return "mp3" def command(this, out): qo = ("-m", options["lame.stereomode"]) if options["lame.bitrate"] != "": qo += ("-b", options["lame.bitrate"]) elif options["lame.quality"] != "": qo += ("-V", options["lame.quality"]) if options["lame.highquality"] != "": qo += ("-h",) return ("lame", "--quiet") + qo + ("-", out) class VorbisEncoder(PipedEncoder): def name(this): return "vorbis" def ext(this): return "ogg" def command(this, out): qo = () if options["vorbis.quality"] != "": qo += ("-q", options["vorbis.quality"]) elif options["vorbis.bitrate"] != "": qo += ("-b", options["vorbis.bitrate"]) return ("oggenc", "-Q") + qo + ("-o", out, "-") class FlacEncoder(PipedEncoder): def name(this): return "flac" def ext(this): return "flac" def command(this, out): q = int(options["flac.quality"]) return ("flac", "-" + str(q), "-o", out, "-s", "-") encoders = {} for x in [WavEncoder(), LameEncoder(), VorbisEncoder(), FlacEncoder()]: encoders[x.name()] = x encoders["default"] = encoders["vorbis"] #}}} #{{{ readers class Reader: def open(this, track): cmd = this.command(track) (this.fd, pw) = os.pipe() this.pid = os.fork() if this.pid == 0: os.close(this.fd) os.dup2(pw, 1) os.execvp(cmd[0], cmd) print >>sys.stderr, "Unable to start reader: " + cmd sys._exit(1) os.close(pw) return this.fd def close(this): os.close(this.fd) os.waitpid(this.pid, 0) class CdparanoiaReader(Reader): def name(this): return "cdparanoia" def command(this, track): return ("cdparanoia", "-w", "-d", options["device"], str(track), "-") # TODO: Cdda2wavReader readers = {} for x in [CdparanoiaReader()]: readers[x.name()] = x readers["default"] = readers["cdparanoia"] #}}} #{{{ utility functions def buffered_copy(rfd, wfd, size): """Copy data from rfd to wfd through a buffer of size size.""" from select import select buf = "" eof = 0 while not (eof and buf == ""): bl = len(buf) if bl != 0: wfds = [wfd] else: wfds = [] if bl < size and not eof: rfds = [rfd] else: rfds = [] (rfds, wfds, efds) = select(rfds, wfds, []) if rfd in rfds: data = os.read(rfd, size - bl) if data == "": eof = 1 buf = buf + data if wfd in wfds: count = os.write(wfd, buf) buf = buf[count:] def dryrun_rename(a, b): """Behave like os.rename, but don't actually rename anything.""" print a + " -> " + b def clean_string(s): """Remove characters from s which would be problematic in a filename.""" return s.replace("/", "&") def fill_template(template, args): """Fill in template, which contains strings of the form '%name', using the supplied arguments. The result will be encoded as UTF-8.""" def func(m): key = m.group(1) if key == "%": return "%" elif key in args and key is not None: return clean_string(args[key]) else: die("Unknown field %%%s while expanding '%s'" % (key, template)) return re.sub(r'%(.)', func, template).encode("UTF-8") def warn(*s): print >>sys.stderr, "".join(map(str, s)) def die(*s): warn(*s) sys.exit(1) #}}} #{{{ main behaviour def usage(): print >>sys.stderr, "Usage:", sys.argv[0], "[option=value]..." print >>sys.stderr, "Possible options and current values:" k = options.keys() k.sort() for arg in k: print >>sys.stderr, " " + arg + "=" + options[arg] sys.exit(1) def encode_track(track, filename): """Encode track track to file filename.""" reader = readers[options["reader"]] encoder = encoders[options["encoder"]] wfd = encoder.open(filename + "." + encoder.ext()) rfd = reader.open(track) buffered_copy(rfd, wfd, int(options["buffersize"])) encoder.close() reader.close() def encode_disc(): """Encode all the tracks from a disc.""" encoder = encoders[options["encoder"]] disc = mbdisc.readDisc(deviceName = options["device"]) discid = disc.getId() dirname = "disc-" + discid os.mkdir(dirname) f = open(dirname + "/%sdiscid" % options["prefix"], "w") f.write(discid + "\n") f.close() tracks = {} ext = encoder.ext() for n in range(disc.getFirstTrackNum(), disc.getLastTrackNum() + 1): dest = dirname + "/%s%02d" % (options["prefix"], n) encode_track(n, dest) tracks[n] = (dest + "." + ext, ext) if options["eject"] != "": if os.spawnvp(os.P_WAIT, "eject", ["eject", options["device"]]) != 0: die("eject failed") return (dirname, discid, tracks) def scan_dir(dirname): """Scan a disc that's already been encoded.""" try: f = open(dirname + "/%sdiscid" % options["prefix"]) discid = f.read().strip() f.close() if len(discid) != 28: # Probably a discid from an old version of cdsuite. discid = None except IOError: discid = None # This works for the "raw" names, and for most custom formats. track_re = re.compile(r'^' + options["prefix"] + r'(\d+).*\.([^\.]+)$') tracks = {} for fn in os.listdir(dirname): m = track_re.match(fn) if m is not None: tracks[int(m.group(1))] = (dirname + "/" + fn, m.group(2)) return (discid, tracks) def rename_tracks(dirname, discid, tracks): """Rename the tracks in a directory, and the directory itself.""" renamer = os.rename if options["dryrun"] != "": renamer = dryrun_rename q = mbws.Query() advice = "run again with 'release=X'" if options["release"] != "": releaseid = options["release"] elif discid is None: die("No discid found: " + advice) else: filter = mbws.ReleaseFilter(discId = discid) results = q.getReleases(filter = filter) if results == []: die("No match found for disc: " + advice) if len(results) != 1: results.sort(lambda a, b: cmp(b.score, a.score)) for result in results: release = result.release print ("'release=%s' = %s" % (release.getId(), release.getTitle())) die("No exact match found for disc: " + advice) releaseid = results[0].release.getId() include = mbws.ReleaseIncludes(artist = True, counts = True, tracks = True) release = q.getReleaseById(releaseid, include) disc_title = release.getTitle() disc_artist = release.getArtist() n = release.getTracksOffset() if n is None: n = 0 for track in release.getTracks(): n += 1 title = track.getTitle() artist = track.getArtist() if artist is None: artist = disc_artist if artist is None: template = options["nogrouptrackformat"] else: template = options["trackformat"] args = { "t" : title, "g" : artist.getName(), "a" : disc_title, "n" : ("%02d" % n) } if n in tracks: (oldname, ext) = tracks[n] newname = (dirname + "/" + options["prefix"] + fill_template(template, args) + "." + ext) renamer(oldname, newname) else: warn("No file found for track %d (%s)" % (n, newname)) if disc_artist is None: template = options["nogroupalbumformat"] else: template = options["albumformat"] args = { "a" : disc_title, "g" : disc_artist.getName() } newname = fill_template(template, args) renamer(dirname, newname) def submit_disc(): """Print the URL for submitting the current disc to MusicBrainz.""" disc = mbdisc.readDisc(deviceName = options["device"]) print mbdisc.getSubmissionUrl(disc) def main(args): """Main entry point.""" try: f = open(os.getenv("HOME") + "/.cdsuiterc") config = map(lambda s: s.strip(), f.readlines()) f.close() except IOError: config = [] for arg in config + args: if arg == "" or arg[0] == "#": continue s = arg.split("=", 1) if len(s) != 2: usage() (option, value) = s if not options.has_key(option): usage() options[option] = value if options["submit"] != "": submit_disc() sys.exit(0) if not readers.has_key(options["reader"]): die("Unknown reader; can be: " + " ".join(readers.keys())) if not encoders.has_key(options["encoder"]): die("Unknown encoder; can be: " + " ".join(encoders.keys())) if options["rename"] == "": (dirname, discid, tracks) = encode_disc() else: dirname = options["rename"] if dirname[-1] == "/": dirname = dirname[:-1] (discid, tracks) = scan_dir(dirname) if options["offline"] == "": rename_tracks(dirname, discid, tracks) if __name__ == "__main__": sys.exit(main(sys.argv[1:])) #}}}