#!/usr/bin/env python # vim: tw=75 sts=4 ts=4 noet fileencoding=utf-8 import argparse from collections import namedtuple import time Domain = namedtuple('Domain','id name master last_check type ' 'notified_serial account') Record = namedtuple('Record', 'id domain_id name type content ttl prio ' 'change_date') def ugly_format(value): if value is None: return 'NULL' elif type(value) == int: return '%d' % value elif type(value) == str: assert "'" not in value assert '"' not in value return "'%s'"%value def xname(x, sub, fqdn): if not x: return '%s.%s'%(sub, fqdn) elif '.' in x: return x else: return '%s.%s.%s'%(x, sub, fqdn) class DB(object): CMD={ '.': 'SOA', '&': 'NS', '=': 'APTR', '+': 'A', '@': 'MX', "'": 'TXT', '^': 'PTR', 'C': 'CNAME', } SOA_FIELDS=[ 172800, # ref = refresh = 2d 900, # ret = update retry = 15m 1209600, # ex = expiry = 2w 3600, # min = minimum = 1h ] def __init__(self): self.domains = {} self.records = [] self.domain_counter = 0 self.record_counter = 0 def add_tiny(self, cmd, *args): getattr(self, 'add_%s'%self.CMD[cmd])(*args) def add_domain(self, name, master=None, ttl=None): assert name not in self.domains assert not name.endswith('.') self.domains[name] = Domain(self.domain_counter, name, master, None, 'MASTER', None, None) self.domain_counter += 1 return self.domains[name] def add_record(self, name, type_, content=None, ttl=None, prio=None): assert not name.endswith('.') dname = name while dname not in self.domains: try: dname = dname.split('.', 1)[1] except IndexError: raise ValueError( 'could not find domain for fqdn %r'%(name,) ) domain = self.domains[dname] record = Record(self.record_counter, domain.id, name, type_, content, ttl or None, prio, None) self.record_counter += 1 self.records.append(record) return record def add_NS(self, fqdn, ip=None, x=None, ttl=None, timestamp=None, lo=None): nsname = xname(x, 'ns', fqdn) self.add_record(fqdn, 'NS', nsname, ttl) if ip: self.add_record(nsname, 'A', ip, ttl) def add_SOA(self, fqdn, ip=None, x=None, ttl=None, timestamp=None, lo=None): domain = self.add_domain(fqdn, None, ttl) nsname = xname(x, 'ns', fqdn) soa_content = ' '.join([ nsname, 'hostmaster.'+fqdn, time.strftime('%Y%m%d%H'), ] + map(str, self.SOA_FIELDS)) self.add_record(fqdn, 'NS', nsname, ttl) self.add_record(fqdn, 'SOA', soa_content, ttl) if ip: self.add_record(nsname, 'A', ip, ttl) def add_APTR(self, fqdn, ip=None, ttl=None, timestamp=None, lo=None): self.add_record(fqdn, 'A', ip, ttl) self.add_record( '.'.join(reversed(ip.split('.')))+'.in-addr.arpa', 'PTR', fqdn, ttl ) def add_A(self, fqdn, ip=None, ttl=None, timestamp=None, lo=None): self.add_record(fqdn, 'A', ip, ttl) def add_MX(self, fqdn, ip=None, x=None, dist=None, ttl=None, timestamp=None, lo=None): mxname = xname(x, 'mx', fqdn) self.add_record(fqdn, 'MX', ip, ttl) if ip: self.add_record(mxname, 'A', ip, ttl) def add_TXT(self, fqdn, s, ttl=None, timestamp=None, lo=None): self.add_record(fqdn, 'TXT', s.decode('string-escape'), ttl) def add_PTR(self, fqdn, p, ttl=None, timestamp=None, lo=None): self.add_record(fqdn, 'PTR', p, ttl) def add_CNAME(self, fqdn, p, ttl=None, timestamp=None, lo=None): self.add_record(fqdn, 'CNAME', p, ttl) def to_sql(self): return ( "INSERT INTO 'domains' (%s) VALUES\n%s;\n\n" "INSERT INTO 'records' (%s) VALUES\n%s;\n" ) % ( ', '.join(("'%s'"%n for n in Domain._fields)), ',\n'.join(( "(%s)" % ', '.join(map(ugly_format, d)) for d in sorted( self.domains.values(), key = lambda d: d.id ) )), ', '.join(("'%s'"%n for n in Record._fields)), ',\n'.join(( "(%s)" % ', '.join(map(ugly_format, r)) for r in self.records )), ) def to_dumb_sql(self): return '\n'.join(( '\n'.join(( "INSERT INTO '%s' (%s) VALUES (%s);"%( table, ', '.join(("'%s'"%n for n in columns)), ', '.join(map(ugly_format, v)), ) for v in values )) for (table, columns, values) in ( ('domains', Domain._fields, sorted( self.domains.values(), key = lambda d: d.id )), ('records', Record._fields, self.records), ))) parser = argparse.ArgumentParser( usage='generate powerdns SQL from tinydns-data' ) parser.add_argument('-s', '--separate', action='store_true', help="separate insert statements for databases that can't handle " "multi-row insert statements (sqlite)") parser.add_argument('source', type=argparse.FileType('r'), metavar='DATA', help='tinydns-data source file') def main(args): db = DB() for line in args.source: if line.startswith('#') or line.startswith('-'): continue line = line.rstrip() if not line: continue cmd, values = line[0], line[1:].split(':') db.add_tiny(cmd, *values) if args.separate: print db.to_dumb_sql() else: print db.to_sql() if __name__ == '__main__': main(parser.parse_args())