GitInsight /
PuristanLabs1's picture
db569a8 verified
history blame
31.3 kB
import requests
import pandas as pd
from datetime import datetime
import gradio as gr
import pickle
from sentence_transformers import SentenceTransformer, util
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import base64
from io import BytesIO
import json
from openai import OpenAI
from graphviz import Source
import re
from PIL import Image
import os
import uuid
# Fixed directory for images
IMAGE_DIR = "/content/images" #to save the diagram png images
os.makedirs(IMAGE_DIR, exist_ok=True) # Create the directory if it doesn't exist
# Constants for GitHub API
ACCESS_TOKEN = os.getenv("github_pat")
raise ValueError("Missing GitHub Personal Access Token.")
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
# Access OpenAI API key from secrets
OPENAI_API_KEY = os.getenv("openai_key")
raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.")
# Initialize OpenAI client once
client = OpenAI(api_key=OPENAI_API_KEY)
# Global variable for allowed extensions
ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"]
# Load topic embeddings
with open("github_topics_embeddings.pkl", "rb") as f:
topic_data = pickle.load(f)
topics = topic_data["topics"]
embeddings = topic_data["embeddings"]
discovered_repos = [] # Format: ["owner/repo_name", ...]
# Function to search for similar topics
def search_similar_topics(input_text):
if not input_text.strip():
return "Enter topics to see suggestions."
model = SentenceTransformer('all-MiniLM-L6-v2')
query_embedding = model.encode(input_text, convert_to_tensor=True)
similarities = util.pytorch_cos_sim(query_embedding, embeddings)
top_indices = similarities[0].argsort(descending=True)[:10] # Top 5 matches
return ", ".join([topics[i] for i in top_indices])
except Exception as e:
return f"Error in generating suggestions: {str(e)}"
# Function to fetch repositories with pagination
def search_repositories(query, sort="stars", order="desc", total_repos=10):
all_repos = []
per_page = 100 if total_repos > 100 else total_repos
total_pages = (total_repos // per_page) + 1
for page in range(1, total_pages + 1):
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page,
response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params)
print(f"Query: {query}, Status Code: {response.status_code}")
print(f"Response: {response.json()}")
if response.status_code != 200:
raise Exception(f"GitHub API error: {response.status_code} {response.text}")
items = response.json().get("items", [])
if not items:
if len(all_repos) >= total_repos:
return all_repos[:total_repos]
# Function to calculate additional metrics
def calculate_additional_metrics(repo):
created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ")
updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
days_since_creation = (datetime.utcnow() - created_date).days
days_since_update = (datetime.utcnow() - updated_date).days
star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0
fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0
hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No"
hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable"
rising_score = ((star_velocity * 10) +
(repo["forks_count"] * 0.2) +
(repo.get("watchers_count", 0) * 0.3) +
(1 / (days_since_update + 1) * 20) -
(repo["open_issues_count"] * 0.01))
legacy_score = (repo["stargazers_count"] * 0.6) + \
(repo["forks_count"] * 0.3) + \
(repo.get("watchers_count", 0) * 0.1) - \
(repo["open_issues_count"] * 0.05)
owner, repo_name = repo["owner"]["login"], repo["name"]
repo_details_url = f"{owner}/{repo_name}"
response = requests.get(repo_details_url, headers=HEADERS)
if response.status_code == 200:
repo_details = response.json()
actual_watchers = repo_details.get("subscribers_count", 0)
actual_watchers = 0
watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0
return {
"Rising Score": round(rising_score, 2),
"Legacy Score": round(legacy_score, 2),
"Star Velocity (Stars/Day)": round(star_velocity, 2),
"Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2),
"Watchers": actual_watchers,
"Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2),
"Language": repo.get("language", "N/A"),
"Topics": ", ".join(repo.get("topics", [])),
"Hidden Gem": hidden_gem,
"Hidden Gem Trend": hidden_gem_trend,
"Open Issues": repo["open_issues_count"],
"Created At": repo["created_at"],
"Last Updated": repo["pushed_at"],
"days_since_creation": round(days_since_creation, 2),
"days_since_update": round(days_since_update, 2),
"URL": repo["html_url"],
# Repository Discovery Interface
def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order):
global discovered_repos
if not topics.strip() and not start_date.strip():
# If neither topics nor start_date are provided, return a validation error
return pd.DataFrame(), "Please provide at least a topic or a start date."
topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()]
stars_range = (stars_min, stars_max)
forks_range = (forks_min, forks_max)
df = pd.DataFrame()
all_repos_data = []
# If no topics are provided, fetch repositories by filters only
if not topics_list:
query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
# Fetch repositories
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
for topic in topics_list:
# Construct query
query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
# Fetch repositories
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
#Add repository to discovered_repos
if not all_repos_data:
return pd.DataFrame(), "No repositories found matching the criteria."
# Remove duplicates from discovered_repos
discovered_repos = list(set(discovered_repos))
# Create DataFrame
df = pd.DataFrame(all_repos_data)
except Exception as e:
print(f"Error: {e}")
return pd.DataFrame(), f"Error fetching repositories: {str(e)}"
csv_file = None
if not df.empty:
csv_file = "discovered_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
#return df, gr.File.update(visible=True, value=csv_file)
#Organization Watch Interface
def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos):
org_list = [org.strip() for org in org_names.split(",") if org.strip()]
if not org_list:
return pd.DataFrame(), "Enter at least one organization."
all_repos_data = []
for org in org_list:
# Query repositories for each organization
query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
if not all_repos_data:
return pd.DataFrame(), "No repositories found for the specified organizations."
# Create DataFrame
df = pd.DataFrame(all_repos_data)
csv_file = "organization_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
except Exception as e:
print(f"Error in fetch_org_repositories: {e}")
return pd.DataFrame(), f"Error: {str(e)}"
# Function to fetch discovered repositories for the dropdown
def get_discovered_repos():
global discovered_repos
return discovered_repos
def process_readme(owner, repo, branch):
# Fetch README content from the specified branch
#url = f"{owner}/{repo}/master/"
url = f"{owner}/{repo}/{branch}/"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
readme_content = response.text
#return "Failed to fetch README content.", "", "", None
return f"Failed to fetch README content from branch {branch}.", "", "", None
# Process README content with OpenAI
MODEL = "gpt-4o-mini"
completion =
{"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."},
{"role": "user", "content": f"""
Perform the following tasks on the following README file:
1. Extract the top 25 most important keywords from the text only.
2. Extract named entities (e.g., people, organizations, technologies).
3. Summarize the content in one paragraph.
Return the results in the following JSON format:
"keywords": ["keyword1", "keyword2", ...],
"entities": ["entity1", "entity2", ...],
"summary": "A concise summary of the README."
README file:
response_format={"type": "json_object"}
result = completion.choices[0].message.content
result_json = json.loads(result)
keywords = ", ".join(result_json["keywords"])
entities = ", ".join(result_json["entities"])
summary = result_json["summary"]
# Generate word cloud
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
return keywords, entities, summary, plt
# Function to get all branches of a repository
def get_branches(owner, repo):
url = f"{owner}/{repo}/branches"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
branches = [branch["name"] for branch in response.json()]
return branches
return []
# Function to get the default branch of a repository
def get_default_branch(owner, repo):
url = f"{owner}/{repo}"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
repo_data = response.json()
return repo_data["default_branch"]
return None
def fetch_files(owner, repo, path=""):
# Base URL for the GitHub API
url = f"{owner}/{repo}/contents/{path}" if path else f"{owner}/{repo}/contents"
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
return f"Failed to fetch files: {response.status_code}", []
files = []
for item in response.json():
if item["type"] == "file": # Only add files
# Use the globally defined allowed extensions
if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS):
"name": item["name"],
"path": item["path"],
"download_url": item["download_url"]
elif item["type"] == "dir":
# Recursively fetch files in subdirectories
sub_files = fetch_files(owner, repo, item["path"])
return files
# Function to fetch the content of a specific file
def fetch_file_content(owner, repo, branch, file_path):
file_url = f"{owner}/{repo}/{branch}/{file_path}"
response = requests.get(file_url)
if response.status_code == 200:
return response.text
return f"Failed to fetch file content: {response.status_code}"
# Function to query GPT-4o-mini
def ask_code_question(code_content, question):
if not code_content.strip():
return "No code content available to analyze."
if not question.strip():
return "Please enter a question about the code."
# Construct the prompt
prompt = f"""
Here is a Python file from a GitHub repository:
Please answer the following question about this file:
- {question}
# Query GPT-4o-mini
response =
{"role": "system", "content": "You are a helpful assistant skilled in understanding code."},
{"role": "user", "content": prompt}
# Extract and return GPT's response
return response.choices[0].message.content.strip()
except Exception as e:
return f"Error querying GPT-4o-mini: {str(e)}"
from graphviz import Source
import re
# Function to generate and clean Graphviz diagrams using GPT-4o-mini
def generate_dot_code_from_code(code_content, diagram_type):
if not code_content.strip():
return "No code content available to analyze."
# Construct the prompt dynamically based on diagram type
prompt = f"""
Here is some Python code from a GitHub repository:
Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable.
Don't include any other text. Don't provide any other explainatory commentry.
Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs.
#Ensure that the output of the code starts with "@startuml" and Ends with "@enduml".
# Query GPT-4o-mini
response =
{"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"},
{"role": "user", "content": prompt}
raw_dot_code = response.choices[0].message.content.strip()
validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets
pattern = r"digraph\b[\s\S]*?^\}"
match =, validated_dot_code,re.MULTILINE | re.DOTALL)
if match:
validated_dot_code = # Extract the matched content
return "Failed to extract valid Graphviz code."
return validated_dot_code
except Exception as e:
return f"Error querying GPT-4o-mini: {str(e)}"
def validate_and_fix_dot_code(dot_code):
# Check for unbalanced brackets
open_brackets = dot_code.count("{")
close_brackets = dot_code.count("}")
# If there are missing closing brackets, add them at the end
if open_brackets > close_brackets:
missing_brackets = open_brackets - close_brackets
dot_code += "}" * missing_brackets
return dot_code
def render_dot_code(dot_code, filename=None):
Renders Graphviz DOT code and saves it as a PNG image.
dot_code (str): The DOT code to render.
filename (str): Name for the output PNG file (without extension).
str: Path to the generated PNG image.
# Ensure the images directory exists
output_dir = "/content/images"
os.makedirs(output_dir, exist_ok=True)
# Save and render the diagram
output_path = os.path.join(output_dir, f"{filename}")
src = Source(dot_code, format="png")
rendered_path = src.render(output_path, cleanup=True)
# The `rendered_path` will have an extra `.png` extension
#png_path = f"{rendered_path}.png"
png_path = f"{rendered_path}"
# Remove the unnecessary file without the extension
#if os.path.exists(rendered_path):
# os.remove(rendered_path)
return png_path
except Exception as e:
return f"Error rendering diagram: {str(e)}"
import time
def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1):
Handles diagram generation and returns the rendered image for display.
code_content (str): The source code to analyze.
diagram_type (str): Type of diagram to generate.
retries (int): Number of times to retry checking for the file.
wait_time (float): Time (in seconds) to wait between retries.
PIL.Image.Image or str: The generated diagram or an error message.
print("Code content received:", code_content) # Debugging print
# Generate and render the diagram
image_path = generate_and_render_diagram(code_content, diagram_type)
print(f"Generated image path: {image_path}") # Debugging print
# Retry logic for checking file existence
for attempt in range(retries):
if os.path.exists(image_path):
return # Return the image if found
except Exception as e:
print(f"Error opening image on attempt {attempt + 1}: {e}")
print(f"Image not found. Retrying... ({attempt + 1}/{retries})")
time.sleep(wait_time) # Wait before the next check
# If the image is still not found after retries
print(f"Failed to generate image after {retries} retries: {image_path}")
return f"Failed to generate image: {image_path}"
# Gradio Interface
with gr.Blocks() as demo:
# Tab 1: Repository Discovery
with gr.Tab("Repository Discovery"):
with gr.Row():
topics_input = gr.Textbox(
label="Topics (comma-separated, leave empty to fetch by date only)",
placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)"
similar_topics = gr.Textbox(
label="Similar Topics (based on embeddings)",
gr.Button("Get Similar Topics").click(
with gr.Row():
start_date_input = gr.Textbox(
label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)",
placeholder="Set to filter recent repositories by date or leave empty"
language_filter = gr.Dropdown(
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"],
label="Language Filter",
stars_min = gr.Number(label="Stars Min", value=10)
stars_max = gr.Number(label="Stars Max", value=1000)
with gr.Row():
forks_min = gr.Number(label="Forks Min", value=0)
forks_max = gr.Number(label="Forks Max", value=500)
total_repos = gr.Number(label="Total Repositories", value=10, step=10)
sort_order = gr.Dropdown(
choices=["stars", "forks", "updated"],
label="Sort Order",
with gr.Row():
output_data = gr.Dataframe(label="Discovered Repositories")
output_file = gr.File(label="Download CSV", file_count="single")
gr.Button("Discover Repositories").click(
topics_input, start_date_input, language_filter, stars_min, stars_max,
forks_min, forks_max, total_repos, sort_order
outputs=[output_data, output_file]
# Tab 2: Organization Watch
with gr.Tab("Organization Watch"):
with gr.Row():
org_input = gr.Textbox(
label="Organizations (comma-separated)",
placeholder="e.g., facebookresearch, openai"
with gr.Row():
language_filter = gr.Dropdown(
choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"],
label="Language Filter",
stars_min = gr.Number(label="Stars Min", value=10)
stars_max = gr.Number(label="Stars Max", value=1000)
with gr.Row():
forks_min = gr.Number(label="Forks Min", value=0)
forks_max = gr.Number(label="Forks Max", value=500)
total_repos = gr.Number(label="Total Repositories", value=10, step=10)
sort_order = gr.Dropdown(
choices=["stars", "forks", "updated"],
label="Sort Order",
with gr.Row():
output_data = gr.Dataframe(label="Repositories by Organizations")
output_file = gr.File(label="Download CSV", file_count="single")
gr.Button("Fetch Organization Repositories").click(
org_input, language_filter, stars_min, stars_max, forks_min, forks_max,
sort_order, total_repos
outputs=[output_data, output_file]
# Tab 3: Code Analysis
# Gradio Interface for Code Analysis (Updated)
with gr.Tab("Code Analysis"):
with gr.Row():
repo_dropdown = gr.Dropdown(
label="Select Repository",
refresh_button = gr.Button("Refresh Repositories")
with gr.Row():
branch_dropdown = gr.Dropdown(
label="Select Branch",
with gr.Row():
keywords_output = gr.Textbox(label="Keywords")
entities_output = gr.Textbox(label="Entities")
with gr.Row():
summary_output = gr.Textbox(label="Summary")
wordcloud_output = gr.Plot(label="Word Cloud") # Use Plot instead of Image
# New components for displaying files
with gr.Row():
files_list = gr.Dropdown(
label="Files in Repository",
with gr.Row():
file_content_box = gr.Textbox(
label="File Content",
with gr.Row(): # Combine question input and button in the same row
question_input = gr.Textbox(
label="Ask a Question",
placeholder="Enter your question about the code...",
question_button = gr.Button("Get Answer")
with gr.Row():
answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False)
# Diagram generation interface
with gr.Row():
diagram_type = gr.Dropdown(
label="Select Diagram Type",
choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"],
value="Call Graph"
generate_diagram_button = gr.Button("Generate Diagram")
with gr.Row():
#diagram_output = gr.Image(label="Generated Diagram", type="pil")
diagram_output = gr.Image(
label="Generated Diagram",
type="pil", # Ensures compatibility with PIL.Image.Image
elem_id="diagram_output", # Add an ID for custom styling if needed
interactive=False, # No need for user interaction on the output
height=600, # Set a larger default height
width=800, # Set a larger default width
# Hook up the question button to ask_code_question
inputs=[file_content_box, question_input], # Inputs: Code content and user question
outputs=[answer_output] # Output: Answer from LLM
# Callback to generate and render the diagram
def generate_and_render_diagram(code_content, diagram_type):
# Generate DOT code
dot_code = generate_dot_code_from_code(code_content, diagram_type)
# Check for valid DOT code
if not dot_code.strip().startswith("digraph"):
return "Invalid DOT code generated."
unique_filename = f"diagram_{uuid.uuid4().hex}" # Generate a unique filename
return render_dot_code(dot_code, filename=unique_filename) # Render the diagram
inputs=[file_content_box, diagram_type], # Use file_content_box instead of answer_output
outputs=[diagram_output] # Output: PNG file path
# Refresh repository list
lambda: gr.update(choices=get_discovered_repos()),
# Update branch dropdown when a repository is selected
def update_branches(repo):
if repo:
owner, repo_name = repo.split("/")
branches = get_branches(owner, repo_name)
default_branch = get_default_branch(owner, repo_name)
return gr.update(choices=branches, value=default_branch)
return gr.update(choices=[], value=None)
# Analyze README content based on the selected repository and branch
def analyze_readme(repo, branch):
if repo and branch:
owner, repo_name = repo.split("/") # Extract the owner and repo name.
# Pass branch to analyze specific README
return process_readme(owner, repo_name, branch)
return "No repository or branch selected.", "", "", None
inputs=[repo_dropdown, branch_dropdown],
outputs=[keywords_output, entities_output, summary_output, wordcloud_output]
analyze_readme, # Function to call when branch changes
inputs=[repo_dropdown, branch_dropdown], # Pass both repo and branch as inputs
outputs=[keywords_output, entities_output, summary_output, wordcloud_output] # Update outputs
# Fetch files in the selected repository
def update_files(repo):
global files_data # To store fetched files for later use
if repo:
owner, repo_name = repo.split("/") # Extract owner and repo
print("Selected repository:", repo)
files = fetch_files(owner, repo_name) # Call with default path=""
files_data = files # Store the fetched files for later use
file_names = [f"{file['name']} ({file['path']})" for file in files] # Prepare dropdown labels
print("Fetched files:", files) # Debugging to ensure files are fetched correctly
print("File names for dropdown:", file_names) # Debugging to ensure dropdown labels are created
return gr.update(choices=file_names, value=None) # Update the dropdown
files_data = [] # Clear files_data if no repo is selected
return gr.update(choices=[], value=None)
lambda repo: update_files(repo),
outputs=[files_list] # Update both files_list and file_content_box
# Fetch and display file content
def display_file_content(repo, branch, selected_file):
if repo and branch and selected_file:
owner, repo_name = repo.split("/")
file_path = selected_file.split(" (")[1][:-1] # Extract the file path from the dropdown label
content = fetch_file_content(owner, repo_name, branch, file_path)
return content
return "No file selected."
inputs=[repo_dropdown, branch_dropdown, files_list],