Last active
February 8, 2026 02:28
-
-
Save YuriyGuts/d4f8969733a57a5eb226 to your computer and use it in GitHub Desktop.
Clone all public and private repositories from a specific GitHub user or organization
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Clone all public and private repositories from a specific GitHub user or organization. | |
| Uses only the Python standard library (no pip dependencies required). | |
| Usage: github-clone-all [-h] [--org] [--token TOKEN] [--output OUTPUT] [--ssh] [--dry-run] [--skip-existing] [--include PATTERN] [--exclude PATTERN] [--private-only] [--public-only] | |
| [--parallel N] [--quiet] [--verbose] [--version] | |
| target | |
| positional arguments: | |
| target GitHub username or organization name | |
| options: | |
| -h, --help show this help message and exit | |
| --org, -o Treat target as organization (default: user) | |
| --token, -t TOKEN GitHub token (fallback: $GITHUB_TOKEN, $GH_TOKEN) | |
| --output, -d OUTPUT Output directory (default: current directory) | |
| --ssh Use SSH URLs instead of HTTPS | |
| --dry-run, -n Show what would be cloned without cloning | |
| --skip-existing Skip repos that already exist locally | |
| --include PATTERN Only clone repos matching regex | |
| --exclude PATTERN Skip repos matching regex | |
| --private-only Clone only private repositories | |
| --public-only Clone only public repositories | |
| --parallel, -j N Number of parallel clone operations (default: 1) | |
| --quiet, -q Suppress non-error output | |
| --verbose, -v Show detailed output | |
| --version show program's version number and exit | |
| Examples: | |
| github-clone-all octocat Clone all repos from user 'octocat' | |
| github-clone-all acme --org Clone all repos from org 'acme' | |
| github-clone-all octocat --dry-run Show what would be cloned | |
| github-clone-all octocat --include '^api-' Clone only repos starting with 'api-' | |
| github-clone-all octocat -j4 --ssh Clone in parallel and use SSH | |
| ----- Notes ----- | |
| For GitHub authentication, set up a personal access token (choose one approach): | |
| Option A: Classic Personal Access Token (might work better with organizations): | |
| 1. Go to GitHub → Settings → Developer settings → Personal access tokens → Tokens (classic). | |
| 2. Generate new token with the repo scope (full control of private repositories). | |
| Option B: Fine-grained Personal Access Token: | |
| 1. Go to GitHub → Settings → Developer settings → Personal access tokens → Fine-grained tokens. | |
| 2. Set "Repository access" to "All repositories" (or select specific ones). | |
| 3. Under "Repository permissions", add "Contents" and set it to "Read-only". | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from concurrent.futures import as_completed | |
| from dataclasses import dataclass | |
| from typing import Generator | |
| from typing import List | |
| from typing import Optional | |
| from urllib.error import HTTPError | |
| from urllib.error import URLError | |
| from urllib.request import Request | |
| from urllib.request import urlopen | |
| VERSION = "1.0.0" | |
| API_BASE = "https://api.github.com" | |
| CLONE_TIMEOUT = 300 # 5 minutes per repo | |
| class Colors: | |
| """ANSI color codes with TTY detection.""" | |
| def __init__(self): | |
| self._enabled = self._should_use_colors() | |
| if self._enabled and sys.platform == "win32": | |
| self._enabled = _init_windows_ansi() | |
| def _should_use_colors(self) -> bool: | |
| """Check if colors should be used.""" | |
| if os.environ.get("NO_COLOR"): | |
| return False | |
| if not sys.stdout.isatty(): | |
| return False | |
| return True | |
| def _code(self, code: str) -> str: | |
| """Return ANSI code if colors enabled, empty string otherwise.""" | |
| return code if self._enabled else "" | |
| @property | |
| def reset(self) -> str: | |
| return self._code("\033[0m") | |
| @property | |
| def bold(self) -> str: | |
| return self._code("\033[1m") | |
| @property | |
| def red(self) -> str: | |
| return self._code("\033[91m") | |
| @property | |
| def green(self) -> str: | |
| return self._code("\033[92m") | |
| @property | |
| def yellow(self) -> str: | |
| return self._code("\033[93m") | |
| @property | |
| def blue(self) -> str: | |
| return self._code("\033[94m") | |
| @property | |
| def cyan(self) -> str: | |
| return self._code("\033[96m") | |
| @property | |
| def dim(self) -> str: | |
| return self._code("\033[2m") | |
| def _init_windows_ansi() -> bool: | |
| """Enable ANSI escape codes on Windows 10+.""" | |
| try: | |
| import ctypes | |
| kernel32 = ctypes.windll.kernel32 | |
| # Get stdout handle | |
| handle = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE | |
| # Get current mode | |
| mode = ctypes.c_ulong() | |
| kernel32.GetConsoleMode(handle, ctypes.byref(mode)) | |
| # Enable virtual terminal processing | |
| ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 | |
| kernel32.SetConsoleMode(handle, mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) | |
| return True | |
| except Exception: | |
| return False | |
| # Global colors instance | |
| colors = Colors() | |
| @dataclass | |
| class Repository: | |
| """Represents a GitHub repository.""" | |
| name: str | |
| clone_url: str | |
| ssh_url: str | |
| private: bool | |
| archived: bool = False | |
| fork: bool = False | |
| @dataclass | |
| class CloneResult: | |
| """Result of a clone operation.""" | |
| repo: Repository | |
| success: bool | |
| message: str | |
| duration: float | |
| skipped: bool = False | |
| class GitHubAPIError(Exception): | |
| """GitHub API error.""" | |
| def __init__(self, message: str, status_code: Optional[int] = None): | |
| super().__init__(message) | |
| self.status_code = status_code | |
| class GitHubAPI: | |
| """GitHub API client using urllib (stdlib).""" | |
| def __init__(self, token: Optional[str] = None): | |
| self.token = token | |
| def _request(self, url: str, max_retries: int = 3) -> tuple: | |
| """Make HTTP request with retry logic. Returns (data, headers).""" | |
| headers = { | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": f"github-clone-all/{VERSION}", | |
| } | |
| if self.token: | |
| headers["Authorization"] = f"Bearer {self.token}" | |
| last_error = None | |
| for attempt in range(max_retries): | |
| try: | |
| req = Request(url, headers=headers) | |
| with urlopen(req, timeout=30) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| return data, dict(response.headers) | |
| except HTTPError as e: | |
| if e.code == 401: | |
| raise GitHubAPIError("Authentication failed. Check your token.", 401) | |
| elif e.code == 403: | |
| # Check for rate limiting | |
| if "rate limit" in str(e.read().decode("utf-8", errors="ignore")).lower(): | |
| raise GitHubAPIError("Rate limit exceeded. Use a token or wait.", 403) | |
| raise GitHubAPIError("Access forbidden. Check permissions.", 403) | |
| elif e.code == 404: | |
| raise GitHubAPIError("Target not found.", 404) | |
| last_error = e | |
| except URLError as e: | |
| last_error = e | |
| if attempt < max_retries - 1: | |
| time.sleep(1 * (attempt + 1)) # Exponential backoff | |
| raise GitHubAPIError(f"Request failed: {last_error}") | |
| def _parse_link_header(self, header: Optional[str]) -> dict: | |
| """Parse Link header for pagination.""" | |
| if not header: | |
| return {} | |
| links = {} | |
| for part in header.split(","): | |
| match = re.match(r'<([^>]+)>;\s*rel="([^"]+)"', part.strip()) | |
| if match: | |
| links[match.group(2)] = match.group(1) | |
| return links | |
| def _paginate(self, url: str) -> Generator[dict, None, None]: | |
| """Generator that yields all items across paginated responses.""" | |
| while url: | |
| data, headers = self._request(url) | |
| for item in data: | |
| yield item | |
| links = self._parse_link_header(headers.get("Link")) | |
| url = links.get("next") | |
| def _get_authenticated_user(self) -> Optional[str]: | |
| """Get the authenticated user's username, or None if not authenticated.""" | |
| if not self.token: | |
| return None | |
| data, _ = self._request(f"{API_BASE}/user") | |
| return data.get("login") | |
| def get_user_repos(self, username: str) -> List[Repository]: | |
| """Get all repositories for a user. | |
| If authenticated as the target user, uses /user/repos to include private repos. | |
| Otherwise, uses /users/{username}/repos which only returns public repos. | |
| """ | |
| # Check if we're fetching our own repos (to include private ones) | |
| authenticated_user = self._get_authenticated_user() | |
| if authenticated_user and authenticated_user.lower() == username.lower(): | |
| url = f"{API_BASE}/user/repos?affiliation=owner&per_page=100" | |
| else: | |
| url = f"{API_BASE}/users/{username}/repos?per_page=100" | |
| repos = [] | |
| for item in self._paginate(url): | |
| repos.append( | |
| Repository( | |
| name=item["name"], | |
| clone_url=item["clone_url"], | |
| ssh_url=item["ssh_url"], | |
| private=item["private"], | |
| archived=item.get("archived", False), | |
| fork=item.get("fork", False), | |
| ) | |
| ) | |
| return repos | |
| def get_org_repos(self, org: str) -> List[Repository]: | |
| """Get all repositories for an organization.""" | |
| url = f"{API_BASE}/orgs/{org}/repos?per_page=100" | |
| repos = [] | |
| for item in self._paginate(url): | |
| repos.append( | |
| Repository( | |
| name=item["name"], | |
| clone_url=item["clone_url"], | |
| ssh_url=item["ssh_url"], | |
| private=item["private"], | |
| archived=item.get("archived", False), | |
| fork=item.get("fork", False), | |
| ) | |
| ) | |
| return repos | |
| class RepoCloner: | |
| """Handles cloning repositories.""" | |
| def __init__( | |
| self, | |
| output_dir: str, | |
| use_ssh: bool = False, | |
| dry_run: bool = False, | |
| skip_existing: bool = False, | |
| quiet: bool = False, | |
| verbose: bool = False, | |
| parallel: bool = False, | |
| ): | |
| self.output_dir = output_dir | |
| self.use_ssh = use_ssh | |
| self.dry_run = dry_run | |
| self.skip_existing = skip_existing | |
| self.quiet = quiet | |
| self.verbose = verbose | |
| self.parallel = parallel | |
| def clone_repo(self, repo: Repository) -> CloneResult: | |
| """Clone a single repository.""" | |
| start_time = time.time() | |
| target_dir = os.path.join(self.output_dir, repo.name) | |
| url = repo.ssh_url if self.use_ssh else repo.clone_url | |
| # Check if already exists | |
| if os.path.exists(target_dir): | |
| if self.skip_existing: | |
| return CloneResult( | |
| repo=repo, | |
| success=True, | |
| message="already exists", | |
| duration=time.time() - start_time, | |
| skipped=True, | |
| ) | |
| else: | |
| return CloneResult( | |
| repo=repo, | |
| success=False, | |
| message="directory already exists", | |
| duration=time.time() - start_time, | |
| ) | |
| # Dry run | |
| if self.dry_run: | |
| return CloneResult( | |
| repo=repo, | |
| success=True, | |
| message="would clone", | |
| duration=time.time() - start_time, | |
| skipped=True, | |
| ) | |
| # Perform clone | |
| try: | |
| cmd = ["git", "clone"] | |
| if self.quiet: | |
| cmd.append("--quiet") | |
| cmd.extend([url, target_dir]) | |
| # Disable interactive prompts when cloning in parallel | |
| env = None | |
| if self.parallel: | |
| env = os.environ.copy() | |
| env["GIT_TERMINAL_PROMPT"] = "0" # Disable HTTPS credential prompts | |
| env["GIT_SSH_COMMAND"] = "ssh -o BatchMode=yes" # Disable SSH prompts | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=CLONE_TIMEOUT, | |
| env=env, | |
| ) | |
| if result.returncode == 0: | |
| return CloneResult( | |
| repo=repo, | |
| success=True, | |
| message="done", | |
| duration=time.time() - start_time, | |
| ) | |
| else: | |
| error_msg = result.stderr.strip() or "clone failed" | |
| return CloneResult( | |
| repo=repo, | |
| success=False, | |
| message=error_msg, | |
| duration=time.time() - start_time, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return CloneResult( | |
| repo=repo, | |
| success=False, | |
| message="timeout", | |
| duration=time.time() - start_time, | |
| ) | |
| except Exception as e: | |
| return CloneResult( | |
| repo=repo, | |
| success=False, | |
| message=str(e), | |
| duration=time.time() - start_time, | |
| ) | |
| def clone_repos( | |
| self, repos: List[Repository], workers: int = 1, reporter: "ProgressReporter" = None | |
| ) -> List[CloneResult]: | |
| """Clone multiple repositories, optionally in parallel.""" | |
| # Ensure output directory exists | |
| if not self.dry_run: | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| results = [] | |
| if workers == 1: | |
| # Sequential cloning | |
| for i, repo in enumerate(repos): | |
| if reporter: | |
| reporter.start_repo(i + 1, len(repos), repo) | |
| result = self.clone_repo(repo) | |
| results.append(result) | |
| if reporter: | |
| reporter.finish_repo(result) | |
| else: | |
| # Parallel cloning | |
| with ThreadPoolExecutor(max_workers=workers) as executor: | |
| future_to_repo = {} | |
| for i, repo in enumerate(repos): | |
| future = executor.submit(self.clone_repo, repo) | |
| future_to_repo[future] = (i, repo) | |
| completed = 0 | |
| for future in as_completed(future_to_repo): | |
| idx, repo = future_to_repo[future] | |
| completed += 1 | |
| result = future.result() | |
| results.append(result) | |
| if reporter: | |
| reporter.start_repo(completed, len(repos), repo) | |
| reporter.finish_repo(result) | |
| return results | |
| class ProgressReporter: | |
| """Handles progress output.""" | |
| def __init__(self, quiet: bool = False, verbose: bool = False): | |
| self.quiet = quiet | |
| self.verbose = verbose | |
| def header(self, target: str, is_org: bool): | |
| """Print header.""" | |
| if self.quiet: | |
| return | |
| target_type = "organization" if is_org else "user" | |
| print( | |
| f"{colors.bold}github-clone-all v{VERSION}{colors.reset} - Cloning from {target_type} '{colors.cyan}{target}{colors.reset}'" | |
| ) | |
| print() | |
| def fetching(self): | |
| """Print fetching message.""" | |
| if self.quiet: | |
| return | |
| print(f"Fetching repository list... ", end="", flush=True) | |
| def fetched(self, count: int): | |
| """Print fetched message.""" | |
| if self.quiet: | |
| return | |
| print(f"done ({colors.bold}{count}{colors.reset} repositories)") | |
| print() | |
| def start_repo(self, index: int, total: int, repo: Repository): | |
| """Print start of repo clone.""" | |
| if self.quiet: | |
| return | |
| width = len(str(total)) | |
| prefix = f"[{index:>{width}}/{total}]" | |
| private_marker = f" {colors.dim}(private){colors.reset}" if repo.private else "" | |
| print(f"{prefix} {repo.name}{private_marker}... ", end="", flush=True) | |
| def finish_repo(self, result: CloneResult): | |
| """Print result of repo clone.""" | |
| if self.quiet: | |
| return | |
| duration_str = f"({result.duration:.1f}s)" if self.verbose else "" | |
| if result.skipped: | |
| print( | |
| f"{colors.yellow}skipped{colors.reset} {colors.dim}{result.message}{colors.reset} {duration_str}" | |
| ) | |
| elif result.success: | |
| print(f"{colors.green}done{colors.reset} {duration_str}") | |
| else: | |
| print(f"{colors.red}failed{colors.reset} {colors.dim}{result.message}{colors.reset}") | |
| def summary(self, results: List[CloneResult]): | |
| """Print summary.""" | |
| if self.quiet: | |
| return | |
| total = len(results) | |
| cloned = sum(1 for r in results if r.success and not r.skipped) | |
| skipped = sum(1 for r in results if r.skipped) | |
| failed = sum(1 for r in results if not r.success and not r.skipped) | |
| print() | |
| print(f"{colors.bold}Summary:{colors.reset}") | |
| print(f" Total: {total}") | |
| print(f" Cloned: {colors.green}{cloned}{colors.reset}") | |
| if skipped > 0: | |
| print(f" Skipped: {colors.yellow}{skipped}{colors.reset}") | |
| if failed > 0: | |
| print(f" Failed: {colors.red}{failed}{colors.reset}") | |
| def error(self, message: str): | |
| """Print error message.""" | |
| print(f"{colors.red}Error:{colors.reset} {message}", file=sys.stderr) | |
| def warning(self, message: str): | |
| """Print warning message.""" | |
| if not self.quiet: | |
| print(f"{colors.yellow}Warning:{colors.reset} {message}", file=sys.stderr) | |
| def parse_args() -> argparse.Namespace: | |
| """Parse command-line arguments.""" | |
| parser = argparse.ArgumentParser( | |
| prog="github-clone-all", | |
| description="Clone all repositories from a GitHub user or organization.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s octocat Clone all repos from user 'octocat' | |
| %(prog)s acme --org Clone all repos from org 'acme' | |
| %(prog)s octocat --dry-run Show what would be cloned | |
| %(prog)s octocat --include '^api-' Clone only repos starting with 'api-' | |
| %(prog)s octocat -j4 --ssh Clone in parallel and use SSH | |
| """, | |
| ) | |
| parser.add_argument("target", help="GitHub username or organization name") | |
| parser.add_argument( | |
| "--org", "-o", action="store_true", help="Treat target as organization (default: user)" | |
| ) | |
| parser.add_argument("--token", "-t", help="GitHub token (fallback: $GITHUB_TOKEN, $GH_TOKEN)") | |
| parser.add_argument("--output", "-d", help="Output directory (default: current directory)") | |
| parser.add_argument("--ssh", action="store_true", help="Use SSH URLs instead of HTTPS") | |
| parser.add_argument( | |
| "--dry-run", "-n", action="store_true", help="Show what would be cloned without cloning" | |
| ) | |
| parser.add_argument( | |
| "--skip-existing", action="store_true", help="Skip repos that already exist locally" | |
| ) | |
| parser.add_argument("--include", metavar="PATTERN", help="Only clone repos matching regex") | |
| parser.add_argument("--exclude", metavar="PATTERN", help="Skip repos matching regex") | |
| parser.add_argument( | |
| "--private-only", action="store_true", help="Clone only private repositories" | |
| ) | |
| parser.add_argument("--public-only", action="store_true", help="Clone only public repositories") | |
| parser.add_argument( | |
| "--parallel", | |
| "-j", | |
| type=int, | |
| default=1, | |
| metavar="N", | |
| help="Number of parallel clone operations (default: 1)", | |
| ) | |
| parser.add_argument("--quiet", "-q", action="store_true", help="Suppress non-error output") | |
| parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output") | |
| parser.add_argument("--version", action="version", version=f"%(prog)s {VERSION}") | |
| return parser.parse_args() | |
| def filter_repos(repos: List[Repository], args: argparse.Namespace) -> List[Repository]: | |
| """Apply include/exclude/visibility filters to repository list.""" | |
| filtered = repos | |
| # Visibility filters | |
| if args.private_only: | |
| filtered = [r for r in filtered if r.private] | |
| elif args.public_only: | |
| filtered = [r for r in filtered if not r.private] | |
| # Include pattern | |
| if args.include: | |
| try: | |
| pattern = re.compile(args.include) | |
| filtered = [r for r in filtered if pattern.search(r.name)] | |
| except re.error as e: | |
| raise ValueError(f"Invalid include pattern: {e}") | |
| # Exclude pattern | |
| if args.exclude: | |
| try: | |
| pattern = re.compile(args.exclude) | |
| filtered = [r for r in filtered if not pattern.search(r.name)] | |
| except re.error as e: | |
| raise ValueError(f"Invalid exclude pattern: {e}") | |
| return filtered | |
| def check_git_installed() -> bool: | |
| """Check if git is installed and accessible.""" | |
| try: | |
| subprocess.run( | |
| ["git", "--version"], | |
| capture_output=True, | |
| check=True, | |
| ) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return False | |
| def get_token(args: argparse.Namespace) -> Optional[str]: | |
| """Get GitHub token from args or environment.""" | |
| if args.token: | |
| return args.token | |
| return os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN") | |
| def main() -> int: | |
| """Main entry point.""" | |
| args = parse_args() | |
| reporter = ProgressReporter(quiet=args.quiet, verbose=args.verbose) | |
| # Validate conflicting options | |
| if args.private_only and args.public_only: | |
| reporter.error("Cannot use both --private-only and --public-only") | |
| return 3 | |
| if args.quiet and args.verbose: | |
| reporter.error("Cannot use both --quiet and --verbose") | |
| return 3 | |
| # Check git is installed | |
| if not check_git_installed(): | |
| reporter.error("git is not installed or not in PATH") | |
| return 3 | |
| # Get token | |
| token = get_token(args) | |
| if args.private_only and not token: | |
| reporter.warning("--private-only requires authentication; private repos may not be visible") | |
| # Set output directory | |
| output_dir = args.output or "." | |
| # Print header | |
| reporter.header(args.target, args.org) | |
| # Fetch repositories | |
| reporter.fetching() | |
| api = GitHubAPI(token=token) | |
| try: | |
| if args.org: | |
| repos = api.get_org_repos(args.target) | |
| else: | |
| repos = api.get_user_repos(args.target) | |
| except GitHubAPIError as e: | |
| print() # Newline after "Fetching..." | |
| reporter.error(str(e)) | |
| if e.status_code == 401: | |
| return 4 | |
| elif e.status_code == 404: | |
| return 5 | |
| return 2 | |
| reporter.fetched(len(repos)) | |
| if not repos: | |
| reporter.warning("No repositories found") | |
| return 0 | |
| # Filter repositories | |
| try: | |
| filtered_repos = filter_repos(repos, args) | |
| except ValueError as e: | |
| reporter.error(str(e)) | |
| return 3 | |
| if not filtered_repos: | |
| reporter.warning("No repositories match the specified filters") | |
| return 0 | |
| if len(filtered_repos) != len(repos) and not args.quiet: | |
| print(f"Filtered to {colors.bold}{len(filtered_repos)}{colors.reset} repositories") | |
| print() | |
| # Clone repositories | |
| cloner = RepoCloner( | |
| output_dir=output_dir, | |
| use_ssh=args.ssh, | |
| dry_run=args.dry_run, | |
| skip_existing=args.skip_existing, | |
| quiet=args.quiet, | |
| verbose=args.verbose, | |
| parallel=args.parallel > 1, | |
| ) | |
| results = cloner.clone_repos(filtered_repos, workers=args.parallel, reporter=reporter) | |
| # Print summary | |
| reporter.summary(results) | |
| # Determine exit code | |
| failed = sum(1 for r in results if not r.success and not r.skipped) | |
| cloned = sum(1 for r in results if r.success and not r.skipped) | |
| if failed == 0: | |
| return 0 | |
| elif cloned > 0: | |
| return 1 | |
| else: | |
| return 2 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Wow thanks. Worked flawlessly. I had tried so many other scripts and tools but none of them worked.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Had to comment out lines 175-176 in order to get this to work, but it worked! Thanks so much for saving me a lot of time.