|
from mcp.server.fastmcp import FastMCP |
|
from mcp.server.fastmcp.prompts import base |
|
from starlette.responses import Response, JSONResponse |
|
import logging |
|
from logging.handlers import RotatingFileHandler |
|
from fastapi import FastAPI, Request |
|
import os |
|
from dotenv import load_dotenv |
|
import traceback |
|
import json |
|
import requests |
|
from mistralai import Mistral |
|
from anthropic import Anthropic |
|
|
|
import openai |
|
from io import BytesIO |
|
from PIL import Image |
|
import io |
|
from pathlib import Path |
|
import base64 |
|
from openai import OpenAI |
|
import shutil |
|
import PyPDF2 |
|
|
|
|
|
load_dotenv() |
|
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") |
|
|
|
|
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
|
logger = logging.getLogger("mcp_server") |
|
|
|
log_level = os.getenv('LOG_LEVEL', 'DEBUG') |
|
if log_level == 'ERROR': |
|
logger.setLevel(logging.ERROR) |
|
elif log_level == 'WARN': |
|
logger.setLevel(logging.WARN) |
|
elif log_level == 'DEBUG': |
|
logger.setLevel(logging.DEBUG) |
|
elif log_level == 'TRACE': |
|
logger.setLevel(logging.TRACE) |
|
else: |
|
logger.setLevel(logging.INFO) |
|
|
|
logger.addHandler(console_handler) |
|
|
|
|
|
app = FastAPI() |
|
mcp = FastMCP("Incident Comic Generator", app=app) |
|
|
|
|
|
|
|
@app.middleware("http") |
|
async def log_requests(request: Request, call_next): |
|
logger.info(f"Incoming request: {request.method} {request.url}") |
|
try: |
|
response = await call_next(request) |
|
return response |
|
except Exception as e: |
|
logger.exception("Unhandled exception in request") |
|
raise |
|
|
|
|
|
|
|
root_cause_mapping = { |
|
"human_error": "Human Error", |
|
"system_failure": "System Failure", |
|
"network_issue": "Network Issue", |
|
"software_bug": "Software Bug", |
|
"hardware_failure": "Hardware Failure", |
|
"others": "Others" |
|
} |
|
|
|
|
|
|
|
def call_mistral_text(prompt: str) -> str: |
|
"""Call Mistral AI's chat completion endpoint.""" |
|
|
|
model = "mistral-small-latest" |
|
|
|
client = Mistral(api_key=MISTRAL_API_KEY) |
|
|
|
chat_response = client.chat.complete( |
|
model=model, |
|
max_tokens=1024, |
|
temperature=0.8, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": prompt, |
|
}, |
|
] |
|
) |
|
|
|
return (chat_response.choices[0].message.content) |
|
|
|
|
|
def call_qwen_text(system_prompt: str, user_prompt: str) -> str: |
|
"""Call Qwen via NEBIUS AI's chat completion endpoint.""" |
|
|
|
client = OpenAI( |
|
base_url="https://api.studio.nebius.com/v1/", |
|
api_key=os.environ.get("NEBIUS_API_KEY") |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model="Qwen/Qwen2.5-Coder-32B-Instruct", |
|
max_tokens=8192, |
|
temperature=0.5, |
|
top_p=0.95, |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": system_prompt, |
|
}, |
|
{ |
|
"role": "user", |
|
"content": user_prompt, |
|
}, |
|
] |
|
) |
|
|
|
print(response.to_json()) |
|
print(response.choices[0].message.content) |
|
return (response.choices[0].message.content) |
|
|
|
|
|
def call_claude_text(system_prompt: str, user_prompt: str) -> str: |
|
"""Call Claude via Anthropic's API.""" |
|
client = Anthropic( |
|
api_key=os.environ.get("ANTHROPIC_API_KEY") |
|
) |
|
|
|
response = client.messages.create( |
|
model="claude-3-5-sonnet-latest", |
|
max_tokens=8192, |
|
temperature=0.5, |
|
system=system_prompt, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": user_prompt, |
|
} |
|
] |
|
) |
|
|
|
print(response.model_dump_json(indent=2)) |
|
print(response.content[0].text) |
|
return response.content[0].text |
|
|
|
|
|
|
|
def delete_folder_contents(folder_path): |
|
""" |
|
Deletes all files and subdirectories inside the specified folder. |
|
|
|
:param folder_path: Path to the folder whose contents should be deleted. |
|
""" |
|
|
|
if not os.path.exists(folder_path): |
|
print(f"The folder {folder_path} does not exist.") |
|
return |
|
|
|
|
|
for item in os.listdir(folder_path): |
|
item_path = os.path.join(folder_path, item) |
|
|
|
|
|
if os.path.isfile(item_path) or os.path.islink(item_path): |
|
os.unlink(item_path) |
|
print(f"Deleted file: {item_path}") |
|
|
|
elif os.path.isdir(item_path): |
|
shutil.rmtree(item_path) |
|
print(f"Deleted directory: {item_path}") |
|
|
|
print(f"All contents of {folder_path} have been deleted.") |
|
|
|
|
|
|
|
@mcp.tool() |
|
def analyze_incident(incident_description: str) -> str: |
|
""" |
|
Analyzes an incident description and provides root cause analysis using AI. |
|
|
|
This tool takes a detailed incident description and uses Qwen AI to analyze |
|
the incident and suggest possible root causes based on the provided information. |
|
|
|
Args: |
|
incident_description (str): Detailed description of the incident that occurred. |
|
Should include relevant context, symptoms, and timeline. |
|
|
|
Returns: |
|
str: AI-generated analysis with suggested root cause, or an error message |
|
if the analysis fails. |
|
|
|
Example: |
|
>>> analyze_incident("Server crashed at 3 AM due to memory leak in payment service") |
|
"Based on the description, the root cause appears to be..." |
|
""" |
|
try: |
|
logger.info(f"Analyzing incident: {incident_description}") |
|
system_prompt = f""" |
|
You are a technical analyst with expertise in system troubleshooting and root cause analysis. Your task is to analyze the following incident in detail and identify the most likely root cause based on the information provided. |
|
|
|
Follow these steps carefully: |
|
|
|
1. **Break down the incident**: Identify the affected system, service, or process, along with observed symptoms and context (e.g., environment, configuration, user actions). |
|
2. **Identify patterns and anomalies**: Look for any unusual behavior, error messages, performance metrics, recent changes, or external dependencies mentioned in the incident description. |
|
3. **Consider multiple root causes**: Propose 2β3 potential root causes and evaluate each one, ruling out unlikely options with clear reasoning. |
|
4. **Provide a concise root cause statement**: Clearly state the most likely root cause, supported by evidence from the incident description. |
|
5. **Highlight missing information**: If applicable, indicate what additional data (e.g., logs, metrics, configurations) would help refine the analysis. |
|
|
|
Format your response as follows: |
|
|
|
- **Incident Summary**: A 2β3 sentence summary of the incident. |
|
- **Key Observations**: List critical details, symptoms, or anomalies in bullet points. |
|
- **Potential Root Causes**: Discuss 2β3 possible causes with brief evaluations. |
|
- **Confirmed Root Cause**: State the most likely root cause with supporting evidence. |
|
- **Additional Data Needed**: Specify any further information required for a more accurate analysis, if applicable. |
|
|
|
Use precise language, avoid assumptions beyond the given information, and ensure your analysis is logical and well-supported. |
|
""" |
|
analysis_prompt = f""" |
|
Analyze the following incident in detail to identify the most likely root cause: {incident_description}. Follow these steps: |
|
1. Break down the incident into key components, including the system, service, or process affected, the observed symptoms, and the context (e.g., environment, configuration, or user actions). |
|
2. Identify any patterns, anomalies, or contributing factors based on the provided description, such as error messages, performance metrics, recent changes, or external dependencies. |
|
3. Consider multiple potential root causes and evaluate each one, ruling out unlikely causes with clear reasoning. |
|
4. Provide a concise and precise root cause statement, supported by evidence from the incident description. |
|
5. If relevant, highlight any missing information that could refine the analysis and suggest what additional data (e.g., logs, metrics, or configurations) would be helpful. |
|
Format the response as follows: |
|
- **Incident Summary**: Summarize the incident in 2-3 sentences. |
|
- **Key Observations**: List critical details, symptoms, or anomalies in bullet points. |
|
- **Potential Root Causes**: Discuss 2-3 possible causes with brief evaluations. |
|
- **Confirmed Root Cause**: State the most likely root cause with supporting evidence. |
|
- **Additional Data Needed**: Specify any further information required for a more accurate analysis, if applicable. |
|
""" |
|
analysis = call_qwen_text(system_prompt, analysis_prompt) |
|
|
|
return analysis |
|
except Exception as e: |
|
logger.error(f"Error analyzing incident: {str(e)}") |
|
return {"error": f"Unable to analyze incident - {str(e)}"} |
|
|
|
|
|
@mcp.tool() |
|
def generate_comic_story(incident_description: str, root_cause: str) -> str: |
|
""" |
|
Generates a humorous comic story based on an incident and its root cause. |
|
|
|
This tool creates entertaining comic narratives that help make incident reviews |
|
more engaging while still being educational about the technical issues involved. |
|
|
|
Args: |
|
incident_description (str): Description of the incident that occurred. |
|
root_cause (str): The identified or suspected root cause of the incident. |
|
|
|
Returns: |
|
str: Generated humorous comic story, or an error message if the story |
|
generation fails. |
|
|
|
Example: |
|
>>> generate_comic_story("Database went down", "Disk space full") |
|
"Title: The Disk Disaster\nPanel 1: ..." |
|
""" |
|
try: |
|
logger.info(f"Generating comic story for incident: {incident_description}, root cause: {root_cause}") |
|
story_prompt = f""" |
|
Create a 3β4 panel humorous comic strip inspired by the incident: '{incident_description}', caused by: '{root_cause}'. Follow these guidelines: |
|
1. **Title**: Provide a catchy, humorous title that reflects the incident or root cause. |
|
2. **Characters**: Include 2β3 characters (e.g., IT admin, server, anthropomorphic database) with distinct personalities that add humor (e.g., a panicked admin, a smug server). |
|
3. **Dialogue**: Write concise, witty dialogue (1β2 short lines per character per panel) that pokes fun at the incident or root cause, using technical jargon for comedic effect. |
|
4. **Style**: Use a clean, cartoonish style with exaggerated expressions to emphasize humor (e.g., wide-eyed panic or a smirking database). |
|
5. **Color**: Use a vibrant color palette with bright tones for characters and muted tones for technical elements (e.g., servers, code) to create contrast. |
|
6. **Panels**: Structure as 3β4 panels, with the first panel setting up the incident, middle panels escalating the humor, and the final panel delivering a punchline tied to the root cause. |
|
7. **Humor**: Focus on absurdity, irony, or exaggeration related to the incident/root cause (e.g., a database refusing to cooperate or an admin blaming 'gremlins'). |
|
Format the response as: |
|
- **Title**: [Comic title] |
|
- **Style and Color**: [Brief description of style and palette] |
|
- **Panel 1**: [Scene, characters, dialogue, visual description] |
|
- **Panel 2**: [Scene, characters, dialogue, visual description] |
|
- **Panel 3**: [Scene, characters, dialogue, visual description] |
|
- **Panel 4 (if used)**: [Scene, characters, dialogue, visual description] |
|
Keep the total description under 1000 characters, ensuring humor aligns with the technical context of the incident and root cause. |
|
""" |
|
story = call_mistral_text(story_prompt) |
|
|
|
return story |
|
except Exception as e: |
|
logger.error(f"Error generating comic story: {str(e)}") |
|
return {"error": f"Unable to generate comic story - {str(e)}"} |
|
|
|
@mcp.tool() |
|
def generate_solution_recommendation(incident_description: str, root_cause: str) -> str: |
|
""" |
|
Generates a technical solution recommendation based on an incident and its root cause. |
|
|
|
This tool provides actionable technical solutions to address the root cause |
|
of incidents and prevent recurrence, with consideration for best practices. |
|
|
|
Args: |
|
incident_description (str): Description of the incident that occurred. |
|
root_cause (str): The identified or suspected root cause of the incident. |
|
|
|
Returns: |
|
str: Recommended technical solution, or an error message if the solution |
|
generation fails. |
|
|
|
Example: |
|
>>> generate_solution_recommendation("Database went down", "Disk space full") |
|
"1. Immediately increase disk capacity... 2. Implement monitoring..." |
|
""" |
|
try: |
|
logger.info(f"Generating solution for incident: {incident_description}, root cause: {root_cause}") |
|
system_prompt = f""" |
|
You are a technical expert tasked with generating detailed and actionable solution recommendations for incidents described by users. Your goal is to provide clear, step-by-step guidance that resolves the issue efficiently and prevents future occurrences. |
|
|
|
When given an incident description, follow these instructions: |
|
|
|
1. **Understand the Incident**: Carefully read and interpret the details of the incident. |
|
2. **Provide Immediate Actions**: List the first steps to mitigate or resolve the issue quickly. |
|
3. **Offer Technical Solutions**: Give a numbered list of specific, actionable steps to implement the fix. |
|
4. **Include Best Practices**: Suggest industry-standard practices relevant to the solution. |
|
5. **Outline Preventive Measures**: Provide strategies to avoid recurrence of the incident. |
|
|
|
Format your response in a clear, numbered structure with well-defined sections. Use precise technical language where appropriate, and ensure each section is easy to follow and understand. |
|
|
|
Do not include any root cause analysis unless explicitly requested. Focus only on solution recommendations. |
|
""" |
|
solution_prompt = f""" |
|
Based on the incident described as '{incident_description}' with the identified root cause '{root_cause}', generate a detailed technical solution recommendation to resolve the issue and prevent recurrence. Follow these guidelines: |
|
1. Propose a comprehensive solution that directly addresses the root cause and mitigates the incident's impact. |
|
2. Provide actionable steps, numbered sequentially, with clear, concise instructions for implementation, including any tools, configurations, or processes required. |
|
3. Incorporate relevant best practices for the affected system, service, or technology to ensure robust resolution and operational stability. |
|
4. Include preventive measures to avoid similar incidents in the future, such as monitoring, automation, or process improvements. |
|
5. Highlight any potential risks or trade-offs associated with the proposed solution (e.g., downtime, resource requirements, or compatibility issues). |
|
6. If applicable, suggest metrics or validation steps to confirm the solution's effectiveness post-implementation. |
|
Format the response as follows: |
|
- **Solution Overview**: Summarize the proposed solution in 2-3 sentences. |
|
- **Actionable Steps**: Provide a numbered list of clear, technical instructions for resolving the incident. |
|
- **Best Practices**: List 2-3 relevant best practices to enhance system reliability or performance. |
|
- **Preventive Measures**: Describe 2-3 strategies to prevent recurrence, including monitoring or automation recommendations. |
|
- **Risks and Trade-offs**: Highlight any potential challenges or limitations of the solution. |
|
- **Validation Steps**: Specify how to verify the solution's success, including metrics or tests to monitor. |
|
""" |
|
solution = call_claude_text(system_prompt, solution_prompt) |
|
|
|
return solution |
|
except Exception as e: |
|
logger.error(f"Error generating solution recommendation: {str(e)}") |
|
return {"error": f"Unable to generate solution recommendation - {str(e)}"} |
|
|
|
|
|
@mcp.tool() |
|
def get_root_cause_categories() -> dict: |
|
""" |
|
Retrieves the predefined root cause categories for incident classification. |
|
|
|
This tool returns a mapping of root cause category keys to their human-readable |
|
descriptions, which can be used for consistent incident categorization. |
|
|
|
Args: |
|
None |
|
|
|
Returns: |
|
dict: Categories result containing either: |
|
- On success: {"categories": dict} - Mapping of category keys to descriptions |
|
- On error: {"error": str} - Error message describing what went wrong |
|
|
|
Example: |
|
>>> get_root_cause_categories() |
|
{ |
|
"categories": { |
|
"human_error": "Human Error", |
|
"system_failure": "System Failure", |
|
"network_issue": "Network Issue", |
|
... |
|
} |
|
} |
|
""" |
|
try: |
|
logger.info("Fetching root cause categories") |
|
return {"categories": root_cause_mapping} |
|
except Exception as e: |
|
logger.error(f"Error fetching root cause categories: {str(e)}") |
|
return {"error": f"Unable to fetch categories - {str(e)}"} |
|
|
|
|
|
@mcp.tool() |
|
def execute_incident_code(code: str) -> str: |
|
""" |
|
Executes Python code related to incident analysis or processing. |
|
|
|
This tool allows for dynamic execution of Python code that might be needed |
|
for custom incident analysis, data processing, or calculations. Code is |
|
executed in a restricted environment for security. |
|
|
|
Args: |
|
code (str): Valid Python code to execute. Should be incident-related |
|
and avoid dangerous operations. |
|
|
|
Returns: |
|
str: Execution result containing either: |
|
- On success: String representation of local variables after execution |
|
- On error: Error message with details about the execution failure |
|
|
|
Example: |
|
>>> execute_incident_code("result = 2 + 2") |
|
"{'result': 4}" |
|
|
|
Security Note: |
|
Code is executed with restricted permissions. Avoid file operations, |
|
network calls, or other potentially dangerous operations. |
|
""" |
|
try: |
|
logger.info("Executing incident-related Python code") |
|
local_vars = {} |
|
exec(code, {}, local_vars) |
|
return str(local_vars) |
|
except Exception as e: |
|
logger.error("Incident code execution failed:\n" + traceback.format_exc()) |
|
return f"Error: Unable to execute code - {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool() |
|
def generate_image_with_dalle3(prompt: str) -> str: |
|
""" |
|
Generates an image using the Flux model via Nebius API. |
|
|
|
This tool creates high-quality images from text prompts (max 2000 characters) using the FLUX.1-dev |
|
model. The generated images are 1024x1024 pixels in PNG format and saved locally. |
|
|
|
Args: |
|
prompt (str): A descriptive text prompt for the image to generate. Be specific |
|
about details like title, characters, dialogue, style, composition, colors, and subject |
|
for best results. |
|
|
|
Returns: |
|
str: File path to the saved image (e.g., 'generated_images/comic_image_-620626766907224227.png'), |
|
or an error message if image generation fails. |
|
|
|
Raises: |
|
ValueError: If no image data is returned from the API. |
|
Exception: For other API or processing errors. |
|
|
|
Example: |
|
>>> generate_image_with_flux("A cartoon server room with smoke coming out") |
|
"generated_images/comic_image_123456789.png" |
|
""" |
|
import openai |
|
try: |
|
client = openai.OpenAI() |
|
response = client.images.generate( |
|
model="dall-e-3", |
|
prompt=prompt, |
|
n=1, |
|
size="1024x1024", |
|
quality="hd", |
|
response_format="url", |
|
style="vivid" |
|
|
|
) |
|
image_url = response.data[0].url |
|
logger.info(f'Generated image URL: {image_url}') |
|
|
|
|
|
image_response = requests.get(image_url, timeout=30) |
|
image_response.raise_for_status() |
|
image = Image.open(BytesIO(image_response.content)) |
|
|
|
|
|
temp_dir = Path("generated_images") |
|
temp_dir.mkdir(exist_ok=True) |
|
image_path = temp_dir / f"comic_image_{hash(prompt)}.png" |
|
image.save(image_path) |
|
normalized_path = str(image_path).replace("\\", "/") |
|
logger.info(f"Image saved to: {normalized_path}") |
|
|
|
|
|
return normalized_path |
|
except Exception as e: |
|
logger.error(f"Error generating image: {str(e)}") |
|
raise Exception(f"Image generation failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool() |
|
def extract_text_from_pdf(file_path: str) -> str: |
|
""" |
|
Extracts text from a PDF file for incident analysis. |
|
|
|
This tool reads a PDF file and extracts its text content using PyPDF2, |
|
suitable for processing incident reports. |
|
|
|
Args: |
|
file_path (str): Path to the PDF file. |
|
|
|
Returns: |
|
str: Extracted text from the PDF, or an error message if extraction fails. |
|
|
|
Example: |
|
>>> extract_text_from_pdf("/path/to/report.pdf") |
|
"Incident report: Server crashed due to..." |
|
""" |
|
try: |
|
with open(file_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
text = "" |
|
for page in reader.pages: |
|
text += page.extract_text() or "" |
|
return text.strip() |
|
except Exception as e: |
|
raise ValueError(f"Failed to extract text from PDF: {str(e)}") |
|
|
|
|
|
|
|
@mcp.custom_route("/health", methods=["GET"]) |
|
async def health_check(request: Request) -> Response: |
|
""" |
|
Health check endpoint for monitoring server status. |
|
|
|
Returns: |
|
JSONResponse: Status information indicating server health |
|
""" |
|
return JSONResponse({"status": "ok"}) |
|
|
|
|
|
async def start_mcp_server(): |
|
try: |
|
print("π Starting server...") |
|
logger.info("Starting Incident Comic Generator server") |
|
await mcp.run(transport="sse") |
|
|
|
except Exception as e: |
|
logger.error(f"Server failed to start: {str(e)}") |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
print("π Starting server...") |
|
logger.info("Starting Incident Comic Generator server") |
|
mcp.run(transport="sse") |
|
|
|
except Exception as e: |
|
logger.error(f"Server failed to start: {str(e)}") |
|
raise |
|
|