Name: pymod-sf-operator-config-map Namespace: sf Labels: Annotations: Data ==== main.py: ---- # Copyright (C) 2022 Red Hat # SPDX-License-Identifier: Apache-2.0 import argparse from pathlib import Path import os import sf_operator.secret import pynotedb def ensure_git_config(): os.environ.setdefault("HOME", str(Path("~/").expanduser())) if any( map( lambda p: p.expanduser().exists(), [Path("~/.gitconfig"), Path("~/.config/git/config")], ) ): return pynotedb.execute( ["git", "config", "--global", "user.email", "admin@" + os.environ["FQDN"]] ) pynotedb.execute( ["git", "config", "--global", "user.name", "SoftwareFactory Admin"] ) def sshkey_scan(port: str, hostname: str) -> bytes: return pynotedb.pread( ["ssh-keyscan", "-T", "10", "-p", port, hostname]) def get_logserver_fingerprint() -> str: return " ".join(sshkey_scan("2222", "logserver").decode().split()[1:]) def mk_incluster_k8s_config(): sa = Path("/run/secrets/kubernetes.io/serviceaccount") def add(n): return (n, (sa / n).read_text()) api = os.environ.get("KUBERNETES_PUBLIC_API_URL") if not api: api = ( "https://" + os.environ["KUBERNETES_SERVICE_HOST"] + ":" + os.environ["KUBERNETES_SERVICE_PORT"] ) return [ add("ca.crt"), add("namespace"), ("token", os.environ["SERVICE_ACCOUNT_TOKEN"]), ("server", api)] def create_zuul_secrets(): clone = pynotedb.mk_clone("git://git-server-rw:9419/system-config") # FQDN to access the logserver logserver_host = os.environ.get("ZUUL_LOGSERVER_HOST", "logserver") logserver_fqdn = "[%s]:2222" % logserver_host # K8s secret k8s_secret = clone / "zuul.d" / "k8s-secret.yaml" secret = sf_operator.secret.mk_secret( "k8s_config", mk_incluster_k8s_config()) k8s_secret.write_text(secret) # log server secret logserver_secret = clone / "zuul.d" / "sf-logserver-secret.yaml" secret = sf_operator.secret.mk_secret( "site_sflogs", items=[ ("ssh_private_key", os.environ["ZUUL_LOGSERVER_PRIVATE_KEY"]) ], unencrypted_items=[ ("fqdn", "\"" + logserver_fqdn + "\""), ("path", "rsync"), ("ssh_known_hosts", "\"%s %s\"" % (logserver_fqdn, get_logserver_fingerprint())), ("ssh_username", "zuul") ] ) logserver_secret.write_text(secret) pynotedb.git( clone, ["add", "zuul.d/k8s-secret.yaml", "zuul.d/sf-logserver-secret.yaml"]) pynotedb.commit_and_push( clone, "Update internal Zuul secrets", "refs/heads/master") def main(): parser = argparse.ArgumentParser(description="notedb-tools") parser.add_argument("action", choices=["config-create-zuul-secrets"]) args = parser.parse_args() ensure_git_config() if args.action == "config-create-zuul-secrets": create_zuul_secrets() if __name__ == "__main__": main() secret.py: ---- # Copyright (C) 2022 Red Hat # SPDX-License-Identifier: Apache-2.0 from urllib.request import Request from urllib.request import urlopen import re import subprocess import tempfile import math import os import textwrap import base64 def mk_secret(name, items, unencrypted_items=[]): # Borrowed from zuul/tools/encrypt_secret.py pubkey = urlopen( Request("http://zuul-web:9000/api/tenant/internal/key/system-config.pub") ) pubkey_file = tempfile.NamedTemporaryFile(delete=False) pubkey_file.write(pubkey.read()) pubkey_file.close() p = subprocess.Popen( ["openssl", "rsa", "-text", "-pubin", "-in", pubkey_file.name], stdout=subprocess.PIPE, ) (stdout, stderr) = p.communicate() if p.returncode != 0: raise Exception("Return code %s from openssl" % p.returncode) output = stdout.decode("utf-8") key_length_re = r"^(|RSA )Public-Key: \((?P\d+) bit\)$" m = re.match(key_length_re, output, re.MULTILINE) nbits = int(m.group("key_length")) nbytes = int(nbits / 8) max_bytes = nbytes - 42 # PKCS1-OAEP overhead secret_output = textwrap.dedent( """ - secret: name: %s data: """ % (name) ) if unencrypted_items: for (key, value) in unencrypted_items: secret_output += " " + key + ": " + value + "\n" for (key, value) in items: secret_output += " " + key + ": !encrypted/pkcs1-oaep\n" chunks = int(math.ceil(float(len(value)) / max_bytes)) ciphertext_chunks = [] for count in range(chunks): chunk = value[int(count * max_bytes) : int((count + 1) * max_bytes)] p = subprocess.Popen( [ "openssl", "rsautl", "-encrypt", "-oaep", "-pubin", "-inkey", pubkey_file.name, ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) (stdout, stderr) = p.communicate(chunk.encode("utf-8")) if p.returncode != 0: raise Exception("Return code %s from openssl" % p.returncode) ciphertext_chunks.append(base64.b64encode(stdout).decode("utf-8")) twrap = textwrap.TextWrapper( width=79, initial_indent=" " * 8, subsequent_indent=" " * 10 ) for chunk in ciphertext_chunks: chunk = twrap.fill("- " + chunk) secret_output += chunk + "\n" os.unlink(pubkey_file.name) return secret_output BinaryData ==== Events: