436 lines
20 KiB
Python
Executable File
436 lines
20 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
|
# -*- coding: utf-8 -*-
|
|
# -*- Mode: Python
|
|
#
|
|
# Copyright (C) 2023-2024 Red Hat, Inc.
|
|
#
|
|
# Author: Frank Ch. Eigler
|
|
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import functools
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import git
|
|
import platform
|
|
import tempfile
|
|
import re
|
|
import ast
|
|
import os
|
|
import glob
|
|
try:
|
|
import libarchive
|
|
enable_libarchive=True
|
|
except:
|
|
enable_libarchive=False
|
|
|
|
|
|
# globals
|
|
args = None
|
|
|
|
|
|
|
|
def default_distrobranch():
|
|
"""Compute the default distrobranch string for the current host.
|
|
This involves parsing /etc/os-release and uname. (NIST CPE would
|
|
be another alternative, but there appears to be no standard location
|
|
on different distros, like Fedora's /etc/system-release-cpe.)
|
|
|
|
:returns: String like "fedora/39/x86_64", or None
|
|
"""
|
|
osrelease = {}
|
|
with open('/etc/os-release', 'r', encoding='utf-8') as f:
|
|
r = re.compile(r'(?P<name>\w+)=(?P<value>.+)')
|
|
for line in f:
|
|
m = r.match(line)
|
|
if m:
|
|
try:
|
|
value = ast.literal_eval(m.group("value")) # unquote "strings", parse numbers
|
|
except:
|
|
value = m.group("value") # but some strings are already unquoted
|
|
osrelease[m.group('name')] = value
|
|
|
|
try:
|
|
return str(osrelease['ID'])+'/'+str(osrelease['VERSION_ID'])+'/'+platform.uname().machine
|
|
except:
|
|
return None
|
|
|
|
|
|
def get_buildid(path: str, desc: str) -> str:
|
|
"""Invoke eu-readelf to fetch the buildid of the given binary.
|
|
Unfortunately this involves parsing the textual output via regexp.
|
|
Raise an exception if not found.
|
|
|
|
:param str path: filename
|
|
:return: lowercase hexadecimal buildid
|
|
:rtype: str
|
|
:raises:
|
|
RuntimeError: if the build-id line is not found in eu-readelf's output
|
|
"""
|
|
cmdline = ["eu-readelf",
|
|
"-n", # fetches all notes; -nSECTION is possible by recent
|
|
path]
|
|
logging.debug(f"running {cmdline}")
|
|
result = subprocess.run(cmdline,
|
|
capture_output=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"eu-readelf error: {path}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
raise RuntimeError(f"eu-readelf failure {result.returncode} {path} {desc}")
|
|
|
|
r = re.compile(r'^\s+Build ID: ([0-9a-z]+)$')
|
|
for line in result.stdout.decode('utf-8').split('\n'):
|
|
m = r.match(line)
|
|
if m:
|
|
return m.group(1)
|
|
|
|
raise RuntimeError(f"Build ID not found in eu-readelf {path} {desc} output")
|
|
|
|
|
|
def get_soname(path: str, desc: str) -> str:
|
|
"""Invoke eu-readelf to fetch the soname of the given shared library.
|
|
Unfortunately this involves parsing the textual output via regexp.
|
|
Raise an exception if not found.
|
|
|
|
:param str path: filename
|
|
:return: soname
|
|
:rtype: str
|
|
:raises:
|
|
RuntimeError: if the soname line is not found in eu-readelf's output
|
|
"""
|
|
cmdline = ["eu-readelf",
|
|
"-d",
|
|
path]
|
|
logging.debug(f"running {cmdline}")
|
|
result = subprocess.run(cmdline,
|
|
capture_output=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"eu-readelf error: {path}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
|
|
r = re.compile(r'^\s+SONAME\s+Library soname:\s\[(.*)\]$')
|
|
for line in result.stdout.decode('utf-8').split('\n'):
|
|
m = r.match(line)
|
|
if m:
|
|
return m.group(1)
|
|
|
|
raise RuntimeError(f"SONAME not found in eu-readelf {path} {desc} output")
|
|
|
|
|
|
def get_solibs(path: str) -> list[str]:
|
|
"""Invoke eu-readelf to fetch the list of dependent shared
|
|
libraries of the given binary. Unfortunately this involves
|
|
parsing the textual output via regexp. Raise an exception if not
|
|
found.
|
|
|
|
:param str path: filename
|
|
:return: list of SONAME strings
|
|
:rtype: List[str]
|
|
"""
|
|
cmdline =["eu-readelf",
|
|
"-d",
|
|
path]
|
|
logging.debug(f"running {cmdline}")
|
|
result = subprocess.run(cmdline,
|
|
capture_output=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"eu-readelf error: {path}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
|
|
solibs = []
|
|
r = re.compile(r'^\s+NEEDED\s+Shared library:\s\[(.*)\]$')
|
|
for line in result.stdout.decode('utf-8').split('\n'):
|
|
m = r.match(line)
|
|
if m:
|
|
solibs.append(m.group(1))
|
|
return solibs
|
|
|
|
|
|
def main() -> list[str]:
|
|
parser = argparse.ArgumentParser(description='Check binary against abidb corpus and/or submit new data.',
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument('--loglevel',type=str,help='logging level',default='info')
|
|
parser.add_argument('--git',type=str,help='abidb git working tree',default='.')
|
|
parser.add_argument('--distrobranch',type=str,help='use given abidb distrobranch',default=default_distrobranch())
|
|
parser.add_argument('--timeout',type=int,help='limit abidw/abicompat runtime (seconds)',default=0)
|
|
parser.add_argument('--submit',nargs='*',type=str,default=[],
|
|
help='submit abidw of given binaries to abidb')
|
|
if enable_libarchive:
|
|
parser.add_argument('--archive','-Z',metavar='EXT=CMD',
|
|
type=str,help='submit binaries from archives with given extension & decoder',
|
|
default=[],action='append') # like debuginfod(8)
|
|
parser.add_argument('--sysroot',type=str,help='remove given sysroot prefix from submitted file names',default=None)
|
|
parser.add_argument('--filter',type=str,help='submit only binaries matching given wildcard',default=r'/lib.*\.so') # sub-version suffixes will be flattened into SONAME
|
|
# --sysroot=PATH subtract this from SUBMIT paths
|
|
parser.add_argument('--check',type=str,nargs='*',default=[],
|
|
help='check given binaries against abidb')
|
|
parser.add_argument('--ld-library-path',type=str,
|
|
help='override LD_LIBRARY_PATH for soname resolution during check',
|
|
default=None) # XXX: how to find appropriate default?
|
|
parser.add_argument('--abicompat',type=str,help='the path to the abicompat program to use',
|
|
default='abicompat')
|
|
parser.add_argument('--abidw',type=str,help='the path to the abidw program to use',
|
|
default='abidw')
|
|
|
|
global args
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=args.loglevel.upper(),
|
|
format="%(asctime)s:"+os.path.basename(__file__)+":%(levelname)s:%(message)s")
|
|
logging.captureWarnings(True)
|
|
|
|
if len(args.submit) + len(args.check) == 0:
|
|
logging.error("need --check or --submit")
|
|
parser.print_usage()
|
|
exit(1)
|
|
|
|
|
|
# Open the git repo
|
|
args.git = os.path.realpath(args.git) # canonicalize
|
|
abidb = git.Repo(args.git) #type: ignore[attr-defined]
|
|
logging.debug(f'opened git repo {args.git}')
|
|
|
|
failures = []
|
|
|
|
# Submit
|
|
if len(args.submit) > 0:
|
|
# Check out the distrobranch, creating if necessary
|
|
if args.distrobranch in abidb.heads:
|
|
abidb.heads[args.distrobranch].checkout(force=True)
|
|
abidb.git.reset()
|
|
abidb.git.clean('-xdf') # clean of misc files
|
|
else:
|
|
abidb.git.checkout(args.distrobranch,orphan=True)
|
|
abidb.git.reset()
|
|
abidb.git.clean('-xdf') # clean of misc files, can easily happen in the case of an orphan branch
|
|
abidb.git.commit(message="initial commit",allow_empty=True) # so index diff HEAD works
|
|
numfiles=len(abidb.git.ls_files().split())
|
|
logging.info(f'checked out distrobranch {args.distrobranch} files {numfiles}')
|
|
|
|
ra = {}
|
|
if enable_libarchive:
|
|
for entry in args.archive: # parse / accumulate -Z EXT=CMD bits
|
|
extcmd = entry.split('=')
|
|
ext = extcmd[0]
|
|
if len(extcmd) == 1:
|
|
cmd = "cat" # default: pass through to libarchive
|
|
else:
|
|
cmd = "=".join(extcmd[1:]) # the rest of the command, filling other ='s back in
|
|
ra["."+ext] = cmd
|
|
|
|
|
|
def submit_file_generator(args):
|
|
"""Generate a list of (archivename,logicalname,physicalname) tuples."""
|
|
for submit in args.submit:
|
|
ext=os.path.splitext(submit)[1] # e.g., ".rpm"
|
|
if (ext not in ra): # not an archive extension?
|
|
pn = submit
|
|
if args.sysroot and submit.startswith(args.sysroot):
|
|
ln = submit[len(args.sysroot):]
|
|
else:
|
|
ln = submit
|
|
yield (None, ln, pn) # must be a plain file # XXX or ldconfig-created symlink, ugh
|
|
else: # an archive!
|
|
assert enable_libarchive
|
|
cmd = ra[ext]
|
|
if (cmd == "cat"): # short-circuit this
|
|
with libarchive.file_reader(submit) as archive:
|
|
for entry in archive:
|
|
if entry.filetype != libarchive.entry.FileType.REGULAR_FILE:
|
|
continue
|
|
# canonicalize the logical names to ordinary full paths
|
|
canon_entry = entry.name
|
|
if canon_entry.startswith("./"):
|
|
canon_entry = canon_entry[1:]
|
|
if not canon_entry.startswith("/"):
|
|
canon_entry = "/" + canon_entry
|
|
with tempfile.NamedTemporaryFile() as tmp: # extract to temp file
|
|
for block in entry.get_blocks():
|
|
tmp.write(block)
|
|
tmp.flush()
|
|
yield (submit, canon_entry, tmp.name)
|
|
tmp.close()
|
|
else: # must run conversion script on archive first
|
|
with tempfile.NamedTemporaryFile() as tmp:
|
|
with open(submit,"r") as archive:
|
|
logging.debug(f"running {cmd}")
|
|
result = subprocess.run(cmd, stdin=archive, stdout=tmp, stderr=subprocess.PIPE,
|
|
shell=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"archive conversion error: {submit} | {cmd}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
else:
|
|
logging.debug(f"converted archive {submit} | {cmd} to {tmp.name}")
|
|
|
|
with libarchive.file_reader(tmp.name) as archive:
|
|
for entry in archive:
|
|
if entry.filetype != libarchive.entry.FileType.REGULAR_FILE:
|
|
continue
|
|
# canonicalize the logical names to ordinary full paths
|
|
canon_entry = entry.name
|
|
if canon_entry.startswith("./"):
|
|
canon_entry = canon_entry[1:]
|
|
if not canon_entry.startswith("/"):
|
|
canon_entry = "/" + canon_entry
|
|
with tempfile.NamedTemporaryFile() as tmp2: # extract to temp file
|
|
for block in entry.get_blocks():
|
|
tmp2.write(block)
|
|
tmp2.flush()
|
|
yield (submit, canon_entry, tmp2.name)
|
|
tmp2.close()
|
|
|
|
rf = re.compile(args.filter)
|
|
for (an,ln,pn) in submit_file_generator(args): # run in ThreadPoolExecutor?
|
|
logging.debug(f"considering archive {an} logical {ln} physical {pn}")
|
|
if (not rf.search(ln)): # unanchored
|
|
logging.debug(f"filtered {an} {ln}")
|
|
continue
|
|
try:
|
|
ln_soname = os.path.basename(ln) # preliminary guess, for exception
|
|
buildid = get_buildid(pn, f"{an} {ln}")
|
|
soname = get_soname(pn, f"{an} {ln}")
|
|
# map /path/to/libfoo.so.N.M.P.Q to /path/to/SONAME
|
|
ln_soname = os.path.dirname(ln) + "/" + soname
|
|
gitpath = args.git + "/" + ln_soname + "/" + buildid + ".xml" # naming convention!
|
|
gitpath = os.path.realpath(gitpath) # canonicalize foo//bar's away, absolutize relative paths
|
|
|
|
if os.path.exists(gitpath):
|
|
# and not --forced
|
|
logging.debug(f'binary {ln_soname} abidb-path {gitpath} already-exists')
|
|
continue
|
|
|
|
cmdline = ["timeout", str(args.timeout), args.abidw,
|
|
# option? "--load-all-types"
|
|
pn]
|
|
logging.debug(f"running {cmdline}")
|
|
result = subprocess.run(cmdline,
|
|
capture_output=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"abidw error: {pn}:\n" +
|
|
f"{result.stdout.decode('utf-8')}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
raise RuntimeError(f"abidw failure {result.returncode} {an} {ln} {pn}")
|
|
|
|
gitdata = result.stdout
|
|
if len(result.stderr) > 0:
|
|
logging.warning(f"abidw error: {ln_soname} {an} {pn}\n{result.stderr}")
|
|
|
|
os.makedirs(os.path.dirname(gitpath), exist_ok=True)
|
|
with open(gitpath, 'wb') as f: # or we could bother decode/encode utf-8 but nah
|
|
f.write(gitdata)
|
|
|
|
# or: don't use index-add/diff/commit, just working tree level ops, for better concurrency?
|
|
abidb.index.add([gitpath])
|
|
|
|
diff = abidb.index.diff("HEAD")
|
|
if len(diff) > 0:
|
|
logan = os.path.basename(an) if an else ""
|
|
c = abidb.index.commit(f"abidb {ln_soname} {logan}") # customizable
|
|
logging.info(f'binary {pn} {an} abidb-path {ln_soname} abixml-length {len(gitdata)} commit {c.hexsha}')
|
|
else:
|
|
logging.info(f'binary {pn} {an} abidb-path {ln_soname} abixml-length {len(gitdata)} unmodified')
|
|
|
|
except Exception as e:
|
|
failures.append(f"submitting soname {ln_soname} archive {an} file {pn}")
|
|
logging.exception(e)
|
|
|
|
if len(args.check) > 0:
|
|
commit = abidb.heads[args.distrobranch].commit # may throw if distrobranch does not exist
|
|
|
|
commit_dirs = sorted([t.path for t in commit.tree.traverse() if t.type == 'tree'])
|
|
logging.info(f'examining distrobranch {args.distrobranch} dirs {len(commit_dirs)}')
|
|
|
|
for a in args.check: # run in ThreadPoolExecutor!
|
|
try:
|
|
logging.debug(f"checking {a}")
|
|
sonames = get_solibs(a)
|
|
if len(sonames) == 0:
|
|
logging.info(f'binary {a} lists no sonames')
|
|
for soname in sonames:
|
|
logging.debug(f"against soname {soname}")
|
|
soname_impl = []
|
|
|
|
# Find the libsoname.so.* directories in abidb.
|
|
soname_dirs = []
|
|
for cd in commit_dirs:
|
|
cdbd = os.path.basename(cd)
|
|
if (cdbd == soname # exact soname match?
|
|
or cdbd.startswith(soname+".")): # or oddball sub-versioned abidb dir
|
|
soname_dirs.append(cd)
|
|
|
|
# Order & filter them, in --ld-library-path mode
|
|
filtered_soname_dirs = []
|
|
if args.ld_library_path:
|
|
for ldir in args.ld_library_path.split(":"): # search, in order, for a matching commit_dir
|
|
ldir_unslashed = ldir[1:] if ldir.startswith("/") else ldir # git uses relative names
|
|
for soname_dir in soname_dirs:
|
|
if os.path.dirname(soname_dir) == ldir_unslashed: # found one - collect all!
|
|
filtered_soname_dirs.append (soname_dir)
|
|
if len(filtered_soname_dirs) > 0: # skip any later ld-library-path entries
|
|
break
|
|
else:
|
|
filtered_soname_dirs = soname_dirs
|
|
|
|
logging.debug(f"searching for {soname} in {filtered_soname_dirs}")
|
|
for candir in filtered_soname_dirs:
|
|
tree = commit.tree[candir] # resolve path/path/libfoo.so.n path
|
|
if tree.type != "tree":
|
|
logging.warning(f"skipping {candir}, expected a tree instead of {tree.type}")
|
|
continue
|
|
|
|
for blob in tree:
|
|
if blob.type != "blob":
|
|
continue
|
|
if not blob.name.endswith(".xml"):
|
|
continue
|
|
abixml = blob.name
|
|
|
|
# stream it out to a temp file to feed to abicompat later
|
|
with tempfile.NamedTemporaryFile(suffix="."+abixml) as tmp:
|
|
blob.stream_data(tmp)
|
|
tmp.flush()
|
|
|
|
soname_impl.append(abixml)
|
|
cmdline = ["timeout", str(args.timeout), args.abicompat,
|
|
"--appd", "/dev/null", # operate even with debuginfod
|
|
"--libd1", "/dev/null", # operate even with debuginfod
|
|
# extra flags?
|
|
a,
|
|
tmp.name]
|
|
logging.debug(f"running {cmdline}")
|
|
result = subprocess.run(cmdline,
|
|
capture_output=True, check=False)
|
|
if (result.returncode != 0):
|
|
logging.error(f"abicompat: {a} vs. {candir} {abixml}:\n" +
|
|
f"{result.stdout.decode('utf-8')}\n" +
|
|
f"{result.stderr.decode('utf-8')}")
|
|
raise RuntimeError(f"abicompat failure {result.returncode} {a} {candir} {abixml}")
|
|
else:
|
|
logging.info(f"abicompat success: {a} vs. {candir} {abixml}")
|
|
|
|
if len(soname_impl) == 0:
|
|
logging.warning(f"no abi.xml found for {soname}")
|
|
except Exception as e:
|
|
failures.append(f"checking {a}")
|
|
# logging.exception(e)
|
|
|
|
return failures
|
|
|
|
|
|
if __name__ == '__main__':
|
|
failures = main()
|
|
if len(failures) == 0:
|
|
exit(0)
|
|
else:
|
|
logging.error(f"{len(failures)} errors")
|
|
for f in failures:
|
|
logging.debug(f)
|
|
|
|
exit(1)
|
|
|