| import logging |
| import os |
| from collections import deque |
| from pathlib import Path |
| from threading import Event |
| from typing import Dict, Optional, Tuple |
|
|
| import gradio as gr |
|
|
| from gefs_wave import WaveDownloadManager, logger as wave_logger |
|
|
| DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave")) |
| manager = WaveDownloadManager(DATA_ROOT) |
| _refresh_started = Event() |
| _log_buffer: deque[str] = deque(maxlen=200) |
|
|
|
|
| class UILogHandler(logging.Handler): |
| def emit(self, record) -> None: |
| try: |
| message = self.format(record) |
| except Exception: |
| message = record.getMessage() |
| _log_buffer.append(message) |
|
|
|
|
| log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s") |
| ui_log_handler = UILogHandler() |
| ui_log_handler.setFormatter(log_formatter) |
|
|
| if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers): |
| wave_logger.addHandler(ui_log_handler) |
| wave_logger.setLevel(logging.INFO) |
|
|
|
|
| def _log_text() -> str: |
| if not _log_buffer: |
| return "No log messages yet." |
| return "\n".join(_log_buffer) |
|
|
|
|
| def ensure_refresh() -> None: |
| """Kick off the downloader thread once per process.""" |
| if not _refresh_started.is_set(): |
| _refresh_started.set() |
| manager.trigger_refresh() |
|
|
|
|
| def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]: |
| """Return status JSON and a markdown summary.""" |
| latest = status.get("latest_state", {}) |
| lines = [ |
| f"**Downloader status:** `{status.get('status', 'unknown')}`", |
| ] |
| if "message" in status: |
| lines.append(f"- Message: {status['message']}") |
| if latest: |
| if dataset := latest.get("product_label"): |
| lines.append(f"- Dataset: `{dataset}`") |
| lines.extend( |
| [ |
| f"- Latest cycle: `{latest.get('cycle', 'N/A')}`", |
| f"- Updated at: `{latest.get('updated_at', 'N/A')}`", |
| f"- Files cached: {len(latest.get('files', []))}", |
| ] |
| ) |
| if tarball := latest.get("tarball"): |
| lines.append(f"- Tarball path: `{tarball}`") |
| else: |
| lines.append("- No cycle downloaded yet.") |
| lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.") |
| if notice: |
| lines.append(f"\n⚠️ {notice}") |
| return status, "\n".join(lines) |
|
|
|
|
| def get_status() -> Tuple[Dict, str, Optional[str], str]: |
| ensure_refresh() |
| status = manager.status() |
| status_dict, status_text = format_status(status) |
| return status_dict, status_text, None, _log_text() |
|
|
|
|
| def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]: |
| ensure_refresh() |
| manager.trigger_refresh() |
| status = manager.status() |
| status_dict, status_text = format_status(status) |
| return status_dict, status_text, None, _log_text() |
|
|
|
|
| def download_tarball() -> Tuple[Dict, str, Optional[str], str]: |
| ensure_refresh() |
| status = manager.status() |
| state = status.get("latest_state", {}) |
|
|
| tarball_rel = state.get("tarball") |
| if not tarball_rel: |
| status_dict, status_text = format_status( |
| status, "Wave dataset not available yet. Trigger a refresh and try again." |
| ) |
| return status_dict, status_text, None, _log_text() |
|
|
| tarball_path = DATA_ROOT / tarball_rel |
| if not tarball_path.exists(): |
| status_dict, status_text = format_status( |
| status, "Tarball missing on disk. Trigger a refresh and try again." |
| ) |
| return status_dict, status_text, None, _log_text() |
|
|
| status_dict, status_text = format_status(status) |
| return status_dict, status_text, str(tarball_path), _log_text() |
|
|
|
|
| with gr.Blocks(title="GFES Wave 0 Downloader") as demo: |
| gr.Markdown( |
| """ |
| ## GFES Wave 0 Downloader |
| Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset. |
| |
| 1. Check the status panel to confirm the current cycle. |
| 2. Press **Trigger Refresh** to request a newer cycle if one is available. |
| 3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle. |
| """ |
| ) |
| status_json = gr.JSON(label="Downloader Status") |
| status_md = gr.Markdown() |
| logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False) |
|
|
| with gr.Row(): |
| refresh_button = gr.Button("Trigger Refresh", variant="secondary") |
| download_button = gr.DownloadButton("Download Latest Tarball", variant="primary") |
|
|
| demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box]) |
| refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box]) |
| download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box]) |
|
|
| demo.queue() |
|
|
|
|
| if __name__ == "__main__": |
| ensure_refresh() |
| port = int(os.environ.get("PORT", 7860)) |
| demo.launch(server_name="0.0.0.0", server_port=port, show_api=False) |
|
|