diff --git a/script/github-pr-status b/script/github-pr-status new file mode 100755 index 0000000000..b3b0463165 --- /dev/null +++ b/script/github-pr-status @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +GitHub PR Analyzer for zed-industries/zed repository +Downloads all PRs and groups them by first assignee with status, open date, and last updated date. +""" + +import urllib.request +import urllib.parse +import urllib.error +import json +from datetime import datetime +from collections import defaultdict +import sys +import os + +# GitHub API configuration +GITHUB_API_BASE = "https://api.github.com" +REPO_OWNER = "zed-industries" +REPO_NAME = "zed" +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") + +def make_github_request(url, params=None): + """Make a request to GitHub API with proper headers and pagination support.""" + if params: + url_parts = list(urllib.parse.urlparse(url)) + query = dict(urllib.parse.parse_qsl(url_parts[4])) + query.update(params) + url_parts[4] = urllib.parse.urlencode(query) + url = urllib.parse.urlunparse(url_parts) + + req = urllib.request.Request(url) + req.add_header("Accept", "application/vnd.github.v3+json") + req.add_header("User-Agent", "GitHub-PR-Analyzer") + + if GITHUB_TOKEN: + req.add_header("Authorization", f"token {GITHUB_TOKEN}") + + try: + response = urllib.request.urlopen(req) + return response + except urllib.error.URLError as e: + print(f"Error making request to {url}: {e}") + return None + except urllib.error.HTTPError as e: + print(f"HTTP error {e.code} for {url}: {e.reason}") + return None + +def fetch_all_prs(): + """Fetch all PRs from the repository using pagination.""" + prs = [] + page = 1 + per_page = 100 + + print("Fetching PRs from GitHub API...") + + while True: + url = f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/pulls" + params = { + "state": "open", + "sort": "updated", + "direction": "desc", + "per_page": per_page, + "page": page + } + + response = make_github_request(url, params) + if not response: + break + + try: + data = response.read().decode('utf-8') + page_prs = json.loads(data) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + print(f"Error parsing response: {e}") + break + + if not page_prs: + break + + prs.extend(page_prs) + print(f"Fetched page {page}: {len(page_prs)} PRs (Total: {len(prs)})") + + # Check if we have more pages + link_header = response.getheader('Link', '') + if 'rel="next"' not in link_header: + break + + page += 1 + + print(f"Total PRs fetched: {len(prs)}") + return prs + +def format_date_as_days_ago(date_string): + """Format ISO date string as 'X days ago'.""" + if not date_string: + return "N/A days ago" + + try: + dt = datetime.fromisoformat(date_string.replace('Z', '+00:00')) + now = datetime.now(dt.tzinfo) + days_diff = (now - dt).days + + if days_diff == 0: + return "today" + elif days_diff == 1: + return "1 day ago" + else: + return f"{days_diff} days ago" + except: + return "N/A days ago" + +def get_first_assignee(pr): + """Get the first assignee from a PR, or return 'Unassigned' if none.""" + assignees = pr.get('assignees', []) + if assignees: + return assignees[0].get('login', 'Unknown') + return 'Unassigned' + +def get_pr_status(pr): + """Determine if PR is draft or ready for review.""" + if pr.get('draft', False): + return "Draft" + return "Ready" + +def analyze_prs(prs): + """Group PRs by first assignee and organize the data.""" + grouped_prs = defaultdict(list) + + for pr in prs: + assignee = get_first_assignee(pr) + + pr_info = { + 'number': pr['number'], + 'title': pr['title'], + 'status': get_pr_status(pr), + 'state': pr['state'], + 'created_at': format_date_as_days_ago(pr['created_at']), + 'updated_at': format_date_as_days_ago(pr['updated_at']), + 'updated_at_raw': pr['updated_at'], + 'url': pr['html_url'], + 'author': pr['user']['login'] + } + + grouped_prs[assignee].append(pr_info) + + # Sort PRs within each group by update date (newest first) + for assignee in grouped_prs: + grouped_prs[assignee].sort(key=lambda x: x['updated_at_raw'], reverse=True) + + return dict(grouped_prs) + +def print_pr_report(grouped_prs): + """Print formatted report of PRs grouped by assignee.""" + print(f"OPEN PR REPORT FOR {REPO_OWNER}/{REPO_NAME}") + print() + + # Sort assignees alphabetically, but put 'Unassigned' last + assignees = sorted(grouped_prs.keys()) + if 'Unassigned' in assignees: + assignees.remove('Unassigned') + assignees.append('Unassigned') + + total_prs = sum(len(prs) for prs in grouped_prs.values()) + print(f"Total Open PRs: {total_prs}") + print() + + for assignee in assignees: + prs = grouped_prs[assignee] + assignee_display = f"@{assignee}" if assignee != 'Unassigned' else assignee + print(f"assigned to {assignee_display} ({len(prs)} PRs):") + + for pr in prs: + print(f"- {pr['author']}: [{pr['title']}]({pr['url']}) opened:{pr['created_at']} updated:{pr['updated_at']}") + + print() + +def save_json_report(grouped_prs, filename="pr_report.json"): + """Save the PR data to a JSON file.""" + try: + with open(filename, 'w') as f: + json.dump(grouped_prs, f, indent=2) + print(f"📄 Report saved to {filename}") + except Exception as e: + print(f"Error saving JSON report: {e}") + +def main(): + """Main function to orchestrate the PR analysis.""" + print("GitHub PR Analyzer") + print("==================") + + if not GITHUB_TOKEN: + print("⚠️ Warning: GITHUB_TOKEN not set. You may hit rate limits.") + print(" Set GITHUB_TOKEN environment variable for authenticated requests.") + print() + + # Fetch all PRs + prs = fetch_all_prs() + + if not prs: + print("❌ Failed to fetch PRs. Please check your connection and try again.") + sys.exit(1) + + # Analyze and group PRs + grouped_prs = analyze_prs(prs) + + # Print report + print_pr_report(grouped_prs) + +if __name__ == "__main__": + main()