pthbs_genpkgpy

Template engine for producing packages for pthbs written using Python and Jinja
git clone https://ccx.te2000.cz/git/pthbs_genpkgpy
Log | Files | Refs | README

genpkg.py (10302B)


      1 #!/usr/bin/env python3
      2 import argparse
      3 import hashlib
      4 import operator
      5 import os.path
      6 import subprocess
      7 from pathlib import Path
      8 
      9 import jinja2
     10 import yaml
     11 
     12 
     13 class SubmoduleInfo:
     14     def __init__(self, cache_dir='cache'):
     15         self._current_commits = None
     16         self._by_commit = Path(cache_dir) / "link" / "git-commit-sha1"
     17 
     18     @property
     19     def current(self):
     20         if self._current_commits is not None:
     21             return self._current_commits
     22         source_type = os.environ.get('pthbs_genpkgpy_submodule_source', None)
     23         if source_type == 'current':
     24             cmd = ("git", "submodule", "status")
     25         elif source_type == 'cached':
     26             cmd = ("git", "submodule", "status", '--cached')
     27         else:
     28             raise RuntimeError(
     29                 'Environment variable pthbs_genpkgpy_submodule_source'
     30                 ' must be set to either "current" or "cached"'
     31             )
     32         out = subprocess.check_output(cmd).decode('utf8')
     33         lines = out.strip('\n').split('\n')
     34         records = [[line[0]] + line[1:].split() for line in lines]
     35         self._current_commits = {
     36             r[2][8:]: r[1] for r in records if r[2].startswith("sources/")
     37         }
     38         for repo, commit in self._current_commits.items():
     39             if not (self._by_commit / commit).exists():
     40                 raise RuntimeError(
     41                     f"commit ID does not seem to be linked:"
     42                     f" {commit} from {repo!r} in {self._by_commit}"
     43                 )
     44         return self._current_commits
     45 
     46     def commit_info(self, commit_id):
     47         assert '/' not in commit_id
     48         assert '.' not in commit_id
     49         assert (self._by_commit / commit_id).exists()
     50         out = subprocess.check_output(
     51             ('git', 'show', '-s', '--pretty=format:%ai by %an'),
     52             cwd=(self._by_commit / commit_id).as_posix(),
     53         ).decode('utf8')
     54         return out
     55 
     56 
     57 class DownloadsInfo:
     58     def __init__(self, downloadlist_path):
     59         assert isinstance(downloadlist_path, Path)
     60         self._basenames = {}
     61         self._urls = {}
     62         with downloadlist_path.open('rt') as f:
     63             for line in f:
     64                 if line[0] in '#\n':
     65                     continue
     66                 sha256, size, url = line.rstrip().split(maxsplit=2)
     67                 assert len(bytes.fromhex(sha256)) == 32
     68                 assert int(size) >= 0
     69                 assert url not in self._urls
     70                 self._urls[url] = 'sha256:' + sha256
     71                 basename = os.path.basename(url)
     72                 if basename in self._basenames:
     73                     self._basenames[basename] = ValueError(
     74                         'Duplicate download name: ' + repr(basename)
     75                     )
     76                 else:
     77                     self._basenames[basename] = 'sha256:' + sha256
     78 
     79     def __getitem__(self, key):
     80         if '/' in key:
     81             value = self._urls[key]
     82         else:
     83             value = self._basenames[key]
     84         if isinstance(value, Exception):
     85             raise value
     86         return value
     87 
     88 
     89 def parse_filelist(filelist_path):
     90     assert isinstance(filelist_path, Path)
     91     with filelist_path.open('rt') as f:
     92         return {
     93             os.path.basename(fname): fhash
     94             for fhash, fname in (
     95                 line.rstrip('\n').split('  ', maxsplit=1) for line in f
     96             )
     97         }
     98 
     99 
    100 def _assertion(value):
    101     assert value
    102     return value
    103 
    104 
    105 def _value_error(*args, **kwargs):
    106     raise ValueError(*args, **kwargs)
    107 
    108 
    109 class SkipPackageException(Exception):
    110     pass
    111 
    112 
    113 def _skip_package(*args, **kwargs):
    114     raise SkipPackageException(*args, **kwargs)
    115 
    116 
    117 class Main:
    118     argument_parser = argparse.ArgumentParser()
    119     argument_parser.add_argument('-P', '--package-dir', default='packages')
    120     argument_parser.add_argument('-T', '--template-dir', default='templates')
    121     argument_parser.add_argument('-I', '--index-dir', default='.')
    122     argument_parser.add_argument('-C', '--cache-dir', default='cache')
    123     argument_parser.add_argument('-V', '--vars-file', default='vars.yaml')
    124     argument_parser.add_argument('-M', '--write-mkfile')
    125 
    126     def __init__(self, out_dir, template_dir, index_dir, cache_dir):
    127         assert isinstance(out_dir, Path)
    128         assert isinstance(template_dir, Path)
    129         assert isinstance(index_dir, Path)
    130         assert isinstance(cache_dir, Path)
    131         self.out_dir = Path(out_dir)
    132         self.template_dir = Path(template_dir)
    133         self.env = jinja2.Environment(
    134             loader=jinja2.FileSystemLoader(template_dir),
    135             undefined=jinja2.StrictUndefined,
    136             autoescape=False,
    137             extensions=["jinja2.ext.do"],
    138         )
    139         self.env.globals["pkg_sha256"] = self.pkg_sha256
    140         self.env.globals["pkg_install_name"] = self.pkg_install_name
    141         self.env.globals["pkg_install_dir"] = self.pkg_install_dir
    142         self.env.globals["submodule"] = SubmoduleInfo(cache_dir=cache_dir)
    143         self.env.globals["files"] = parse_filelist(index_dir / 'filelist.sha256')
    144         self.env.globals["downloads"] = DownloadsInfo(index_dir / 'downloadlist.sha256')
    145         self.env.globals["assertion"] = _assertion
    146         self.env.globals["value_error"] = _value_error
    147         self.env.globals["skip"] = _skip_package
    148         self.env.globals["set"] = set
    149         self.env.globals["setitem"] = operator.setitem
    150         self.env.filters["shesc"] = lambda s: "'%s'" % s.replace("'", r"'\''")
    151         self.package_hashes = {}
    152         self.rendering = []
    153         self.deps = {}
    154 
    155     @classmethod
    156     def from_argv(cls, args=None):
    157         args = cls.argument_parser.parse_args(args)
    158         obj = cls(
    159             out_dir=Path(args.package_dir),
    160             template_dir=Path(args.template_dir),
    161             index_dir=Path(args.index_dir),
    162             cache_dir=Path(args.cache_dir),
    163         )
    164         obj.load_vars_yaml(args.vars_file)
    165         if args.write_mkfile:
    166             obj.write_mkfile(args.write_mkfile)
    167         return obj
    168 
    169     def load_vars_yaml(self, fname):
    170         with open(fname) as f:
    171             self.env.globals.update(yaml.safe_load(f))
    172 
    173     def write_mkfile(self, fname):
    174         t = self.env.from_string('versions:={{versions}}\npackages:={{packages}}\n')
    175         with open(fname + '.new', 'wt') as f:
    176             f.write(t.render(packages=self.out_dir))
    177         os.rename(fname + '.new', fname)
    178 
    179     def pkg_env_sha256(self, name):
    180         current = self.rendering[-1]
    181         if current not in self.deps:
    182             self.deps[current] = set((name,))
    183         else:
    184             self.deps[current].add(name)
    185         self._pkg_sha256(name)
    186         envlist = ''.join(
    187             sorted('%s.%s\n' % (d, self.package_hashes[d]) for d in self.deps[name])
    188         )
    189         return hashlib.sha256(envlist.encode()).hexdigest()
    190 
    191     def pkg_sha256(self, name):
    192         current = self.rendering[-1]
    193         if current not in self.deps:
    194             self.deps[current] = set((name,))
    195         else:
    196             self.deps[current].add(name)
    197         return self._pkg_sha256(name)
    198 
    199     def _pkg_sha256(self, name, error_on_skip=True):
    200         if name in self.package_hashes:
    201             return self.package_hashes[name]
    202         if name in self.rendering:
    203             raise RuntimeError("circular dependency: %r", self.rendering)
    204 
    205         out_path = self.out_dir / name
    206 
    207         t = self.env.get_template("pkg/" + name)
    208         self.rendering.append(name)
    209         try:
    210             data = bytes(
    211                 t.render(
    212                     name=name,
    213                     shortname=name.split(':')[0],
    214                     import_functions=set(),  # for "generic" template
    215                     env_template={},  # for "generic" template
    216                 ).encode('utf8')
    217             )
    218         except SkipPackageException as e:
    219             if error_on_skip:
    220                 raise RuntimeError("dependency on skipped package: %s", e)
    221             else:
    222                 if out_path.exists():
    223                     out_path.unlink()
    224                 raise
    225         self.package_hashes[name] = hashlib.sha256(data).hexdigest()
    226         lastname = self.rendering.pop()
    227         assert name == lastname
    228 
    229         old_hash = None
    230         if out_path.exists():
    231             with out_path.open('rb') as f:
    232                 old_hash = hashlib.file_digest(f, "sha256").hexdigest()
    233         if old_hash is None or old_hash != self.package_hashes[name]:
    234             tmp_path = out_path.with_suffix('.new')
    235             tmp_path.write_bytes(data)
    236             tmp_path.replace(out_path)
    237 
    238         return self.package_hashes[name]
    239 
    240     def pkg_install_name(self, name):
    241         rootname = name.split(":")[0]
    242         if rootname.endswith('.environment'):
    243             return "env.%s" % (self.pkg_env_sha256(name),)
    244         else:
    245             return "%s.%s" % (name.split(":")[0], self.pkg_sha256(name))
    246 
    247     def pkg_install_dir(self, name):
    248         return os.path.join(
    249             self.env.globals["versions"],
    250             self.pkg_install_name(name),
    251         )
    252 
    253     def pkg_transitive_deps(self, name):
    254         # raise NotImplementedError(f'{self.__class__.__name}.pkg_transitive_deps()')
    255         self._pkg_sha256(name)
    256         def recur(name):
    257             for n in self.deps[name]:
    258                 yield from recur(n)
    259             yield self.pkg_install_name(name)
    260         return set(recur(name))
    261 
    262     def list_packages(self):
    263         for tplname in self.env.list_templates():
    264             if not tplname.startswith("pkg/"):
    265                 continue
    266             if "/." in tplname:
    267                 continue
    268             yield tplname[4:]
    269 
    270     def render_all(self):
    271         for pkgname in self.list_packages():
    272             try:
    273                 print("%s\t%s" % (pkgname, self._pkg_sha256(pkgname, False)))
    274             except SkipPackageException as e:
    275                 print("  [SKIPPED] %s" % (e,))
    276                 continue
    277             for dep in sorted(self.deps.get(pkgname, ())):
    278                 print(
    279                     "  > %s.%s"
    280                     % (
    281                         dep,
    282                         self.package_hashes[dep],
    283                     )
    284                 )
    285 
    286 
    287 if __name__ == '__main__':
    288     m = Main.from_argv()
    289     m.render_all()
    290 
    291 # pylama:linters=pycodestyle,pyflakes:ignore=D212,D203,D100,D101,D102,D105,D107
    292 # vim: sts=4 ts=4 sw=4 et tw=88 efm=%A%f\:%l%\:%c\ %t%n\ %m