#!/usr/bin/env python3 """ Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved. For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md Download ecooboot.bin from repository @package globalFinder """ import argparse import git import json import os import requests import sys from tqdm import tqdm import time class Getter(object): '''Download latest ecooboot.bin/updater.bin images''' def __init__(self): self.host = 'https://api.github.com/repos' self.organisation = 'mudita' self.repo = '' self.apitoken = None self.ghLogin = None self.getGitRoot() self.getApiToken() self.getGHLogin() self.releases = [] self.workdir = "" def genRestPrefix(self): self.restPrefix = self.host self.restPrefix += '/' self.restPrefix += self.organisation self.restPrefix += '/' if self.repo == '': raise ValueError("Repository must be set") self.restPrefix += self.repo self.restPrefix += '/' return self.restPrefix def getGitRoot(self): 'Find git root directory' self.gitRepo = git.Repo(os.getcwd(), search_parent_directories=True) self.gitRoot = self.gitRepo.git.rev_parse("--show-toplevel") return self.gitRoot def getApiToken(self, args=None): if args is not None: if args.token is not None: self.apitoken = args.token return try: gitConfigReader = self.gitRepo.config_reader() self.apitoken = gitConfigReader.get_value("user", "apitoken") except: pass def getGHLogin(self, args=None): if args is not None: if args.login is not None: self.ghLogin = args.login return try: gitConfigReader = self.gitRepo.config_reader() self.ghLogin = gitConfigReader.get_value("user", "githublogin") except: pass def findWorkDir(self): '''Finds and sets current workdir (based on 'workdir' argument)''' if os.path.isabs(self.args.workdir): self.workdir = self.args.workdir else: self.workdir = self.getGitRoot() + "/" + self.args.workdir def getReleases(self, args): '''List only releases''' print(f"{sys._getframe().f_code.co_name}") self.args = args self.getGHLogin(args) self.getApiToken(args) request = self.genRestPrefix() + "releases" print (f"Request: {request}") headers = {'accept': 'application/vnd.github.v3+json'} page = 0 itemsOnPage = 100 count = itemsOnPage while count == itemsOnPage: page += 1 queryParams = {'per_page': itemsOnPage, 'page': page} response = requests.get(request, auth=(self.ghLogin, self.apitoken), headers=headers, params=queryParams) if response.status_code != requests.codes.ok: print("download error:", response.status_code) print("content:", response.content) sys.exit(1) items = json.loads(response.content) count = len(items) self.releases += items drafts = [] releases = [] for item in self.releases: if item['published_at'] is None: drafts.append(item) else: releases.append(item) releases.sort(key=lambda r: r['published_at'], reverse=True) self.releases=drafts self.releases.extend(releases) def listReleases(self, args): self.getReleases(args) print(" tag | name | date | pre-release") print("--------------------------------") for release in self.releases: print(release['tag_name'], release['name'], release['published_at'], release['prerelease'], sep=" | ") def getProductByName(self, repository: str, product: str, version: str) -> str: ''' We have naming convetion of releases: "$Product $Version" where: - product is product built i.e. ecoboot - version - is semantic version we use ''' if "bell" in product.lower(): product = "bell" elif "pure" in product.lower(): product = "pure" else: print("we require short version in this script - either pure or bell") sys.exit(2) return f"{repository} {version}-{product}" def downloadRelease(self, args): print(f"{sys._getframe().f_code.co_name} from: {args.repository}") self.getReleases(args) release_requested = self.getProductByName(args.repository, args.product, args.version) release = None for rel in self.releases: # warning just taken first from partial match, should be done differently if release_requested in rel['name']: release = rel break if release is None: raise RuntimeError(f"Can't get release by release name: {release_requested} with args: {args}") print(f"Trying to download release from {args.repository} under name: {release_requested}") assets = release['assets'] found = False for asset in assets: if asset['name'] == args.assetRepoName: self.downloadAsset(asset, args.assetOutName) found = True if not found: raise(RuntimeError(f"asset not found {args.assetRepoName} and cant be saved as {args.assetOutName}")) def downloadAsset(self, asset, outName): self.createWorkdir() print("name:", asset['name']) print("url:", asset['url']) print("size:", asset['size']) headers = {'accept': 'application/octet-stream'} response = requests.get(asset['url'], auth=(self.ghLogin, self.apitoken), headers=headers, stream=True) destination_file = self.args.workdir + "/" + outName print("downloading to:", destination_file) progres_bar = tqdm(total=asset['size'], unit='iB', unit_scale=True) with open(destination_file, 'wb') as fd: for chunk in response.iter_content(chunk_size=1024): progres_bar.update(len(chunk)) fd.write(chunk) if response.status_code != requests.codes.ok: print("download error:", response.status_code) print(response.content) sys.exit(2) def createWorkdir(self): from pathlib import Path Path(self.workdir).mkdir(parents=True, exist_ok=True) def run(self, args): self.args = args self.findWorkDir() print(self.args) def main(): getter = Getter() parser = argparse.ArgumentParser(description="Download ecooboot") parser.add_argument('--workdir', help="Directory where package is build", default="update") parser.add_argument('-t', '--token', help="GitHub security token " "by default read from `git config user.apitoken` " "Generate it on https://github.com/settings/tokens " "Set privileges to `repo` " "and store with command: `git config -add user.apitoken ", required=(getter.apitoken is None)) parser.add_argument('-l', '--login', help="GitHub login" "by default read from `git config user.githublogin`" "to add your login use:" "`git config -add user.githublogin `", required=(getter.ghLogin is None)) parser.add_argument('--repository', help="repository name from which assets will be downloaded") parser.set_defaults(func=getter.run) subparsers = parser.add_subparsers(title="commands", description="available commands") listReleases_args = subparsers.add_parser('list', aliases=['ll'], description="List public releases") listReleases_args.set_defaults(func=getter.listReleases) getReleases_args = subparsers.add_parser('download', aliases=['dw'], description="Download Release based on tag or the latest") getReleases_args.set_defaults(func=getter.downloadRelease) getReleases_args.add_argument('--assetRepoName', help="Asset name in repository", required=True) getReleases_args.add_argument('--assetOutName', help="Asset output name", required=True) getReleases_args.add_argument('--version', help="Asset version", required=True) getReleases_args.add_argument('--product', help="Asset product", required=True) args = parser.parse_args() getter.repo = args.repository getter.workdir = args.workdir args.func(args) if __name__ == "__main__": main()