diff --git a/packages/pipeline/transcribe/src/__tests__/gcp-stt-transcriber.test.ts b/packages/pipeline/transcribe/src/__tests__/gcp-stt-transcriber.test.ts new file mode 100644 index 0000000..4dd2776 --- /dev/null +++ b/packages/pipeline/transcribe/src/__tests__/gcp-stt-transcriber.test.ts @@ -0,0 +1,75 @@ +import assert from 'node:assert/strict' +import test from 'node:test' +import { transcribeWavBuffer } from '../providers/gcp-stt-transcriber.js' + +const originalFetch = globalThis.fetch + +test('gcp stt transcriber returns joined transcript text', async () => { + const previousEnv = { ...process.env } + process.env.GCP_PROJECT_ID = 'demo-project' + process.env.GOOGLE_OAUTH_ACCESS_TOKEN = 'token' + + globalThis.fetch = (async (input: RequestInfo | URL) => { + const url = String(input) + if (url.includes('speech.googleapis.com')) { + return new Response( + JSON.stringify({ + results: [ + { alternatives: [{ transcript: 'hello' }] }, + { alternatives: [{ transcript: 'world' }] }, + ], + }), + { status: 200 }, + ) + } + return new Response(JSON.stringify({ access_token: 'metadata-token' }), { status: 200 }) + }) as typeof fetch + + const transcript = await transcribeWavBuffer(Buffer.from([1, 2, 3]), 'segment.wav') + assert.equal(transcript, 'hello world') + + process.env = previousEnv + globalThis.fetch = originalFetch +}) + +test('gcp stt transcriber retries retryable HTTP failures', async () => { + const previousEnv = { ...process.env } + process.env.GCP_PROJECT_ID = 'demo-project' + process.env.GOOGLE_OAUTH_ACCESS_TOKEN = 'token' + process.env.GCP_STT_MAX_RETRIES = '2' + + let attempts = 0 + globalThis.fetch = (async (input: RequestInfo | URL) => { + const url = String(input) + if (!url.includes('speech.googleapis.com')) { + return new Response(JSON.stringify({ access_token: 'metadata-token' }), { status: 200 }) + } + + attempts += 1 + if (attempts < 3) { + return new Response(JSON.stringify({ error: { message: 'busy' } }), { status: 503 }) + } + return new Response(JSON.stringify({ results: [{ alternatives: [{ transcript: 'ok' }] }] }), { status: 200 }) + }) as typeof fetch + + const transcript = await transcribeWavBuffer(Buffer.from([1, 2, 3]), 'segment.wav') + assert.equal(transcript, 'ok') + assert.equal(attempts, 3) + + process.env = previousEnv + globalThis.fetch = originalFetch +}) + +test('gcp stt transcriber throws when project id is missing', async () => { + const previousEnv = { ...process.env } + delete process.env.GCP_PROJECT_ID + delete process.env.GOOGLE_CLOUD_PROJECT + delete process.env.GCLOUD_PROJECT + + await assert.rejects( + () => transcribeWavBuffer(Buffer.from([1, 2, 3]), 'segment.wav'), + /Missing GCP project id/, + ) + + process.env = previousEnv +}) diff --git a/packages/pipeline/transcribe/src/__tests__/provider-resolver.test.ts b/packages/pipeline/transcribe/src/__tests__/provider-resolver.test.ts index 8c08814..ee15ed7 100644 --- a/packages/pipeline/transcribe/src/__tests__/provider-resolver.test.ts +++ b/packages/pipeline/transcribe/src/__tests__/provider-resolver.test.ts @@ -11,6 +11,7 @@ test("resolveTranscriptionProvider defaults to whisper_local with tiny.en model" test("resolveTranscriptionProvider supports explicit provider aliases", () => { assert.equal(resolveTranscriptionProvider({ TRANSCRIPTION_PROVIDER: "medasr" }).provider, "medasr") + assert.equal(resolveTranscriptionProvider({ TRANSCRIPTION_PROVIDER: "gcp_stt_v2" }).provider, "gcp_stt_v2") assert.equal(resolveTranscriptionProvider({ TRANSCRIPTION_PROVIDER: "openai" }).provider, "whisper_openai") assert.equal(resolveTranscriptionProvider({ TRANSCRIPTION_PROVIDER: "whisper_openai" }).provider, "whisper_openai") assert.equal(resolveTranscriptionProvider({ TRANSCRIPTION_PROVIDER: "whisper_local" }).provider, "whisper_local") diff --git a/packages/pipeline/transcribe/src/providers/gcp-stt-transcriber.ts b/packages/pipeline/transcribe/src/providers/gcp-stt-transcriber.ts new file mode 100644 index 0000000..6177e7f --- /dev/null +++ b/packages/pipeline/transcribe/src/providers/gcp-stt-transcriber.ts @@ -0,0 +1,161 @@ +const DEFAULT_LOCATION = 'us-central1' +const DEFAULT_LANGUAGE_CODE = 'en-US' +const DEFAULT_MODEL = 'chirp_2' +const DEFAULT_TIMEOUT_MS = 20_000 +const DEFAULT_MAX_RETRIES = 2 + +interface GcpRecognizeResponse { + results?: Array<{ + alternatives?: Array<{ transcript?: string }> + }> +} + +interface GcpApiErrorEnvelope { + error?: { + message?: string + status?: string + code?: number + } +} + +function resolvePositiveInteger(rawValue: string | undefined, fallback: number): number { + if (!rawValue) return fallback + const parsed = Number.parseInt(rawValue, 10) + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback +} + +function shouldRetryStatus(status: number): boolean { + return status === 408 || status === 425 || status === 429 || status >= 500 +} + +async function wait(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number): Promise { + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), timeoutMs) + try { + return await fetch(url, { ...init, signal: controller.signal }) + } finally { + clearTimeout(timer) + } +} + +async function getGoogleAccessToken(): Promise { + if (process.env.GOOGLE_OAUTH_ACCESS_TOKEN) { + return process.env.GOOGLE_OAUTH_ACCESS_TOKEN + } + + const metadataUrl = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token' + const response = await fetch(metadataUrl, { + headers: { 'Metadata-Flavor': 'Google' }, + }) + + if (!response.ok) { + throw new Error(`Unable to retrieve Google access token from metadata service (${response.status})`) + } + + const payload = (await response.json()) as { access_token?: string } + if (!payload.access_token) { + throw new Error('Metadata service token response did not include access_token') + } + + return payload.access_token +} + +function resolveProjectId(): string { + const projectId = process.env.GCP_PROJECT_ID || process.env.GOOGLE_CLOUD_PROJECT || process.env.GCLOUD_PROJECT + if (!projectId) { + throw new Error('Missing GCP project id. Set GCP_PROJECT_ID or GOOGLE_CLOUD_PROJECT for gcp_stt_v2 provider.') + } + return projectId +} + +export async function transcribeWavBuffer(buffer: Buffer, filename: string): Promise { + void filename + const projectId = resolveProjectId() + const location = process.env.GCP_STT_LOCATION || DEFAULT_LOCATION + const languageCode = process.env.GCP_STT_LANGUAGE_CODE || DEFAULT_LANGUAGE_CODE + const model = process.env.GCP_STT_MODEL || DEFAULT_MODEL + const recognizer = process.env.GCP_STT_RECOGNIZER || '_' + + const token = await getGoogleAccessToken() + const endpoint = `https://speech.googleapis.com/v2/projects/${projectId}/locations/${location}/recognizers/${recognizer}:recognize` + const timeoutMs = resolvePositiveInteger(process.env.GCP_STT_TIMEOUT_MS, DEFAULT_TIMEOUT_MS) + const maxRetries = resolvePositiveInteger(process.env.GCP_STT_MAX_RETRIES, DEFAULT_MAX_RETRIES) + + const body = { + config: { + autoDecodingConfig: {}, + languageCodes: [languageCode], + model, + features: { + enableAutomaticPunctuation: true, + }, + }, + content: buffer.toString('base64'), + } + + for (let attempt = 1; attempt <= maxRetries + 1; attempt += 1) { + try { + const response = await fetchWithTimeout( + endpoint, + { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }, + timeoutMs, + ) + + if (!response.ok) { + const errorText = await response.text() + let message = errorText + try { + const parsed = JSON.parse(errorText) as GcpApiErrorEnvelope + if (parsed.error?.message) { + message = parsed.error.message + } + } catch { + // ignore json parsing for non-json bodies + } + + const retryable = shouldRetryStatus(response.status) && attempt <= maxRetries + if (retryable) { + await wait(250 * attempt) + continue + } + + throw new Error(`GCP STT recognize failed (${response.status}): ${message}`) + } + + const data = (await response.json()) as GcpRecognizeResponse + const transcript = data.results + ?.flatMap((result) => result.alternatives || []) + .map((alt) => alt.transcript?.trim() || '') + .filter(Boolean) + .join(' ') + .trim() + + return transcript || '' + } catch (error) { + const isTimeout = error instanceof DOMException && error.name === 'AbortError' + const isNetwork = error instanceof TypeError && error.message.toLowerCase().includes('fetch') + const shouldRetry = (isTimeout || isNetwork) && attempt <= maxRetries + if (shouldRetry) { + await wait(250 * attempt) + continue + } + if (isTimeout) { + throw new Error(`GCP STT recognize request timed out after ${timeoutMs}ms`) + } + throw error + } + } + + throw new Error('GCP STT recognize failed after retries') +} diff --git a/packages/pipeline/transcribe/src/providers/provider-resolver.ts b/packages/pipeline/transcribe/src/providers/provider-resolver.ts index c00f741..18b284d 100644 --- a/packages/pipeline/transcribe/src/providers/provider-resolver.ts +++ b/packages/pipeline/transcribe/src/providers/provider-resolver.ts @@ -1,23 +1,27 @@ import { transcribeWavBuffer as transcribeWithMedASR } from "./medasr-transcriber" import { transcribeWavBuffer as transcribeWithWhisperLocal } from "./whisper-local-transcriber" import { transcribeWavBuffer as transcribeWithWhisperOpenAI } from "./whisper-transcriber" +import { transcribeWavBuffer as transcribeWithGcpStt } from "./gcp-stt-transcriber" -export type TranscriptionProvider = "whisper_local" | "whisper_openai" | "medasr" +export type TranscriptionProvider = "whisper_local" | "whisper_openai" | "medasr" | "gcp_stt_v2" export interface ResolvedTranscriptionProvider { provider: TranscriptionProvider model: string } +type EnvLike = Record + const DEFAULT_WHISPER_LOCAL_MODEL = "tiny.en" const DEFAULT_WHISPER_OPENAI_MODEL = "whisper-1" const DEFAULT_MEDASR_MODEL = "medasr" +const DEFAULT_GCP_STT_MODEL = "chirp_2" function normalizeProvider(rawProvider: string | undefined): string { return rawProvider?.trim().toLowerCase() || "" } -export function resolveTranscriptionProvider(env: NodeJS.ProcessEnv = process.env): ResolvedTranscriptionProvider { +export function resolveTranscriptionProvider(env: EnvLike = process.env): ResolvedTranscriptionProvider { const provider = normalizeProvider(env.TRANSCRIPTION_PROVIDER) if (provider === "medasr" || provider === "med_asr") { @@ -27,6 +31,13 @@ export function resolveTranscriptionProvider(env: NodeJS.ProcessEnv = process.en } } + if (provider === "gcp_stt_v2" || provider === "gcp-stt-v2" || provider === "google_stt" || provider === "gcp_stt") { + return { + provider: "gcp_stt_v2", + model: env.GCP_STT_MODEL?.trim() || DEFAULT_GCP_STT_MODEL, + } + } + if (provider === "whisper_openai" || provider === "whisper-openai" || provider === "openai" || provider === "whisper") { return { provider: "whisper_openai", @@ -46,6 +57,8 @@ export async function transcribeWithResolvedProvider( resolved: ResolvedTranscriptionProvider = resolveTranscriptionProvider(), ): Promise { switch (resolved.provider) { + case "gcp_stt_v2": + return transcribeWithGcpStt(buffer, filename) case "medasr": return transcribeWithMedASR(buffer, filename) case "whisper_openai":