File size: 7,218 Bytes
7cc3853
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d2c39a2
 
7cc3853
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from collections.abc import Generator
import io
import logging
import time
from typing import Annotated, Literal, Self

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import numpy as np
from piper.voice import PiperVoice
from pydantic import BaseModel, BeforeValidator, Field, ValidationError, model_validator
import soundfile as sf

from faster_whisper_server.dependencies import PiperModelManagerDependency
from faster_whisper_server.hf_utils import read_piper_voices_config

DEFAULT_MODEL = "piper"
# https://platform.openai.com/docs/api-reference/audio/createSpeech#audio-createspeech-response_format
DEFAULT_RESPONSE_FORMAT = "mp3"
DEFAULT_VOICE = "en_US-amy-medium"  # TODO: make configurable
DEFAULT_VOICE_SAMPLE_RATE = 22050  # NOTE: Dependant on the voice

# https://platform.openai.com/docs/api-reference/audio/createSpeech#audio-createspeech-model
# https://platform.openai.com/docs/models/tts
OPENAI_SUPPORTED_SPEECH_MODEL = ("tts-1", "tts-1-hd")

# https://platform.openai.com/docs/api-reference/audio/createSpeech#audio-createspeech-voice
# https://platform.openai.com/docs/guides/text-to-speech/voice-options
OPENAI_SUPPORTED_SPEECH_VOICE_NAMES = ("alloy", "echo", "fable", "onyx", "nova", "shimmer")

# https://platform.openai.com/docs/guides/text-to-speech/supported-output-formats
type ResponseFormat = Literal["mp3", "flac", "wav", "pcm"]
SUPPORTED_RESPONSE_FORMATS = ("mp3", "flac", "wav", "pcm")
UNSUPORTED_RESPONSE_FORMATS = ("opus", "aac")

MIN_SAMPLE_RATE = 8000
MAX_SAMPLE_RATE = 48000


logger = logging.getLogger(__name__)

router = APIRouter()


# aip 'Write a function `resample_audio` which would take in RAW PCM 16-bit signed, little-endian audio data represented as bytes (`audio_bytes`) and resample it (either downsample or upsample) from `sample_rate` to `target_sample_rate` using numpy'  # noqa: E501
def resample_audio(audio_bytes: bytes, sample_rate: int, target_sample_rate: int) -> bytes:
    audio_data = np.frombuffer(audio_bytes, dtype=np.int16)
    duration = len(audio_data) / sample_rate
    target_length = int(duration * target_sample_rate)
    resampled_data = np.interp(
        np.linspace(0, len(audio_data), target_length, endpoint=False), np.arange(len(audio_data)), audio_data
    )
    return resampled_data.astype(np.int16).tobytes()


def generate_audio(
    piper_tts: PiperVoice, text: str, *, speed: float = 1.0, sample_rate: int | None = None
) -> Generator[bytes, None, None]:
    if sample_rate is None:
        sample_rate = piper_tts.config.sample_rate
    start = time.perf_counter()
    for audio_bytes in piper_tts.synthesize_stream_raw(text, length_scale=1.0 / speed):
        if sample_rate != piper_tts.config.sample_rate:
            audio_bytes = resample_audio(audio_bytes, piper_tts.config.sample_rate, sample_rate)  # noqa: PLW2901
        yield audio_bytes
    logger.info(f"Generated audio for {len(text)} characters in {time.perf_counter() - start}s")


def convert_audio_format(
    audio_bytes: bytes,
    sample_rate: int,
    audio_format: ResponseFormat,
    format: str = "RAW",  # noqa: A002
    channels: int = 1,
    subtype: str = "PCM_16",
    endian: str = "LITTLE",
) -> bytes:
    # NOTE: the default dtype is float64. Should something else be used? Would that improve performance?
    data, _ = sf.read(
        io.BytesIO(audio_bytes),
        samplerate=sample_rate,
        format=format,
        channels=channels,
        subtype=subtype,
        endian=endian,
    )
    converted_audio_bytes_buffer = io.BytesIO()
    sf.write(converted_audio_bytes_buffer, data, samplerate=sample_rate, format=audio_format)
    return converted_audio_bytes_buffer.getvalue()


def handle_openai_supported_model_ids(model_id: str) -> str:
    if model_id in OPENAI_SUPPORTED_SPEECH_MODEL:
        logger.warning(f"{model_id} is not a valid model name. Using '{DEFAULT_MODEL}' instead.")
        return DEFAULT_MODEL
    return model_id


ModelId = Annotated[
    Literal["piper"],
    BeforeValidator(handle_openai_supported_model_ids),
    Field(
        description=f"The ID of the model. The only supported model is '{DEFAULT_MODEL}'.",
        examples=[DEFAULT_MODEL],
    ),
]


def handle_openai_supported_voices(voice: str) -> str:
    if voice in OPENAI_SUPPORTED_SPEECH_VOICE_NAMES:
        logger.warning(f"{voice} is not a valid voice name. Using '{DEFAULT_VOICE}' instead.")
        return DEFAULT_VOICE
    return voice


Voice = Annotated[str, BeforeValidator(handle_openai_supported_voices)]  # TODO: description and examples


class CreateSpeechRequestBody(BaseModel):
    model: ModelId = DEFAULT_MODEL
    input: str = Field(
        ...,
        description="The text to generate audio for. ",
        examples=[
            "A rainbow is an optical phenomenon caused by refraction, internal reflection and dispersion of light in water droplets resulting in a continuous spectrum of light appearing in the sky. The rainbow takes the form of a multicoloured circular arc. Rainbows caused by sunlight always appear in the section of sky directly opposite the Sun. Rainbows can be caused by many forms of airborne water. These include not only rain, but also mist, spray, and airborne dew."  # noqa: E501
        ],
    )
    voice: Voice = DEFAULT_VOICE
    response_format: ResponseFormat = Field(
        DEFAULT_RESPONSE_FORMAT,
        description=f"The format to audio in. Supported formats are {", ".join(SUPPORTED_RESPONSE_FORMATS)}. {", ".join(UNSUPORTED_RESPONSE_FORMATS)} are not supported",  # noqa: E501
        examples=list(SUPPORTED_RESPONSE_FORMATS),
    )
    # https://platform.openai.com/docs/api-reference/audio/createSpeech#audio-createspeech-voice
    speed: float = Field(1.0, ge=0.25, le=4.0)
    """The speed of the generated audio. Select a value from 0.25 to 4.0. 1.0 is the default."""
    sample_rate: int | None = Field(None, ge=MIN_SAMPLE_RATE, le=MAX_SAMPLE_RATE)
    """Desired sample rate to convert the generated audio to. If not provided, the model's default sample rate will be used."""  # noqa: E501

    # TODO: move into `Voice`
    @model_validator(mode="after")
    def verify_voice_is_valid(self) -> Self:
        valid_voices = read_piper_voices_config()
        if self.voice not in valid_voices:
            raise ValidationError(f"Voice '{self.voice}' is not supported. Supported voices: {valid_voices.keys()}")
        return self


# https://platform.openai.com/docs/api-reference/audio/createSpeech
@router.post("/v1/audio/speech")
def synthesize(
    piper_model_manager: PiperModelManagerDependency,
    body: CreateSpeechRequestBody,
) -> StreamingResponse:
    with piper_model_manager.load_model(body.voice) as piper_tts:
        audio_generator = generate_audio(piper_tts, body.input, speed=body.speed, sample_rate=body.sample_rate)
        if body.response_format != "pcm":
            audio_generator = (
                convert_audio_format(
                    audio_bytes, body.sample_rate or piper_tts.config.sample_rate, body.response_format
                )
                for audio_bytes in audio_generator
            )

        return StreamingResponse(audio_generator, media_type=f"audio/{body.response_format}")