~aleteoryx/muditaos

ref: a7f3ae4daf5bf1be94743a9acb888016977c4cc8 muditaos/tools/download_asset.py -rw-r--r-- 14.2 KiB
a7f3ae4d — Bartosz Cichocki [EGD-7697] Fix BT names in all files 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/python3
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

'''
class to download assets listed in json file
additional classes can be added to i.e. just copy files from some other location
which is super simple as it uses json file for description

{
    "comment": "This is kiss structure - asset path & sha",         <-- random comment in file if you wish
    "assets": [
        {
            "name": "./fonts/bell/gt_pressura_regular_38.mpf",      <-- name of our file to download in repo
            "output": "fonts/gt_pressura_regular_38.mpf",           <-- output: where should be and how shall be called the file
            "ref": "fd168040c5d1216d457e6cf223e8ea9bb76bf7b"        <-- from what ref shall we download the file
        },
    ...
}

Github downloader essentially:
-> connects to api -> loads json -> check if file for `sha` exists locally -> downloads file if not exists -> copy file from cache
'''
from os import environ
import subprocess
import json
from functools import lru_cache
from tqdm import tqdm
from ghapi.all import GhApi
from pathlib import Path
from fastcore.net import HTTP404NotFoundError, HTTP403ForbiddenError
from base64 import b64decode
import logging


log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


class TqdmLoggingHandler(logging.Handler):
    '''
    required for pretty logs with tqdm
    '''
    def __init__(self, level=logging.NOTSET):
        super().__init__(level)

    def emit(self, record):
        try:
            msg = self.format(record)
            tqdm.write(msg)
            self.flush()
        except Exception:
            self.handleError(record)


log.addHandler(TqdmLoggingHandler())


def getToken():
    '''
    util to get github token for user from git config
    - try env first
    - then try gitconfig
    '''
    apitoken = environ.get('GITHUB_TOKEN')
    if apitoken is not None:
        return apitoken
    res = subprocess.Popen(["git", "config", "user.apitoken"], stdout=subprocess.PIPE).communicate()[0]
    apitoken = res[:-1].decode()
    return apitoken


def get_assets_json(name):
    with open(name) as f:
        j = json.load(f)
    return j


def verify_config(j):
    '''
    checks for required fields for json description file
    '''
    required = ['assets']
    for val in required:
        if val not in j:
            raise(RuntimeError(f"value '{val}' not found in config!"))


class GitOps:
    '''
    Simplest github download wrapper based on ghapi (passed by api)
    please see reference here: https://ghapi.fast.ai/fullapi.html
    '''
    def __init__(self, api: GhApi, cache_dir: str, install_dir, j: dict):
        self.api = api
        self.j = j
        self.cache = Path(cache_dir)
        self.cache.expanduser().mkdir(exist_ok=True)
        self.install_dir = Path(install_dir if install_dir is not None else '.')

    def get_cached(self, file_name, any_id):
        '''
        create catalog in downloads for file
            file_name - file to download as
            any_id - ideal git sha, `ref` by ghapi v3, if sha is not provided `master` is taken for repo access, id for release
        '''
        where = (self.cache / any_id / file_name).expanduser()
        if not where.parent.exists():
            where.parent.mkdir(parents=True)
        return where

    def create_download_data(self, val, ref):
        '''
        generate 'data' required for github api to download via get_content
        essentially:
            val - file to take, remove `./` if required - http paths doesn't have ./
            ref - git sha to use for download
        '''
        file_name = val['name']
        if './' == file_name[0:2]:
            file_name = file_name[2:]
        data = {"path": file_name, "ref": ref}
        return data

    def download_file_from_git(self, where, data):
        '''
        use ghapi to download
            where - where to download file to (please remember to download to cache)
            data - what to download
        '''
        path = self.api.repos.get_content(**data)
        with where.open("wb") as f:
            # TODO in generall - content says what encoding is used, this is simplification
            data = b64decode(path["content"])
            f.write(data)

    def download_file_from_release(self, id, name_out):
        '''
        This function:
            - gets file data via github api v3
            - then downloads it from the same link with forward by header:
                'application/octet-stream'
        '''
        cached = self.get_cached(name_out, str(id))
        where = self.install_dir / name_out
        if not cached.exists():
            oldheaders = self.api.headers['Accept']
            log.info(f"downloading {cached.absolute()}")
            data = self.api.repos.get_release_asset(id)
            # set headers to accept octet stream instead of ghapi
            self.api.headers['Accept'] = 'application/octet-stream'
            import requests
            response = requests.get(data['url'],
                                    headers={**self.api.headers},
                                    stream=True)
            if response.status_code != requests.codes.ok:
                raise RuntimeError(f"download error: {response.status_code} {response.content}")
            with cached.open('wb') as fd:
                with tqdm(total=data['size'], unit='kb', unit_scale=True) as p_bar:
                    for chunk in response.iter_content(chunk_size=1024):
                        fd.write(chunk)
                        p_bar.update(len(chunk))
            # restore headers
            self.api.headers['Accept'] = oldheaders
        self.copy_file(cached, where)

    def copy_file(self, what: Path, where: Path):
        '''
        if there is no path to `where` create -> then copy `what`
        '''
        import shutil
        where.parent.mkdir(exist_ok=True, parents=True)
        log.info(f'{what} -> {where}')
        shutil.copy(what, where)

    @lru_cache(maxsize=None)
    def _api_list_branches(self):
        return self.api.repos.list_branches()

    @lru_cache(maxsize=None)
    def _api_list_releases(self):
        return self.api.repos.list_releases()

    def fallback_ref(self):
        '''
        get master sha, cached as asking api visible time and there is no need to do so
        '''
        for val in self._api_list_branches():
            if val['name'] == 'master':
                log.debug(f"using {val['commit']['sha']} for master")
                return val['commit']['sha']
        raise RuntimeError('Master not found!')

    def download_json(self):
        '''
        download function which uses our json to download all files required
        -> check if file for sha exists -> if not: download -> copy where needed
        '''
        for idx, val in enumerate(tqdm(self.j['assets'])):
            data = None
            if 'name' not in val:
                raise RuntimeError(f'there is no name in json->assets on position {idx}')
            try:
                git_sha = val['ref'] if 'ref' in val else self.fallback_ref()
                data = self.create_download_data(val, git_sha)
                cached = self.get_cached(data['path'], git_sha)
                if not cached.exists():
                    log.info(f"downloading: {data} to: {str(cached)}")
                    self.download_file_from_git(cached, data)
                output = Path(val['output']) if 'output' in val else Path(val['name'])
                if output.is_absolute() and self.install_dir is not None:
                    raise RuntimeError("cant have absolute output with install dir...")
                if not output.is_absolute() and self.install_dir is not None:
                    log.debug("prepend install dir")
                    output = Path(self.install_dir) / output
                output.parent.mkdir(parents=True, exist_ok=True)
                if 'unpack' in val:
                    import tarfile
                    with tarfile.open(cached) as tar:
                        tar.extract(output.name, path=output.parent)
                        log.info(f'{cached}[{output.name}] -[untar]-> {output.parent / output.name}')
                else:
                    self.copy_file(cached, output)
            except HTTP404NotFoundError as ex:
                raise RuntimeError(f'file not found with: {data} err: {ex}')
            except HTTP403ForbiddenError as ex:
                # gh is messed up - if you get persistent error on this file, try renaming
                raise RuntimeError(f'something is wrong with: {data} err: {ex}')

    def _get_product_by_name(self, repository: str, product: str, version: str) -> str:
        '''
        We have naming convetion of releases:
        "$Product $Version"
        where:
            - product is product built i.e. ecoboot
            - version - is semantic version we use
        '''
        if "bell" in product.lower():
            product = "bell"
        elif "pure" in product.lower():
            product = "pure"
        else:
            log.error("we require short version in this script - either pure or bell")
            sys.exit(2)

        return f"{repository} {version}-{product}"


    def download_release(self, repo, name_in, name_out, version, product):
        release_requested = self._get_product_by_name(repo, product, version)
        release = None
        for rel in self._api_list_releases():
            # warning just taken first from partial match, should be done differently
            if release_requested in rel['name']:
                release = rel
                break
        if release is None:
            raise RuntimeError(f"Can't get release by release name: {release_requested} with args: {args}")
        log.info(f"Trying to download release from {args.repository} under name: {release_requested}")
        assets = release['assets']
        found = False
        for asset in assets:
            if asset['name'] == name_in:
                self.download_file_from_release(asset['id'], name_out)
                found = True
        if not found:
            raise(RuntimeError(f"asset not found {args.assetRepoName} and cant be saved as {args.assetOutName}"))

    def list_releases(self):
        '''
        list releases from repository selected in pretty format
        '''
        from tabulate import tabulate
        headers = ["tag", "name", "date", "pre-release"]
        table = [[r['tag_name'], r['name'], r['published_at'], r['prerelease']] for r in self._api_list_releases()]
        # print is used "by design" here - deliberately not using `logging` here
        print(tabulate(table, headers, tablefmt="github"))


def arguments():
    import argparse
    parser = argparse.ArgumentParser(description="download assets from repo, requires valid token in git config")
    subparsers = parser.add_subparsers(title='cmd', description="command to run", required=True, dest='cmd')
    cmd_git = subparsers.add_parser('github', description="download assets from github")
    cmd_git.add_argument('--owner', help="owner to take data from, in our case Mudita", default="mudita")
    cmd_git.add_argument('--repository', help='repository to take assets from', default='MuditaOSAssets')
    cmd_git.add_argument('--cache_dir', help='cache dir to store downloaded files', default='~/.mudita/')
    cmd_git.add_argument('--install_dir', help='optional install dir for path, default - relative to script', default=None)
    subparsers_git = cmd_git.add_subparsers(title='github cmd', description="util to run with github", dest='cmd_github')
    cmd_git_json = subparsers_git.add_parser('json', description="download assets by json file")
    cmd_git_json.add_argument('--json', help="json file with description what shall we load", required=True)
    cmd_git_list = subparsers_git.add_parser('list', description="list releases from github")
    cmd_git_release_download = subparsers_git.add_parser('download', description="download assets from github release")
    cmd_git_release_download.add_argument('--name_in', required=True, help="The name of asset to download from release package in repository")
    cmd_git_release_download.add_argument('--name_out', required=True, help="The output name to assign to downloaded asset after download")
    cmd_git_release_download.add_argument('--version', required=True, help="Asset version to download")
    cmd_git_release_download.add_argument('--product', required=True, help='Asset product to download, has to match any product we support at the moment')


    # cmd_git_list =
    subparsers.add_parser('local', description="local files copy")
    return parser.parse_args()


if __name__ == "__main__":
    try:
        args = arguments()
        log.info('download_assets.py')
        if args.cmd == 'github':
            log.debug('get token...')
            token = getToken()
            if args.cmd_github == 'json':
                log.debug('get config...')
                j = get_assets_json(args.json)
                log.debug('verify config...')
                verify_config(j)
                api = GhApi(owner=args.owner, token=token, repo=args.repository)
                downloader = GitOps(api, args.cache_dir, args.install_dir, j)
                downloader.download_json()
                log.info('downloader success')
            if args.cmd_github == 'list':
                api = GhApi(owner=args.owner, token=token, repo=args.repository)
                downloader = GitOps(api, args.cache_dir, args.install_dir, {})
                downloader.list_releases()
                log.info('downloader success')
            if args.cmd_github == 'download':
                if args.install_dir is None:
                    args.install_dir = Path('./')
                api = GhApi(owner=args.owner, token=token, repo=args.repository)
                downloader = GitOps(api, args.cache_dir, args.install_dir, {})
                downloader.download_release(args.repository, args.name_in, args.name_out, args.version, args.product)
                log.info('downloader success')
        elif args.cmd == 'local':
            raise RuntimeError('Not implemented')
        else:
            raise RuntimeError('Not implemented')
    except RuntimeError as ex:
        log.error(ex)
        log.error('downloader exit with error!')
        exit(1)