import requests import pandas as pd from datetime import datetime import gradio as gr import pickle from sentence_transformers import SentenceTransformer, util from wordcloud import WordCloud import matplotlib.pyplot as plt import base64 from io import BytesIO import json from openai import OpenAI from graphviz import Source import re from PIL import Image import os import uuid # Fixed directory for images IMAGE_DIR = "/content/images" #to save the diagram png images os.makedirs(IMAGE_DIR, exist_ok=True) # Create the directory if it doesn't exist # Constants for GitHub API GITHUB_API_URL = "https://api.github.com/search/repositories" ACCESS_TOKEN = os.getenv("github_pat") if not ACCESS_TOKEN: raise ValueError("Missing GitHub Personal Access Token.") HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"} # Access OpenAI API key from secrets OPENAI_API_KEY = os.getenv("openai_key") if not OPENAI_API_KEY: raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.") # Initialize OpenAI client once client = OpenAI(api_key=OPENAI_API_KEY) # Global variable for allowed extensions ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"] # Load topic embeddings with open("github_topics_embeddings.pkl", "rb") as f: topic_data = pickle.load(f) topics = topic_data["topics"] embeddings = topic_data["embeddings"] discovered_repos = [] # Format: ["owner/repo_name", ...] # Function to search for similar topics def search_similar_topics(input_text): if not input_text.strip(): return "Enter topics to see suggestions." try: model = SentenceTransformer('all-MiniLM-L6-v2') query_embedding = model.encode(input_text, convert_to_tensor=True) similarities = util.pytorch_cos_sim(query_embedding, embeddings) top_indices = similarities[0].argsort(descending=True)[:10] # Top 5 matches return ", ".join([topics[i] for i in top_indices]) except Exception as e: return f"Error in generating suggestions: {str(e)}" # Function to fetch repositories with pagination def search_repositories(query, sort="stars", order="desc", total_repos=10): all_repos = [] per_page = 100 if total_repos > 100 else total_repos total_pages = (total_repos // per_page) + 1 for page in range(1, total_pages + 1): params = { "q": query, "sort": sort, "order": order, "per_page": per_page, "page": page, } response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params) print(f"Query: {query}, Status Code: {response.status_code}") print(f"Response: {response.json()}") if response.status_code != 200: raise Exception(f"GitHub API error: {response.status_code} {response.text}") items = response.json().get("items", []) if not items: break all_repos.extend(items) if len(all_repos) >= total_repos: break return all_repos[:total_repos] # Function to calculate additional metrics def calculate_additional_metrics(repo): created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ") updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ") days_since_creation = (datetime.utcnow() - created_date).days days_since_update = (datetime.utcnow() - updated_date).days star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0 fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0 hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No" hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable" rising_score = ((star_velocity * 10) + (repo["forks_count"] * 0.2) + (repo.get("watchers_count", 0) * 0.3) + (1 / (days_since_update + 1) * 20) - (repo["open_issues_count"] * 0.01)) legacy_score = (repo["stargazers_count"] * 0.6) + \ (repo["forks_count"] * 0.3) + \ (repo.get("watchers_count", 0) * 0.1) - \ (repo["open_issues_count"] * 0.05) owner, repo_name = repo["owner"]["login"], repo["name"] repo_details_url = f"https://api.github.com/repos/{owner}/{repo_name}" response = requests.get(repo_details_url, headers=HEADERS) if response.status_code == 200: repo_details = response.json() actual_watchers = repo_details.get("subscribers_count", 0) else: actual_watchers = 0 watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0 return { "Rising Score": round(rising_score, 2), "Legacy Score": round(legacy_score, 2), "Star Velocity (Stars/Day)": round(star_velocity, 2), "Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2), "Watchers": actual_watchers, "Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2), "Language": repo.get("language", "N/A"), "Topics": ", ".join(repo.get("topics", [])), "Hidden Gem": hidden_gem, "Hidden Gem Trend": hidden_gem_trend, "Open Issues": repo["open_issues_count"], "Created At": repo["created_at"], "Last Updated": repo["pushed_at"], "days_since_creation": round(days_since_creation, 2), "days_since_update": round(days_since_update, 2), "URL": repo["html_url"], } # Repository Discovery Interface def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order): global discovered_repos if not topics.strip() and not start_date.strip(): # If neither topics nor start_date are provided, return a validation error return pd.DataFrame(), "Please provide at least a topic or a start date." topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()] stars_range = (stars_min, stars_max) forks_range = (forks_min, forks_max) df = pd.DataFrame() all_repos_data = [] try: # If no topics are provided, fetch repositories by filters only if not topics_list: query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" if start_date.strip(): query += f" created:>{start_date.strip()}" if language_filter: query += f" language:{language_filter}" # Fetch repositories repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) else: for topic in topics_list: # Construct query query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" if start_date.strip(): query += f" created:>{start_date.strip()}" if language_filter: query += f" language:{language_filter}" # Fetch repositories repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) #Add repository to discovered_repos discovered_repos.append(f"{repo['owner']['login']}/{repo['name']}") if not all_repos_data: return pd.DataFrame(), "No repositories found matching the criteria." # Remove duplicates from discovered_repos discovered_repos = list(set(discovered_repos)) # Create DataFrame df = pd.DataFrame(all_repos_data) except Exception as e: print(f"Error: {e}") return pd.DataFrame(), f"Error fetching repositories: {str(e)}" csv_file = None if not df.empty: csv_file = "discovered_repositories.csv" df.to_csv(csv_file, index=False) return df, csv_file #return df, gr.File.update(visible=True, value=csv_file) #Organization Watch Interface def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos): try: org_list = [org.strip() for org in org_names.split(",") if org.strip()] if not org_list: return pd.DataFrame(), "Enter at least one organization." all_repos_data = [] for org in org_list: # Query repositories for each organization query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}" if language_filter: query += f" language:{language_filter}" repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) if not all_repos_data: return pd.DataFrame(), "No repositories found for the specified organizations." # Create DataFrame df = pd.DataFrame(all_repos_data) csv_file = "organization_repositories.csv" df.to_csv(csv_file, index=False) return df, csv_file except Exception as e: print(f"Error in fetch_org_repositories: {e}") return pd.DataFrame(), f"Error: {str(e)}" # Function to fetch discovered repositories for the dropdown def get_discovered_repos(): global discovered_repos return discovered_repos def process_readme(owner, repo, branch): # Fetch README content from the specified branch #url = f"https://raw.githubusercontent.com/{owner}/{repo}/master/README.md" url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md" response = requests.get(url, headers=HEADERS) if response.status_code == 200: readme_content = response.text else: #return "Failed to fetch README content.", "", "", None return f"Failed to fetch README content from branch {branch}.", "", "", None # Process README content with OpenAI MODEL = "gpt-4o-mini" completion = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."}, {"role": "user", "content": f""" Perform the following tasks on the following README file: 1. Extract the top 25 most important keywords from the text only. 2. Extract named entities (e.g., people, organizations, technologies). 3. Summarize the content in one paragraph. Return the results in the following JSON format: {{ "keywords": ["keyword1", "keyword2", ...], "entities": ["entity1", "entity2", ...], "summary": "A concise summary of the README." }} README file: {readme_content} """} ], response_format={"type": "json_object"} ) result = completion.choices[0].message.content result_json = json.loads(result) keywords = ", ".join(result_json["keywords"]) entities = ", ".join(result_json["entities"]) summary = result_json["summary"] # Generate word cloud wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') return keywords, entities, summary, plt # Function to get all branches of a repository def get_branches(owner, repo): url = f"https://api.github.com/repos/{owner}/{repo}/branches" response = requests.get(url, headers=HEADERS) if response.status_code == 200: branches = [branch["name"] for branch in response.json()] return branches else: return [] # Function to get the default branch of a repository def get_default_branch(owner, repo): url = f"https://api.github.com/repos/{owner}/{repo}" response = requests.get(url, headers=HEADERS) if response.status_code == 200: repo_data = response.json() return repo_data["default_branch"] else: return None def fetch_files(owner, repo, path=""): # Base URL for the GitHub API url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" if path else f"https://api.github.com/repos/{owner}/{repo}/contents" response = requests.get(url, headers=HEADERS) if response.status_code != 200: return f"Failed to fetch files: {response.status_code}", [] files = [] for item in response.json(): if item["type"] == "file": # Only add files # Use the globally defined allowed extensions if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS): files.append({ "name": item["name"], "path": item["path"], "download_url": item["download_url"] }) elif item["type"] == "dir": # Recursively fetch files in subdirectories sub_files = fetch_files(owner, repo, item["path"]) files.extend(sub_files) return files # Function to fetch the content of a specific file def fetch_file_content(owner, repo, branch, file_path): file_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}" response = requests.get(file_url) if response.status_code == 200: return response.text else: return f"Failed to fetch file content: {response.status_code}" # Function to query GPT-4o-mini def ask_code_question(code_content, question): if not code_content.strip(): return "No code content available to analyze." if not question.strip(): return "Please enter a question about the code." # Construct the prompt prompt = f""" Here is a Python file from a GitHub repository: {code_content} Please answer the following question about this file: - {question} """ try: # Query GPT-4o-mini response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant skilled in understanding code."}, {"role": "user", "content": prompt} ] ) # Extract and return GPT's response return response.choices[0].message.content.strip() except Exception as e: return f"Error querying GPT-4o-mini: {str(e)}" from graphviz import Source import re # Function to generate and clean Graphviz diagrams using GPT-4o-mini def generate_dot_code_from_code(code_content, diagram_type): if not code_content.strip(): return "No code content available to analyze." # Construct the prompt dynamically based on diagram type prompt = f""" Here is some Python code from a GitHub repository: {code_content} Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable. Don't include any other text. Don't provide any other explainatory commentry. Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs. """ #Ensure that the output of the code starts with "@startuml" and Ends with "@enduml". try: # Query GPT-4o-mini response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"}, {"role": "user", "content": prompt} ] ) raw_dot_code = response.choices[0].message.content.strip() validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets pattern = r"digraph\b[\s\S]*?^\}" match = re.search(pattern, validated_dot_code,re.MULTILINE | re.DOTALL) if match: validated_dot_code = match.group(0) # Extract the matched content else: return "Failed to extract valid Graphviz code." return validated_dot_code except Exception as e: return f"Error querying GPT-4o-mini: {str(e)}" def validate_and_fix_dot_code(dot_code): # Check for unbalanced brackets open_brackets = dot_code.count("{") close_brackets = dot_code.count("}") # If there are missing closing brackets, add them at the end if open_brackets > close_brackets: missing_brackets = open_brackets - close_brackets dot_code += "}" * missing_brackets return dot_code def render_dot_code(dot_code, filename=None): """ Renders Graphviz DOT code and saves it as a PNG image. Args: dot_code (str): The DOT code to render. filename (str): Name for the output PNG file (without extension). Returns: str: Path to the generated PNG image. """ # Ensure the images directory exists output_dir = "/content/images" os.makedirs(output_dir, exist_ok=True) # Save and render the diagram output_path = os.path.join(output_dir, f"{filename}") try: src = Source(dot_code, format="png") rendered_path = src.render(output_path, cleanup=True) # The `rendered_path` will have an extra `.png` extension #png_path = f"{rendered_path}.png" png_path = f"{rendered_path}" # Remove the unnecessary file without the extension #if os.path.exists(rendered_path): # os.remove(rendered_path) return png_path except Exception as e: return f"Error rendering diagram: {str(e)}" import time def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1): """ Handles diagram generation and returns the rendered image for display. Args: code_content (str): The source code to analyze. diagram_type (str): Type of diagram to generate. retries (int): Number of times to retry checking for the file. wait_time (float): Time (in seconds) to wait between retries. Returns: PIL.Image.Image or str: The generated diagram or an error message. """ print("Code content received:", code_content) # Debugging print # Generate and render the diagram image_path = generate_and_render_diagram(code_content, diagram_type) print(f"Generated image path: {image_path}") # Debugging print # Retry logic for checking file existence for attempt in range(retries): if os.path.exists(image_path): try: return Image.open(image_path) # Return the image if found except Exception as e: print(f"Error opening image on attempt {attempt + 1}: {e}") else: print(f"Image not found. Retrying... ({attempt + 1}/{retries})") time.sleep(wait_time) # Wait before the next check # If the image is still not found after retries print(f"Failed to generate image after {retries} retries: {image_path}") return f"Failed to generate image: {image_path}" # Gradio Interface with gr.Blocks() as demo: # Tab 1: Repository Discovery with gr.Tab("Repository Discovery"): with gr.Row(): topics_input = gr.Textbox( label="Topics (comma-separated, leave empty to fetch by date only)", placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)" ) similar_topics = gr.Textbox( label="Similar Topics (based on embeddings)", interactive=False ) gr.Button("Get Similar Topics").click( search_similar_topics, inputs=[topics_input], outputs=[similar_topics] ) with gr.Row(): start_date_input = gr.Textbox( label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)", placeholder="Set to filter recent repositories by date or leave empty" ) language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Discovered Repositories") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Discover Repositories").click( gradio_interface, inputs=[ topics_input, start_date_input, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order ], outputs=[output_data, output_file] ) # Tab 2: Organization Watch with gr.Tab("Organization Watch"): with gr.Row(): org_input = gr.Textbox( label="Organizations (comma-separated)", placeholder="e.g., facebookresearch, openai" ) with gr.Row(): language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Repositories by Organizations") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Fetch Organization Repositories").click( fetch_org_repositories, inputs=[ org_input, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos ], outputs=[output_data, output_file] ) # Tab 3: Code Analysis # Gradio Interface for Code Analysis (Updated) with gr.Tab("Code Analysis"): with gr.Row(): repo_dropdown = gr.Dropdown( label="Select Repository", choices=[], interactive=True ) refresh_button = gr.Button("Refresh Repositories") with gr.Row(): branch_dropdown = gr.Dropdown( label="Select Branch", choices=[], interactive=True ) with gr.Row(): keywords_output = gr.Textbox(label="Keywords") entities_output = gr.Textbox(label="Entities") with gr.Row(): summary_output = gr.Textbox(label="Summary") wordcloud_output = gr.Plot(label="Word Cloud") # Use Plot instead of Image # New components for displaying files with gr.Row(): files_list = gr.Dropdown( label="Files in Repository", choices=[], interactive=True ) with gr.Row(): file_content_box = gr.Textbox( label="File Content", lines=20, interactive=True ) with gr.Row(): # Combine question input and button in the same row question_input = gr.Textbox( label="Ask a Question", placeholder="Enter your question about the code...", lines=1 ) question_button = gr.Button("Get Answer") with gr.Row(): answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False) # Diagram generation interface with gr.Row(): diagram_type = gr.Dropdown( label="Select Diagram Type", choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"], value="Call Graph" ) generate_diagram_button = gr.Button("Generate Diagram") with gr.Row(): #diagram_output = gr.Image(label="Generated Diagram", type="pil") diagram_output = gr.Image( label="Generated Diagram", type="pil", # Ensures compatibility with PIL.Image.Image elem_id="diagram_output", # Add an ID for custom styling if needed interactive=False, # No need for user interaction on the output show_label=True, height=600, # Set a larger default height width=800, # Set a larger default width ) # Hook up the question button to ask_code_question question_button.click( ask_code_question, inputs=[file_content_box, question_input], # Inputs: Code content and user question outputs=[answer_output] # Output: Answer from LLM ) # Callback to generate and render the diagram def generate_and_render_diagram(code_content, diagram_type): # Generate DOT code dot_code = generate_dot_code_from_code(code_content, diagram_type) # Check for valid DOT code if not dot_code.strip().startswith("digraph"): return "Invalid DOT code generated." unique_filename = f"diagram_{uuid.uuid4().hex}" # Generate a unique filename return render_dot_code(dot_code, filename=unique_filename) # Render the diagram generate_diagram_button.click( handle_generate_diagram, inputs=[file_content_box, diagram_type], # Use file_content_box instead of answer_output outputs=[diagram_output] # Output: PNG file path ) # Refresh repository list refresh_button.click( lambda: gr.update(choices=get_discovered_repos()), inputs=[], outputs=[repo_dropdown] ) # Update branch dropdown when a repository is selected def update_branches(repo): if repo: owner, repo_name = repo.split("/") branches = get_branches(owner, repo_name) default_branch = get_default_branch(owner, repo_name) return gr.update(choices=branches, value=default_branch) return gr.update(choices=[], value=None) repo_dropdown.change( update_branches, inputs=[repo_dropdown], outputs=[branch_dropdown] ) # Analyze README content based on the selected repository and branch def analyze_readme(repo, branch): if repo and branch: owner, repo_name = repo.split("/") # Extract the owner and repo name. # Pass branch to analyze specific README return process_readme(owner, repo_name, branch) return "No repository or branch selected.", "", "", None repo_dropdown.change( analyze_readme, inputs=[repo_dropdown, branch_dropdown], outputs=[keywords_output, entities_output, summary_output, wordcloud_output] ) branch_dropdown.change( analyze_readme, # Function to call when branch changes inputs=[repo_dropdown, branch_dropdown], # Pass both repo and branch as inputs outputs=[keywords_output, entities_output, summary_output, wordcloud_output] # Update outputs ) # Fetch files in the selected repository def update_files(repo): global files_data # To store fetched files for later use if repo: owner, repo_name = repo.split("/") # Extract owner and repo print("Selected repository:", repo) files = fetch_files(owner, repo_name) # Call with default path="" files_data = files # Store the fetched files for later use file_names = [f"{file['name']} ({file['path']})" for file in files] # Prepare dropdown labels print("Fetched files:", files) # Debugging to ensure files are fetched correctly print("File names for dropdown:", file_names) # Debugging to ensure dropdown labels are created return gr.update(choices=file_names, value=None) # Update the dropdown files_data = [] # Clear files_data if no repo is selected return gr.update(choices=[], value=None) repo_dropdown.change( lambda repo: update_files(repo), inputs=[repo_dropdown], outputs=[files_list] # Update both files_list and file_content_box ) # Fetch and display file content def display_file_content(repo, branch, selected_file): if repo and branch and selected_file: owner, repo_name = repo.split("/") file_path = selected_file.split(" (")[1][:-1] # Extract the file path from the dropdown label content = fetch_file_content(owner, repo_name, branch, file_path) return content return "No file selected." files_list.change( display_file_content, inputs=[repo_dropdown, branch_dropdown, files_list], outputs=[file_content_box] ) demo.launch()