Skip to content

Instantly share code, notes, and snippets.

@YuriyGuts
Last active February 8, 2026 02:28
Show Gist options
  • Select an option

  • Save YuriyGuts/d4f8969733a57a5eb226 to your computer and use it in GitHub Desktop.

Select an option

Save YuriyGuts/d4f8969733a57a5eb226 to your computer and use it in GitHub Desktop.
Clone all public and private repositories from a specific GitHub user or organization
#!/usr/bin/env python3
"""
Clone all public and private repositories from a specific GitHub user or organization.
Uses only the Python standard library (no pip dependencies required).
Usage: github-clone-all [-h] [--org] [--token TOKEN] [--output OUTPUT] [--ssh] [--dry-run] [--skip-existing] [--include PATTERN] [--exclude PATTERN] [--private-only] [--public-only]
[--parallel N] [--quiet] [--verbose] [--version]
target
positional arguments:
target GitHub username or organization name
options:
-h, --help show this help message and exit
--org, -o Treat target as organization (default: user)
--token, -t TOKEN GitHub token (fallback: $GITHUB_TOKEN, $GH_TOKEN)
--output, -d OUTPUT Output directory (default: current directory)
--ssh Use SSH URLs instead of HTTPS
--dry-run, -n Show what would be cloned without cloning
--skip-existing Skip repos that already exist locally
--include PATTERN Only clone repos matching regex
--exclude PATTERN Skip repos matching regex
--private-only Clone only private repositories
--public-only Clone only public repositories
--parallel, -j N Number of parallel clone operations (default: 1)
--quiet, -q Suppress non-error output
--verbose, -v Show detailed output
--version show program's version number and exit
Examples:
github-clone-all octocat Clone all repos from user 'octocat'
github-clone-all acme --org Clone all repos from org 'acme'
github-clone-all octocat --dry-run Show what would be cloned
github-clone-all octocat --include '^api-' Clone only repos starting with 'api-'
github-clone-all octocat -j4 --ssh Clone in parallel and use SSH
----- Notes -----
For GitHub authentication, set up a personal access token (choose one approach):
Option A: Classic Personal Access Token (might work better with organizations):
1. Go to GitHub → Settings → Developer settings → Personal access tokens → Tokens (classic).
2. Generate new token with the repo scope (full control of private repositories).
Option B: Fine-grained Personal Access Token:
1. Go to GitHub → Settings → Developer settings → Personal access tokens → Fine-grained tokens.
2. Set "Repository access" to "All repositories" (or select specific ones).
3. Under "Repository permissions", add "Contents" and set it to "Read-only".
"""
import argparse
import json
import os
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from dataclasses import dataclass
from typing import Generator
from typing import List
from typing import Optional
from urllib.error import HTTPError
from urllib.error import URLError
from urllib.request import Request
from urllib.request import urlopen
VERSION = "1.0.0"
API_BASE = "https://api.github.com"
CLONE_TIMEOUT = 300 # 5 minutes per repo
class Colors:
"""ANSI color codes with TTY detection."""
def __init__(self):
self._enabled = self._should_use_colors()
if self._enabled and sys.platform == "win32":
self._enabled = _init_windows_ansi()
def _should_use_colors(self) -> bool:
"""Check if colors should be used."""
if os.environ.get("NO_COLOR"):
return False
if not sys.stdout.isatty():
return False
return True
def _code(self, code: str) -> str:
"""Return ANSI code if colors enabled, empty string otherwise."""
return code if self._enabled else ""
@property
def reset(self) -> str:
return self._code("\033[0m")
@property
def bold(self) -> str:
return self._code("\033[1m")
@property
def red(self) -> str:
return self._code("\033[91m")
@property
def green(self) -> str:
return self._code("\033[92m")
@property
def yellow(self) -> str:
return self._code("\033[93m")
@property
def blue(self) -> str:
return self._code("\033[94m")
@property
def cyan(self) -> str:
return self._code("\033[96m")
@property
def dim(self) -> str:
return self._code("\033[2m")
def _init_windows_ansi() -> bool:
"""Enable ANSI escape codes on Windows 10+."""
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Get stdout handle
handle = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
# Get current mode
mode = ctypes.c_ulong()
kernel32.GetConsoleMode(handle, ctypes.byref(mode))
# Enable virtual terminal processing
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
kernel32.SetConsoleMode(handle, mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
return True
except Exception:
return False
# Global colors instance
colors = Colors()
@dataclass
class Repository:
"""Represents a GitHub repository."""
name: str
clone_url: str
ssh_url: str
private: bool
archived: bool = False
fork: bool = False
@dataclass
class CloneResult:
"""Result of a clone operation."""
repo: Repository
success: bool
message: str
duration: float
skipped: bool = False
class GitHubAPIError(Exception):
"""GitHub API error."""
def __init__(self, message: str, status_code: Optional[int] = None):
super().__init__(message)
self.status_code = status_code
class GitHubAPI:
"""GitHub API client using urllib (stdlib)."""
def __init__(self, token: Optional[str] = None):
self.token = token
def _request(self, url: str, max_retries: int = 3) -> tuple:
"""Make HTTP request with retry logic. Returns (data, headers)."""
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": f"github-clone-all/{VERSION}",
}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
last_error = None
for attempt in range(max_retries):
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode("utf-8"))
return data, dict(response.headers)
except HTTPError as e:
if e.code == 401:
raise GitHubAPIError("Authentication failed. Check your token.", 401)
elif e.code == 403:
# Check for rate limiting
if "rate limit" in str(e.read().decode("utf-8", errors="ignore")).lower():
raise GitHubAPIError("Rate limit exceeded. Use a token or wait.", 403)
raise GitHubAPIError("Access forbidden. Check permissions.", 403)
elif e.code == 404:
raise GitHubAPIError("Target not found.", 404)
last_error = e
except URLError as e:
last_error = e
if attempt < max_retries - 1:
time.sleep(1 * (attempt + 1)) # Exponential backoff
raise GitHubAPIError(f"Request failed: {last_error}")
def _parse_link_header(self, header: Optional[str]) -> dict:
"""Parse Link header for pagination."""
if not header:
return {}
links = {}
for part in header.split(","):
match = re.match(r'<([^>]+)>;\s*rel="([^"]+)"', part.strip())
if match:
links[match.group(2)] = match.group(1)
return links
def _paginate(self, url: str) -> Generator[dict, None, None]:
"""Generator that yields all items across paginated responses."""
while url:
data, headers = self._request(url)
for item in data:
yield item
links = self._parse_link_header(headers.get("Link"))
url = links.get("next")
def _get_authenticated_user(self) -> Optional[str]:
"""Get the authenticated user's username, or None if not authenticated."""
if not self.token:
return None
data, _ = self._request(f"{API_BASE}/user")
return data.get("login")
def get_user_repos(self, username: str) -> List[Repository]:
"""Get all repositories for a user.
If authenticated as the target user, uses /user/repos to include private repos.
Otherwise, uses /users/{username}/repos which only returns public repos.
"""
# Check if we're fetching our own repos (to include private ones)
authenticated_user = self._get_authenticated_user()
if authenticated_user and authenticated_user.lower() == username.lower():
url = f"{API_BASE}/user/repos?affiliation=owner&per_page=100"
else:
url = f"{API_BASE}/users/{username}/repos?per_page=100"
repos = []
for item in self._paginate(url):
repos.append(
Repository(
name=item["name"],
clone_url=item["clone_url"],
ssh_url=item["ssh_url"],
private=item["private"],
archived=item.get("archived", False),
fork=item.get("fork", False),
)
)
return repos
def get_org_repos(self, org: str) -> List[Repository]:
"""Get all repositories for an organization."""
url = f"{API_BASE}/orgs/{org}/repos?per_page=100"
repos = []
for item in self._paginate(url):
repos.append(
Repository(
name=item["name"],
clone_url=item["clone_url"],
ssh_url=item["ssh_url"],
private=item["private"],
archived=item.get("archived", False),
fork=item.get("fork", False),
)
)
return repos
class RepoCloner:
"""Handles cloning repositories."""
def __init__(
self,
output_dir: str,
use_ssh: bool = False,
dry_run: bool = False,
skip_existing: bool = False,
quiet: bool = False,
verbose: bool = False,
parallel: bool = False,
):
self.output_dir = output_dir
self.use_ssh = use_ssh
self.dry_run = dry_run
self.skip_existing = skip_existing
self.quiet = quiet
self.verbose = verbose
self.parallel = parallel
def clone_repo(self, repo: Repository) -> CloneResult:
"""Clone a single repository."""
start_time = time.time()
target_dir = os.path.join(self.output_dir, repo.name)
url = repo.ssh_url if self.use_ssh else repo.clone_url
# Check if already exists
if os.path.exists(target_dir):
if self.skip_existing:
return CloneResult(
repo=repo,
success=True,
message="already exists",
duration=time.time() - start_time,
skipped=True,
)
else:
return CloneResult(
repo=repo,
success=False,
message="directory already exists",
duration=time.time() - start_time,
)
# Dry run
if self.dry_run:
return CloneResult(
repo=repo,
success=True,
message="would clone",
duration=time.time() - start_time,
skipped=True,
)
# Perform clone
try:
cmd = ["git", "clone"]
if self.quiet:
cmd.append("--quiet")
cmd.extend([url, target_dir])
# Disable interactive prompts when cloning in parallel
env = None
if self.parallel:
env = os.environ.copy()
env["GIT_TERMINAL_PROMPT"] = "0" # Disable HTTPS credential prompts
env["GIT_SSH_COMMAND"] = "ssh -o BatchMode=yes" # Disable SSH prompts
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=CLONE_TIMEOUT,
env=env,
)
if result.returncode == 0:
return CloneResult(
repo=repo,
success=True,
message="done",
duration=time.time() - start_time,
)
else:
error_msg = result.stderr.strip() or "clone failed"
return CloneResult(
repo=repo,
success=False,
message=error_msg,
duration=time.time() - start_time,
)
except subprocess.TimeoutExpired:
return CloneResult(
repo=repo,
success=False,
message="timeout",
duration=time.time() - start_time,
)
except Exception as e:
return CloneResult(
repo=repo,
success=False,
message=str(e),
duration=time.time() - start_time,
)
def clone_repos(
self, repos: List[Repository], workers: int = 1, reporter: "ProgressReporter" = None
) -> List[CloneResult]:
"""Clone multiple repositories, optionally in parallel."""
# Ensure output directory exists
if not self.dry_run:
os.makedirs(self.output_dir, exist_ok=True)
results = []
if workers == 1:
# Sequential cloning
for i, repo in enumerate(repos):
if reporter:
reporter.start_repo(i + 1, len(repos), repo)
result = self.clone_repo(repo)
results.append(result)
if reporter:
reporter.finish_repo(result)
else:
# Parallel cloning
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_repo = {}
for i, repo in enumerate(repos):
future = executor.submit(self.clone_repo, repo)
future_to_repo[future] = (i, repo)
completed = 0
for future in as_completed(future_to_repo):
idx, repo = future_to_repo[future]
completed += 1
result = future.result()
results.append(result)
if reporter:
reporter.start_repo(completed, len(repos), repo)
reporter.finish_repo(result)
return results
class ProgressReporter:
"""Handles progress output."""
def __init__(self, quiet: bool = False, verbose: bool = False):
self.quiet = quiet
self.verbose = verbose
def header(self, target: str, is_org: bool):
"""Print header."""
if self.quiet:
return
target_type = "organization" if is_org else "user"
print(
f"{colors.bold}github-clone-all v{VERSION}{colors.reset} - Cloning from {target_type} '{colors.cyan}{target}{colors.reset}'"
)
print()
def fetching(self):
"""Print fetching message."""
if self.quiet:
return
print(f"Fetching repository list... ", end="", flush=True)
def fetched(self, count: int):
"""Print fetched message."""
if self.quiet:
return
print(f"done ({colors.bold}{count}{colors.reset} repositories)")
print()
def start_repo(self, index: int, total: int, repo: Repository):
"""Print start of repo clone."""
if self.quiet:
return
width = len(str(total))
prefix = f"[{index:>{width}}/{total}]"
private_marker = f" {colors.dim}(private){colors.reset}" if repo.private else ""
print(f"{prefix} {repo.name}{private_marker}... ", end="", flush=True)
def finish_repo(self, result: CloneResult):
"""Print result of repo clone."""
if self.quiet:
return
duration_str = f"({result.duration:.1f}s)" if self.verbose else ""
if result.skipped:
print(
f"{colors.yellow}skipped{colors.reset} {colors.dim}{result.message}{colors.reset} {duration_str}"
)
elif result.success:
print(f"{colors.green}done{colors.reset} {duration_str}")
else:
print(f"{colors.red}failed{colors.reset} {colors.dim}{result.message}{colors.reset}")
def summary(self, results: List[CloneResult]):
"""Print summary."""
if self.quiet:
return
total = len(results)
cloned = sum(1 for r in results if r.success and not r.skipped)
skipped = sum(1 for r in results if r.skipped)
failed = sum(1 for r in results if not r.success and not r.skipped)
print()
print(f"{colors.bold}Summary:{colors.reset}")
print(f" Total: {total}")
print(f" Cloned: {colors.green}{cloned}{colors.reset}")
if skipped > 0:
print(f" Skipped: {colors.yellow}{skipped}{colors.reset}")
if failed > 0:
print(f" Failed: {colors.red}{failed}{colors.reset}")
def error(self, message: str):
"""Print error message."""
print(f"{colors.red}Error:{colors.reset} {message}", file=sys.stderr)
def warning(self, message: str):
"""Print warning message."""
if not self.quiet:
print(f"{colors.yellow}Warning:{colors.reset} {message}", file=sys.stderr)
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
prog="github-clone-all",
description="Clone all repositories from a GitHub user or organization.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s octocat Clone all repos from user 'octocat'
%(prog)s acme --org Clone all repos from org 'acme'
%(prog)s octocat --dry-run Show what would be cloned
%(prog)s octocat --include '^api-' Clone only repos starting with 'api-'
%(prog)s octocat -j4 --ssh Clone in parallel and use SSH
""",
)
parser.add_argument("target", help="GitHub username or organization name")
parser.add_argument(
"--org", "-o", action="store_true", help="Treat target as organization (default: user)"
)
parser.add_argument("--token", "-t", help="GitHub token (fallback: $GITHUB_TOKEN, $GH_TOKEN)")
parser.add_argument("--output", "-d", help="Output directory (default: current directory)")
parser.add_argument("--ssh", action="store_true", help="Use SSH URLs instead of HTTPS")
parser.add_argument(
"--dry-run", "-n", action="store_true", help="Show what would be cloned without cloning"
)
parser.add_argument(
"--skip-existing", action="store_true", help="Skip repos that already exist locally"
)
parser.add_argument("--include", metavar="PATTERN", help="Only clone repos matching regex")
parser.add_argument("--exclude", metavar="PATTERN", help="Skip repos matching regex")
parser.add_argument(
"--private-only", action="store_true", help="Clone only private repositories"
)
parser.add_argument("--public-only", action="store_true", help="Clone only public repositories")
parser.add_argument(
"--parallel",
"-j",
type=int,
default=1,
metavar="N",
help="Number of parallel clone operations (default: 1)",
)
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress non-error output")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
parser.add_argument("--version", action="version", version=f"%(prog)s {VERSION}")
return parser.parse_args()
def filter_repos(repos: List[Repository], args: argparse.Namespace) -> List[Repository]:
"""Apply include/exclude/visibility filters to repository list."""
filtered = repos
# Visibility filters
if args.private_only:
filtered = [r for r in filtered if r.private]
elif args.public_only:
filtered = [r for r in filtered if not r.private]
# Include pattern
if args.include:
try:
pattern = re.compile(args.include)
filtered = [r for r in filtered if pattern.search(r.name)]
except re.error as e:
raise ValueError(f"Invalid include pattern: {e}")
# Exclude pattern
if args.exclude:
try:
pattern = re.compile(args.exclude)
filtered = [r for r in filtered if not pattern.search(r.name)]
except re.error as e:
raise ValueError(f"Invalid exclude pattern: {e}")
return filtered
def check_git_installed() -> bool:
"""Check if git is installed and accessible."""
try:
subprocess.run(
["git", "--version"],
capture_output=True,
check=True,
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_token(args: argparse.Namespace) -> Optional[str]:
"""Get GitHub token from args or environment."""
if args.token:
return args.token
return os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN")
def main() -> int:
"""Main entry point."""
args = parse_args()
reporter = ProgressReporter(quiet=args.quiet, verbose=args.verbose)
# Validate conflicting options
if args.private_only and args.public_only:
reporter.error("Cannot use both --private-only and --public-only")
return 3
if args.quiet and args.verbose:
reporter.error("Cannot use both --quiet and --verbose")
return 3
# Check git is installed
if not check_git_installed():
reporter.error("git is not installed or not in PATH")
return 3
# Get token
token = get_token(args)
if args.private_only and not token:
reporter.warning("--private-only requires authentication; private repos may not be visible")
# Set output directory
output_dir = args.output or "."
# Print header
reporter.header(args.target, args.org)
# Fetch repositories
reporter.fetching()
api = GitHubAPI(token=token)
try:
if args.org:
repos = api.get_org_repos(args.target)
else:
repos = api.get_user_repos(args.target)
except GitHubAPIError as e:
print() # Newline after "Fetching..."
reporter.error(str(e))
if e.status_code == 401:
return 4
elif e.status_code == 404:
return 5
return 2
reporter.fetched(len(repos))
if not repos:
reporter.warning("No repositories found")
return 0
# Filter repositories
try:
filtered_repos = filter_repos(repos, args)
except ValueError as e:
reporter.error(str(e))
return 3
if not filtered_repos:
reporter.warning("No repositories match the specified filters")
return 0
if len(filtered_repos) != len(repos) and not args.quiet:
print(f"Filtered to {colors.bold}{len(filtered_repos)}{colors.reset} repositories")
print()
# Clone repositories
cloner = RepoCloner(
output_dir=output_dir,
use_ssh=args.ssh,
dry_run=args.dry_run,
skip_existing=args.skip_existing,
quiet=args.quiet,
verbose=args.verbose,
parallel=args.parallel > 1,
)
results = cloner.clone_repos(filtered_repos, workers=args.parallel, reporter=reporter)
# Print summary
reporter.summary(results)
# Determine exit code
failed = sum(1 for r in results if not r.success and not r.skipped)
cloned = sum(1 for r in results if r.success and not r.skipped)
if failed == 0:
return 0
elif cloned > 0:
return 1
else:
return 2
if __name__ == "__main__":
sys.exit(main())
@cor
Copy link

cor commented Feb 8, 2020

Had to comment out lines 175-176 in order to get this to work, but it worked! Thanks so much for saving me a lot of time.

@yasanglass
Copy link

Wow thanks. Worked flawlessly. I had tried so many other scripts and tools but none of them worked.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment