-
Notifications
You must be signed in to change notification settings - Fork 0
/
submodule.py
235 lines (197 loc) · 9.54 KB
/
submodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
import argparse
import configparser
import json
import pathlib
import shutil
import subprocess
import typing
import urllib.parse
GitModuleInfo = typing.Dict[str, typing.Union[str, pathlib.Path]] # map for .gitmodules section
GitModules = typing.Dict[str, GitModuleInfo] # canonical path -> GitModuleInfo
ZuulProject = typing.Dict[str, str] # zuul.project map
ZuulProjects = typing.Dict[str, ZuulProject] # zuul.projects map
def url2canonical_name(site: str) -> str:
"""Remove all schemas and ports from site and return canonical path
>>> url2canonical_name('ssh://site.example.com:29418/foo/bar')
'site.example.com/foo/bar'
>>> url2canonical_name('https://site.example.com/foo/')
'site.example.com/foo'
>>> url2canonical_name('https://site.example.com/')
'site.example.com'
"""
url = urllib.parse.urlparse(site)
# url.path contains leading slash
assert url.hostname is not None
return url.hostname + url.path.rstrip('/')
def resolve_submodule_url(submodule_url: str, repo_url: str) -> str:
"""Resolve submodule_url (can be absolute or relative) from repo_url. Return canonical project name.
>>> resolve_submodule_url('../../foo', 'ssh://site.example.com:29418/top/bar', )
'site.example.com/foo'
>>> resolve_submodule_url('../../foo1/foo2', 'ssh://site.example.com:29418/top/bar')
'site.example.com/foo1/foo2'
>>> resolve_submodule_url('../../foo1/foo2', 'ssh://site.example.com/')
Traceback (most recent call last):
...
ValueError: Relative submodule_url ../../foo1/foo2 is out of repo_url ssh://site.example.com/
>>> resolve_submodule_url('https://site.example.com/foo/bar', 'nonsignificant')
'site.example.com/foo/bar'
"""
if '://' in submodule_url:
return url2canonical_name(submodule_url)
assert(submodule_url.startswith('../'))
path = pathlib.PurePath(url2canonical_name(repo_url))
for dir_ in submodule_url.split('/'):
if dir_ == '..':
if path == path.parent:
raise ValueError(f"Relative submodule_url {submodule_url}"
f" is out of repo_url {repo_url}")
path = path.parent
else:
path = path / dir_
return str(path)
def get_remote_url(repopath: typing.Union[str, pathlib.Path],
remote: str = 'origin') -> str:
result = subprocess.run(
['git', '-C', repopath, 'remote', 'get-url', remote],
check=True, universal_newlines=True,
stdout=subprocess.PIPE)
return result.stdout
def parse_gitmodules(gitmodules: pathlib.Path) -> GitModules:
"""Return dict of 'canonical_name': 'module' mapping.
module is a dict describing submodule.
"""
cfg = configparser.ConfigParser()
read_ok = cfg.read(gitmodules)
if not read_ok: # failed to read file
return {}
modules = {}
remote = get_remote_url(gitmodules.parent)
for section in cfg.sections():
module: GitModuleInfo = \
{'path': cfg[section]['path'],
'abspath': (gitmodules.parent / cfg[section]['path']).resolve(),
'submodule': section.split()[-1].replace('"', '')}
branch = cfg.get(section, 'branch', fallback=None)
if branch is not None:
module['branch'] = branch
modules[resolve_submodule_url(cfg[section]['url'], remote)] = module
return modules
def split_modules(modules: GitModules,
projects: ZuulProjects) -> typing.Tuple[GitModules, GitModules]:
"""Return canonical names to replace and canonical names to be checked out
from remote"""
project_cnames = set(projects.keys())
module_cnames = set(modules.keys())
to_replace_cnames = project_cnames & module_cnames
to_clone_cnames = module_cnames - project_cnames
return ({cname: modules[cname] for cname in to_replace_cnames},
{cname: modules[cname] for cname in to_clone_cnames})
def print_split_modules(modules_to_replace: GitModules,
modules_to_clone: GitModules,
super_project: ZuulProject):
if len(modules_to_replace):
print(f"Following submodules of {super_project['canonical_name']}"
" will be replaced with zuul projects:")
for module_cname, module in modules_to_replace.items():
branch = f"branch {module['branch']}" if 'branch' in module else ''
print(f"* {module['abspath']} ⇒ {module_cname} {branch}")
if len(modules_to_clone):
print(f"Following submodules of {super_project['canonical_name']}"
f" will be cloned:")
for module_cname, module in modules_to_clone.items():
branch = f"branch {module['branch']}" if 'branch' in module else ''
print(f"* {module['abspath']} from {module_cname} {branch}")
def update_submodule(repo_path: typing.Union[str, pathlib.Path],
recursive: bool,
submodule_path: typing.Union[str, pathlib.Path] = None):
cmd = ['git', '-C', repo_path, 'submodule', 'update', '--init']
if recursive:
cmd.append('--recursive')
if submodule_path is not None:
cmd.extend(['--', submodule_path])
subprocess.run(cmd, check=True, universal_newlines=True)
def update_projects(projects: ZuulProjects, recursive: bool, dry_run=False, ):
for project in projects.values():
if not (pathlib.Path(project['src_dir']) / '.gitmodules').exists():
print(f"{project['canonical_name']}: no .gitmodules found")
continue
print(f"{project['canonical_name']}: cloning submodules", flush=True)
if dry_run:
continue
update_submodule(project['src_dir'], recursive)
def update_super_project(super_project: ZuulProject,
projects: ZuulProjects,
recursive: bool,
dry_run=False,
verbose=False):
"""Replace super project submodules with corresponding Zuul projects.
Others super project submodules are cloned from origin."""
modules = parse_gitmodules(pathlib.Path(super_project['src_dir']) / '.gitmodules')
if not modules:
print(f'No .gitmodules found in super project {super_project["src_dir"]}')
return
modules_to_replace, modules_to_clone = split_modules(modules, projects)
if verbose:
print_split_modules(modules_to_replace, modules_to_clone, super_project)
for module_cname, module in modules_to_replace.items():
src_dir = projects[module_cname]['src_dir']
src_absdir = pathlib.Path(src_dir).resolve()
module_abspath = pathlib.Path(module['abspath'])
branch = f"branch {module['branch']}" if 'branch' in module else ''
print(f'Replace submodule {module_abspath} with {src_dir} {branch}')
if not dry_run:
module_abspath.rmdir()
shutil.move(src_absdir, module_abspath)
if 'branch' not in module:
continue
print(f'Checkout submodule: {module_abspath}')
subprocess.run(['git', '-C', super_project['src_dir'],
'submodule', 'init', module['path']],
check=True)
subprocess.run(['git', '-C', super_project['src_dir'],
'submodule', 'absorbgitdirs', module['path']],
check=True)
# zuul checkouts the same branches for dependent projects as for main
# project's branch, so we have to checkout required branch
subprocess.run(['git', '-C', module_abspath, 'checkout', module['branch']],
check=True)
# If there are submodules within this submodule, they have to be updated
# without updating the submodule itself.
if recursive:
update_submodule(module_abspath, recursive)
for module_cname, module in modules_to_clone.items():
branch = f"branch {module['branch']}" if 'branch' in module else ''
# cloning can be time consuming, so lets flush printing
print(f"Cloning submodule {module['abspath']} from {module_cname} {branch}",
flush=True)
if dry_run:
continue
update_submodule(super_project['src_dir'], recursive, module['path'])
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('zuul_json', type=argparse.FileType(),
help='JSON file with zuul variable')
parser.add_argument('super_project',
help='super project canonical name (e.g. example.com/foo/bar')
parser.add_argument('--dry-run', action='store_true',
help="don't touch repositories")
parser.add_argument('--recursive', action='store_true',
help="update submodules recursively")
parser.add_argument('--verbose', action='store_true',
help="print more info")
return parser.parse_args()
def main():
args = parse_args()
zuul = json.load(args.zuul_json)
super_project = zuul['projects'][args.super_project]
# create projects that doesn't contain super project
projects = zuul['projects'].copy() # it's ok to do shallow copy here
projects.pop(super_project['canonical_name'])
# BUG: use case when projects' submodules must be updated from siblings projects
# is not supported
update_projects(projects, args.recursive, args.dry_run)
update_super_project(super_project, projects, args.recursive, args.dry_run, args.verbose)
if __name__ == '__main__':
main()