GitInsight / app.py
PuristanLabs1's picture
Update app.py
2f80c00 verified
raw
history blame
26.5 kB
import requests
import pandas as pd
from datetime import datetime
import gradio as gr
import pickle
from sentence_transformers import SentenceTransformer, util
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import base64
from io import BytesIO
import json
from openai import OpenAI
from graphviz import Source
import re
from PIL import Image
import os
import uuid
IMAGE_DIR = "./images"
os.makedirs(IMAGE_DIR, exist_ok=True)
GITHUB_API_URL = "https://api.github.com/search/repositories"
ACCESS_TOKEN = os.getenv("github_pat")
if not ACCESS_TOKEN:
raise ValueError("Missing GitHub Personal Access Token.")
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
OPENAI_API_KEY = os.getenv("openai_key")
if not OPENAI_API_KEY:
raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.")
client = OpenAI(api_key=OPENAI_API_KEY)
ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"]
with open("github_topics_embeddings.pkl", "rb") as f:
topic_data = pickle.load(f)
topics = topic_data["topics"]
embeddings = topic_data["embeddings"]
discovered_repos = []
def search_similar_topics(input_text):
if not input_text.strip():
return "Enter topics to see suggestions."
try:
model = SentenceTransformer('all-MiniLM-L6-v2')
query_embedding = model.encode(input_text, convert_to_tensor=True)
similarities = util.pytorch_cos_sim(query_embedding, embeddings)
top_indices = similarities[0].argsort(descending=True)[:10]
return ", ".join([topics[i] for i in top_indices])
except Exception as e:
return f"Error in generating suggestions: {str(e)}"
def search_repositories(query, sort="stars", order="desc", total_repos=10):
all_repos = []
per_page = 100 if total_repos > 100 else total_repos
total_pages = (total_repos // per_page) + 1
for page in range(1, total_pages + 1):
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page,
}
response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params)
if response.status_code != 200:
raise Exception(f"GitHub API error: {response.status_code} {response.text}")
items = response.json().get("items", [])
if not items:
break
all_repos.extend(items)
if len(all_repos) >= total_repos:
break
return all_repos[:total_repos]
def calculate_additional_metrics(repo):
created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ")
updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
days_since_creation = (datetime.utcnow() - created_date).days
days_since_update = (datetime.utcnow() - updated_date).days
star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0
fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0
hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No"
hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable"
rising_score = ((star_velocity * 10) +
(repo["forks_count"] * 0.2) +
(repo.get("watchers_count", 0) * 0.3) +
(1 / (days_since_update + 1) * 20) -
(repo["open_issues_count"] * 0.01))
legacy_score = (repo["stargazers_count"] * 0.6) + \
(repo["forks_count"] * 0.3) + \
(repo.get("watchers_count", 0) * 0.1) - \
(repo["open_issues_count"] * 0.05)
owner, repo_name = repo["owner"]["login"], repo["name"]
repo_details_url = f"https://api.github.com/repos/{owner}/{repo_name}"
response = requests.get(repo_details_url, headers=HEADERS)
if response.status_code == 200:
repo_details = response.json()
actual_watchers = repo_details.get("subscribers_count", 0)
else:
actual_watchers = 0
watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0
return {
"Rising Score": round(rising_score, 2),
"Legacy Score": round(legacy_score, 2),
"Star Velocity (Stars/Day)": round(star_velocity, 2),
"Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2),
"Watchers": actual_watchers,
"Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2),
"Language": repo.get("language", "N/A"),
"Topics": ", ".join(repo.get("topics", [])),
"Hidden Gem": hidden_gem,
"Hidden Gem Trend": hidden_gem_trend,
"Open Issues": repo["open_issues_count"],
"Created At": repo["created_at"],
"Last Updated": repo["pushed_at"],
"days_since_creation": round(days_since_creation, 2),
"days_since_update": round(days_since_update, 2),
"URL": repo["html_url"],
}
def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order):
global discovered_repos
if not topics.strip() and not start_date.strip():
return pd.DataFrame(), "Please provide at least a topic or a start date."
topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()]
stars_range = (stars_min, stars_max)
forks_range = (forks_min, forks_max)
df = pd.DataFrame()
all_repos_data = []
try:
if not topics_list:
query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
else:
for topic in topics_list:
query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
discovered_repos.append(f"{repo['owner']['login']}/{repo['name']}")
if not all_repos_data:
return pd.DataFrame(), "No repositories found matching the criteria."
discovered_repos = list(set(discovered_repos))
df = pd.DataFrame(all_repos_data)
except Exception as e:
print(f"Error: {e}")
return pd.DataFrame(), f"Error fetching repositories: {str(e)}"
csv_file = None
if not df.empty:
csv_file = "discovered_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos):
try:
org_list = [org.strip() for org in org_names.split(",") if org.strip()]
if not org_list:
return pd.DataFrame(), "Enter at least one organization."
all_repos_data = []
for org in org_list:
query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
if not all_repos_data:
return pd.DataFrame(), "No repositories found for the specified organizations."
df = pd.DataFrame(all_repos_data)
csv_file = "organization_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
except Exception as e:
print(f"Error in fetch_org_repositories: {e}")
return pd.DataFrame(), f"Error: {str(e)}"
def get_discovered_repos():
global discovered_repos
return discovered_repos
def process_readme(owner, repo, branch):
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
readme_content = response.text
else:
return f"Failed to fetch README content from branch {branch}.", "", "", None
MODEL = "gpt-4o-mini"
completion = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."},
{"role": "user", "content": f"""
Perform the following tasks on the following README file:
1. Extract the top 25 most important keywords from the text only.
2. Extract All Major named entities (e.g., people, organizations, technologies).
3. Summarize the content in one paragraph.
Return the results in the following JSON format:
{{
"keywords": ["keyword1", "keyword2", ...],
"entities": ["entity1", "entity2", ...],
"summary": "A concise summary of the README."
}}
README file:
{readme_content}
"""}
],
response_format={"type": "json_object"}
)
result = completion.choices[0].message.content
result_json = json.loads(result)
keywords = ", ".join(result_json["keywords"])
entities = ", ".join(result_json["entities"])
summary = result_json["summary"]
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
return keywords, entities, summary, plt
def get_branches(owner, repo):
url = f"https://api.github.com/repos/{owner}/{repo}/branches"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
branches = [branch["name"] for branch in response.json()]
return branches
else:
return []
def get_default_branch(owner, repo):
url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
repo_data = response.json()
return repo_data["default_branch"]
else:
return None
def fetch_files(owner, repo, path=""):
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" if path else f"https://api.github.com/repos/{owner}/{repo}/contents"
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
return f"Failed to fetch files: {response.status_code}", []
files = []
for item in response.json():
if item["type"] == "file": # Only add files
if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS):
files.append({
"name": item["name"],
"path": item["path"],
"download_url": item["download_url"]
})
elif item["type"] == "dir":
sub_files = fetch_files(owner, repo, item["path"])
files.extend(sub_files)
return files
def fetch_file_content(owner, repo, branch, file_path):
file_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}"
response = requests.get(file_url)
if response.status_code == 200:
return response.text
else:
return f"Failed to fetch file content: {response.status_code}"
def ask_code_question(code_content, question):
if not code_content.strip():
return "No code content available to analyze."
if not question.strip():
return "Please enter a question about the code."
prompt = f"""
Here is a Python file from a GitHub repository:
{code_content}
Please answer the following question about this file:
- {question}
"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant skilled in understanding code."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Error querying the LLM: {str(e)}"
def generate_dot_code_from_code(code_content, diagram_type):
if not code_content.strip():
return "No code content available to analyze."
prompt = f"""
Here is some Python code from a GitHub repository:
{code_content}
Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable.
Don't include any other text. Don't provide any other explanatory commentary.
Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs.
"""
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"},
{"role": "user", "content": prompt}
]
)
raw_dot_code = response.choices[0].message.content.strip()
validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets
pattern = r"digraph\b[\s\S]*?^\}"
match = re.search(pattern, validated_dot_code,re.MULTILINE | re.DOTALL)
if match:
validated_dot_code = match.group(0) # Extract the matched content
else:
return "Failed to extract valid Graphviz code."
return validated_dot_code
except Exception as e:
return f"Error querying GPT-4o-mini: {str(e)}"
def validate_and_fix_dot_code(dot_code):
open_brackets = dot_code.count("{")
close_brackets = dot_code.count("}")
if open_brackets > close_brackets:
missing_brackets = open_brackets - close_brackets
dot_code += "}" * missing_brackets
return dot_code
def render_dot_code(dot_code, filename=None):
output_path = os.path.join(IMAGE_DIR, f"{filename}.png")
try:
src = Source(dot_code, format="png")
#rendered_path = src.render(output_path, cleanup=True)
src.render(output_path, cleanup=True)
#png_path = f"{rendered_path}"
#return png_path
# Return the web-accessible path
return f"/images/{filename}.png"
except Exception as e:
return f"Error rendering diagram: {str(e)}"
import time
def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1):
image_path = generate_and_render_diagram(code_content, diagram_type)
print(f"Generated image path: {image_path}") # Debugging print
for attempt in range(retries):
if os.path.exists(image_path):
print(f"Image found: {image_path}")
# Return the web-accessible path (relative to the Hugging Face Space's root)
return f"/images/{os.path.basename(image_path)}"
else:
print(f"Image not found. Retrying... ({attempt + 1}/{retries})")
time.sleep(wait_time)
# If the image is still not found after retries
print(f"Failed to generate image after {retries} retries: {image_path}")
return f"Failed to generate image: {image_path}"
# Gradio Interface
with gr.Blocks() as demo:
# Tab 1: Repository Discovery
with gr.Tab("Repository Discovery"):
with gr.Row():
topics_input = gr.Textbox(
label="Topics (comma-separated, leave empty to fetch by date only)",
placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)"
)
similar_topics = gr.Textbox(
label="Similar Topics (based on embeddings)",
interactive=False
)
gr.Button("Get Similar Topics").click(
search_similar_topics,
inputs=[topics_input],
outputs=[similar_topics]
)
with gr.Row():
start_date_input = gr.Textbox(
label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)",
placeholder="Set to filter recent repositories by date or leave empty"
)
language_filter = gr.Dropdown(
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"],
label="Language Filter",
value=""
)
stars_min = gr.Number(label="Stars Min", value=10)
stars_max = gr.Number(label="Stars Max", value=1000)
with gr.Row():
forks_min = gr.Number(label="Forks Min", value=0)
forks_max = gr.Number(label="Forks Max", value=500)
total_repos = gr.Number(label="Total Repositories", value=10, step=10)
sort_order = gr.Dropdown(
choices=["stars", "forks", "updated"],
label="Sort Order",
value="stars"
)
with gr.Row():
output_data = gr.Dataframe(label="Discovered Repositories")
output_file = gr.File(label="Download CSV", file_count="single")
gr.Button("Discover Repositories").click(
gradio_interface,
inputs=[
topics_input, start_date_input, language_filter, stars_min, stars_max,
forks_min, forks_max, total_repos, sort_order
],
outputs=[output_data, output_file]
)
# Tab 2: Organization Watch
with gr.Tab("Organization Watch"):
with gr.Row():
org_input = gr.Textbox(
label="Organizations (comma-separated)",
placeholder="e.g., facebookresearch, openai"
)
with gr.Row():
language_filter = gr.Dropdown(
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"],
label="Language Filter",
value=""
)
stars_min = gr.Number(label="Stars Min", value=10)
stars_max = gr.Number(label="Stars Max", value=1000)
with gr.Row():
forks_min = gr.Number(label="Forks Min", value=0)
forks_max = gr.Number(label="Forks Max", value=500)
total_repos = gr.Number(label="Total Repositories", value=10, step=10)
sort_order = gr.Dropdown(
choices=["stars", "forks", "updated"],
label="Sort Order",
value="stars"
)
with gr.Row():
output_data = gr.Dataframe(label="Repositories by Organizations")
output_file = gr.File(label="Download CSV", file_count="single")
gr.Button("Fetch Organization Repositories").click(
fetch_org_repositories,
inputs=[
org_input, language_filter, stars_min, stars_max, forks_min, forks_max,
sort_order, total_repos
],
outputs=[output_data, output_file]
)
# Tab 3: Code Analysis
with gr.Tab("Code Analysis"):
with gr.Row():
repo_dropdown = gr.Dropdown(
label="Select Repository",
choices=[],
interactive=True
)
refresh_button = gr.Button("Refresh Repositories")
with gr.Row():
branch_dropdown = gr.Dropdown(
label="Select Branch",
choices=[],
interactive=True
)
with gr.Row():
keywords_output = gr.Textbox(label="Keywords")
entities_output = gr.Textbox(label="Entities")
with gr.Row():
summary_output = gr.Textbox(label="Summary")
wordcloud_output = gr.Plot(label="Word Cloud")
with gr.Row():
files_list = gr.Dropdown(
label="Files in Repository",
choices=[],
interactive=True
)
with gr.Row():
file_content_box = gr.Textbox(
label="File Content",
lines=20,
interactive=True
)
with gr.Row():
question_input = gr.Textbox(
label="Ask a Question",
placeholder="Enter your question about the code...",
lines=1
)
question_button = gr.Button("Get Answer")
with gr.Row():
answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False)
with gr.Row():
diagram_type = gr.Dropdown(
label="Select Diagram Type",
choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"],
value="Call Graph"
)
generate_diagram_button = gr.Button("Generate Diagram")
with gr.Row():
diagram_output = gr.Image(
label="Generated Diagram",
type="filepath",
elem_id="diagram_output",
interactive=False,
show_label=True,
height=600,
width=800,
)
question_button.click(
ask_code_question,
inputs=[file_content_box, question_input],
outputs=[answer_output]
)
def generate_and_render_diagram(code_content, diagram_type):
dot_code = generate_dot_code_from_code(code_content, diagram_type)
if not dot_code.strip().startswith("digraph"):
return "Invalid DOT code generated."
unique_filename = f"diagram_{uuid.uuid4().hex}"
return render_dot_code(dot_code, filename=unique_filename)
generate_diagram_button.click(
handle_generate_diagram,
inputs=[file_content_box, diagram_type],
outputs=[diagram_output]
)
refresh_button.click(
lambda: gr.update(choices=get_discovered_repos()),
inputs=[],
outputs=[repo_dropdown]
)
def update_branches(repo):
if repo:
owner, repo_name = repo.split("/")
branches = get_branches(owner, repo_name)
default_branch = get_default_branch(owner, repo_name)
return gr.update(choices=branches, value=default_branch)
return gr.update(choices=[], value=None)
repo_dropdown.change(
update_branches,
inputs=[repo_dropdown],
outputs=[branch_dropdown]
)
def analyze_readme(repo, branch):
if repo and branch:
owner, repo_name = repo.split("/")
return process_readme(owner, repo_name, branch)
return "No repository or branch selected.", "", "", None
repo_dropdown.change(
analyze_readme,
inputs=[repo_dropdown, branch_dropdown],
outputs=[keywords_output, entities_output, summary_output, wordcloud_output]
)
branch_dropdown.change(
analyze_readme,
inputs=[repo_dropdown, branch_dropdown],
outputs=[keywords_output, entities_output, summary_output, wordcloud_output]
)
def update_files(repo):
global files_data
if repo:
owner, repo_name = repo.split("/")
files = fetch_files(owner, repo_name)
files_data = files
file_names = [f"{file['name']} ({file['path']})" for file in files]
return gr.update(choices=file_names, value=None)
files_data = []
return gr.update(choices=[], value=None)
repo_dropdown.change(
lambda repo: update_files(repo),
inputs=[repo_dropdown],
outputs=[files_list]
)
def display_file_content(repo, branch, selected_file):
if repo and branch and selected_file:
owner, repo_name = repo.split("/")
file_path = selected_file.split(" (")[1][:-1]
content = fetch_file_content(owner, repo_name, branch, file_path)
return content
return "No file selected."
files_list.change(
display_file_content,
inputs=[repo_dropdown, branch_dropdown, files_list],
outputs=[file_content_box]
)
#demo.launch()
#demo.launch(share=True, server_name="0.0.0.0", server_port=7860, static_dirs={"images": "./images"})
demo.launch(share=True, static_dirs={"images": "./images"})