Source code for seutil.GitHubUtils

import math
import traceback
from datetime import datetime
from time import sleep
from typing import *

from github import Github, RateLimitExceededException
from github.GithubException import GithubException
from github.NamedUser import NamedUser
from github.Repository import Repository

from . import _config
from .LoggingUtils import LoggingUtils
from .BashUtils import BashUtils


[docs]class GitHubUtils: logger = LoggingUtils.get_logger("GitHubUtils", LoggingUtils.DEBUG) GITHUB_SEARCH_ITEMS_MAX = 1000 try: DEFAULT_ACCESS_TOKEN = _config.get_config("github_access_token") DEFAULT_GITHUB_OBJECT = Github(DEFAULT_ACCESS_TOKEN, per_page=100) except: DEFAULT_ACCESS_TOKEN = None DEFAULT_GITHUB_OBJECT = None logger.info("Fail to get github_access_token from config file. Using GitHubUtils APIs will require compulsory input access_token") # end try
[docs] @classmethod def get_github(cls, access_token: str = None) -> Github: if access_token is None: return cls.DEFAULT_GITHUB_OBJECT else: return Github(access_token)
[docs] class wait_rate_limit: """ Wait for rate limit of the github accessor. For use with "with". Use the default github accessor if no argument is given. """ DEFAULT_GITHUB_OBJECT = None logger = None def __init__(self, github: Github = DEFAULT_GITHUB_OBJECT): self.github = github return def __enter__(self): if self.github is None: self.github = self.DEFAULT_GITHUB_OBJECT # end if # Check rate limit rate_limit_remain, rate_limit = self.github.rate_limiting if rate_limit_remain <= 1: self.logger.debug("Rate limit {} / {}".format(rate_limit_remain, rate_limit)) rate_limit_reset_time = datetime.fromtimestamp(self.github.rate_limiting_resettime) rate_limit_wait_seconds = math.ceil((rate_limit_reset_time - datetime.now()).total_seconds()) + 1 if rate_limit_wait_seconds > 0: self.logger.warning("Rate limit will recover at: {}, will wait for {} seconds.".format(rate_limit_reset_time, rate_limit_wait_seconds)) sleep(rate_limit_wait_seconds) self.logger.warning("Rate limit recovered") # end if # end if return self.github def __exit__(self, type, value, tb): return
# end class wait_rate_limit.DEFAULT_GITHUB_OBJECT = DEFAULT_GITHUB_OBJECT wait_rate_limit.logger = logger T = TypeVar("T")
[docs] @classmethod def ensure_github_api_call(cls, call: Callable[[Github], T], github: Github = DEFAULT_GITHUB_OBJECT, max_retry_times: int = float("inf")) -> T: retry_times = 0 while True: try: with cls.wait_rate_limit(github) as g: return call(g) # end with except (GithubException, RateLimitExceededException) as e: if e.status == 422: cls.logger.warning("Validation Error. Will not retry.") raise else: cls.logger.warning("Unexpected exception during api call: {}".format(traceback.format_exc())) retry_times += 1 if retry_times > max_retry_times: cls.logger.warning("Exceeding max retry times {}".format(max_retry_times)) raise # end if retry_wait_time = min(retry_times * 30, 600) cls.logger.warning("Will wait {} seconds before retry {}".format(retry_wait_time, retry_times)) sleep(retry_wait_time)
# end try # end while
[docs] @classmethod def search_repos(cls, q: str = "", sort: str = "stars", order: str = "desc", is_allow_fork: bool = False, max_num_repos: int = GITHUB_SEARCH_ITEMS_MAX, github: Github = DEFAULT_GITHUB_OBJECT, max_retry_times: int = float("inf"), *_, **qualifiers) -> List[Repository]: """ Searches the repos by querying GitHub API v3. :return: a list of full names of the repos match the query. """ cls.logger.debug("Search for repos with query {}, sort {}, order {}".format(q, sort, order)) repos = list() num_repos = 0 repos_iterator = iter(github.search_repositories(q, sort, order, **qualifiers)) while True: try: repo = cls.ensure_github_api_call(lambda g: next(repos_iterator), github, max_retry_times) # Check fork if not is_allow_fork: if repo.fork: continue # end if, if repos.append(repo) num_repos += 1 # Check number if num_repos >= max_num_repos: break # end if except StopIteration: break except: cls.logger.warning("Unknown exception: {}".format(traceback.format_exc())) cls.logger.warning("Returning partial results") break # end try except # end while if num_repos < max_num_repos: cls.logger.info("Got {}/{} repos".format(num_repos, max_num_repos)) else: cls.logger.info("Got {}/{} repos".format(num_repos, max_num_repos)) # end if return repos
[docs] @classmethod def search_users(cls, q: str = "", sort: str = "repositories", order: str = "desc", max_num_users: int = GITHUB_SEARCH_ITEMS_MAX, github: Github = DEFAULT_GITHUB_OBJECT, max_retry_times: int = float("inf"), *_, **qualifiers) -> List[NamedUser]: """ Searches the users by querying GitHub API v3. :return: a list of usernames (login) of the users match the query. """ cls.logger.debug("Search for users with query {}, sort {}, order {}".format(q, sort, order)) users = list() num_users = 0 users_iterator = iter(github.search_users(q, sort, order, **qualifiers)) while True: try: user = cls.ensure_github_api_call(lambda g: next(users_iterator), github, max_retry_times) users.append(user) num_users += 1 # Check number if num_users >= max_num_users: break # end if except StopIteration: break except: cls.logger.warning("Unknown exception: {}".format(traceback.format_exc())) cls.logger.warning("Returning partial results.") break # end try except # end while if num_users < max_num_users: cls.logger.warning("Got {}/{} users".format(num_users, max_num_users)) else: cls.logger.info("Got {}/{} users".format(num_users, max_num_users)) # end if return users
[docs] @classmethod def search_repos_of_language(cls, language: str, max_num_repos: int = float("inf"), is_allow_fork: bool = False, max_retry_times: int = float("inf"), strategies: List[str] = None) -> List[Repository]: """ Searches for all the repos of the language. :return: a list of full names of matching repos. """ if strategies is None: strategies = ["search_repos", "search_users"] # end if # Check supported strategies supported_strategies = ["search_repos", "search_users", "enum_users"] for strategy in strategies: assert strategy in supported_strategies, strategy # end for names_repos = dict() try: # Strategy 1: search repos (limited to 1000) strategy = "search_repos" if strategy in strategies: cls.logger.info("Using strategy {}".format(strategy)) new_repos = cls.search_repos("language:{}".format(language), is_allow_fork=is_allow_fork, max_retry_times=max_retry_times, max_num_repos=max_num_repos) for repo in new_repos: names_repos[repo.full_name] = repo # end for cls.logger.warning("Progress {}/{} repos.".format(len(names_repos), max_num_repos)) if len(names_repos) >= max_num_repos: return list(names_repos.values()) # end if # end if # Strategy 2: search users (~37000?) strategy = "search_users" if strategy in strategies: cls.logger.info("Using strategy {}".format(strategy)) s_users = set() # s_users = s_users.union([u.login for u in cls.search_users("language:{}".format(language), sort="repositories", max_retry_times=max_retry_times)]) s_users = s_users.union([u.login for u in cls.search_users("language:{}".format(language), sort="followers", max_retry_times=max_retry_times)]) # s_users = s_users.union([u.login for u in cls.search_users("language:{}".format(language), sort="joined", max_retry_times=max_retry_times)]) users_count = 0 total_users_count = len(s_users) for user in s_users: try: new_repos = cls.search_repos("language:{} user:{}".format(language, user), is_allow_fork=is_allow_fork, max_retry_times=max_retry_times) except GithubException as e: cls.logger.warning("Cannot get the repos of user {}".format(user)) continue # end try for repo in new_repos: names_repos[repo.full_name] = repo # end for users_count += 1 cls.logger.debug("Progress {}/{} repos, {}/{} users.".format(len(names_repos), max_num_repos, users_count, total_users_count)) if len(names_repos) >= max_num_repos: return list(names_repos.values()) # end if # end for # end if # Strategy 3: enum users (?) strategy = "enum_users" if strategy in strategies: cls.logger.warning("Strategy {} is not implemented yet.".format(strategy)) cls.logger.warning("Nothing happens.") # end if except KeyboardInterrupt as e: cls.logger.warning("Interrupted. Returning partial results.") finally: cls.logger.warning("Got {}/{} repos.".format(len(names_repos), max_num_repos)) return list(names_repos.values())
[docs] @classmethod def is_url_valid_git_repo(cls, url: str) -> bool: if BashUtils.run(f"git ls-remote {url}").return_code == 0: return True else: return False