import os import requests import logging import subprocess import re from cleanvoice import Cleanvoice logger = logging.getLogger(__name__) class Denoiser: def __init__(self): self.api_key = os.environ.get("CLEANVOICE_API_KEY", "") if self.api_key: print("[Denoiser] ☁️ Cleanvoice API Ready (WhatsApp Support Active)") else: print("[Denoiser] ⚠️ No CLEANVOICE_API_KEY found — will use preprocessed WAV only") def process(self, audio_path: str, out_dir: str, **kwargs) -> dict: # Step A: Pre-convert to standard WAV (Handles .opus, .aac, .m4a) standard_input = os.path.join(out_dir, "input_converted.wav") result = subprocess.run([ "ffmpeg", "-y", "-i", audio_path, "-ar", "16000", "-ac", "1", standard_input ], capture_output=True) # Fallback to original if FFmpeg fails if result.returncode != 0 or not os.path.exists(standard_input): logger.warning("[Denoiser] FFmpeg conversion failed — using original file") standard_input = audio_path try: from cleanvoice import Cleanvoice cv = Cleanvoice({'api_key': self.api_key}) # Step B: Process via Cleanvoice API supported_kwargs = {k: v for k, v in kwargs.items() if k in ["fillers", "stutters", "long_silences"]} result = cv.process( standard_input, remove_noise=True, studio_sound=True, **supported_kwargs ) # Step C: Download the processed WAV file audio_data = requests.get(result.audio.url, timeout=60).content wav_output = os.path.join(out_dir, "clean_output.wav") with open(wav_output, "wb") as f: f.write(audio_data) # ✅ NEW: Convert WAV → MP3 (high quality) mp3_output = os.path.join(out_dir, "clean_output.mp3") subprocess.run([ "ffmpeg", "-y", "-i", wav_output, "-codec:a", "libmp3lame", "-b:a", "192k", "-ar", "44100", "-ac", "2", mp3_output ], capture_output=True) return {'audio_path': mp3_output} # Returns MP3! except Exception as e: logger.error(f"Denoiser error: {e}") # Fallback: Convert original to MP3 fallback_mp3 = os.path.join(out_dir, "fallback.mp3") subprocess.run([ "ffmpeg", "-y", "-i", audio_path, "-codec:a", "libmp3lame", "-b:a", "192k", fallback_mp3 ], capture_output=True) return {'audio_path': fallback_mp3} def clean_transcript_fillers(self, transcript: str) -> str: fillers = {"um", "umm", "uh", "basically", "like", "ante", "ane"} words = transcript.split() return " ".join([w for w in words if re.sub(r'[^a-z]', '', w.lower()) not in fillers]) # Important: Dummy methods to prevent "AttributeError" in app.py def _remove_fillers(self, audio, sr, segments): return audio, 0 def _remove_stutters(self, audio, sr, segments): return audio, 0