diff --git a/.env.example b/.env.example index 5ca3ed80..3032225b 100644 --- a/.env.example +++ b/.env.example @@ -216,6 +216,46 @@ EVA_RECORD_IDS= # Logging level (DEBUG | INFO | WARNING | ERROR | CRITICAL) EVA_LOG_LEVEL=INFO +# ============================================== +# Optional: Turn Detection & VAD Configuration +# ============================================== +# Fine-tune user turn detection and voice activity detection. +# Leave commented to use smart defaults. + +# User turn start strategy: vad | transcription | external +# - vad: Start turn when VAD detects speech (default) +# - transcription: Start turn when STT produces transcription +# - external: Delegate to external service (e.g., Deepgram Flux) +# EVA_MODEL__TURN_START_STRATEGY=vad + +# User turn start strategy parameters (JSON) +# EVA_MODEL__TURN_START_STRATEGY_PARAMS='{}' + +# User turn stop strategy: turn_analyzer | speech_timeout | external +# - turn_analyzer: Use smart turn analyzer to detect natural turn end (default) +# - speech_timeout: Stop after fixed silence duration +# - external: Delegate to external service +# EVA_MODEL__TURN_STOP_STRATEGY=turn_analyzer + +# User turn stop strategy parameters (JSON) +# For speech_timeout: {"user_speech_timeout": 0.8} +# For turn_analyzer: automatically uses smart turn detection +# EVA_MODEL__TURN_STOP_STRATEGY_PARAMS='{}' + +# Note: For services with built-in turn detection (e.g., Deepgram Flux), set both to 'external': +# EVA_MODEL__TURN_START_STRATEGY=external +# EVA_MODEL__TURN_STOP_STRATEGY=external + +# VAD (Voice Activity Detection) analyzer: silero | none +# EVA_MODEL__VAD=silero + +# VAD parameters (JSON) +# - confidence: Minimum confidence threshold (0.0-1.0, default: 0.7) +# - start_secs: Duration to wait before confirming voice start (default: 0.2) +# - stop_secs: Duration to wait before confirming voice stop (default: 0.2) +# - min_volume: Minimum audio volume threshold (0.0-1.0, default: 0.6) +# EVA_MODEL__VAD_PARAMS='{"start_secs": 0.2, "stop_secs": 0.2, "min_volume": 0.6, "confidence": 0.7}' + # ============================================== # Optional: Perturbations # ============================================== diff --git a/src/eva/assistant/pipecat_server.py b/src/eva/assistant/pipecat_server.py index b4ae6d93..3b5a23e2 100644 --- a/src/eva/assistant/pipecat_server.py +++ b/src/eva/assistant/pipecat_server.py @@ -10,10 +10,6 @@ import uvicorn from fastapi import FastAPI, WebSocket -from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams -from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( CancelFrame, LLMRunFrame, @@ -37,9 +33,8 @@ FastAPIWebsocketParams, FastAPIWebsocketTransport, ) -from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies +from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 from eva.assistant.agentic.audit_log import convert_to_epoch_ms, current_timestamp_ms @@ -59,6 +54,11 @@ create_stt_service, create_tts_service, ) +from eva.assistant.pipeline.turn_config import ( + create_turn_start_strategy, + create_turn_stop_strategy, + create_vad_analyzer, +) from eva.assistant.services.llm import LiteLLMClient from eva.models.agents import AgentConfig from eva.models.config import AudioLLMConfig, PipelineConfig, SpeechToSpeechConfig @@ -304,26 +304,33 @@ async def _realtime_tool_handler(params) -> None: "smart_turn_stop_secs", 0.8 ) # Shorter silence so we don't have to wait 3s if smart turn marks audio as incomplete - if ( - isinstance(self.pipeline_config, (PipelineConfig, SpeechToSpeechConfig)) - and self.pipeline_config.turn_strategy == "external" - ): - logger.info("Using external user turn strategies") - user_turn_strategies = ExternalUserTurnStrategies() - vad_analyzer = None - else: - logger.info("Using local smart turn analyzer") - user_turn_strategies = UserTurnStrategies( - start=[VADUserTurnStartStrategy()], - stop=[ - TurnAnalyzerUserTurnStopStrategy( - turn_analyzer=LocalSmartTurnAnalyzerV3( - params=SmartTurnParams(stop_secs=smart_turn_stop_secs) - ) - ) - ], - ) - vad_analyzer = SileroVADAnalyzer(params=VADParams(stop_secs=vad_stop_secs)) + turn_start_cfg = self.pipeline_config.turn_start_strategy + turn_start_params = self.pipeline_config.turn_start_strategy_params + turn_stop_cfg = self.pipeline_config.turn_stop_strategy + turn_stop_params = self.pipeline_config.turn_stop_strategy_params + vad_cfg = self.pipeline_config.vad + vad_cfg_params = self.pipeline_config.vad_params + + # Create turn start strategy using factory function + turn_start_strategy = create_turn_start_strategy(turn_start_cfg, turn_start_params) + logger.info(f"Using turn start strategy: {turn_start_cfg}") + + # Create turn stop strategy using factory function + turn_stop_strategy = create_turn_stop_strategy(turn_stop_cfg, turn_stop_params, smart_turn_stop_secs) + logger.info(f"Using turn stop strategy: {turn_stop_cfg}") + + user_turn_strategies = UserTurnStrategies( + start=[turn_start_strategy], + stop=[turn_stop_strategy], + ) + + # Create VAD analyzer using factory function + # Merge user params with pipeline-specific stop_secs + vad_params_dict = {"stop_secs": vad_stop_secs} + if vad_cfg_params: + vad_params_dict.update(vad_cfg_params) + vad_analyzer = create_vad_analyzer(vad_cfg, vad_params_dict) + logger.info(f"Using VAD analyzer: {vad_cfg}") user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( diff --git a/src/eva/assistant/pipeline/audio_llm_processor.py b/src/eva/assistant/pipeline/audio_llm_processor.py index 296fe4c4..6032aa8d 100644 --- a/src/eva/assistant/pipeline/audio_llm_processor.py +++ b/src/eva/assistant/pipeline/audio_llm_processor.py @@ -417,7 +417,7 @@ def __init__( super().__init__(**kwargs) self._audio_collector = audio_collector params = params or {} - self._api_key = params.get["api_key"] + self._api_key = params["api_key"] self._model = model self._system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT self._sample_rate = sample_rate diff --git a/src/eva/assistant/pipeline/turn_config.py b/src/eva/assistant/pipeline/turn_config.py new file mode 100644 index 00000000..efe71a0e --- /dev/null +++ b/src/eva/assistant/pipeline/turn_config.py @@ -0,0 +1,125 @@ +"""Factory functions for creating turn strategies and VAD analyzers from configuration. + +This module provides functions to instantiate Pipecat turn strategies and VAD analyzers +based on configuration settings from environment variables or config files. +""" + +from typing import Any + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams +from pipecat.turns.user_start import ( + BaseUserTurnStartStrategy, + ExternalUserTurnStartStrategy, + TranscriptionUserTurnStartStrategy, + VADUserTurnStartStrategy, +) +from pipecat.turns.user_stop import ( + BaseUserTurnStopStrategy, + ExternalUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + TurnAnalyzerUserTurnStopStrategy, +) + +from eva.utils.logging import get_logger + +logger = get_logger(__name__) + + +def create_vad_analyzer(vad_type: str, vad_params: dict[str, Any]) -> VADAnalyzer | None: + """Create a VAD analyzer from configuration. + + Args: + vad_type: VAD analyzer type ('silero', 'none') + vad_params: VAD parameters (confidence, start_secs, stop_secs, min_volume) + + Returns: + VAD analyzer instance, or None if vad_type is 'none' + + Raises: + ValueError: If vad_type is not supported + """ + vad_type_lower = vad_type.lower() + + if vad_type_lower == "none": + return None + elif vad_type_lower == "silero": + # Create VADParams, respecting existing defaults if no params specified + params = VADParams(**vad_params) if vad_params else None + return SileroVADAnalyzer(params=params) + else: + raise ValueError(f"Unsupported VAD type: {vad_type}. Supported types: 'silero', 'none'") + + +def create_turn_start_strategy( + strategy_type: str, + strategy_params: dict[str, Any], +) -> BaseUserTurnStartStrategy: + """Create a user turn start strategy from configuration. + + Args: + strategy_type: Strategy type ('vad', 'transcription', 'external') + strategy_params: Strategy-specific parameters + + Returns: + Turn start strategy instance + + Raises: + ValueError: If strategy_type is not supported + """ + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "vad": + # VADUserTurnStartStrategy has no required parameters + return VADUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "transcription": + # TranscriptionUserTurnStartStrategy has no required parameters + return TranscriptionUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStartStrategy has no required parameters + return ExternalUserTurnStartStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn start strategy: {strategy_type}. Supported types: 'vad', 'transcription', 'external'" + ) + + +def create_turn_stop_strategy( + strategy_type: str, + strategy_params: dict[str, Any], + smart_turn_stop_secs: float | None = None, +) -> BaseUserTurnStopStrategy: + """Create a user turn stop strategy from configuration. + + Args: + strategy_type: Strategy type ('speech_timeout', 'turn_analyzer', 'external') + strategy_params: Strategy-specific parameters + smart_turn_stop_secs: stop_secs for SmartTurnParams (used with turn_analyzer strategy) + + Returns: + Turn stop strategy instance + + Raises: + ValueError: If strategy_type is not supported + """ + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "speech_timeout": + # SpeechTimeoutUserTurnStopStrategy accepts user_speech_timeout parameter + return SpeechTimeoutUserTurnStopStrategy(**strategy_params) + elif strategy_type_lower == "turn_analyzer": + # TurnAnalyzerUserTurnStopStrategy requires a turn_analyzer instance + # If smart_turn_stop_secs is provided, use it; otherwise let SmartTurnParams use its default + smart_params = SmartTurnParams(stop_secs=smart_turn_stop_secs) if smart_turn_stop_secs is not None else None + turn_analyzer = LocalSmartTurnAnalyzerV3(params=smart_params) + return TurnAnalyzerUserTurnStopStrategy(turn_analyzer=turn_analyzer, **strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStopStrategy has no required parameters + return ExternalUserTurnStopStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn stop strategy: {strategy_type}. " + f"Supported types: 'speech_timeout', 'turn_analyzer', 'external'" + ) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 65f231dd..f219367b 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -68,14 +68,44 @@ class PipelineConfig(BaseModel): stt_params: dict[str, Any] = Field({}, description="Additional STT model parameters (JSON)") tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", + # Configurable turn start/stop strategies + turn_start_strategy: str = Field( + "vad", description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=( + "VAD analyzer type: 'silero' or 'none'. Defaults to 'silero' (SileroVADAnalyzer). Use 'none' with external turn strategies (e.g. deepgram-flux) to skip local VAD. Set via EVA_MODEL__VAD." + ), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." ), ) @@ -103,6 +133,15 @@ def _migrate_legacy_fields(cls, data: Any) -> Any: data.pop(key, None) return data + @field_serializer("stt_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class SpeechToSpeechConfig(BaseModel): """Configuration for a speech-to-speech model.""" @@ -112,14 +151,44 @@ class SpeechToSpeechConfig(BaseModel): s2s: str = Field(description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"]) s2s_params: dict[str, Any] = Field({}, description="Additional speech-to-speech model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=( + "VAD analyzer type: 'silero' or 'none'. Defaults to 'silero' (SileroVADAnalyzer). Use 'none' with external turn strategies (e.g. deepgram-flux) to skip local VAD. Set via EVA_MODEL__VAD." + ), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." ), ) @@ -128,6 +197,15 @@ def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" return {"s2s": _param_alias(self.s2s_params) or self.s2s} + @field_serializer("s2s_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class AudioLLMConfig(BaseModel): """Configuration for an Audio-LLM pipeline (audio in, text out, separate TTS). @@ -144,11 +222,55 @@ class AudioLLMConfig(BaseModel): ) audio_llm_params: dict[str, Any] = Field( {}, - description="Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens", + description=( + "Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens, " + "vad_stop_secs (default: 0.4), smart_turn_stop_secs (default: 0.8)" + ), ) tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", + description=( + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=( + "VAD analyzer type: 'silero' or 'none'. Defaults to 'silero' (SileroVADAnalyzer). Use 'none' with external turn strategies (e.g. deepgram-flux) to skip local VAD. Set via EVA_MODEL__VAD." + ), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." + ), + ) + @property def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" @@ -157,6 +279,15 @@ def pipeline_parts(self) -> dict[str, str]: "tts": _param_alias(self.tts_params) or self.tts, } + @field_serializer("audio_llm_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + _PIPELINE_FIELDS = { "llm", @@ -164,12 +295,37 @@ def pipeline_parts(self) -> dict[str, str]: "tts", "stt_params", "tts_params", - "turn_strategy", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", *PipelineConfig._LEGACY_RENAMES, *PipelineConfig._LEGACY_DROP, } -_S2S_FIELDS = {"s2s", "s2s_params", "turn_strategy"} -_AUDIO_LLM_FIELDS = {"audio_llm", "audio_llm_params", "tts", "tts_params"} +_S2S_FIELDS = { + "s2s", + "s2s_params", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", +} +_AUDIO_LLM_FIELDS = { + "audio_llm", + "audio_llm_params", + "tts", + "tts_params", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", +} class PipelineType(StrEnum): diff --git a/tests/unit/assistant/test_turn_config.py b/tests/unit/assistant/test_turn_config.py new file mode 100644 index 00000000..8939a4a8 --- /dev/null +++ b/tests/unit/assistant/test_turn_config.py @@ -0,0 +1,210 @@ +"""Unit tests for turn_config factory functions.""" + +from unittest.mock import MagicMock, patch + +import pytest +from pipecat.turns.user_start import ( + ExternalUserTurnStartStrategy, + TranscriptionUserTurnStartStrategy, + VADUserTurnStartStrategy, +) +from pipecat.turns.user_stop import ( + ExternalUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + TurnAnalyzerUserTurnStopStrategy, +) + +from eva.assistant.pipeline.turn_config import ( + create_turn_start_strategy, + create_turn_stop_strategy, + create_vad_analyzer, +) + +# --------------------------------------------------------------------------- +# create_vad_analyzer +# --------------------------------------------------------------------------- + + +class TestCreateVadAnalyzer: + """Tests for create_vad_analyzer factory.""" + + def test_silero_no_params(self): + """'silero' with empty params creates SileroVADAnalyzer with params=None.""" + mock_analyzer = MagicMock() + with patch("eva.assistant.pipeline.turn_config.SileroVADAnalyzer", return_value=mock_analyzer) as mock_cls: + result = create_vad_analyzer("silero", {}) + + mock_cls.assert_called_once_with(params=None) + assert result is mock_analyzer + + def test_silero_with_params(self): + """'silero' with params passes VADParams constructed from them.""" + from pipecat.audio.vad.vad_analyzer import VADParams + + mock_analyzer = MagicMock() + with patch("eva.assistant.pipeline.turn_config.SileroVADAnalyzer", return_value=mock_analyzer) as mock_cls: + result = create_vad_analyzer("silero", {"stop_secs": 0.8, "confidence": 0.7}) + + call_args = mock_cls.call_args + passed_params = call_args.kwargs["params"] + assert isinstance(passed_params, VADParams) + assert passed_params.stop_secs == 0.8 + assert passed_params.confidence == 0.7 + assert result is mock_analyzer + + def test_silero_case_insensitive(self): + """Silero type is matched case-insensitively.""" + mock_analyzer = MagicMock() + with patch("eva.assistant.pipeline.turn_config.SileroVADAnalyzer", return_value=mock_analyzer): + for variant in ("SILERO", "Silero", "SiLeRo"): + result = create_vad_analyzer(variant, {}) + assert result is mock_analyzer + + def test_unsupported_vad_type_raises(self): + """Unknown VAD type raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported VAD type: webrtc"): + create_vad_analyzer("webrtc", {}) + + def test_none_vad_type_returns_none(self): + """'none' vad_type returns None without loading any model.""" + result = create_vad_analyzer("none", {}) + assert result is None + + def test_none_vad_type_case_insensitive(self): + """'none' is matched case-insensitively.""" + assert create_vad_analyzer("None", {}) is None + assert create_vad_analyzer("NONE", {}) is None + + def test_unsupported_vad_type_error_lists_supported(self): + """ValueError message lists supported types including 'none'.""" + with pytest.raises(ValueError, match="silero"): + create_vad_analyzer("unknown", {}) + with pytest.raises(ValueError, match="none"): + create_vad_analyzer("unknown", {}) + + +# --------------------------------------------------------------------------- +# create_turn_start_strategy +# --------------------------------------------------------------------------- + + +class TestCreateTurnStartStrategy: + """Tests for create_turn_start_strategy factory.""" + + def test_vad_strategy(self): + """'vad' returns VADUserTurnStartStrategy.""" + result = create_turn_start_strategy("vad", {}) + assert isinstance(result, VADUserTurnStartStrategy) + + def test_transcription_strategy(self): + """'transcription' returns TranscriptionUserTurnStartStrategy.""" + result = create_turn_start_strategy("transcription", {}) + assert isinstance(result, TranscriptionUserTurnStartStrategy) + + def test_external_strategy(self): + """'external' returns ExternalUserTurnStartStrategy.""" + result = create_turn_start_strategy("external", {}) + assert isinstance(result, ExternalUserTurnStartStrategy) + + def test_case_insensitive(self): + """Strategy types are matched case-insensitively.""" + assert isinstance(create_turn_start_strategy("VAD", {}), VADUserTurnStartStrategy) + assert isinstance(create_turn_start_strategy("Vad", {}), VADUserTurnStartStrategy) + assert isinstance(create_turn_start_strategy("TRANSCRIPTION", {}), TranscriptionUserTurnStartStrategy) + assert isinstance(create_turn_start_strategy("External", {}), ExternalUserTurnStartStrategy) + + def test_unsupported_strategy_raises(self): + """Unknown strategy raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported turn start strategy: magic"): + create_turn_start_strategy("magic", {}) + + def test_unsupported_strategy_error_lists_supported(self): + """ValueError message lists the supported strategies.""" + with pytest.raises(ValueError, match="vad.*transcription.*external"): + create_turn_start_strategy("unknown", {}) + + +# --------------------------------------------------------------------------- +# create_turn_stop_strategy +# --------------------------------------------------------------------------- + + +class TestCreateTurnStopStrategy: + """Tests for create_turn_stop_strategy factory.""" + + def test_speech_timeout_strategy(self): + """'speech_timeout' returns SpeechTimeoutUserTurnStopStrategy.""" + result = create_turn_stop_strategy("speech_timeout", {}) + assert isinstance(result, SpeechTimeoutUserTurnStopStrategy) + + def test_speech_timeout_with_params(self): + """speech_timeout strategy passes through strategy_params.""" + result = create_turn_stop_strategy("speech_timeout", {"user_speech_timeout": 1.2}) + assert isinstance(result, SpeechTimeoutUserTurnStopStrategy) + assert result._user_speech_timeout == 1.2 + + def test_turn_analyzer_strategy(self): + """'turn_analyzer' returns TurnAnalyzerUserTurnStopStrategy.""" + mock_analyzer = MagicMock() + with patch( + "eva.assistant.pipeline.turn_config.LocalSmartTurnAnalyzerV3", + return_value=mock_analyzer, + ): + result = create_turn_stop_strategy("turn_analyzer", {}) + + assert isinstance(result, TurnAnalyzerUserTurnStopStrategy) + + def test_turn_analyzer_without_stop_secs_uses_default_smart_params(self): + """When smart_turn_stop_secs is None, SmartTurnParams is not passed explicitly.""" + mock_analyzer = MagicMock() + with ( + patch( + "eva.assistant.pipeline.turn_config.LocalSmartTurnAnalyzerV3", + return_value=mock_analyzer, + ) as mock_cls, + patch("eva.assistant.pipeline.turn_config.SmartTurnParams") as mock_smart_params, + ): + create_turn_stop_strategy("turn_analyzer", {}, smart_turn_stop_secs=None) + + mock_smart_params.assert_not_called() + mock_cls.assert_called_once_with(params=None) + + def test_turn_analyzer_with_stop_secs(self): + """When smart_turn_stop_secs is provided, SmartTurnParams uses it.""" + from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams + + mock_analyzer = MagicMock() + with patch( + "eva.assistant.pipeline.turn_config.LocalSmartTurnAnalyzerV3", + return_value=mock_analyzer, + ) as mock_cls: + create_turn_stop_strategy("turn_analyzer", {}, smart_turn_stop_secs=0.8) + + call_args = mock_cls.call_args + passed_params = call_args.kwargs["params"] + assert isinstance(passed_params, SmartTurnParams) + assert passed_params.stop_secs == 0.8 + + def test_external_stop_strategy(self): + """'external' returns ExternalUserTurnStopStrategy.""" + result = create_turn_stop_strategy("external", {}) + assert isinstance(result, ExternalUserTurnStopStrategy) + + def test_case_insensitive(self): + """Strategy types are matched case-insensitively.""" + assert isinstance(create_turn_stop_strategy("SPEECH_TIMEOUT", {}), SpeechTimeoutUserTurnStopStrategy) + assert isinstance(create_turn_stop_strategy("External", {}), ExternalUserTurnStopStrategy) + + mock_analyzer = MagicMock() + with patch("eva.assistant.pipeline.turn_config.LocalSmartTurnAnalyzerV3", return_value=mock_analyzer): + assert isinstance(create_turn_stop_strategy("TURN_ANALYZER", {}), TurnAnalyzerUserTurnStopStrategy) + + def test_unsupported_strategy_raises(self): + """Unknown strategy raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported turn stop strategy: magic"): + create_turn_stop_strategy("magic", {}) + + def test_unsupported_strategy_error_lists_supported(self): + """ValueError message lists supported strategies.""" + with pytest.raises(ValueError, match="speech_timeout.*turn_analyzer.*external"): + create_turn_stop_strategy("unknown", {}) diff --git a/tests/unit/models/test_config_models.py b/tests/unit/models/test_config_models.py index e93a0e4a..1e3626b3 100644 --- a/tests/unit/models/test_config_models.py +++ b/tests/unit/models/test_config_models.py @@ -802,6 +802,197 @@ def test_multiple_old_flags_together(self): assert c.record_ids == ["1.2.1"] +class TestTurnStrategyConfig: + """Tests for configurable turn start/stop strategy fields.""" + + def test_pipeline_config_turn_strategy_defaults(self): + """PipelineConfig has expected defaults for turn strategy fields.""" + config = _config(env_vars=_BASE_ENV) + assert config.model.turn_start_strategy == "vad" + assert config.model.turn_start_strategy_params == {} + assert config.model.turn_stop_strategy == "turn_analyzer" + assert config.model.turn_stop_strategy_params == {} + assert config.model.vad == "silero" + assert config.model.vad_params == {} + + def test_pipeline_config_turn_start_strategy_from_env(self): + """EVA_MODEL__TURN_START_STRATEGY sets turn_start_strategy.""" + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__TURN_START_STRATEGY": "external"}) + assert config.model.turn_start_strategy == "external" + + def test_pipeline_config_turn_stop_strategy_from_env(self): + """EVA_MODEL__TURN_STOP_STRATEGY sets turn_stop_strategy.""" + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__TURN_STOP_STRATEGY": "speech_timeout"}) + assert config.model.turn_stop_strategy == "speech_timeout" + + def test_pipeline_config_turn_start_strategy_params_from_env(self): + """EVA_MODEL__TURN_START_STRATEGY_PARAMS sets turn_start_strategy_params.""" + params = {"some_param": True} + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__TURN_START_STRATEGY_PARAMS": json.dumps(params)}) + assert config.model.turn_start_strategy_params == params + + def test_pipeline_config_turn_stop_strategy_params_from_env(self): + """EVA_MODEL__TURN_STOP_STRATEGY_PARAMS sets turn_stop_strategy_params.""" + params = {"user_speech_timeout": 1.5} + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__TURN_STOP_STRATEGY_PARAMS": json.dumps(params)}) + assert config.model.turn_stop_strategy_params == params + + def test_pipeline_config_vad_from_env(self): + """EVA_MODEL__VAD sets vad.""" + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__VAD": "silero"}) + assert config.model.vad == "silero" + + def test_pipeline_config_vad_params_from_env(self): + """EVA_MODEL__VAD_PARAMS sets vad_params.""" + params = {"stop_secs": 0.5, "confidence": 0.8} + config = _config(env_vars=_BASE_ENV | {"EVA_MODEL__VAD_PARAMS": json.dumps(params)}) + assert config.model.vad_params == params + + def test_s2s_config_turn_strategy_defaults(self): + """SpeechToSpeechConfig has expected defaults for turn strategy fields.""" + config = _config(env_vars=_S2S_ENV) + assert config.model.turn_start_strategy == "vad" + assert config.model.turn_start_strategy_params == {} + assert config.model.turn_stop_strategy == "turn_analyzer" + assert config.model.turn_stop_strategy_params == {} + assert config.model.vad == "silero" + assert config.model.vad_params == {} + + def test_s2s_config_turn_strategy_from_env(self): + """S2S turn strategies can be overridden via env.""" + config = _config( + env_vars=_S2S_ENV + | { + "EVA_MODEL__TURN_START_STRATEGY": "transcription", + "EVA_MODEL__TURN_STOP_STRATEGY": "external", + } + ) + assert config.model.turn_start_strategy == "transcription" + assert config.model.turn_stop_strategy == "external" + + def test_audio_llm_config_turn_strategy_defaults(self): + """AudioLLMConfig has expected defaults for turn strategy fields.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__AUDIO_LLM": "vllm", + "EVA_MODEL__AUDIO_LLM_PARAMS": json.dumps( + {"api_key": "k", "model": "ultravox", "base_url": "http://localhost:8000"} + ), + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic"}), + } + ) + assert config.model.turn_start_strategy == "vad" + assert config.model.turn_stop_strategy == "turn_analyzer" + assert config.model.vad == "silero" + assert config.model.vad_params == {} + + +class TestApiKeyRedactionInPipelineModels: + """api_key redaction works for all three pipeline config types.""" + + def test_pipeline_config_stt_tts_params_api_key_redacted(self): + """PipelineConfig redacts api_key in stt_params and tts_params on serialization.""" + config = _config(env_vars=_BASE_ENV) + dumped = config.model.model_dump(mode="json") + assert dumped["stt_params"]["api_key"] == "***" + assert dumped["tts_params"]["api_key"] == "***" + # Non-secret fields survive + assert dumped["stt_params"]["model"] == "nova-2" + assert dumped["tts_params"]["model"] == "sonic" + + def test_pipeline_config_redaction_does_not_mutate(self): + """Serializing PipelineConfig does not mutate live stt_params/tts_params.""" + config = _config(env_vars=_BASE_ENV) + config.model.model_dump(mode="json") + assert config.model.stt_params["api_key"] == "test_key" + assert config.model.tts_params["api_key"] == "test_key" + + def test_s2s_config_s2s_params_api_key_redacted(self): + """SpeechToSpeechConfig redacts api_key in s2s_params on serialization.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__S2S": "gpt-realtime-mini", + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "secret", "model": "gpt-realtime-mini"}), + } + ) + dumped = config.model.model_dump(mode="json") + assert dumped["s2s_params"]["api_key"] == "***" + # Non-secret fields survive + assert dumped["s2s_params"]["model"] == "gpt-realtime-mini" + + def test_s2s_config_redaction_does_not_mutate(self): + """Serializing SpeechToSpeechConfig does not mutate live s2s_params.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__S2S": "gpt-realtime-mini", + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "secret", "model": "gpt-realtime-mini"}), + } + ) + config.model.model_dump(mode="json") + assert config.model.s2s_params["api_key"] == "secret" + + def test_audio_llm_config_params_api_key_redacted(self): + """AudioLLMConfig redacts api_key in audio_llm_params and tts_params on serialization.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__AUDIO_LLM": "vllm", + "EVA_MODEL__AUDIO_LLM_PARAMS": json.dumps( + {"api_key": "secret", "base_url": "http://localhost:8000", "model": "ultravox"} + ), + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "tts_secret", "model": "sonic"}), + } + ) + dumped = config.model.model_dump(mode="json") + assert dumped["audio_llm_params"]["api_key"] == "***" + assert dumped["tts_params"]["api_key"] == "***" + # Non-secret fields survive + assert dumped["audio_llm_params"]["base_url"] == "http://localhost:8000" + assert dumped["tts_params"]["model"] == "sonic" + + def test_non_secret_params_not_affected_by_redaction(self): + """Non-api_key fields in params pass through serialization unchanged.""" + config = _config( + env_vars=_BASE_ENV + | { + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2", "language": "en"}), + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic", "speed": 1.0}), + } + ) + dumped = config.model.model_dump(mode="json") + # api_key is redacted + assert dumped["stt_params"]["api_key"] == "***" + assert dumped["tts_params"]["api_key"] == "***" + # Extra non-secret fields survive unchanged + assert dumped["stt_params"]["language"] == "en" + assert dumped["tts_params"]["speed"] == 1.0 + + +class TestParamAlias: + """Tests for the _param_alias helper used to build run_id suffixes.""" + + def test_alias_takes_priority_over_model(self): + """When alias is present it is returned.""" + config = _config( + env_vars=_BASE_ENV + | { + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2", "alias": "my-stt"}), + } + ) + # run_id suffix uses alias for STT component + assert "my-stt" in config.run_id + + def test_model_used_when_no_alias(self): + """When no alias, model is used for the suffix.""" + config = _config(env_vars=_BASE_ENV) + assert "nova-2" in config.run_id + + class TestSpeechToSpeechConfig: """Tests for SpeechToSpeechConfig discriminated union."""