libabigail/tools/abidb

436 lines
20 KiB
Python
Executable File

#! /usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
# -*- coding: utf-8 -*-
# -*- Mode: Python
#
# Copyright (C) 2023-2024 Red Hat, Inc.
#
# Author: Frank Ch. Eigler
from __future__ import print_function
import argparse
import functools
import logging
import subprocess
import sys
import time
import git
import platform
import tempfile
import re
import ast
import os
import glob
try:
import libarchive
enable_libarchive=True
except:
enable_libarchive=False
# globals
args = None
def default_distrobranch():
"""Compute the default distrobranch string for the current host.
This involves parsing /etc/os-release and uname. (NIST CPE would
be another alternative, but there appears to be no standard location
on different distros, like Fedora's /etc/system-release-cpe.)
:returns: String like "fedora/39/x86_64", or None
"""
osrelease = {}
with open('/etc/os-release', 'r', encoding='utf-8') as f:
r = re.compile(r'(?P<name>\w+)=(?P<value>.+)')
for line in f:
m = r.match(line)
if m:
try:
value = ast.literal_eval(m.group("value")) # unquote "strings", parse numbers
except:
value = m.group("value") # but some strings are already unquoted
osrelease[m.group('name')] = value
try:
return str(osrelease['ID'])+'/'+str(osrelease['VERSION_ID'])+'/'+platform.uname().machine
except:
return None
def get_buildid(path: str, desc: str) -> str:
"""Invoke eu-readelf to fetch the buildid of the given binary.
Unfortunately this involves parsing the textual output via regexp.
Raise an exception if not found.
:param str path: filename
:return: lowercase hexadecimal buildid
:rtype: str
:raises:
RuntimeError: if the build-id line is not found in eu-readelf's output
"""
cmdline = ["eu-readelf",
"-n", # fetches all notes; -nSECTION is possible by recent
path]
logging.debug(f"running {cmdline}")
result = subprocess.run(cmdline,
capture_output=True, check=False)
if (result.returncode != 0):
logging.error(f"eu-readelf error: {path}\n" +
f"{result.stderr.decode('utf-8')}")
raise RuntimeError(f"eu-readelf failure {result.returncode} {path} {desc}")
r = re.compile(r'^\s+Build ID: ([0-9a-z]+)$')
for line in result.stdout.decode('utf-8').split('\n'):
m = r.match(line)
if m:
return m.group(1)
raise RuntimeError(f"Build ID not found in eu-readelf {path} {desc} output")
def get_soname(path: str, desc: str) -> str:
"""Invoke eu-readelf to fetch the soname of the given shared library.
Unfortunately this involves parsing the textual output via regexp.
Raise an exception if not found.
:param str path: filename
:return: soname
:rtype: str
:raises:
RuntimeError: if the soname line is not found in eu-readelf's output
"""
cmdline = ["eu-readelf",
"-d",
path]
logging.debug(f"running {cmdline}")
result = subprocess.run(cmdline,
capture_output=True, check=False)
if (result.returncode != 0):
logging.error(f"eu-readelf error: {path}\n" +
f"{result.stderr.decode('utf-8')}")
r = re.compile(r'^\s+SONAME\s+Library soname:\s\[(.*)\]$')
for line in result.stdout.decode('utf-8').split('\n'):
m = r.match(line)
if m:
return m.group(1)
raise RuntimeError(f"SONAME not found in eu-readelf {path} {desc} output")
def get_solibs(path: str) -> list[str]:
"""Invoke eu-readelf to fetch the list of dependent shared
libraries of the given binary. Unfortunately this involves
parsing the textual output via regexp. Raise an exception if not
found.
:param str path: filename
:return: list of SONAME strings
:rtype: List[str]
"""
cmdline =["eu-readelf",
"-d",
path]
logging.debug(f"running {cmdline}")
result = subprocess.run(cmdline,
capture_output=True, check=False)
if (result.returncode != 0):
logging.error(f"eu-readelf error: {path}\n" +
f"{result.stderr.decode('utf-8')}")
solibs = []
r = re.compile(r'^\s+NEEDED\s+Shared library:\s\[(.*)\]$')
for line in result.stdout.decode('utf-8').split('\n'):
m = r.match(line)
if m:
solibs.append(m.group(1))
return solibs
def main() -> list[str]:
parser = argparse.ArgumentParser(description='Check binary against abidb corpus and/or submit new data.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--loglevel',type=str,help='logging level',default='info')
parser.add_argument('--git',type=str,help='abidb git working tree',default='.')
parser.add_argument('--distrobranch',type=str,help='use given abidb distrobranch',default=default_distrobranch())
parser.add_argument('--timeout',type=int,help='limit abidw/abicompat runtime (seconds)',default=0)
parser.add_argument('--submit',nargs='*',type=str,default=[],
help='submit abidw of given binaries to abidb')
if enable_libarchive:
parser.add_argument('--archive','-Z',metavar='EXT=CMD',
type=str,help='submit binaries from archives with given extension & decoder',
default=[],action='append') # like debuginfod(8)
parser.add_argument('--sysroot',type=str,help='remove given sysroot prefix from submitted file names',default=None)
parser.add_argument('--filter',type=str,help='submit only binaries matching given wildcard',default=r'/lib.*\.so') # sub-version suffixes will be flattened into SONAME
# --sysroot=PATH subtract this from SUBMIT paths
parser.add_argument('--check',type=str,nargs='*',default=[],
help='check given binaries against abidb')
parser.add_argument('--ld-library-path',type=str,
help='override LD_LIBRARY_PATH for soname resolution during check',
default=None) # XXX: how to find appropriate default?
parser.add_argument('--abicompat',type=str,help='the path to the abicompat program to use',
default='abicompat')
parser.add_argument('--abidw',type=str,help='the path to the abidw program to use',
default='abidw')
global args
args = parser.parse_args()
logging.basicConfig(level=args.loglevel.upper(),
format="%(asctime)s:"+os.path.basename(__file__)+":%(levelname)s:%(message)s")
logging.captureWarnings(True)
if len(args.submit) + len(args.check) == 0:
logging.error("need --check or --submit")
parser.print_usage()
exit(1)
# Open the git repo
args.git = os.path.realpath(args.git) # canonicalize
abidb = git.Repo(args.git) #type: ignore[attr-defined]
logging.debug(f'opened git repo {args.git}')
failures = []
# Submit
if len(args.submit) > 0:
# Check out the distrobranch, creating if necessary
if args.distrobranch in abidb.heads:
abidb.heads[args.distrobranch].checkout(force=True)
abidb.git.reset()
abidb.git.clean('-xdf') # clean of misc files
else:
abidb.git.checkout(args.distrobranch,orphan=True)
abidb.git.reset()
abidb.git.clean('-xdf') # clean of misc files, can easily happen in the case of an orphan branch
abidb.git.commit(message="initial commit",allow_empty=True) # so index diff HEAD works
numfiles=len(abidb.git.ls_files().split())
logging.info(f'checked out distrobranch {args.distrobranch} files {numfiles}')
ra = {}
if enable_libarchive:
for entry in args.archive: # parse / accumulate -Z EXT=CMD bits
extcmd = entry.split('=')
ext = extcmd[0]
if len(extcmd) == 1:
cmd = "cat" # default: pass through to libarchive
else:
cmd = "=".join(extcmd[1:]) # the rest of the command, filling other ='s back in
ra["."+ext] = cmd
def submit_file_generator(args):
"""Generate a list of (archivename,logicalname,physicalname) tuples."""
for submit in args.submit:
ext=os.path.splitext(submit)[1] # e.g., ".rpm"
if (ext not in ra): # not an archive extension?
pn = submit
if args.sysroot and submit.startswith(args.sysroot):
ln = submit[len(args.sysroot):]
else:
ln = submit
yield (None, ln, pn) # must be a plain file # XXX or ldconfig-created symlink, ugh
else: # an archive!
assert enable_libarchive
cmd = ra[ext]
if (cmd == "cat"): # short-circuit this
with libarchive.file_reader(submit) as archive:
for entry in archive:
if entry.filetype != libarchive.entry.FileType.REGULAR_FILE:
continue
# canonicalize the logical names to ordinary full paths
canon_entry = entry.name
if canon_entry.startswith("./"):
canon_entry = canon_entry[1:]
if not canon_entry.startswith("/"):
canon_entry = "/" + canon_entry
with tempfile.NamedTemporaryFile() as tmp: # extract to temp file
for block in entry.get_blocks():
tmp.write(block)
tmp.flush()
yield (submit, canon_entry, tmp.name)
tmp.close()
else: # must run conversion script on archive first
with tempfile.NamedTemporaryFile() as tmp:
with open(submit,"r") as archive:
logging.debug(f"running {cmd}")
result = subprocess.run(cmd, stdin=archive, stdout=tmp, stderr=subprocess.PIPE,
shell=True, check=False)
if (result.returncode != 0):
logging.error(f"archive conversion error: {submit} | {cmd}\n" +
f"{result.stderr.decode('utf-8')}")
else:
logging.debug(f"converted archive {submit} | {cmd} to {tmp.name}")
with libarchive.file_reader(tmp.name) as archive:
for entry in archive:
if entry.filetype != libarchive.entry.FileType.REGULAR_FILE:
continue
# canonicalize the logical names to ordinary full paths
canon_entry = entry.name
if canon_entry.startswith("./"):
canon_entry = canon_entry[1:]
if not canon_entry.startswith("/"):
canon_entry = "/" + canon_entry
with tempfile.NamedTemporaryFile() as tmp2: # extract to temp file
for block in entry.get_blocks():
tmp2.write(block)
tmp2.flush()
yield (submit, canon_entry, tmp2.name)
tmp2.close()
rf = re.compile(args.filter)
for (an,ln,pn) in submit_file_generator(args): # run in ThreadPoolExecutor?
logging.debug(f"considering archive {an} logical {ln} physical {pn}")
if (not rf.search(ln)): # unanchored
logging.debug(f"filtered {an} {ln}")
continue
try:
ln_soname = os.path.basename(ln) # preliminary guess, for exception
buildid = get_buildid(pn, f"{an} {ln}")
soname = get_soname(pn, f"{an} {ln}")
# map /path/to/libfoo.so.N.M.P.Q to /path/to/SONAME
ln_soname = os.path.dirname(ln) + "/" + soname
gitpath = args.git + "/" + ln_soname + "/" + buildid + ".xml" # naming convention!
gitpath = os.path.realpath(gitpath) # canonicalize foo//bar's away, absolutize relative paths
if os.path.exists(gitpath):
# and not --forced
logging.debug(f'binary {ln_soname} abidb-path {gitpath} already-exists')
continue
cmdline = ["timeout", str(args.timeout), args.abidw,
# option? "--load-all-types"
pn]
logging.debug(f"running {cmdline}")
result = subprocess.run(cmdline,
capture_output=True, check=False)
if (result.returncode != 0):
logging.error(f"abidw error: {pn}:\n" +
f"{result.stdout.decode('utf-8')}\n" +
f"{result.stderr.decode('utf-8')}")
raise RuntimeError(f"abidw failure {result.returncode} {an} {ln} {pn}")
gitdata = result.stdout
if len(result.stderr) > 0:
logging.warning(f"abidw error: {ln_soname} {an} {pn}\n{result.stderr}")
os.makedirs(os.path.dirname(gitpath), exist_ok=True)
with open(gitpath, 'wb') as f: # or we could bother decode/encode utf-8 but nah
f.write(gitdata)
# or: don't use index-add/diff/commit, just working tree level ops, for better concurrency?
abidb.index.add([gitpath])
diff = abidb.index.diff("HEAD")
if len(diff) > 0:
logan = os.path.basename(an) if an else ""
c = abidb.index.commit(f"abidb {ln_soname} {logan}") # customizable
logging.info(f'binary {pn} {an} abidb-path {ln_soname} abixml-length {len(gitdata)} commit {c.hexsha}')
else:
logging.info(f'binary {pn} {an} abidb-path {ln_soname} abixml-length {len(gitdata)} unmodified')
except Exception as e:
failures.append(f"submitting soname {ln_soname} archive {an} file {pn}")
logging.exception(e)
if len(args.check) > 0:
commit = abidb.heads[args.distrobranch].commit # may throw if distrobranch does not exist
commit_dirs = sorted([t.path for t in commit.tree.traverse() if t.type == 'tree'])
logging.info(f'examining distrobranch {args.distrobranch} dirs {len(commit_dirs)}')
for a in args.check: # run in ThreadPoolExecutor!
try:
logging.debug(f"checking {a}")
sonames = get_solibs(a)
if len(sonames) == 0:
logging.info(f'binary {a} lists no sonames')
for soname in sonames:
logging.debug(f"against soname {soname}")
soname_impl = []
# Find the libsoname.so.* directories in abidb.
soname_dirs = []
for cd in commit_dirs:
cdbd = os.path.basename(cd)
if (cdbd == soname # exact soname match?
or cdbd.startswith(soname+".")): # or oddball sub-versioned abidb dir
soname_dirs.append(cd)
# Order & filter them, in --ld-library-path mode
filtered_soname_dirs = []
if args.ld_library_path:
for ldir in args.ld_library_path.split(":"): # search, in order, for a matching commit_dir
ldir_unslashed = ldir[1:] if ldir.startswith("/") else ldir # git uses relative names
for soname_dir in soname_dirs:
if os.path.dirname(soname_dir) == ldir_unslashed: # found one - collect all!
filtered_soname_dirs.append (soname_dir)
if len(filtered_soname_dirs) > 0: # skip any later ld-library-path entries
break
else:
filtered_soname_dirs = soname_dirs
logging.debug(f"searching for {soname} in {filtered_soname_dirs}")
for candir in filtered_soname_dirs:
tree = commit.tree[candir] # resolve path/path/libfoo.so.n path
if tree.type != "tree":
logging.warning(f"skipping {candir}, expected a tree instead of {tree.type}")
continue
for blob in tree:
if blob.type != "blob":
continue
if not blob.name.endswith(".xml"):
continue
abixml = blob.name
# stream it out to a temp file to feed to abicompat later
with tempfile.NamedTemporaryFile(suffix="."+abixml) as tmp:
blob.stream_data(tmp)
tmp.flush()
soname_impl.append(abixml)
cmdline = ["timeout", str(args.timeout), args.abicompat,
"--appd", "/dev/null", # operate even with debuginfod
"--libd1", "/dev/null", # operate even with debuginfod
# extra flags?
a,
tmp.name]
logging.debug(f"running {cmdline}")
result = subprocess.run(cmdline,
capture_output=True, check=False)
if (result.returncode != 0):
logging.error(f"abicompat: {a} vs. {candir} {abixml}:\n" +
f"{result.stdout.decode('utf-8')}\n" +
f"{result.stderr.decode('utf-8')}")
raise RuntimeError(f"abicompat failure {result.returncode} {a} {candir} {abixml}")
else:
logging.info(f"abicompat success: {a} vs. {candir} {abixml}")
if len(soname_impl) == 0:
logging.warning(f"no abi.xml found for {soname}")
except Exception as e:
failures.append(f"checking {a}")
# logging.exception(e)
return failures
if __name__ == '__main__':
failures = main()
if len(failures) == 0:
exit(0)
else:
logging.error(f"{len(failures)} errors")
for f in failures:
logging.debug(f)
exit(1)