Spaces:
Sleeping
Sleeping
import requests | |
import pandas as pd | |
from datetime import datetime | |
import gradio as gr | |
import pickle | |
from sentence_transformers import SentenceTransformer, util | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
import base64 | |
from io import BytesIO | |
import json | |
from openai import OpenAI | |
from graphviz import Source | |
import re | |
from PIL import Image | |
import os | |
import uuid | |
# Fixed directory for images | |
IMAGE_DIR = "/content/images" #to save the diagram png images | |
os.makedirs(IMAGE_DIR, exist_ok=True) # Create the directory if it doesn't exist | |
# Constants for GitHub API | |
GITHUB_API_URL = "https://api.github.com/search/repositories" | |
ACCESS_TOKEN = os.getenv("github_pat") | |
if not ACCESS_TOKEN: | |
raise ValueError("Missing GitHub Personal Access Token.") | |
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"} | |
# Access OpenAI API key from secrets | |
OPENAI_API_KEY = os.getenv("openai_key") | |
if not OPENAI_API_KEY: | |
raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.") | |
# Initialize OpenAI client once | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# Global variable for allowed extensions | |
ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"] | |
# Load topic embeddings | |
with open("github_topics_embeddings.pkl", "rb") as f: | |
topic_data = pickle.load(f) | |
topics = topic_data["topics"] | |
embeddings = topic_data["embeddings"] | |
discovered_repos = [] # Format: ["owner/repo_name", ...] | |
# Function to search for similar topics | |
def search_similar_topics(input_text): | |
if not input_text.strip(): | |
return "Enter topics to see suggestions." | |
try: | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
query_embedding = model.encode(input_text, convert_to_tensor=True) | |
similarities = util.pytorch_cos_sim(query_embedding, embeddings) | |
top_indices = similarities[0].argsort(descending=True)[:10] # Top 5 matches | |
return ", ".join([topics[i] for i in top_indices]) | |
except Exception as e: | |
return f"Error in generating suggestions: {str(e)}" | |
# Function to fetch repositories with pagination | |
def search_repositories(query, sort="stars", order="desc", total_repos=10): | |
all_repos = [] | |
per_page = 100 if total_repos > 100 else total_repos | |
total_pages = (total_repos // per_page) + 1 | |
for page in range(1, total_pages + 1): | |
params = { | |
"q": query, | |
"sort": sort, | |
"order": order, | |
"per_page": per_page, | |
"page": page, | |
} | |
response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params) | |
print(f"Query: {query}, Status Code: {response.status_code}") | |
print(f"Response: {response.json()}") | |
if response.status_code != 200: | |
raise Exception(f"GitHub API error: {response.status_code} {response.text}") | |
items = response.json().get("items", []) | |
if not items: | |
break | |
all_repos.extend(items) | |
if len(all_repos) >= total_repos: | |
break | |
return all_repos[:total_repos] | |
# Function to calculate additional metrics | |
def calculate_additional_metrics(repo): | |
created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ") | |
updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ") | |
days_since_creation = (datetime.utcnow() - created_date).days | |
days_since_update = (datetime.utcnow() - updated_date).days | |
star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0 | |
fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0 | |
hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No" | |
hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable" | |
rising_score = ((star_velocity * 10) + | |
(repo["forks_count"] * 0.2) + | |
(repo.get("watchers_count", 0) * 0.3) + | |
(1 / (days_since_update + 1) * 20) - | |
(repo["open_issues_count"] * 0.01)) | |
legacy_score = (repo["stargazers_count"] * 0.6) + \ | |
(repo["forks_count"] * 0.3) + \ | |
(repo.get("watchers_count", 0) * 0.1) - \ | |
(repo["open_issues_count"] * 0.05) | |
owner, repo_name = repo["owner"]["login"], repo["name"] | |
repo_details_url = f"https://api.github.com/repos/{owner}/{repo_name}" | |
response = requests.get(repo_details_url, headers=HEADERS) | |
if response.status_code == 200: | |
repo_details = response.json() | |
actual_watchers = repo_details.get("subscribers_count", 0) | |
else: | |
actual_watchers = 0 | |
watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0 | |
return { | |
"Rising Score": round(rising_score, 2), | |
"Legacy Score": round(legacy_score, 2), | |
"Star Velocity (Stars/Day)": round(star_velocity, 2), | |
"Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2), | |
"Watchers": actual_watchers, | |
"Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2), | |
"Language": repo.get("language", "N/A"), | |
"Topics": ", ".join(repo.get("topics", [])), | |
"Hidden Gem": hidden_gem, | |
"Hidden Gem Trend": hidden_gem_trend, | |
"Open Issues": repo["open_issues_count"], | |
"Created At": repo["created_at"], | |
"Last Updated": repo["pushed_at"], | |
"days_since_creation": round(days_since_creation, 2), | |
"days_since_update": round(days_since_update, 2), | |
"URL": repo["html_url"], | |
} | |
# Repository Discovery Interface | |
def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order): | |
global discovered_repos | |
if not topics.strip() and not start_date.strip(): | |
# If neither topics nor start_date are provided, return a validation error | |
return pd.DataFrame(), "Please provide at least a topic or a start date." | |
topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()] | |
stars_range = (stars_min, stars_max) | |
forks_range = (forks_min, forks_max) | |
df = pd.DataFrame() | |
all_repos_data = [] | |
try: | |
# If no topics are provided, fetch repositories by filters only | |
if not topics_list: | |
query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" | |
if start_date.strip(): | |
query += f" created:>{start_date.strip()}" | |
if language_filter: | |
query += f" language:{language_filter}" | |
# Fetch repositories | |
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) | |
for repo in repos: | |
repo_data = { | |
"Name": repo["name"], | |
"Owner": repo["owner"]["login"], | |
"Stars": repo["stargazers_count"], | |
"Forks": repo["forks_count"], | |
"Description": repo.get("description", "N/A"), | |
} | |
repo_data.update(calculate_additional_metrics(repo)) | |
all_repos_data.append(repo_data) | |
else: | |
for topic in topics_list: | |
# Construct query | |
query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" | |
if start_date.strip(): | |
query += f" created:>{start_date.strip()}" | |
if language_filter: | |
query += f" language:{language_filter}" | |
# Fetch repositories | |
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) | |
for repo in repos: | |
repo_data = { | |
"Name": repo["name"], | |
"Owner": repo["owner"]["login"], | |
"Stars": repo["stargazers_count"], | |
"Forks": repo["forks_count"], | |
"Description": repo.get("description", "N/A"), | |
} | |
repo_data.update(calculate_additional_metrics(repo)) | |
all_repos_data.append(repo_data) | |
#Add repository to discovered_repos | |
discovered_repos.append(f"{repo['owner']['login']}/{repo['name']}") | |
if not all_repos_data: | |
return pd.DataFrame(), "No repositories found matching the criteria." | |
# Remove duplicates from discovered_repos | |
discovered_repos = list(set(discovered_repos)) | |
# Create DataFrame | |
df = pd.DataFrame(all_repos_data) | |
except Exception as e: | |
print(f"Error: {e}") | |
return pd.DataFrame(), f"Error fetching repositories: {str(e)}" | |
csv_file = None | |
if not df.empty: | |
csv_file = "discovered_repositories.csv" | |
df.to_csv(csv_file, index=False) | |
return df, csv_file | |
#return df, gr.File.update(visible=True, value=csv_file) | |
#Organization Watch Interface | |
def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos): | |
try: | |
org_list = [org.strip() for org in org_names.split(",") if org.strip()] | |
if not org_list: | |
return pd.DataFrame(), "Enter at least one organization." | |
all_repos_data = [] | |
for org in org_list: | |
# Query repositories for each organization | |
query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}" | |
if language_filter: | |
query += f" language:{language_filter}" | |
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) | |
for repo in repos: | |
repo_data = { | |
"Name": repo["name"], | |
"Owner": repo["owner"]["login"], | |
"Stars": repo["stargazers_count"], | |
"Forks": repo["forks_count"], | |
"Description": repo.get("description", "N/A"), | |
} | |
repo_data.update(calculate_additional_metrics(repo)) | |
all_repos_data.append(repo_data) | |
if not all_repos_data: | |
return pd.DataFrame(), "No repositories found for the specified organizations." | |
# Create DataFrame | |
df = pd.DataFrame(all_repos_data) | |
csv_file = "organization_repositories.csv" | |
df.to_csv(csv_file, index=False) | |
return df, csv_file | |
except Exception as e: | |
print(f"Error in fetch_org_repositories: {e}") | |
return pd.DataFrame(), f"Error: {str(e)}" | |
# Function to fetch discovered repositories for the dropdown | |
def get_discovered_repos(): | |
global discovered_repos | |
return discovered_repos | |
def process_readme(owner, repo, branch): | |
# Fetch README content from the specified branch | |
#url = f"https://raw.githubusercontent.com/{owner}/{repo}/master/README.md" | |
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md" | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code == 200: | |
readme_content = response.text | |
else: | |
#return "Failed to fetch README content.", "", "", None | |
return f"Failed to fetch README content from branch {branch}.", "", "", None | |
# Process README content with OpenAI | |
MODEL = "gpt-4o-mini" | |
completion = client.chat.completions.create( | |
model=MODEL, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."}, | |
{"role": "user", "content": f""" | |
Perform the following tasks on the following README file: | |
1. Extract the top 25 most important keywords from the text only. | |
2. Extract named entities (e.g., people, organizations, technologies). | |
3. Summarize the content in one paragraph. | |
Return the results in the following JSON format: | |
{{ | |
"keywords": ["keyword1", "keyword2", ...], | |
"entities": ["entity1", "entity2", ...], | |
"summary": "A concise summary of the README." | |
}} | |
README file: | |
{readme_content} | |
"""} | |
], | |
response_format={"type": "json_object"} | |
) | |
result = completion.choices[0].message.content | |
result_json = json.loads(result) | |
keywords = ", ".join(result_json["keywords"]) | |
entities = ", ".join(result_json["entities"]) | |
summary = result_json["summary"] | |
# Generate word cloud | |
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords) | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
return keywords, entities, summary, plt | |
# Function to get all branches of a repository | |
def get_branches(owner, repo): | |
url = f"https://api.github.com/repos/{owner}/{repo}/branches" | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code == 200: | |
branches = [branch["name"] for branch in response.json()] | |
return branches | |
else: | |
return [] | |
# Function to get the default branch of a repository | |
def get_default_branch(owner, repo): | |
url = f"https://api.github.com/repos/{owner}/{repo}" | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code == 200: | |
repo_data = response.json() | |
return repo_data["default_branch"] | |
else: | |
return None | |
def fetch_files(owner, repo, path=""): | |
# Base URL for the GitHub API | |
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" if path else f"https://api.github.com/repos/{owner}/{repo}/contents" | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code != 200: | |
return f"Failed to fetch files: {response.status_code}", [] | |
files = [] | |
for item in response.json(): | |
if item["type"] == "file": # Only add files | |
# Use the globally defined allowed extensions | |
if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS): | |
files.append({ | |
"name": item["name"], | |
"path": item["path"], | |
"download_url": item["download_url"] | |
}) | |
elif item["type"] == "dir": | |
# Recursively fetch files in subdirectories | |
sub_files = fetch_files(owner, repo, item["path"]) | |
files.extend(sub_files) | |
return files | |
# Function to fetch the content of a specific file | |
def fetch_file_content(owner, repo, branch, file_path): | |
file_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}" | |
response = requests.get(file_url) | |
if response.status_code == 200: | |
return response.text | |
else: | |
return f"Failed to fetch file content: {response.status_code}" | |
# Function to query GPT-4o-mini | |
def ask_code_question(code_content, question): | |
if not code_content.strip(): | |
return "No code content available to analyze." | |
if not question.strip(): | |
return "Please enter a question about the code." | |
# Construct the prompt | |
prompt = f""" | |
Here is a Python file from a GitHub repository: | |
{code_content} | |
Please answer the following question about this file: | |
- {question} | |
""" | |
try: | |
# Query GPT-4o-mini | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant skilled in understanding code."}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
# Extract and return GPT's response | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
return f"Error querying GPT-4o-mini: {str(e)}" | |
from graphviz import Source | |
import re | |
# Function to generate and clean Graphviz diagrams using GPT-4o-mini | |
def generate_dot_code_from_code(code_content, diagram_type): | |
if not code_content.strip(): | |
return "No code content available to analyze." | |
# Construct the prompt dynamically based on diagram type | |
prompt = f""" | |
Here is some Python code from a GitHub repository: | |
{code_content} | |
Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable. | |
Don't include any other text. Don't provide any other explainatory commentry. | |
Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs. | |
""" | |
#Ensure that the output of the code starts with "@startuml" and Ends with "@enduml". | |
try: | |
# Query GPT-4o-mini | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
raw_dot_code = response.choices[0].message.content.strip() | |
validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets | |
pattern = r"digraph\b[\s\S]*?^\}" | |
match = re.search(pattern, validated_dot_code,re.MULTILINE | re.DOTALL) | |
if match: | |
validated_dot_code = match.group(0) # Extract the matched content | |
else: | |
return "Failed to extract valid Graphviz code." | |
return validated_dot_code | |
except Exception as e: | |
return f"Error querying GPT-4o-mini: {str(e)}" | |
def validate_and_fix_dot_code(dot_code): | |
# Check for unbalanced brackets | |
open_brackets = dot_code.count("{") | |
close_brackets = dot_code.count("}") | |
# If there are missing closing brackets, add them at the end | |
if open_brackets > close_brackets: | |
missing_brackets = open_brackets - close_brackets | |
dot_code += "}" * missing_brackets | |
return dot_code | |
def render_dot_code(dot_code, filename=None): | |
""" | |
Renders Graphviz DOT code and saves it as a PNG image. | |
Args: | |
dot_code (str): The DOT code to render. | |
filename (str): Name for the output PNG file (without extension). | |
Returns: | |
str: Path to the generated PNG image. | |
""" | |
# Ensure the images directory exists | |
output_dir = "/content/images" | |
os.makedirs(output_dir, exist_ok=True) | |
# Save and render the diagram | |
output_path = os.path.join(output_dir, f"{filename}") | |
try: | |
src = Source(dot_code, format="png") | |
rendered_path = src.render(output_path, cleanup=True) | |
# The `rendered_path` will have an extra `.png` extension | |
#png_path = f"{rendered_path}.png" | |
png_path = f"{rendered_path}" | |
# Remove the unnecessary file without the extension | |
#if os.path.exists(rendered_path): | |
# os.remove(rendered_path) | |
return png_path | |
except Exception as e: | |
return f"Error rendering diagram: {str(e)}" | |
import time | |
def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1): | |
""" | |
Handles diagram generation and returns the rendered image for display. | |
Args: | |
code_content (str): The source code to analyze. | |
diagram_type (str): Type of diagram to generate. | |
retries (int): Number of times to retry checking for the file. | |
wait_time (float): Time (in seconds) to wait between retries. | |
Returns: | |
PIL.Image.Image or str: The generated diagram or an error message. | |
""" | |
print("Code content received:", code_content) # Debugging print | |
# Generate and render the diagram | |
image_path = generate_and_render_diagram(code_content, diagram_type) | |
print(f"Generated image path: {image_path}") # Debugging print | |
# Retry logic for checking file existence | |
for attempt in range(retries): | |
if os.path.exists(image_path): | |
try: | |
return Image.open(image_path) # Return the image if found | |
except Exception as e: | |
print(f"Error opening image on attempt {attempt + 1}: {e}") | |
else: | |
print(f"Image not found. Retrying... ({attempt + 1}/{retries})") | |
time.sleep(wait_time) # Wait before the next check | |
# If the image is still not found after retries | |
print(f"Failed to generate image after {retries} retries: {image_path}") | |
return f"Failed to generate image: {image_path}" | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
# Tab 1: Repository Discovery | |
with gr.Tab("Repository Discovery"): | |
with gr.Row(): | |
topics_input = gr.Textbox( | |
label="Topics (comma-separated, leave empty to fetch by date only)", | |
placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)" | |
) | |
similar_topics = gr.Textbox( | |
label="Similar Topics (based on embeddings)", | |
interactive=False | |
) | |
gr.Button("Get Similar Topics").click( | |
search_similar_topics, | |
inputs=[topics_input], | |
outputs=[similar_topics] | |
) | |
with gr.Row(): | |
start_date_input = gr.Textbox( | |
label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)", | |
placeholder="Set to filter recent repositories by date or leave empty" | |
) | |
language_filter = gr.Dropdown( | |
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], | |
label="Language Filter", | |
value="" | |
) | |
stars_min = gr.Number(label="Stars Min", value=10) | |
stars_max = gr.Number(label="Stars Max", value=1000) | |
with gr.Row(): | |
forks_min = gr.Number(label="Forks Min", value=0) | |
forks_max = gr.Number(label="Forks Max", value=500) | |
total_repos = gr.Number(label="Total Repositories", value=10, step=10) | |
sort_order = gr.Dropdown( | |
choices=["stars", "forks", "updated"], | |
label="Sort Order", | |
value="stars" | |
) | |
with gr.Row(): | |
output_data = gr.Dataframe(label="Discovered Repositories") | |
output_file = gr.File(label="Download CSV", file_count="single") | |
gr.Button("Discover Repositories").click( | |
gradio_interface, | |
inputs=[ | |
topics_input, start_date_input, language_filter, stars_min, stars_max, | |
forks_min, forks_max, total_repos, sort_order | |
], | |
outputs=[output_data, output_file] | |
) | |
# Tab 2: Organization Watch | |
with gr.Tab("Organization Watch"): | |
with gr.Row(): | |
org_input = gr.Textbox( | |
label="Organizations (comma-separated)", | |
placeholder="e.g., facebookresearch, openai" | |
) | |
with gr.Row(): | |
language_filter = gr.Dropdown( | |
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], | |
label="Language Filter", | |
value="" | |
) | |
stars_min = gr.Number(label="Stars Min", value=10) | |
stars_max = gr.Number(label="Stars Max", value=1000) | |
with gr.Row(): | |
forks_min = gr.Number(label="Forks Min", value=0) | |
forks_max = gr.Number(label="Forks Max", value=500) | |
total_repos = gr.Number(label="Total Repositories", value=10, step=10) | |
sort_order = gr.Dropdown( | |
choices=["stars", "forks", "updated"], | |
label="Sort Order", | |
value="stars" | |
) | |
with gr.Row(): | |
output_data = gr.Dataframe(label="Repositories by Organizations") | |
output_file = gr.File(label="Download CSV", file_count="single") | |
gr.Button("Fetch Organization Repositories").click( | |
fetch_org_repositories, | |
inputs=[ | |
org_input, language_filter, stars_min, stars_max, forks_min, forks_max, | |
sort_order, total_repos | |
], | |
outputs=[output_data, output_file] | |
) | |
# Tab 3: Code Analysis | |
# Gradio Interface for Code Analysis (Updated) | |
with gr.Tab("Code Analysis"): | |
with gr.Row(): | |
repo_dropdown = gr.Dropdown( | |
label="Select Repository", | |
choices=[], | |
interactive=True | |
) | |
refresh_button = gr.Button("Refresh Repositories") | |
with gr.Row(): | |
branch_dropdown = gr.Dropdown( | |
label="Select Branch", | |
choices=[], | |
interactive=True | |
) | |
with gr.Row(): | |
keywords_output = gr.Textbox(label="Keywords") | |
entities_output = gr.Textbox(label="Entities") | |
with gr.Row(): | |
summary_output = gr.Textbox(label="Summary") | |
wordcloud_output = gr.Plot(label="Word Cloud") # Use Plot instead of Image | |
# New components for displaying files | |
with gr.Row(): | |
files_list = gr.Dropdown( | |
label="Files in Repository", | |
choices=[], | |
interactive=True | |
) | |
with gr.Row(): | |
file_content_box = gr.Textbox( | |
label="File Content", | |
lines=20, | |
interactive=True | |
) | |
with gr.Row(): # Combine question input and button in the same row | |
question_input = gr.Textbox( | |
label="Ask a Question", | |
placeholder="Enter your question about the code...", | |
lines=1 | |
) | |
question_button = gr.Button("Get Answer") | |
with gr.Row(): | |
answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False) | |
# Diagram generation interface | |
with gr.Row(): | |
diagram_type = gr.Dropdown( | |
label="Select Diagram Type", | |
choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"], | |
value="Call Graph" | |
) | |
generate_diagram_button = gr.Button("Generate Diagram") | |
with gr.Row(): | |
#diagram_output = gr.Image(label="Generated Diagram", type="pil") | |
diagram_output = gr.Image( | |
label="Generated Diagram", | |
type="pil", # Ensures compatibility with PIL.Image.Image | |
elem_id="diagram_output", # Add an ID for custom styling if needed | |
interactive=False, # No need for user interaction on the output | |
show_label=True, | |
height=600, # Set a larger default height | |
width=800, # Set a larger default width | |
) | |
# Hook up the question button to ask_code_question | |
question_button.click( | |
ask_code_question, | |
inputs=[file_content_box, question_input], # Inputs: Code content and user question | |
outputs=[answer_output] # Output: Answer from LLM | |
) | |
# Callback to generate and render the diagram | |
def generate_and_render_diagram(code_content, diagram_type): | |
# Generate DOT code | |
dot_code = generate_dot_code_from_code(code_content, diagram_type) | |
# Check for valid DOT code | |
if not dot_code.strip().startswith("digraph"): | |
return "Invalid DOT code generated." | |
unique_filename = f"diagram_{uuid.uuid4().hex}" # Generate a unique filename | |
return render_dot_code(dot_code, filename=unique_filename) # Render the diagram | |
generate_diagram_button.click( | |
handle_generate_diagram, | |
inputs=[file_content_box, diagram_type], # Use file_content_box instead of answer_output | |
outputs=[diagram_output] # Output: PNG file path | |
) | |
# Refresh repository list | |
refresh_button.click( | |
lambda: gr.update(choices=get_discovered_repos()), | |
inputs=[], | |
outputs=[repo_dropdown] | |
) | |
# Update branch dropdown when a repository is selected | |
def update_branches(repo): | |
if repo: | |
owner, repo_name = repo.split("/") | |
branches = get_branches(owner, repo_name) | |
default_branch = get_default_branch(owner, repo_name) | |
return gr.update(choices=branches, value=default_branch) | |
return gr.update(choices=[], value=None) | |
repo_dropdown.change( | |
update_branches, | |
inputs=[repo_dropdown], | |
outputs=[branch_dropdown] | |
) | |
# Analyze README content based on the selected repository and branch | |
def analyze_readme(repo, branch): | |
if repo and branch: | |
owner, repo_name = repo.split("/") # Extract the owner and repo name. | |
# Pass branch to analyze specific README | |
return process_readme(owner, repo_name, branch) | |
return "No repository or branch selected.", "", "", None | |
repo_dropdown.change( | |
analyze_readme, | |
inputs=[repo_dropdown, branch_dropdown], | |
outputs=[keywords_output, entities_output, summary_output, wordcloud_output] | |
) | |
branch_dropdown.change( | |
analyze_readme, # Function to call when branch changes | |
inputs=[repo_dropdown, branch_dropdown], # Pass both repo and branch as inputs | |
outputs=[keywords_output, entities_output, summary_output, wordcloud_output] # Update outputs | |
) | |
# Fetch files in the selected repository | |
def update_files(repo): | |
global files_data # To store fetched files for later use | |
if repo: | |
owner, repo_name = repo.split("/") # Extract owner and repo | |
print("Selected repository:", repo) | |
files = fetch_files(owner, repo_name) # Call with default path="" | |
files_data = files # Store the fetched files for later use | |
file_names = [f"{file['name']} ({file['path']})" for file in files] # Prepare dropdown labels | |
print("Fetched files:", files) # Debugging to ensure files are fetched correctly | |
print("File names for dropdown:", file_names) # Debugging to ensure dropdown labels are created | |
return gr.update(choices=file_names, value=None) # Update the dropdown | |
files_data = [] # Clear files_data if no repo is selected | |
return gr.update(choices=[], value=None) | |
repo_dropdown.change( | |
lambda repo: update_files(repo), | |
inputs=[repo_dropdown], | |
outputs=[files_list] # Update both files_list and file_content_box | |
) | |
# Fetch and display file content | |
def display_file_content(repo, branch, selected_file): | |
if repo and branch and selected_file: | |
owner, repo_name = repo.split("/") | |
file_path = selected_file.split(" (")[1][:-1] # Extract the file path from the dropdown label | |
content = fetch_file_content(owner, repo_name, branch, file_path) | |
return content | |
return "No file selected." | |
files_list.change( | |
display_file_content, | |
inputs=[repo_dropdown, branch_dropdown, files_list], | |
outputs=[file_content_box] | |
) | |
demo.launch() | |