Package pynotedb
Expand source code
# Copyright (c) 2020 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import os
from urllib.parse import urlparse
from sys import argv
from pathlib import Path
from typing import Callable, List, Iterator, NewType, Optional, Tuple, Union
from pynotedb.utils import execute, ls, lsR, pread, sha1sum, try_action
# Create new types to avoid mis-usage
Email = NewType('Email', str)
Username = NewType('Username', str)
PubKey = NewType('PubKey', str)
Url = NewType('Url', str)
Uuid = NewType('Uuid', str)
Clone = NewType('Clone', Path)
Branch = NewType('Branch', str)
Ref = NewType('Ref', str)
ExternalScheme = NewType('ExternalScheme', str)
fetch_head = Ref('FETCH_HEAD')
meta_config = Ref('refs/meta/config')
meta_external_ids = Ref('refs/meta/external-ids')
meta_group_names = Ref('refs/meta/group-names')
scheme_gerrit = ExternalScheme('gerrit')
scheme_keycloak = ExternalScheme('keycloak-oauth')
scheme_username = ExternalScheme('username')
scheme_mail = ExternalScheme('mailto')
def is_mine(directory: Path) -> bool:
return os.stat(directory).st_uid == os.getuid()
def parse_url(url: str) -> Url:
if urlparse(url).scheme not in ('http', 'https', 'git'):
raise RuntimeError("%s: expected url" % url)
return Url(url)
def parse_path(url: str) -> Path:
path = Path(url).resolve()
if not path.exists() or not is_mine(path):
raise RuntimeError("%s: expected writable directory" % url)
return path
def parse_url_or_path(url: str) -> Union[Url, Path]:
try:
return parse_url(url)
except RuntimeError:
return parse_path(url)
def strip_git_suffix(url: str) -> str:
return url[:-4] if url.endswith(".git") else url
def mk_clone(url: Union[Url, Path]) -> Clone:
"""Clone a project to ~/.cache/pynotedb/"""
path = Path("~/.cache/pynotedb/" + strip_git_suffix(str(url)).split('/')[-1]).expanduser()
if not path.exists():
path.mkdir(parents=True)
execute(["git", "clone", str(url), str(path)])
else:
execute(["git", "remote", "set-url", "origin", str(url)], cwd=path)
return Clone(path)
def git(clone: Clone, args: List[str]) -> None:
"""A convenient wrapper around git commands"""
execute(["git"] + args, cwd=clone)
def git_read(clone: Clone, args: List[str]) -> str:
"""A convenient reader around git commands"""
return pread(["git"] + args, cwd=clone).decode('utf-8')
def fetch(clone: Clone, ref: Ref) -> None:
"""fetch a ref"""
git(clone, ["fetch", "origin", ref])
def checkout(clone: Clone, branch: Branch, ref: Ref) -> None:
"""checkout a ref, raising an exception if it doesn't exists"""
git(clone, ["checkout", "-B", branch, ref])
def fetch_checkout(clone: Clone, branch: Branch, ref: Ref) -> None:
"""fetch a ref and check it out"""
fetch(clone, ref)
checkout(clone, branch, fetch_head)
def commit_and_push(clone: Clone, message: str, ref: Ref) -> None:
try_action(lambda: git(clone, ["commit", "-a", "-m", message]))
# TODO: check if ref is already pushed
try_action(lambda: git(clone, ["push", "origin", "HEAD:" + ref]))
def new_orphan(clone: Clone, branch: Branch) -> None:
"""create a new orphan commit"""
try_action(lambda: git(clone, ["branch", "-D", branch]))
git(clone, ["checkout", "--orphan", branch])
git(clone, ["rm", "--cached", "-r", "--", "."])
git(clone, ["clean", "-d", "-f", "-x"])
def mk_ref_id(refname: str) -> str:
"""Create gerrit CD/ABCD name, refname must not be empty.
>>> mk_ref_id("1")
'01/1'
>>> mk_ref_id("41242")
'42/41242'
"""
refid = refname[-2:] if len(refname) > 1 else ("0" + refname[-1])
return refid + "/" + refname
def mk_ref(name: str) -> Callable[[str], Ref]:
def func(refname: str) -> Ref:
return Ref("refs/" + name + "/" + mk_ref_id(refname))
return func
def mk_user_ref(user: str) -> Ref:
"""Create a user ref
>>> mk_user_ref("1")
'refs/users/01/1'
"""
return mk_ref("users")(user)
def mk_group_ref(group: Uuid) -> Ref:
"""Create a group ref"""
return mk_ref("groups")(group)
def invert_ref_id(ref: Ref) -> Ref:
"""Invert a gerrit ref
>>> invert_ref_id("refs/groups/CD/ABCD")
'refs/groups/AB/ABCD'
"""
r, g, _, i = ref.split('/')
return Ref('/'.join([r, g, i[:2], i]))
def read_items(lines: List[str]) -> List[Tuple[str, str]]:
"""Read key values of git config file
>>> read_items(["[group]", " name = un name=avec ", "uuid=4242"])
[('name', 'un name=avec'), ('uuid', '4242')]
"""
return [(elems[0], elems[1])
for elems in map(lambda s: list(map(str.strip, s.split("=", 1))), lines)
if len(elems) == 2]
def read_group_name_uid(group_file: Path) -> Optional[Tuple[str, Uuid]]:
"""Return the name and uuid of a group config file"""
name, uid = None, None
for k, v in read_items(group_file.read_text().split('\n')):
if k == "name":
name = v
elif k == "uuid":
uid = v
if name and uid:
return (name, Uuid(uid))
return None
def read_user_name(user_file: Path) -> Optional[str]:
for k, v in read_items(user_file.read_text().split('\n')):
if k == 'fullName':
return v
return None
def get_group_id(all_users: Clone, group_name: str) -> Optional[Tuple[Path, Uuid]]:
"""Return the file path and uid of a group name"""
fetch_checkout(all_users, Branch("group_names"), meta_group_names)
for fn in filter(lambda fp: fp.is_file(), ls(all_users)):
group_info = read_group_name_uid(fn)
if group_info:
name, uid = group_info
if name == group_name:
return (fn, uid)
return None
def get_user_id(user_ref: Ref) -> str:
return user_ref.split('/')[-1]
def get_users_ref(all_users: Clone) -> Iterator[Ref]:
"""Return the list of user git refs"""
return map(Ref,
filter(lambda s: s.startswith("refs/users/") and s != "refs/users/self",
map(lambda s: s and s.split()[-1],
git_read(all_users, ["ls-remote"]).split('\n'))))
def get_user_ref(all_users: Clone, user: str) -> Optional[Ref]:
"""Return the user git ref"""
for user_ref in get_users_ref(all_users):
fetch_checkout(all_users, Branch("user_" + get_user_id(user_ref)), user_ref)
if read_user_name(all_users / "account.config") == user:
return user_ref
return None
def lookup_sha_nest(root: Clone, sha: str, acc: int) -> Optional[int]:
"""Count nesting level for a given sha"""
if sha == "":
# The sha was not found
return None
if (root / sha).exists():
return acc
return lookup_sha_nest(root / sha[:2], sha[2:], acc + 1)
def nest_sha(root: Path, sha: str, nest: int) -> Path:
"""Split a sha for a given nesting level"""
if nest == 0:
return root / sha
return nest_sha(root / sha[:2], sha[2:], nest - 1)
def write_obj(file_path : Path, file_content : List[str]) -> None:
"""Write a file and ensure parent directory exists"""
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
file_path.write_text("\n".join(file_content))
def write_sha_obj(root: Path, sha: str, nest: int, file_content: List[str]) -> None:
"""Write a sha object by respecting a nest level"""
write_obj(nest_sha(root, sha, nest), file_content)
def show_external_id(scheme: ExternalScheme, username: str, account_id: str) -> List[str]:
"""Return an external id file content"""
return [
"[externalId \"" + scheme + ":" + username + "\"]",
"\taccountId = " + account_id,
""
]
def write_gerrit_username_id_files(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) -> None:
"""Create an external id scheme for a given user"""
# Lookup if the http external id is already created and get the current nest level
http_sha = sha1sum(scheme_username + ":" + username)
nest = lookup_sha_nest(all_users, http_sha, 0)
if nest is None:
nest = 1
write_sha_obj(all_users, http_sha, nest, show_external_id(scheme_username, username, account_id))
# Then write the desired external id scheme
write_sha_obj(all_users, sha1sum(scheme + ":" + username), nest, show_external_id(scheme, username, account_id))
def add_account_external_id(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) -> None:
"""Create an account external id"""
fetch_checkout(all_users, Branch("ids"), meta_external_ids)
write_gerrit_username_id_files(all_users, username, account_id, scheme)
git(all_users, ["add", "."])
commit_and_push(all_users, "Add externalId for user " + username, meta_external_ids)
def delete_group(all_users_path: Path, group: str) -> None:
all_users = mk_clone(all_users_path)
group_path_id = get_group_id(all_users, group)
if not group_path_id:
raise RuntimeError("%s: group doesn't exists!" % group)
group_path, group_id = group_path_id
git(all_users, ["push", "--delete", "origin", invert_ref_id(mk_group_ref(group_id))])
git(all_users, ["rm", str(group_path)])
commit_and_push(all_users, "Remove group " + group, meta_group_names)
def list_external_ids(all_users: Clone) -> List[Path]:
if not try_action(lambda: fetch_checkout(all_users, Branch("external_ids"), meta_external_ids)):
return []
return list(filter(lambda fp: fp.is_file(), lsR(all_users)))
def ext_id_match(headers: List[str], ext_id_file: Path) -> bool:
ext_id_file_content = ext_id_file.read_text().split('\n')
return any(filter(lambda h: h in ext_id_file_content, headers))
def get_user_external_id(all_users: Clone, user: Username, email: Email) -> List[Path]:
headers = list(map(lambda sn: external_id_header(sn[0], sn[1]),
[(scheme_mail, email), (scheme_gerrit, user), (scheme_username, user)]))
return [ext_id_file for ext_id_file in list_external_ids(all_users)
if ext_id_match(headers, ext_id_file)]
def delete_user(all_users_path: Path, user: Username, email: Email) -> None:
all_users = mk_clone(all_users_path)
# Remove external id
for user_external_id_file in get_user_external_id(all_users, user, email):
git(all_users, ["rm", str(user_external_id_file)])
commit_and_push(all_users, "Removing external id for user %s" % user, meta_external_ids)
# Delete user ref
user_ref = get_user_ref(all_users, user)
if not user_ref:
raise RuntimeError("%s: user doesn't exists!" % user)
# TODO: delete group membership too?
git(all_users, ["push", "--delete", "origin", user_ref])
def create_admin_user(email: Email, pubkey: PubKey, all_users_url: Union[Url, Path], scheme: ExternalScheme) -> None:
"""Ensure the admin user is created"""
all_users = mk_clone(all_users_url)
admin_ref = mk_user_ref("1")
if not try_action(lambda: fetch(all_users, admin_ref)):
# Add user to admin group
admin_group_id = get_group_id(all_users, "Administrators")
if not admin_group_id:
raise RuntimeError("%s: Administrators group doesn't exists!" % all_users)
admin_group_ref = mk_group_ref(admin_group_id[1])
if not try_action(lambda: fetch_checkout(all_users, Branch("group_admin"), admin_group_ref)):
# For some reason, group ref can be AB/ABCD
admin_group_ref = invert_ref_id(admin_group_ref)
fetch_checkout(all_users, Branch("group_admin"), admin_group_ref)
members_file = all_users / "members"
if members_file.exists():
members = members_file.read_text().split('\n')
else:
members = []
if "1" not in members:
members_file.write_text("\n".join(members + ["1", ""]))
git(all_users, ["add", "members"])
commit_and_push(all_users, "Add admin user to Administrators group", admin_group_ref)
# Create externalId
if not try_action(lambda: fetch_checkout(all_users, Branch("external_ids"), meta_external_ids)):
new_orphan(all_users, Branch("external_ids"))
write_gerrit_username_id_files(all_users, "admin", "1", scheme)
write_sha_obj(all_users, sha1sum("mailto:" + email), 1, [
"[externalId \"mailto:" + email + "\"]",
"\taccountId = 1",
"\temail = " + email,
""
])
git(all_users, ["add", "."])
commit_and_push(all_users, "Add admin external id", meta_external_ids)
# Create user
new_orphan(all_users, Branch("user_admin"))
(all_users / "account.config").write_text("\n".join([
"[account]",
"\tfullName = Administrator",
"\tpreferredEmail = " + email,
""
]))
(all_users / "authorized_keys").write_text(pubkey + "\n")
git(all_users, ["add", "account.config", "authorized_keys"])
commit_and_push(all_users, "Initialize admin user", admin_ref)
def create_gerrit_external_id(filename: Path) -> None:
"""Create a gerrit external id scheme for username scheme"""
filecontent = filename.read_text()
is_username = [fileline
for fileline in filecontent.split('\n')
if fileline.startswith("[externalId \"username:")]
if is_username:
extid = is_username[0].split("\"")[1]
_scheme, name = extid.split(":", 1)
newfilename = filename.parent / sha1sum("gerrit:" + name)
newfilename.write_text(filecontent.replace(
"[externalId \"username:",
"[externalId \"gerrit:"))
def external_id_header(scheme: ExternalScheme, name: str) -> str:
return "[externalId \"%s:%s\"]" % (scheme, name)
def migrate(all_projects_url: Union[Url, Path], all_users_url: Union[Url, Path]) -> None:
"""Migrate software factory notedb data from gerrit 2.x"""
# Ensure admin can push notedb ref
all_projects = mk_clone(all_projects_url)
git(all_projects, ["config", "-f", "project.config", "access.refs/*.push",
"group Administrators", ".*group Administrators"])
commit_and_push(all_projects, "Enable admin to push refs", meta_config)
# Update externalId to use `gerrit` scheme instead of `username`
all_users = mk_clone(all_users_url)
if list(map(create_gerrit_external_id, list_external_ids(all_users))):
git(all_users, ["add", "."])
commit_and_push(all_users, "Update external id to gerrit scheme", meta_external_ids)
def gerrit_to_kc_external_id(filename: Path) -> None:
filecontent = filename.read_text()
is_username = [fileline
for fileline in filecontent.split('\n')
if fileline.startswith("[externalId \"gerrit:")]
if is_username:
extid = is_username[0].split("\"")[1]
_scheme, name = extid.split(":", 1)
newfilename = filename.parent / sha1sum(scheme_keycloak + ":" + name)
newfilename.write_text(filecontent.replace(
"[externalId \"gerrit:",
"[externalId \"keycloak-oauth:"))
def migrate_to_keycloak(all_users_url: Union[Url, Path]) -> None:
"""Migrate users from cauth to keycloak."""
all_users = mk_clone(all_users_url)
if list(map(gerrit_to_kc_external_id, list_external_ids(all_users))):
# stage new files, deleted files
git(all_users, ["add", "-u", "."])
git(all_users, ["add", "."])
commit_and_push(all_users, "Migrate auth scheme to keycloak", meta_external_ids)
def main() -> None:
"""The CLI entrypoint"""
def usage(argv: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="notedb-tools")
parser.add_argument("action", choices=["create-admin-user", "migrate",
"delete-group", "delete-user",
"cauth-to-keycloak"])
parser.add_argument("--email", help="The user email address")
parser.add_argument("--pubkey", help="The user SSH public key content")
parser.add_argument("--all-users", help="URL of the All-Users project")
parser.add_argument("--all-projects", help="URL of the All-Projects project")
parser.add_argument("--name", help="The name of the things to delete")
parser.add_argument("--scheme", help='the auth scheme to use when creating the admin user',
choices=['gerrit', 'keycloak-oauth'],
default='gerrit')
return parser.parse_args(argv)
main_do(usage(argv[1:]))
def main_do(args: argparse.Namespace) -> None:
if args.action == "create-admin-user":
if not args.email or not args.pubkey or not args.all_users:
raise RuntimeError("create-admin-user: needs email, pubkey and all-users argument")
if not args.scheme or args.scheme == 'gerrit':
scheme = scheme_gerrit
else:
scheme = scheme_keycloak
create_admin_user(Email(args.email), PubKey(args.pubkey), parse_url_or_path(args.all_users), scheme)
elif args.action == "migrate":
if not args.all_projects or not args.all_users:
raise RuntimeError("migrate: needs all-projects and all-users argument")
migrate(parse_url_or_path(args.all_projects), parse_url_or_path(args.all_users))
elif args.action == "cauth-to-keycloak":
if not args.all_users:
raise RuntimeError("cauth-to-keycloak: needs all-users argument")
migrate_to_keycloak(parse_url_or_path(args.all_users))
elif args.action == "delete-group":
if not args.all_users or not args.name:
raise RuntimeError("delete-group: needs all-users and name arguments")
delete_group(parse_path(args.all_users), args.name)
elif args.action == "delete-user":
if not args.all_users or not args.name or not args.email:
raise RuntimeError("delete-user: needs all-users, name and email arguments")
delete_user(parse_path(args.all_users), Username(args.name), Email(args.email))
Sub-modules
pynotedb.utils
Functions
def add_account_external_id(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) ‑> NoneType
-
Create an account external id
Expand source code
def add_account_external_id(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) -> None: """Create an account external id""" fetch_checkout(all_users, Branch("ids"), meta_external_ids) write_gerrit_username_id_files(all_users, username, account_id, scheme) git(all_users, ["add", "."]) commit_and_push(all_users, "Add externalId for user " + username, meta_external_ids)
def checkout(clone: Clone, branch: Branch, ref: Ref) ‑> NoneType
-
checkout a ref, raising an exception if it doesn't exists
Expand source code
def checkout(clone: Clone, branch: Branch, ref: Ref) -> None: """checkout a ref, raising an exception if it doesn't exists""" git(clone, ["checkout", "-B", branch, ref])
def commit_and_push(clone: Clone, message: str, ref: Ref) ‑> NoneType
-
Expand source code
def commit_and_push(clone: Clone, message: str, ref: Ref) -> None: try_action(lambda: git(clone, ["commit", "-a", "-m", message])) # TODO: check if ref is already pushed try_action(lambda: git(clone, ["push", "origin", "HEAD:" + ref]))
def create_admin_user(email: Email, pubkey: PubKey, all_users_url: Union[Url, pathlib.Path], scheme: ExternalScheme) ‑> NoneType
-
Ensure the admin user is created
Expand source code
def create_admin_user(email: Email, pubkey: PubKey, all_users_url: Union[Url, Path], scheme: ExternalScheme) -> None: """Ensure the admin user is created""" all_users = mk_clone(all_users_url) admin_ref = mk_user_ref("1") if not try_action(lambda: fetch(all_users, admin_ref)): # Add user to admin group admin_group_id = get_group_id(all_users, "Administrators") if not admin_group_id: raise RuntimeError("%s: Administrators group doesn't exists!" % all_users) admin_group_ref = mk_group_ref(admin_group_id[1]) if not try_action(lambda: fetch_checkout(all_users, Branch("group_admin"), admin_group_ref)): # For some reason, group ref can be AB/ABCD admin_group_ref = invert_ref_id(admin_group_ref) fetch_checkout(all_users, Branch("group_admin"), admin_group_ref) members_file = all_users / "members" if members_file.exists(): members = members_file.read_text().split('\n') else: members = [] if "1" not in members: members_file.write_text("\n".join(members + ["1", ""])) git(all_users, ["add", "members"]) commit_and_push(all_users, "Add admin user to Administrators group", admin_group_ref) # Create externalId if not try_action(lambda: fetch_checkout(all_users, Branch("external_ids"), meta_external_ids)): new_orphan(all_users, Branch("external_ids")) write_gerrit_username_id_files(all_users, "admin", "1", scheme) write_sha_obj(all_users, sha1sum("mailto:" + email), 1, [ "[externalId \"mailto:" + email + "\"]", "\taccountId = 1", "\temail = " + email, "" ]) git(all_users, ["add", "."]) commit_and_push(all_users, "Add admin external id", meta_external_ids) # Create user new_orphan(all_users, Branch("user_admin")) (all_users / "account.config").write_text("\n".join([ "[account]", "\tfullName = Administrator", "\tpreferredEmail = " + email, "" ])) (all_users / "authorized_keys").write_text(pubkey + "\n") git(all_users, ["add", "account.config", "authorized_keys"]) commit_and_push(all_users, "Initialize admin user", admin_ref)
def create_gerrit_external_id(filename: pathlib.Path) ‑> NoneType
-
Create a gerrit external id scheme for username scheme
Expand source code
def create_gerrit_external_id(filename: Path) -> None: """Create a gerrit external id scheme for username scheme""" filecontent = filename.read_text() is_username = [fileline for fileline in filecontent.split('\n') if fileline.startswith("[externalId \"username:")] if is_username: extid = is_username[0].split("\"")[1] _scheme, name = extid.split(":", 1) newfilename = filename.parent / sha1sum("gerrit:" + name) newfilename.write_text(filecontent.replace( "[externalId \"username:", "[externalId \"gerrit:"))
def delete_group(all_users_path: pathlib.Path, group: str) ‑> NoneType
-
Expand source code
def delete_group(all_users_path: Path, group: str) -> None: all_users = mk_clone(all_users_path) group_path_id = get_group_id(all_users, group) if not group_path_id: raise RuntimeError("%s: group doesn't exists!" % group) group_path, group_id = group_path_id git(all_users, ["push", "--delete", "origin", invert_ref_id(mk_group_ref(group_id))]) git(all_users, ["rm", str(group_path)]) commit_and_push(all_users, "Remove group " + group, meta_group_names)
def delete_user(all_users_path: pathlib.Path, user: Username, email: Email) ‑> NoneType
-
Expand source code
def delete_user(all_users_path: Path, user: Username, email: Email) -> None: all_users = mk_clone(all_users_path) # Remove external id for user_external_id_file in get_user_external_id(all_users, user, email): git(all_users, ["rm", str(user_external_id_file)]) commit_and_push(all_users, "Removing external id for user %s" % user, meta_external_ids) # Delete user ref user_ref = get_user_ref(all_users, user) if not user_ref: raise RuntimeError("%s: user doesn't exists!" % user) # TODO: delete group membership too? git(all_users, ["push", "--delete", "origin", user_ref])
def ext_id_match(headers: List[str], ext_id_file: pathlib.Path) ‑> bool
-
Expand source code
def ext_id_match(headers: List[str], ext_id_file: Path) -> bool: ext_id_file_content = ext_id_file.read_text().split('\n') return any(filter(lambda h: h in ext_id_file_content, headers))
def external_id_header(scheme: ExternalScheme, name: str) ‑> str
-
Expand source code
def external_id_header(scheme: ExternalScheme, name: str) -> str: return "[externalId \"%s:%s\"]" % (scheme, name)
def fetch(clone: Clone, ref: Ref) ‑> NoneType
-
fetch a ref
Expand source code
def fetch(clone: Clone, ref: Ref) -> None: """fetch a ref""" git(clone, ["fetch", "origin", ref])
def fetch_checkout(clone: Clone, branch: Branch, ref: Ref) ‑> NoneType
-
fetch a ref and check it out
Expand source code
def fetch_checkout(clone: Clone, branch: Branch, ref: Ref) -> None: """fetch a ref and check it out""" fetch(clone, ref) checkout(clone, branch, fetch_head)
def gerrit_to_kc_external_id(filename: pathlib.Path) ‑> NoneType
-
Expand source code
def gerrit_to_kc_external_id(filename: Path) -> None: filecontent = filename.read_text() is_username = [fileline for fileline in filecontent.split('\n') if fileline.startswith("[externalId \"gerrit:")] if is_username: extid = is_username[0].split("\"")[1] _scheme, name = extid.split(":", 1) newfilename = filename.parent / sha1sum(scheme_keycloak + ":" + name) newfilename.write_text(filecontent.replace( "[externalId \"gerrit:", "[externalId \"keycloak-oauth:"))
def get_group_id(all_users: Clone, group_name: str) ‑> Optional[Tuple[pathlib.Path, Uuid]]
-
Return the file path and uid of a group name
Expand source code
def get_group_id(all_users: Clone, group_name: str) -> Optional[Tuple[Path, Uuid]]: """Return the file path and uid of a group name""" fetch_checkout(all_users, Branch("group_names"), meta_group_names) for fn in filter(lambda fp: fp.is_file(), ls(all_users)): group_info = read_group_name_uid(fn) if group_info: name, uid = group_info if name == group_name: return (fn, uid) return None
def get_user_external_id(all_users: Clone, user: Username, email: Email) ‑> List[pathlib.Path]
-
Expand source code
def get_user_external_id(all_users: Clone, user: Username, email: Email) -> List[Path]: headers = list(map(lambda sn: external_id_header(sn[0], sn[1]), [(scheme_mail, email), (scheme_gerrit, user), (scheme_username, user)])) return [ext_id_file for ext_id_file in list_external_ids(all_users) if ext_id_match(headers, ext_id_file)]
def get_user_id(user_ref: Ref) ‑> str
-
Expand source code
def get_user_id(user_ref: Ref) -> str: return user_ref.split('/')[-1]
def get_user_ref(all_users: Clone, user: str) ‑> Optional[Ref]
-
Return the user git ref
Expand source code
def get_user_ref(all_users: Clone, user: str) -> Optional[Ref]: """Return the user git ref""" for user_ref in get_users_ref(all_users): fetch_checkout(all_users, Branch("user_" + get_user_id(user_ref)), user_ref) if read_user_name(all_users / "account.config") == user: return user_ref return None
def get_users_ref(all_users: Clone) ‑> Iterator[Ref]
-
Return the list of user git refs
Expand source code
def get_users_ref(all_users: Clone) -> Iterator[Ref]: """Return the list of user git refs""" return map(Ref, filter(lambda s: s.startswith("refs/users/") and s != "refs/users/self", map(lambda s: s and s.split()[-1], git_read(all_users, ["ls-remote"]).split('\n'))))
def git(clone: Clone, args: List[str]) ‑> NoneType
-
A convenient wrapper around git commands
Expand source code
def git(clone: Clone, args: List[str]) -> None: """A convenient wrapper around git commands""" execute(["git"] + args, cwd=clone)
def git_read(clone: Clone, args: List[str]) ‑> str
-
A convenient reader around git commands
Expand source code
def git_read(clone: Clone, args: List[str]) -> str: """A convenient reader around git commands""" return pread(["git"] + args, cwd=clone).decode('utf-8')
def invert_ref_id(ref: Ref) ‑> Ref
-
Invert a gerrit ref
>>> invert_ref_id("refs/groups/CD/ABCD") 'refs/groups/AB/ABCD'
Expand source code
def invert_ref_id(ref: Ref) -> Ref: """Invert a gerrit ref >>> invert_ref_id("refs/groups/CD/ABCD") 'refs/groups/AB/ABCD' """ r, g, _, i = ref.split('/') return Ref('/'.join([r, g, i[:2], i]))
def is_mine(directory: pathlib.Path) ‑> bool
-
Expand source code
def is_mine(directory: Path) -> bool: return os.stat(directory).st_uid == os.getuid()
def list_external_ids(all_users: Clone) ‑> List[pathlib.Path]
-
Expand source code
def list_external_ids(all_users: Clone) -> List[Path]: if not try_action(lambda: fetch_checkout(all_users, Branch("external_ids"), meta_external_ids)): return [] return list(filter(lambda fp: fp.is_file(), lsR(all_users)))
def lookup_sha_nest(root: Clone, sha: str, acc: int) ‑> Optional[int]
-
Count nesting level for a given sha
Expand source code
def lookup_sha_nest(root: Clone, sha: str, acc: int) -> Optional[int]: """Count nesting level for a given sha""" if sha == "": # The sha was not found return None if (root / sha).exists(): return acc return lookup_sha_nest(root / sha[:2], sha[2:], acc + 1)
def main() ‑> NoneType
-
The CLI entrypoint
Expand source code
def main() -> None: """The CLI entrypoint""" def usage(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="notedb-tools") parser.add_argument("action", choices=["create-admin-user", "migrate", "delete-group", "delete-user", "cauth-to-keycloak"]) parser.add_argument("--email", help="The user email address") parser.add_argument("--pubkey", help="The user SSH public key content") parser.add_argument("--all-users", help="URL of the All-Users project") parser.add_argument("--all-projects", help="URL of the All-Projects project") parser.add_argument("--name", help="The name of the things to delete") parser.add_argument("--scheme", help='the auth scheme to use when creating the admin user', choices=['gerrit', 'keycloak-oauth'], default='gerrit') return parser.parse_args(argv) main_do(usage(argv[1:]))
def main_do(args: argparse.Namespace) ‑> NoneType
-
Expand source code
def main_do(args: argparse.Namespace) -> None: if args.action == "create-admin-user": if not args.email or not args.pubkey or not args.all_users: raise RuntimeError("create-admin-user: needs email, pubkey and all-users argument") if not args.scheme or args.scheme == 'gerrit': scheme = scheme_gerrit else: scheme = scheme_keycloak create_admin_user(Email(args.email), PubKey(args.pubkey), parse_url_or_path(args.all_users), scheme) elif args.action == "migrate": if not args.all_projects or not args.all_users: raise RuntimeError("migrate: needs all-projects and all-users argument") migrate(parse_url_or_path(args.all_projects), parse_url_or_path(args.all_users)) elif args.action == "cauth-to-keycloak": if not args.all_users: raise RuntimeError("cauth-to-keycloak: needs all-users argument") migrate_to_keycloak(parse_url_or_path(args.all_users)) elif args.action == "delete-group": if not args.all_users or not args.name: raise RuntimeError("delete-group: needs all-users and name arguments") delete_group(parse_path(args.all_users), args.name) elif args.action == "delete-user": if not args.all_users or not args.name or not args.email: raise RuntimeError("delete-user: needs all-users, name and email arguments") delete_user(parse_path(args.all_users), Username(args.name), Email(args.email))
def migrate(all_projects_url: Union[Url, pathlib.Path], all_users_url: Union[Url, pathlib.Path]) ‑> NoneType
-
Migrate software factory notedb data from gerrit 2.x
Expand source code
def migrate(all_projects_url: Union[Url, Path], all_users_url: Union[Url, Path]) -> None: """Migrate software factory notedb data from gerrit 2.x""" # Ensure admin can push notedb ref all_projects = mk_clone(all_projects_url) git(all_projects, ["config", "-f", "project.config", "access.refs/*.push", "group Administrators", ".*group Administrators"]) commit_and_push(all_projects, "Enable admin to push refs", meta_config) # Update externalId to use `gerrit` scheme instead of `username` all_users = mk_clone(all_users_url) if list(map(create_gerrit_external_id, list_external_ids(all_users))): git(all_users, ["add", "."]) commit_and_push(all_users, "Update external id to gerrit scheme", meta_external_ids)
def migrate_to_keycloak(all_users_url: Union[Url, pathlib.Path]) ‑> NoneType
-
Migrate users from cauth to keycloak.
Expand source code
def migrate_to_keycloak(all_users_url: Union[Url, Path]) -> None: """Migrate users from cauth to keycloak.""" all_users = mk_clone(all_users_url) if list(map(gerrit_to_kc_external_id, list_external_ids(all_users))): # stage new files, deleted files git(all_users, ["add", "-u", "."]) git(all_users, ["add", "."]) commit_and_push(all_users, "Migrate auth scheme to keycloak", meta_external_ids)
def mk_clone(url: Union[Url, pathlib.Path]) ‑> Clone
-
Clone a project to ~/.cache/pynotedb/
Expand source code
def mk_clone(url: Union[Url, Path]) -> Clone: """Clone a project to ~/.cache/pynotedb/""" path = Path("~/.cache/pynotedb/" + strip_git_suffix(str(url)).split('/')[-1]).expanduser() if not path.exists(): path.mkdir(parents=True) execute(["git", "clone", str(url), str(path)]) else: execute(["git", "remote", "set-url", "origin", str(url)], cwd=path) return Clone(path)
def mk_group_ref(group: Uuid) ‑> Ref
-
Create a group ref
Expand source code
def mk_group_ref(group: Uuid) -> Ref: """Create a group ref""" return mk_ref("groups")(group)
def mk_ref(name: str) ‑> Callable[[str], Ref]
-
Expand source code
def mk_ref(name: str) -> Callable[[str], Ref]: def func(refname: str) -> Ref: return Ref("refs/" + name + "/" + mk_ref_id(refname)) return func
def mk_ref_id(refname: str) ‑> str
-
Create gerrit CD/ABCD name, refname must not be empty.
>>> mk_ref_id("1") '01/1' >>> mk_ref_id("41242") '42/41242'
Expand source code
def mk_ref_id(refname: str) -> str: """Create gerrit CD/ABCD name, refname must not be empty. >>> mk_ref_id("1") '01/1' >>> mk_ref_id("41242") '42/41242' """ refid = refname[-2:] if len(refname) > 1 else ("0" + refname[-1]) return refid + "/" + refname
def mk_user_ref(user: str) ‑> Ref
-
Create a user ref
>>> mk_user_ref("1") 'refs/users/01/1'
Expand source code
def mk_user_ref(user: str) -> Ref: """Create a user ref >>> mk_user_ref("1") 'refs/users/01/1' """ return mk_ref("users")(user)
def nest_sha(root: pathlib.Path, sha: str, nest: int) ‑> pathlib.Path
-
Split a sha for a given nesting level
Expand source code
def nest_sha(root: Path, sha: str, nest: int) -> Path: """Split a sha for a given nesting level""" if nest == 0: return root / sha return nest_sha(root / sha[:2], sha[2:], nest - 1)
def new_orphan(clone: Clone, branch: Branch) ‑> NoneType
-
create a new orphan commit
Expand source code
def new_orphan(clone: Clone, branch: Branch) -> None: """create a new orphan commit""" try_action(lambda: git(clone, ["branch", "-D", branch])) git(clone, ["checkout", "--orphan", branch]) git(clone, ["rm", "--cached", "-r", "--", "."]) git(clone, ["clean", "-d", "-f", "-x"])
def parse_path(url: str) ‑> pathlib.Path
-
Expand source code
def parse_path(url: str) -> Path: path = Path(url).resolve() if not path.exists() or not is_mine(path): raise RuntimeError("%s: expected writable directory" % url) return path
def parse_url(url: str) ‑> Url
-
Expand source code
def parse_url(url: str) -> Url: if urlparse(url).scheme not in ('http', 'https', 'git'): raise RuntimeError("%s: expected url" % url) return Url(url)
def parse_url_or_path(url: str) ‑> Union[Url, pathlib.Path]
-
Expand source code
def parse_url_or_path(url: str) -> Union[Url, Path]: try: return parse_url(url) except RuntimeError: return parse_path(url)
def read_group_name_uid(group_file: pathlib.Path) ‑> Optional[Tuple[str, Uuid]]
-
Return the name and uuid of a group config file
Expand source code
def read_group_name_uid(group_file: Path) -> Optional[Tuple[str, Uuid]]: """Return the name and uuid of a group config file""" name, uid = None, None for k, v in read_items(group_file.read_text().split('\n')): if k == "name": name = v elif k == "uuid": uid = v if name and uid: return (name, Uuid(uid)) return None
def read_items(lines: List[str]) ‑> List[Tuple[str, str]]
-
Read key values of git config file
>>> read_items(["[group]", " name = un name=avec ", "uuid=4242"]) [('name', 'un name=avec'), ('uuid', '4242')]
Expand source code
def read_items(lines: List[str]) -> List[Tuple[str, str]]: """Read key values of git config file >>> read_items(["[group]", " name = un name=avec ", "uuid=4242"]) [('name', 'un name=avec'), ('uuid', '4242')] """ return [(elems[0], elems[1]) for elems in map(lambda s: list(map(str.strip, s.split("=", 1))), lines) if len(elems) == 2]
def read_user_name(user_file: pathlib.Path) ‑> Optional[str]
-
Expand source code
def read_user_name(user_file: Path) -> Optional[str]: for k, v in read_items(user_file.read_text().split('\n')): if k == 'fullName': return v return None
def show_external_id(scheme: ExternalScheme, username: str, account_id: str) ‑> List[str]
-
Return an external id file content
Expand source code
def show_external_id(scheme: ExternalScheme, username: str, account_id: str) -> List[str]: """Return an external id file content""" return [ "[externalId \"" + scheme + ":" + username + "\"]", "\taccountId = " + account_id, "" ]
def strip_git_suffix(url: str) ‑> str
-
Expand source code
def strip_git_suffix(url: str) -> str: return url[:-4] if url.endswith(".git") else url
def write_gerrit_username_id_files(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) ‑> NoneType
-
Create an external id scheme for a given user
Expand source code
def write_gerrit_username_id_files(all_users: Clone, username: str, account_id: str, scheme: ExternalScheme) -> None: """Create an external id scheme for a given user""" # Lookup if the http external id is already created and get the current nest level http_sha = sha1sum(scheme_username + ":" + username) nest = lookup_sha_nest(all_users, http_sha, 0) if nest is None: nest = 1 write_sha_obj(all_users, http_sha, nest, show_external_id(scheme_username, username, account_id)) # Then write the desired external id scheme write_sha_obj(all_users, sha1sum(scheme + ":" + username), nest, show_external_id(scheme, username, account_id))
def write_obj(file_path: pathlib.Path, file_content: List[str]) ‑> NoneType
-
Write a file and ensure parent directory exists
Expand source code
def write_obj(file_path : Path, file_content : List[str]) -> None: """Write a file and ensure parent directory exists""" if not file_path.parent.exists(): file_path.parent.mkdir(parents=True) file_path.write_text("\n".join(file_content))
def write_sha_obj(root: pathlib.Path, sha: str, nest: int, file_content: List[str]) ‑> NoneType
-
Write a sha object by respecting a nest level
Expand source code
def write_sha_obj(root: Path, sha: str, nest: int, file_content: List[str]) -> None: """Write a sha object by respecting a nest level""" write_obj(nest_sha(root, sha, nest), file_content)