""" ClearWave AI — API Space (FastAPI) =================================== Pipeline: 1. Download audio from URL 2. Denoise → Cleanvoice SDK (tested & working) 3. Transcribe → Groq Whisper large-v3 / faster-whisper fallback 4. Translate → Helsinki-NLP / NLLB / Google fallback 5. Summarize → Extractive (position-scored) 6. Upload to Cloudinary Environment vars: CLEANVOICE_API_KEY, CLOUD_NAME, API_KEY, API_SECRET, GROQ_API_KEY """ import os import json import time import tempfile import logging import shutil import requests import cloudinary import cloudinary.uploader from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from denoiser import Denoiser # ✅ Your working denoiser from transcriber import Transcriber # Your transcriber from translator import Translator # Your translator # ── Cloudinary config ───────────────────────────────────────────── cloudinary.config( cloud_name=os.environ.get("CLOUD_NAME"), api_key=os.environ.get("API_KEY"), api_secret=os.environ.get("API_SECRET"), ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ── Singletons (loaded once at startup) ────────────────────────── denoiser = Denoiser() transcriber = Transcriber() translator = Translator() app = FastAPI(title="ClearWave AI API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def _safe_cleanup(out_dir): """Remove temp directory safely — never raises.""" try: if hasattr(denoiser, 'cleanup_temp_files'): denoiser.cleanup_temp_files(out_dir) elif out_dir and os.path.exists(out_dir): shutil.rmtree(out_dir, ignore_errors=True) except Exception as e: logger.warning(f"Cleanup warning (non-fatal): {e}") def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", opt_fillers=True, opt_stutters=True, opt_silences=True): """ Generator — yields SSE-style dicts at each step. """ out_dir = tempfile.mkdtemp() word_segs = [] stats = {} try: # ── Step 1: Cleanvoice API ──────────────────────────────── yield {"status": "processing", "step": 1, "message": "Step 1/4 — Enhancing audio with ClearWave AI..."} result = denoiser.process( audio_path, out_dir, fillers=opt_fillers, stutters=opt_stutters, long_silences=opt_silences, ) clean_audio = result["audio_path"] stats = { "noise_method": "Cleanvoice API", "fillers": opt_fillers, "stutters": opt_stutters, "silences": opt_silences, } logger.info("Cleanvoice enhancement complete") # ── Step 2: Transcribe ───────────────────────────────────── yield {"status": "processing", "step": 2, "message": "Step 2/4 — Transcribing..."} transcript, detected_lang, t_method = transcriber.transcribe(clean_audio, src_lang) word_segs = transcriber._last_segments # Clean transcript fillers too if opt_fillers: transcript = denoiser.clean_transcript_fillers(transcript) logger.info(f"Transcription: {len(transcript.split())} words, lang={detected_lang}") # ── Step 3: Translate ────────────────────────────────────── translation = transcript tl_method = "same language" if tgt_lang != "auto" and detected_lang != tgt_lang: yield {"status": "processing", "step": 3, "message": "Step 3/4 — Translating..."} translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) logger.info(f"Translation done via {tl_method}") else: yield {"status": "processing", "step": 3, "message": "Step 3/4 — Skipping translation (same language)"} # ── Step 4: Summarize & Cloudinary ──────────────────────── yield {"status": "processing", "step": 4, "message": "Step 4/4 — Summarizing & uploading..."} summary = translator.summarize(transcript) enhanced_url = None try: upload_result = cloudinary.uploader.upload( clean_audio, resource_type="video", folder="clearwave_enhanced", ) enhanced_url = upload_result["secure_url"] logger.info(f"Cloudinary upload: {enhanced_url}") except Exception as e: logger.error(f"Cloudinary failed: {e}") # ✅ yield done INSIDE try so cleanup never interrupts it yield { "status": "done", "step": 4, "message": "✅ Complete!", "transcript": transcript, "translation": translation, "summary": summary, "enhancedAudio": enhanced_url, "stats": { "language": detected_lang.upper(), "denoiser": stats, "transcription_method": t_method, "translation_method": tl_method, "word_segments": len(word_segs), "transcript_words": len(transcript.split()), }, } except Exception as e: logger.error(f"Pipeline error: {e}", exc_info=True) yield {"status": "error", "message": f"Pipeline failed: {str(e)}"} finally: # ✅ Cleanup is now in finally — runs after done is yielded, never crashes pipeline _safe_cleanup(out_dir) @app.get("/api/health") async def health(): return JSONResponse({"status": "ok", "service": "ClearWave AI API"}) @app.post("/api/process-url") async def process_url(request: Request): data = await request.json() audio_url = data.get("audioUrl") audio_id = data.get("audioId", "") src_lang = data.get("srcLang", "auto") tgt_lang = data.get("tgtLang", "te") opt_fillers = data.get("optFillers", True) opt_stutters = data.get("optStutters", True) opt_silences = data.get("optSilences", True) if not audio_url: return JSONResponse({"error": "audioUrl required"}, status_code=400) async def generate(): def sse(obj): return "data: " + json.dumps(obj) + "\n\n" yield sse({"status": "processing", "step": 0, "message": "Downloading audio..."}) # Download audio from URL try: resp = requests.get(audio_url, timeout=60, stream=True) resp.raise_for_status() lower_url = audio_url.lower().split("?")[0] suffix = ".mp3" if ".opus" in lower_url: suffix = ".opus" elif ".ogg" in lower_url: suffix = ".ogg" elif ".aac" in lower_url: suffix = ".aac" elif ".m4a" in lower_url: suffix = ".m4a" elif ".wav" in lower_url: suffix = ".wav" tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) for chunk in resp.iter_content(chunk_size=65536): if chunk: tmp.write(chunk) tmp.close() except Exception as e: yield sse({"status": "error", "message": f"Download failed: {e}"}) return # Run full pipeline for result in run_pipeline( tmp.name, src_lang, tgt_lang, opt_fillers, opt_stutters, opt_silences ): result["audioId"] = audio_id yield sse(result) try: os.unlink(tmp.name) except Exception: pass return StreamingResponse( generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, )