Refactor: Integrate backend API and normalize data
This commit integrates the backend API for fetching and updating report data. It also includes a normalization function to handle data consistency between the API and local storage. Co-authored-by: anthonymuncher <anthonymuncher@gmail.com>
This commit is contained in:
33
backend/test/Machine_Learning/broken_street_light.py
Normal file
33
backend/test/Machine_Learning/broken_street_light.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# from bing_image_downloader import downloader
|
||||
|
||||
# downloader.download(
|
||||
# "broken streetlight",
|
||||
# limit=100,
|
||||
# output_dir='dataset_downloads',
|
||||
# adult_filter_off=True,
|
||||
# force_replace=False,
|
||||
# timeout=60
|
||||
# )
|
||||
|
||||
from bing_image_downloader import downloader
|
||||
from pathlib import Path
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
CLASS_NAME = "drainage"
|
||||
LIMIT = 200 # number of images to download
|
||||
OUTPUT_DIR = Path("dataset_downloads") # folder to store downloaded images
|
||||
|
||||
# Ensure the output directory exists
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# ---------- DOWNLOAD IMAGES ----------
|
||||
downloader.download(
|
||||
CLASS_NAME,
|
||||
limit=LIMIT,
|
||||
output_dir=str(OUTPUT_DIR),
|
||||
adult_filter_off=True, # keep it safe
|
||||
force_replace=False, # don't overwrite if already downloaded
|
||||
timeout=60 # seconds per request
|
||||
)
|
||||
|
||||
print(f"✅ Downloaded {LIMIT} images for class '{CLASS_NAME}' in '{OUTPUT_DIR}'")
|
||||
92
backend/test/Machine_Learning/fetch_datasets.py
Normal file
92
backend/test/Machine_Learning/fetch_datasets.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import os
|
||||
import zipfile
|
||||
import shutil
|
||||
import random
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
BASE_DIR = Path("dataset")
|
||||
DOWNLOAD_DIR = Path("downloads")
|
||||
CLASSES = ["pothole", "streetlight", "garbage", "signage"]
|
||||
TRAIN_SPLIT = 0.8 # 80% train, 20% val
|
||||
|
||||
os.makedirs(BASE_DIR, exist_ok=True)
|
||||
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
||||
|
||||
# Create folder structure
|
||||
for split in ["train", "val"]:
|
||||
for cls in CLASSES:
|
||||
os.makedirs(BASE_DIR / split / cls, exist_ok=True)
|
||||
|
||||
# ---------- AUTHENTICATION ----------
|
||||
def setup_kaggle_api():
|
||||
"""Load kaggle.json and set environment variables"""
|
||||
kaggle_path = Path("kaggle.json") # put kaggle.json in the same folder as this script
|
||||
if not kaggle_path.exists():
|
||||
raise FileNotFoundError("❌ kaggle.json not found! Download it from https://www.kaggle.com/settings")
|
||||
|
||||
with open(kaggle_path, "r") as f:
|
||||
creds = json.load(f)
|
||||
|
||||
os.environ["KAGGLE_USERNAME"] = creds["username"]
|
||||
os.environ["KAGGLE_KEY"] = creds["key"]
|
||||
print("✅ Kaggle API credentials loaded.")
|
||||
|
||||
# ---------- HELPERS ----------
|
||||
def unzip_and_move(zip_path, class_name):
|
||||
"""Unzip dataset and put images into dataset/train/ & val/ folders"""
|
||||
extract_path = Path("tmp_extract")
|
||||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||
zip_ref.extractall(extract_path)
|
||||
|
||||
# Collect images
|
||||
all_images = list(extract_path.rglob("*.jpg")) + list(extract_path.rglob("*.png")) + list(extract_path.rglob("*.jpeg"))
|
||||
random.shuffle(all_images)
|
||||
|
||||
# Train/Val split
|
||||
split_idx = int(len(all_images) * TRAIN_SPLIT)
|
||||
train_files = all_images[:split_idx]
|
||||
val_files = all_images[split_idx:]
|
||||
|
||||
for img in train_files:
|
||||
target = BASE_DIR / "train" / class_name / img.name
|
||||
shutil.move(str(img), target)
|
||||
|
||||
for img in val_files:
|
||||
target = BASE_DIR / "val" / class_name / img.name
|
||||
shutil.move(str(img), target)
|
||||
|
||||
shutil.rmtree(extract_path)
|
||||
|
||||
def kaggle_download(dataset_slug, out_zip):
|
||||
"""Download Kaggle dataset into downloads/ folder"""
|
||||
os.system(f'kaggle datasets download -d {dataset_slug} -p {DOWNLOAD_DIR} -o')
|
||||
return DOWNLOAD_DIR / out_zip
|
||||
|
||||
# ---------- MAIN ----------
|
||||
if __name__ == "__main__":
|
||||
setup_kaggle_api()
|
||||
|
||||
# Pothole dataset
|
||||
pothole_zip = kaggle_download("andrewmvd/pothole-detection", "pothole-detection.zip")
|
||||
unzip_and_move(pothole_zip, "pothole")
|
||||
|
||||
# Garbage dataset
|
||||
garbage_zip = kaggle_download("dataclusterlabs/domestic-trash-garbage-dataset", "domestic-trash-garbage-dataset.zip")
|
||||
unzip_and_move(garbage_zip, "garbage")
|
||||
|
||||
# TrashNet (alternative garbage dataset)
|
||||
trashnet_zip = kaggle_download("techsash/waste-classification-data", "waste-classification-data.zip")
|
||||
unzip_and_move(trashnet_zip, "garbage")
|
||||
|
||||
# Signage dataset
|
||||
signage_zip = kaggle_download("ahemateja19bec1025/traffic-sign-dataset-classification", "traffic-sign-dataset-classification.zip")
|
||||
unzip_and_move(signage_zip, "signage") # Combine all sign classes into one
|
||||
|
||||
#Drainage dataset (⚠️ still missing)
|
||||
print("⚠️ No Kaggle dataset found for drainage. Please add manually to dataset/train/drainage & val/drainage.")
|
||||
# Streetlight dataset (⚠️ still missing)
|
||||
print("⚠️ No Kaggle dataset found for streetlights. Please add manually to dataset/train/streetlight & val/streetlight.")
|
||||
|
||||
print("✅ All datasets downloaded, cleaned, and organized into 'dataset/'")
|
||||
43
backend/test/Machine_Learning/oraganize_path.py
Normal file
43
backend/test/Machine_Learning/oraganize_path.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import os
|
||||
import shutil
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
SRC_DIR = Path("dataset_downloads") # where new images are
|
||||
DST_DIR = Path("dataset") # your main dataset folder
|
||||
TRAIN_SPLIT = 0.8 # 80% train, 20% val
|
||||
|
||||
# Classes to process
|
||||
NEW_CLASSES = ["broken streetlight", "drainage"]
|
||||
|
||||
for cls in NEW_CLASSES:
|
||||
src_class_dir = SRC_DIR / cls
|
||||
if not src_class_dir.exists():
|
||||
print(f"⚠️ Source folder not found: {src_class_dir}")
|
||||
continue
|
||||
|
||||
# Prepare destination folders
|
||||
train_dest = DST_DIR / "train" / cls
|
||||
val_dest = DST_DIR / "val" / cls
|
||||
train_dest.mkdir(parents=True, exist_ok=True)
|
||||
val_dest.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# List all images
|
||||
images = list(src_class_dir.glob("*.*")) # jpg, png, jpeg
|
||||
random.shuffle(images)
|
||||
|
||||
# Split
|
||||
split_idx = int(len(images) * TRAIN_SPLIT)
|
||||
train_imgs = images[:split_idx]
|
||||
val_imgs = images[split_idx:]
|
||||
|
||||
# Move images
|
||||
for img in train_imgs:
|
||||
shutil.move(str(img), train_dest / img.name)
|
||||
for img in val_imgs:
|
||||
shutil.move(str(img), val_dest / img.name)
|
||||
|
||||
print(f"✅ Class '{cls}' added: {len(train_imgs)} train, {len(val_imgs)} val")
|
||||
|
||||
print("All new classes are organized and ready for training!")
|
||||
62
backend/test/Machine_Learning/street_light_scrapping.py
Normal file
62
backend/test/Machine_Learning/street_light_scrapping.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import os
|
||||
import zipfile
|
||||
import shutil
|
||||
import random
|
||||
from pathlib import Path
|
||||
import requests
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
BASE_DIR = Path("dataset")
|
||||
DOWNLOAD_DIR = Path("downloads")
|
||||
CLASS_NAME = "streetlight"
|
||||
TRAIN_SPLIT = 0.8 # 80% train, 20% val
|
||||
|
||||
os.makedirs(BASE_DIR / "train" / CLASS_NAME, exist_ok=True)
|
||||
os.makedirs(BASE_DIR / "val" / CLASS_NAME, exist_ok=True)
|
||||
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
||||
|
||||
def download_from_github(url: str, out_path: Path):
|
||||
print(f"⬇️ Trying download: {url}")
|
||||
resp = requests.get(url, stream=True)
|
||||
if resp.status_code != 200:
|
||||
print(f"❌ Download failed: status code {resp.status_code}")
|
||||
return False
|
||||
with open(out_path, "wb") as f:
|
||||
for chunk in resp.iter_content(8192):
|
||||
f.write(chunk)
|
||||
print(f"✅ Downloaded to {out_path}")
|
||||
return True
|
||||
|
||||
def unzip_and_split(zip_path: Path, class_name: str):
|
||||
extract_path = Path("tmp_extract")
|
||||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||
zip_ref.extractall(extract_path)
|
||||
|
||||
all_images = list(extract_path.rglob("*.jpg")) + list(extract_path.rglob("*.png")) + list(extract_path.rglob("*.jpeg"))
|
||||
if not all_images:
|
||||
print("⚠️ No images in extracted folder.")
|
||||
return
|
||||
|
||||
random.shuffle(all_images)
|
||||
split_idx = int(len(all_images) * TRAIN_SPLIT)
|
||||
train = all_images[:split_idx]
|
||||
val = all_images[split_idx:]
|
||||
|
||||
for img in train:
|
||||
shutil.move(str(img), BASE_DIR / "train" / class_name / img.name)
|
||||
for img in val:
|
||||
shutil.move(str(img), BASE_DIR / "val" / class_name / img.name)
|
||||
|
||||
shutil.rmtree(extract_path)
|
||||
print(f"✅ {class_name} split: {len(train)} train / {len(val)} val")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Try the GitHub repo from the paper
|
||||
streetlight_url = "https://github.com/Team16Project/Street-Light-Dataset/archive/refs/heads/main.zip"
|
||||
zip_path = DOWNLOAD_DIR / "streetlight_dataset.zip"
|
||||
|
||||
ok = download_from_github(streetlight_url, zip_path)
|
||||
if ok:
|
||||
unzip_and_split(zip_path, CLASS_NAME)
|
||||
else:
|
||||
print("⚠️ Could not download streetlight dataset. You may need to find alternative source.")
|
||||
40
backend/test/Machine_Learning/test_trained_ml.py
Normal file
40
backend/test/Machine_Learning/test_trained_ml.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import torch
|
||||
from torchvision import transforms, models
|
||||
from PIL import Image
|
||||
import os
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
NUM_CLASSES = 6
|
||||
CLASS_NAMES = ["broken_streetlight","drainage","garbage", "pothole","signage", "streetlight"]
|
||||
MODEL_PATH = "best_model.pth"
|
||||
TEST_IMAGES_DIR = "images" # folder containing test images
|
||||
|
||||
# ---------- MODEL ----------
|
||||
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
|
||||
model.fc = torch.nn.Linear(model.fc.in_features, NUM_CLASSES)
|
||||
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
|
||||
model = model.to(DEVICE)
|
||||
model.eval()
|
||||
|
||||
# ---------- IMAGE PREPROCESS ----------
|
||||
preprocess = transforms.Compose([
|
||||
transforms.Resize((224, 224)),
|
||||
transforms.ToTensor(),
|
||||
])
|
||||
|
||||
# ---------- INFERENCE ----------
|
||||
for image_name in os.listdir(TEST_IMAGES_DIR):
|
||||
image_path = os.path.join(TEST_IMAGES_DIR, image_name)
|
||||
if not image_path.lower().endswith(('.png', '.jpg', '.jpeg')):
|
||||
continue
|
||||
|
||||
image = Image.open(image_path).convert("RGB")
|
||||
input_tensor = preprocess(image).unsqueeze(0).to(DEVICE) # add batch dimension
|
||||
|
||||
with torch.no_grad():
|
||||
outputs = model(input_tensor)
|
||||
_, predicted = torch.max(outputs, 1)
|
||||
predicted_class = CLASS_NAMES[predicted.item()]
|
||||
|
||||
print(f"{image_name} --> Predicted class: {predicted_class}")
|
||||
41
backend/test/Machine_Learning/tets_sevarity.py
Normal file
41
backend/test/Machine_Learning/tets_sevarity.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import cv2
|
||||
from ultralytics import YOLO
|
||||
|
||||
# Load your trained YOLOv12 model
|
||||
model = YOLO("checkpoints/pothole_detector/weights/best.pt") # Path to your trained weights
|
||||
|
||||
# Define severity thresholds (you can adjust these based on your dataset)
|
||||
def classify_severity(box, image_height):
|
||||
x1, y1, x2, y2 = box
|
||||
area = (x2 - x1) * (y2 - y1)
|
||||
if area > 50000 or y2 > image_height * 0.75:
|
||||
return "High"
|
||||
elif area > 20000 or y2 > image_height * 0.5:
|
||||
return "Medium"
|
||||
else:
|
||||
return "Low"
|
||||
|
||||
# Draw bounding boxes with severity
|
||||
def draw_boxes_and_severity(image, results):
|
||||
for r in results: # iterate over Results objects
|
||||
for box in r.boxes.xyxy: # xyxy format
|
||||
x1, y1, x2, y2 = map(int, box.cpu().numpy())
|
||||
conf = float(r.boxes.conf[0]) if hasattr(r.boxes, "conf") else 0.0
|
||||
severity = classify_severity((x1, y1, x2, y2), image.shape[0])
|
||||
color = (0, 255, 0) if severity == "Low" else (0, 255, 255) if severity == "Medium" else (0, 0, 255)
|
||||
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
|
||||
cv2.putText(image, f"{severity} ({conf:.2f})", (x1, y1 - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
||||
return image
|
||||
|
||||
# Detect potholes in an image
|
||||
def detect_potholes(image_path, output_path="output.jpg"):
|
||||
image = cv2.imread(image_path)
|
||||
results = model(image) # Run inference
|
||||
image = draw_boxes_and_severity(image, results)
|
||||
cv2.imwrite(output_path, image)
|
||||
print(f"Output saved to {output_path}")
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
detect_potholes(r"images\pothole_1.jpg")
|
||||
17
backend/test/Machine_Learning/train_deetction.py
Normal file
17
backend/test/Machine_Learning/train_deetction.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from ultralytics import YOLO
|
||||
|
||||
def train():
|
||||
model = YOLO("yolov12n.pt") # pretrained YOLOv8 small
|
||||
model.train(
|
||||
data="D:/CTF_Hackathon/gensprintai2025/pothole-detection-yolov12.v2i.yolov12/data.yaml",
|
||||
epochs=10,
|
||||
imgsz=512,
|
||||
batch=8,
|
||||
device=0,
|
||||
project="checkpoints",
|
||||
name="pothole_detector",
|
||||
exist_ok=True
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
train()
|
||||
125
backend/test/Machine_Learning/train_ml.py
Normal file
125
backend/test/Machine_Learning/train_ml.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import os
|
||||
import torch
|
||||
from torch import nn, optim
|
||||
from torch.utils.data import DataLoader
|
||||
from torchvision import datasets, transforms, models
|
||||
from torch.cuda.amp import GradScaler, autocast
|
||||
from torch.utils.tensorboard import SummaryWriter
|
||||
import time
|
||||
import psutil
|
||||
|
||||
# ---------- CONFIG ----------
|
||||
DATA_DIR = "dataset" # dataset folder
|
||||
BATCH_SIZE = 16
|
||||
NUM_EPOCHS = 5
|
||||
LR = 1e-4
|
||||
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
NUM_CLASSES = 6 # pothole, streetlight, garbage
|
||||
NUM_WORKERS = 10 # Windows-safe
|
||||
|
||||
# ---------- DATA ----------
|
||||
train_transforms = transforms.Compose([
|
||||
transforms.Resize((224, 224)),
|
||||
transforms.RandomHorizontalFlip(),
|
||||
transforms.RandomRotation(15),
|
||||
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
|
||||
transforms.ToTensor(),
|
||||
])
|
||||
|
||||
val_transforms = transforms.Compose([
|
||||
transforms.Resize((224, 224)),
|
||||
transforms.ToTensor(),
|
||||
])
|
||||
|
||||
train_dataset = datasets.ImageFolder(os.path.join(DATA_DIR, "train"), transform=train_transforms)
|
||||
val_dataset = datasets.ImageFolder(os.path.join(DATA_DIR, "val"), transform=val_transforms)
|
||||
|
||||
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
|
||||
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
|
||||
|
||||
# ---------- MODEL ----------
|
||||
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
|
||||
model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)
|
||||
model = model.to(DEVICE)
|
||||
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
optimizer = optim.Adam(model.parameters(), lr=LR)
|
||||
scaler = GradScaler() # Mixed precision
|
||||
|
||||
# ---------- TENSORBOARD ----------
|
||||
writer = SummaryWriter(log_dir="runs/streetlight_classification")
|
||||
|
||||
# ---------- DEBUG FUNCTIONS ----------
|
||||
def print_gpu_memory():
|
||||
if DEVICE.type == "cuda":
|
||||
print(f"GPU Memory Allocated: {torch.cuda.memory_allocated()/1024**2:.2f} MB")
|
||||
print(f"GPU Memory Cached: {torch.cuda.memory_reserved()/1024**2:.2f} MB")
|
||||
|
||||
def print_cpu_memory():
|
||||
mem = psutil.virtual_memory()
|
||||
print(f"CPU Memory Usage: {mem.percent}% ({mem.used/1024**2:.2f}MB / {mem.total/1024**2:.2f}MB)")
|
||||
|
||||
# ---------- TRAINING FUNCTION ----------
|
||||
def train_model(num_epochs):
|
||||
best_acc = 0.0
|
||||
for epoch in range(num_epochs):
|
||||
start_time = time.time()
|
||||
model.train()
|
||||
running_loss = 0.0
|
||||
|
||||
for i, (inputs, labels) in enumerate(train_loader):
|
||||
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
|
||||
optimizer.zero_grad()
|
||||
|
||||
with autocast():
|
||||
outputs = model(inputs)
|
||||
loss = criterion(outputs, labels)
|
||||
|
||||
scaler.scale(loss).backward()
|
||||
|
||||
# Debug gradients for first batch
|
||||
if i == 0 and epoch == 0:
|
||||
for name, param in model.named_parameters():
|
||||
if param.grad is not None:
|
||||
print(f"Grad {name}: mean={param.grad.mean():.6f}, std={param.grad.std():.6f}")
|
||||
|
||||
scaler.step(optimizer)
|
||||
scaler.update()
|
||||
running_loss += loss.item()
|
||||
|
||||
if i % 10 == 0:
|
||||
print(f"[Epoch {epoch+1}][Batch {i}/{len(train_loader)}] Loss: {loss.item():.4f}")
|
||||
print_gpu_memory()
|
||||
print_cpu_memory()
|
||||
|
||||
avg_loss = running_loss / len(train_loader)
|
||||
|
||||
# ---------- VALIDATION ----------
|
||||
model.eval()
|
||||
correct, total = 0, 0
|
||||
with torch.no_grad():
|
||||
for inputs, labels in val_loader:
|
||||
inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
|
||||
outputs = model(inputs)
|
||||
_, preds = torch.max(outputs, 1)
|
||||
correct += (preds == labels).sum().item()
|
||||
total += labels.size(0)
|
||||
val_acc = correct / total
|
||||
|
||||
print(f"Epoch [{epoch+1}/{num_epochs}] completed in {time.time()-start_time:.2f}s")
|
||||
print(f"Train Loss: {avg_loss:.4f}, Val Accuracy: {val_acc:.4f}\n")
|
||||
|
||||
# TensorBoard logging
|
||||
writer.add_scalar("Loss/train", avg_loss, epoch)
|
||||
writer.add_scalar("Accuracy/val", val_acc, epoch)
|
||||
|
||||
# Save best model
|
||||
if val_acc > best_acc:
|
||||
best_acc = val_acc
|
||||
torch.save(model.state_dict(), "best_model.pth")
|
||||
print("✅ Saved best model.")
|
||||
|
||||
print(f"Training finished. Best Val Accuracy: {best_acc:.4f}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
train_model(NUM_EPOCHS)
|
||||
19
backend/test/check_torch.py
Normal file
19
backend/test/check_torch.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import torch
|
||||
|
||||
print("🔥 PyTorch version:", torch.__version__)
|
||||
|
||||
# Always True if torch installed
|
||||
print("✅ Torch is available:", torch.backends.mkl.is_available() or torch.backends.openmp.is_available())
|
||||
|
||||
# Check CUDA / GPU
|
||||
print("🖥️ CUDA available:", torch.cuda.is_available())
|
||||
if torch.cuda.is_available():
|
||||
print(" -> CUDA device count:", torch.cuda.device_count())
|
||||
print(" -> Current device:", torch.cuda.current_device())
|
||||
print(" -> GPU name:", torch.cuda.get_device_name(torch.cuda.current_device()))
|
||||
else:
|
||||
print(" -> Running on CPU only")
|
||||
|
||||
# Check MPS (for Apple Silicon M1/M2 Macs)
|
||||
if torch.backends.mps.is_available():
|
||||
print("🍎 MPS (Apple GPU) available")
|
||||
130
backend/test/test_backend.py
Normal file
130
backend/test/test_backend.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import requests
|
||||
import json
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
BASE_URL = "http://127.0.0.1:8000/api" # API root
|
||||
|
||||
# ----------------------
|
||||
# Helper function to log responses nicely
|
||||
# ----------------------
|
||||
def log_response(step_name, response):
|
||||
print(f"\n=== {step_name} ===")
|
||||
print("Status Code:", response.status_code)
|
||||
try:
|
||||
print("Response JSON:", json.dumps(response.json(), indent=2))
|
||||
except Exception:
|
||||
print("Response Text:", response.text)
|
||||
|
||||
# ----------------------
|
||||
# 1. Create a new user
|
||||
# ----------------------
|
||||
def create_user(name, email):
|
||||
url = f"{BASE_URL}/users"
|
||||
payload = {"name": name, "email": email}
|
||||
response = requests.post(url, json=payload)
|
||||
log_response("CREATE USER", response)
|
||||
if response.status_code == 200:
|
||||
user_data = response.json()
|
||||
return user_data.get("id"), user_data.get("name")
|
||||
return None, None
|
||||
|
||||
# ----------------------
|
||||
# 2. Submit a new report/ticket
|
||||
# ----------------------
|
||||
def submit_report(user_id, image_path):
|
||||
url = f"{BASE_URL}/report"
|
||||
if not Path(image_path).exists():
|
||||
print(f"Image file not found: {image_path}")
|
||||
return None
|
||||
|
||||
data = {
|
||||
"user_id": user_id,
|
||||
"latitude": 3.12345,
|
||||
"longitude": 101.54321,
|
||||
"description": "Automated test report"
|
||||
}
|
||||
with open(image_path, "rb") as img_file:
|
||||
files = {"image": img_file}
|
||||
response = requests.post(url, data=data, files=files)
|
||||
log_response("SUBMIT REPORT", response)
|
||||
if response.status_code == 201:
|
||||
return response.json().get("ticket_id")
|
||||
return None
|
||||
|
||||
# ----------------------
|
||||
# 3. Fetch all tickets
|
||||
# ----------------------
|
||||
def get_all_tickets():
|
||||
url = f"{BASE_URL}/tickets"
|
||||
response = requests.get(url)
|
||||
log_response("GET ALL TICKETS", response)
|
||||
|
||||
# ----------------------
|
||||
# 4. Fetch a single ticket
|
||||
# ----------------------
|
||||
def get_ticket(ticket_id):
|
||||
url = f"{BASE_URL}/tickets/{ticket_id}"
|
||||
response = requests.get(url)
|
||||
log_response(f"GET TICKET {ticket_id}", response)
|
||||
|
||||
# ----------------------
|
||||
# 5. Update ticket status
|
||||
# ----------------------
|
||||
def update_ticket(ticket_id, new_status):
|
||||
url = f"{BASE_URL}/tickets/{ticket_id}"
|
||||
payload = {"new_status": new_status} # <-- use new_status to match backend
|
||||
response = requests.patch(url, json=payload)
|
||||
log_response(f"UPDATE TICKET {ticket_id} TO {new_status}", response)
|
||||
|
||||
|
||||
|
||||
# ----------------------
|
||||
# 6. Fetch analytics
|
||||
# ----------------------
|
||||
def get_analytics():
|
||||
url = f"{BASE_URL}/analytics"
|
||||
response = requests.get(url)
|
||||
log_response("GET ANALYTICS", response)
|
||||
|
||||
# ----------------------
|
||||
# Main test flow
|
||||
# ----------------------
|
||||
if __name__ == "__main__":
|
||||
print("=== STARTING BACKEND TEST SCRIPT ===")
|
||||
|
||||
# # Step 1: Create user
|
||||
# user_name = "Test User"
|
||||
# user_email = f"testuser1@gmail.com"
|
||||
# user_id, returned_name = create_user(user_name, user_email)
|
||||
# if user_id:
|
||||
# print(f"Created user: {returned_name} with ID: {user_id}")
|
||||
# else:
|
||||
# print("Failed to create user, aborting script.")
|
||||
# exit(1)
|
||||
|
||||
user_id = "5fc2ac8b-6d77-4567-918e-39e31f749c79" # Use existing user ID for testing
|
||||
|
||||
# Step 2: Submit a ticket
|
||||
image_file = r"D:\CTF_Hackathon\gensprintai2025\images\potholes.jpeg" # Update this path
|
||||
ticket_id = submit_report(user_id, image_file)
|
||||
if ticket_id:
|
||||
print(f"Created ticket with ID: {ticket_id}")
|
||||
|
||||
# # Step 3: Fetch all tickets
|
||||
# get_all_tickets()
|
||||
|
||||
# Step 4: Fetch single ticket
|
||||
get_ticket(ticket_id)
|
||||
|
||||
# Step 5: Update ticket status to 'In Progress' then 'Fixed'
|
||||
update_ticket(ticket_id, "In Progress")
|
||||
get_ticket(ticket_id)
|
||||
update_ticket(ticket_id, "Fixed")
|
||||
|
||||
# Step 6: Fetch analytics
|
||||
get_analytics()
|
||||
else:
|
||||
print("Ticket creation failed, skipping ticket tests.")
|
||||
|
||||
print("\n=== BACKEND TEST SCRIPT COMPLETED ===")
|
||||
Reference in New Issue
Block a user