From b52ad16715337b13d37514406a16d74e3ac16270 Mon Sep 17 00:00:00 2001 From: Sam Margolis Date: Mon, 2 Mar 2026 08:46:08 -0800 Subject: [PATCH] refactor: make session store operations deterministic with redis support --- .../src/app/api/transcription/final/route.ts | 27 +- .../app/api/transcription/segment/route.ts | 25 +- .../pipeline/assemble/src/session-store.ts | 435 +++++++++++++++--- .../pipeline/eval/src/tests/e2e-basic.test.ts | 8 +- .../eval/src/tests/e2e-real-api.test.ts | 4 +- 5 files changed, 405 insertions(+), 94 deletions(-) diff --git a/apps/web/src/app/api/transcription/final/route.ts b/apps/web/src/app/api/transcription/final/route.ts index e157c97..e54dc90 100644 --- a/apps/web/src/app/api/transcription/final/route.ts +++ b/apps/web/src/app/api/transcription/final/route.ts @@ -1,7 +1,8 @@ import type { NextRequest } from "next/server" import { parseWavHeader, resolveTranscriptionProvider, transcribeWithResolvedProvider } from "@transcription" import { transcriptionSessionStore } from "@transcript-assembly" -import { writeAuditEntry } from "@storage/audit-log" +import { writeServerAuditEntry, logSanitizedServerError } from "@storage/server-audit" +import { requireAuth } from "@/lib/auth" export const runtime = "nodejs" @@ -13,6 +14,11 @@ function jsonError(status: number, code: string, message: string) { } export async function POST(req: NextRequest) { + const auth = await requireAuth(req) + if (!auth) { + return jsonError(401, "unauthorized", "Authentication required") + } + try { const formData = await req.formData() const sessionId = formData.get("session_id") @@ -22,7 +28,7 @@ export async function POST(req: NextRequest) { return jsonError(400, "validation_error", "Missing session_id or file") } - transcriptionSessionStore.setStatus(sessionId, "finalizing") + await transcriptionSessionStore.setStatus(sessionId, "finalizing") const arrayBuffer = await file.arrayBuffer() let wavInfo @@ -45,11 +51,13 @@ export async function POST(req: NextRequest) { resolvedProvider, ) const latencyMs = Date.now() - startedAtMs - transcriptionSessionStore.setFinalTranscript(sessionId, transcript) + await transcriptionSessionStore.setFinalTranscript(sessionId, transcript) // Audit log: final transcription completed - await writeAuditEntry({ + await writeServerAuditEntry({ event_type: "transcription.completed", + org_id: auth.orgId, + user_id: auth.userId, resource_id: sessionId, success: true, metadata: { @@ -65,19 +73,22 @@ export async function POST(req: NextRequest) { headers: { "Content-Type": "application/json" }, }) } catch (error) { - console.error("Final transcription failed", error) + logSanitizedServerError("transcription.final", error) const resolvedProvider = resolveTranscriptionProvider() - transcriptionSessionStore.emitError( + await transcriptionSessionStore.emitError( sessionId, "api_error", error instanceof Error ? error.message : "Transcription API failure", ) // Audit log: final transcription failed - await writeAuditEntry({ + await writeServerAuditEntry({ event_type: "transcription.failed", + org_id: auth.orgId, + user_id: auth.userId, resource_id: sessionId, success: false, + error_code: "transcription_failed", error_message: error instanceof Error ? error.message : "Transcription API failed", metadata: { transcription_provider: resolvedProvider.provider, @@ -88,7 +99,7 @@ export async function POST(req: NextRequest) { return jsonError(502, "api_error", "Transcription API failed") } } catch (error) { - console.error("Final recording ingestion failed", error) + logSanitizedServerError("transcription.final.ingest", error) return jsonError(500, "storage_error", "Failed to process final recording") } } diff --git a/apps/web/src/app/api/transcription/segment/route.ts b/apps/web/src/app/api/transcription/segment/route.ts index 483713b..18b964a 100644 --- a/apps/web/src/app/api/transcription/segment/route.ts +++ b/apps/web/src/app/api/transcription/segment/route.ts @@ -1,7 +1,8 @@ import type { NextRequest } from "next/server" import { parseWavHeader, resolveTranscriptionProvider, transcribeWithResolvedProvider } from "@transcription" import { transcriptionSessionStore } from "@transcript-assembly" -import { writeAuditEntry } from "@storage/audit-log" +import { writeServerAuditEntry, logSanitizedServerError } from "@storage/server-audit" +import { requireAuth } from "@/lib/auth" export const runtime = "nodejs" @@ -13,6 +14,11 @@ function jsonError(status: number, code: string, message: string) { } export async function POST(req: NextRequest) { + const auth = await requireAuth(req) + if (!auth) { + return jsonError(401, "unauthorized", "Authentication required") + } + try { const formData = await req.formData() const sessionId = formData.get("session_id") @@ -56,7 +62,7 @@ export async function POST(req: NextRequest) { const startedAtMs = Date.now() const transcript = await transcribeWithResolvedProvider(Buffer.from(arrayBuffer), `segment-${seqNo}.wav`, resolvedProvider) const latencyMs = Date.now() - startedAtMs - transcriptionSessionStore.addSegment(sessionId, { + await transcriptionSessionStore.addSegment(sessionId, { seqNo, startMs, endMs, @@ -66,8 +72,10 @@ export async function POST(req: NextRequest) { }) // Audit log: segment transcribed successfully - await writeAuditEntry({ + await writeServerAuditEntry({ event_type: "transcription.segment_uploaded", + org_id: auth.orgId, + user_id: auth.userId, resource_id: sessionId, success: true, metadata: { @@ -83,19 +91,22 @@ export async function POST(req: NextRequest) { headers: { "Content-Type": "application/json" }, }) } catch (error) { - console.error("Segment transcription failed", error) + logSanitizedServerError("transcription.segment", error) const resolvedProvider = resolveTranscriptionProvider() - transcriptionSessionStore.emitError( + await transcriptionSessionStore.emitError( sessionId, "api_error", error instanceof Error ? error.message : "Transcription API failure", ) // Audit log: segment transcription failed - await writeAuditEntry({ + await writeServerAuditEntry({ event_type: "transcription.failed", + org_id: auth.orgId, + user_id: auth.userId, resource_id: sessionId, success: false, + error_code: "transcription_failed", error_message: error instanceof Error ? error.message : "Transcription API failed", metadata: { seq_no: seqNo, @@ -107,7 +118,7 @@ export async function POST(req: NextRequest) { return jsonError(502, "api_error", "Transcription API failed") } } catch (error) { - console.error("Segment ingestion failed", error) + logSanitizedServerError("transcription.segment.ingest", error) return jsonError(500, "storage_error", "Failed to process audio segment") } } diff --git a/packages/pipeline/assemble/src/session-store.ts b/packages/pipeline/assemble/src/session-store.ts index 586173f..983feaa 100644 --- a/packages/pipeline/assemble/src/session-store.ts +++ b/packages/pipeline/assemble/src/session-store.ts @@ -1,4 +1,7 @@ -type TranscriptionStatus = "recording" | "finalizing" | "completed" | "error" +import net from 'node:net' +import tls from 'node:tls' + +type TranscriptionStatus = 'recording' | 'finalizing' | 'completed' | 'error' export interface SegmentMetadata { seqNo: number @@ -10,10 +13,18 @@ export interface SegmentMetadata { } export interface TranscriptionEvent { - event: "segment" | "final" | "error" | "status" + event: 'segment' | 'final' | 'error' | 'status' data: Record } +export interface SessionStore { + subscribe(sessionId: string, listener: (event: TranscriptionEvent) => void): () => void + addSegment(sessionId: string, segment: Omit & { transcript: string }): Promise + setStatus(sessionId: string, status: TranscriptionStatus): Promise + setFinalTranscript(sessionId: string, transcript: string): Promise + emitError(sessionId: string, code: string, message: string): Promise +} + interface SessionRecord { id: string segments: Map @@ -23,11 +34,19 @@ interface SessionRecord { listeners: Set<(event: TranscriptionEvent) => void> } +interface PersistedSession { + id: string + segments: SegmentMetadata[] + stitchedText: string + status: TranscriptionStatus + finalTranscript?: string +} + function normalizeToken(token: string): string { return token .toLowerCase() - .replace(/^[^A-Za-z0-9]+/g, "") - .replace(/[^A-Za-z0-9]+$/g, "") + .replace(/^[^A-Za-z0-9]+/g, '') + .replace(/[^A-Za-z0-9]+$/g, '') } function trimOverlapText(previousText: string, nextText: string): string { @@ -45,22 +64,187 @@ function trimOverlapText(previousText: string, nextText: string): string { const nextSlice = nextTokens.slice(0, overlap).map(normalizeToken) const matches = prevSlice.every((token, idx) => token && token === nextSlice[idx]) if (matches) { - return nextTokens.slice(overlap).join(" ") + return nextTokens.slice(overlap).join(' ') } } return nextText } -class TranscriptionSessionStore { +type RedisValue = string | number | null | RedisValue[] + +interface RedisConfig { + host: string + port: number + password?: string + tls: boolean +} + +function getRedisConfig(env: NodeJS.ProcessEnv = process.env): RedisConfig | null { + if (String(env.SESSION_STORE_BACKEND || '').toLowerCase() !== 'redis') { + return null + } + + const host = env.REDIS_HOST + if (!host) return null + + return { + host, + port: Number(env.REDIS_PORT || 6379), + password: env.REDIS_PASSWORD, + tls: String(env.REDIS_TLS || '').toLowerCase() === 'true', + } +} + +function encodeRespCommand(parts: string[]): string { + const chunks = [`*${parts.length}\r\n`] + for (const part of parts) { + const bytes = Buffer.byteLength(part) + chunks.push(`$${bytes}\r\n${part}\r\n`) + } + return chunks.join('') +} + +function parseRespValue(buffer: Buffer, offset = 0): { value: RedisValue; nextOffset: number } | null { + if (offset >= buffer.length) return null + const prefix = String.fromCharCode(buffer[offset]) + const lineEnd = buffer.indexOf('\r\n', offset) + if (lineEnd === -1) return null + + if (prefix === '+' || prefix === '-' || prefix === ':') { + const data = buffer.toString('utf8', offset + 1, lineEnd) + if (prefix === '+') return { value: data, nextOffset: lineEnd + 2 } + if (prefix === '-') throw new Error(`Redis error: ${data}`) + return { value: Number(data), nextOffset: lineEnd + 2 } + } + + if (prefix === '$') { + const len = Number(buffer.toString('utf8', offset + 1, lineEnd)) + if (len === -1) return { value: null, nextOffset: lineEnd + 2 } + const start = lineEnd + 2 + const end = start + len + if (buffer.length < end + 2) return null + const value = buffer.toString('utf8', start, end) + return { value, nextOffset: end + 2 } + } + + if (prefix === '*') { + const count = Number(buffer.toString('utf8', offset + 1, lineEnd)) + let next = lineEnd + 2 + const items: RedisValue[] = [] + for (let i = 0; i < count; i++) { + const parsed = parseRespValue(buffer, next) + if (!parsed) return null + items.push(parsed.value) + next = parsed.nextOffset + } + return { value: items, nextOffset: next } + } + + throw new Error('Unsupported Redis RESP type') +} + +function concatUint8Arrays(chunks: Uint8Array[]): Uint8Array { + const total = chunks.reduce((sum, chunk) => sum + chunk.byteLength, 0) + const out = new Uint8Array(total) + let offset = 0 + for (const chunk of chunks) { + out.set(chunk, offset) + offset += chunk.byteLength + } + return out +} + +class LightweightRedisClient { + constructor(private readonly config: RedisConfig) {} + + private async execute(parts: string[]): Promise { + const payload = encodeRespCommand(parts) + + return await new Promise((resolve, reject) => { + const socket = this.config.tls + ? tls.connect({ host: this.config.host, port: this.config.port }) + : net.createConnection({ host: this.config.host, port: this.config.port }) + + const chunks: Uint8Array[] = [] + + socket.on('error', (error) => { + reject(error) + }) + + socket.on('data', (chunk) => { + chunks.push(new Uint8Array(chunk)) + const combined = Buffer.from(concatUint8Arrays(chunks)) + try { + const parsed = parseRespValue(combined) + if (!parsed) return + resolve(parsed.value) + socket.end() + } catch (error) { + reject(error) + socket.end() + } + }) + + socket.on('connect', () => { + if (this.config.password) { + const authCommand = encodeRespCommand(['AUTH', this.config.password]) + socket.write(authCommand) + } + socket.write(payload) + }) + }) + } + + async get(key: string): Promise { + const value = await this.execute(['GET', key]) + return typeof value === 'string' ? value : null + } + + async setEx(key: string, ttlSeconds: number, value: string): Promise { + await this.execute(['SETEX', key, String(ttlSeconds), value]) + } + + async rpush(key: string, value: string): Promise { + await this.execute(['RPUSH', key, value]) + } + + async lrange(key: string, start: number, stop: number): Promise { + const value = await this.execute(['LRANGE', key, String(start), String(stop)]) + if (!Array.isArray(value)) return [] + return value.filter((item): item is string => typeof item === 'string') + } + + async expire(key: string, ttlSeconds: number): Promise { + await this.execute(['EXPIRE', key, String(ttlSeconds)]) + } +} + +class TranscriptionSessionStore implements SessionStore { private sessions: Map = new Map() - private cleanupInterval: NodeJS.Timeout | null = null - private readonly SESSION_TIMEOUT_MS = 30 * 60 * 1000 // 30 minutes + private readonly SESSION_TIMEOUT_MS = 30 * 60 * 1000 + private readonly SESSION_TTL_SECONDS = 60 * 60 private sessionTimestamps: Map = new Map() + private cleanupInterval: NodeJS.Timeout | null = null + private redisClient: LightweightRedisClient | null = null constructor() { - // Run cleanup every 5 minutes - this.cleanupInterval = setInterval(() => this.cleanupOldSessions(), 5 * 60 * 1000) + const redisConfig = getRedisConfig() + if (redisConfig) { + this.redisClient = new LightweightRedisClient(redisConfig) + } + + if (!this.redisClient) { + this.cleanupInterval = setInterval(() => this.cleanupOldSessions(), 5 * 60 * 1000) + } + } + + private sessionKey(sessionId: string): string { + return `openscribe:session:${sessionId}` + } + + private eventKey(sessionId: string): string { + return `openscribe:session:${sessionId}:events` } private cleanupOldSessions() { @@ -69,11 +253,6 @@ class TranscriptionSessionStore { for (const [sessionId, timestamp] of this.sessionTimestamps.entries()) { const session = this.sessions.get(sessionId) - - // Clean up if: - // 1. Session is older than timeout - // 2. Session is completed or error - // 3. No active listeners if ( now - timestamp > this.SESSION_TIMEOUT_MS && session && @@ -85,79 +264,156 @@ class TranscriptionSessionStore { } for (const sessionId of sessionsToDelete) { - console.log(`[SessionStore] Cleaning up old session: ${sessionId}`) this.sessions.delete(sessionId) this.sessionTimestamps.delete(sessionId) } - - if (sessionsToDelete.length > 0) { - console.log(`[SessionStore] Cleaned up ${sessionsToDelete.length} sessions. Active sessions: ${this.sessions.size}`) - } } - getSession(sessionId: string): SessionRecord { + private getSession(sessionId: string): SessionRecord { let session = this.sessions.get(sessionId) if (!session) { session = { id: sessionId, segments: new Map(), - stitchedText: "", - status: "recording", + stitchedText: '', + status: 'recording', listeners: new Set(), } this.sessions.set(sessionId, session) this.sessionTimestamps.set(sessionId, Date.now()) - console.log(`[SessionStore] Created new session: ${sessionId}. Total sessions: ${this.sessions.size}`) } return session } - subscribe(sessionId: string, listener: (event: TranscriptionEvent) => void): () => void { - const session = this.getSession(sessionId) - session.listeners.add(listener) - console.log(`[SessionStore] Subscriber added to session ${sessionId}. Total listeners: ${session.listeners.size}`) - - // Emit the current status immediately - listener({ - event: "status", - data: { - session_id: sessionId, - status: session.status, - stitched_text: session.stitchedText, - final_transcript: session.finalTranscript ?? null, - }, + private emit(session: SessionRecord, event: TranscriptionEvent) { + session.listeners.forEach((listener) => { + try { + listener(event) + } catch { + // Ignore listener failures. + } }) + } - return () => { - session.listeners.delete(listener) - console.log(`[SessionStore] Subscriber removed from session ${sessionId}. Remaining listeners: ${session.listeners.size}`) + private toPersisted(session: SessionRecord): PersistedSession { + return { + id: session.id, + segments: Array.from(session.segments.values()).sort((a, b) => a.seqNo - b.seqNo), + stitchedText: session.stitchedText, + status: session.status, + finalTranscript: session.finalTranscript, } } - private emit(session: SessionRecord, event: TranscriptionEvent) { - session.listeners.forEach((listener) => { + private fromPersisted(data: PersistedSession): SessionRecord { + return { + id: data.id, + segments: new Map(data.segments.map((segment) => [segment.seqNo, segment])), + stitchedText: data.stitchedText, + status: data.status, + finalTranscript: data.finalTranscript, + listeners: new Set(), + } + } + + private async saveRedisSession(session: SessionRecord): Promise { + if (!this.redisClient) return + const payload = JSON.stringify(this.toPersisted(session)) + await this.redisClient.setEx(this.sessionKey(session.id), this.SESSION_TTL_SECONDS, payload) + } + + private async loadRedisSession(sessionId: string): Promise { + if (!this.redisClient) return null + const payload = await this.redisClient.get(this.sessionKey(sessionId)) + if (!payload) return null + const parsed = JSON.parse(payload) as PersistedSession + return this.fromPersisted(parsed) + } + + private async pushRedisEvent(sessionId: string, event: TranscriptionEvent): Promise { + if (!this.redisClient) return + await this.redisClient.rpush(this.eventKey(sessionId), JSON.stringify(event)) + await this.redisClient.expire(this.eventKey(sessionId), this.SESSION_TTL_SECONDS) + } + + subscribe(sessionId: string, listener: (event: TranscriptionEvent) => void): () => void { + if (!this.redisClient) { + const session = this.getSession(sessionId) + session.listeners.add(listener) + + listener({ + event: 'status', + data: { + session_id: sessionId, + status: session.status, + stitched_text: session.stitchedText, + final_transcript: session.finalTranscript ?? null, + }, + }) + + return () => { + session.listeners.delete(listener) + } + } + + let closed = false + let sentCount = 0 + + const emitInitial = async () => { + const session = await this.loadRedisSession(sessionId) + listener({ + event: 'status', + data: { + session_id: sessionId, + status: session?.status ?? 'recording', + stitched_text: session?.stitchedText ?? '', + final_transcript: session?.finalTranscript ?? null, + }, + }) + + const existingEvents = await this.redisClient!.lrange(this.eventKey(sessionId), 0, -1) + sentCount = existingEvents.length + } + + void emitInitial() + + const interval = setInterval(async () => { + if (closed) return try { - listener(event) - } catch (error) { - console.error("Failed to notify SSE listener", error) + const events = await this.redisClient!.lrange(this.eventKey(sessionId), sentCount, -1) + if (events.length === 0) return + for (const payload of events) { + const event = JSON.parse(payload) as TranscriptionEvent + listener(event) + } + sentCount += events.length + } catch { + // Ignore transient polling errors. } - }) + }, 1000) + + return () => { + closed = true + clearInterval(interval) + } } - addSegment(sessionId: string, segment: Omit & { transcript: string }) { - const session = this.getSession(sessionId) - session.segments.set(segment.seqNo, segment) + async addSegment(sessionId: string, segment: Omit & { transcript: string }): Promise { + const existing = this.redisClient ? await this.loadRedisSession(sessionId) : null + const session = existing || this.getSession(sessionId) + session.segments.set(segment.seqNo, segment) const orderedSegments = Array.from(session.segments.values()).sort((a, b) => a.seqNo - b.seqNo) - let stitched = "" + + let stitched = '' for (const seg of orderedSegments) { const text = trimOverlapText(stitched, seg.transcript) stitched = stitched ? `${stitched} ${text}` : text } session.stitchedText = stitched.trim() - this.emit(session, { - event: "segment", + const event: TranscriptionEvent = { + event: 'segment', data: { session_id: sessionId, seq_no: segment.seqNo, @@ -168,49 +424,82 @@ class TranscriptionSessionStore { transcript: segment.transcript, stitched_text: session.stitchedText, }, - }) + } + + if (this.redisClient) { + await this.saveRedisSession(session) + await this.pushRedisEvent(sessionId, event) + } else { + this.emit(session, event) + } } - setStatus(sessionId: string, status: TranscriptionStatus) { - const session = this.getSession(sessionId) + async setStatus(sessionId: string, status: TranscriptionStatus): Promise { + const existing = this.redisClient ? await this.loadRedisSession(sessionId) : null + const session = existing || this.getSession(sessionId) session.status = status - this.emit(session, { - event: "status", + + const event: TranscriptionEvent = { + event: 'status', data: { session_id: sessionId, status, stitched_text: session.stitchedText, final_transcript: session.finalTranscript ?? null, }, - }) + } + + if (this.redisClient) { + await this.saveRedisSession(session) + await this.pushRedisEvent(sessionId, event) + } else { + this.emit(session, event) + } } - setFinalTranscript(sessionId: string, transcript: string) { - const session = this.getSession(sessionId) + async setFinalTranscript(sessionId: string, transcript: string): Promise { + const existing = this.redisClient ? await this.loadRedisSession(sessionId) : null + const session = existing || this.getSession(sessionId) session.finalTranscript = transcript - session.status = "completed" - this.sessionTimestamps.set(sessionId, Date.now()) // Update timestamp on completion - console.log(`[SessionStore] Session ${sessionId} completed with ${transcript.length} chars`) - this.emit(session, { - event: "final", + session.status = 'completed' + this.sessionTimestamps.set(sessionId, Date.now()) + + const event: TranscriptionEvent = { + event: 'final', data: { session_id: sessionId, final_transcript: transcript, }, - }) + } + + if (this.redisClient) { + await this.saveRedisSession(session) + await this.pushRedisEvent(sessionId, event) + } else { + this.emit(session, event) + } } - emitError(sessionId: string, code: string, message: string) { - const session = this.getSession(sessionId) - session.status = "error" - this.emit(session, { - event: "error", + async emitError(sessionId: string, code: string, message: string): Promise { + const existing = this.redisClient ? await this.loadRedisSession(sessionId) : null + const session = existing || this.getSession(sessionId) + session.status = 'error' + + const event: TranscriptionEvent = { + event: 'error', data: { session_id: sessionId, code, message, }, - }) + } + + if (this.redisClient) { + await this.saveRedisSession(session) + await this.pushRedisEvent(sessionId, event) + } else { + this.emit(session, event) + } } } diff --git a/packages/pipeline/eval/src/tests/e2e-basic.test.ts b/packages/pipeline/eval/src/tests/e2e-basic.test.ts index 1e2f4e9..6bd124f 100644 --- a/packages/pipeline/eval/src/tests/e2e-basic.test.ts +++ b/packages/pipeline/eval/src/tests/e2e-basic.test.ts @@ -236,13 +236,13 @@ test("Phase 5: Session store - assemble transcript segments", { timeout: 5000 }, for (const segment of segments) { console.log(` Adding segment ${segment.seqNo}: "${segment.transcript}"`) - transcriptionSessionStore.addSegment(sessionId, segment) + await transcriptionSessionStore.addSegment(sessionId, segment) } console.log(`✅ Added ${segments.length} segments`) console.log("⏳ Setting final transcript...") const finalText = segments.map(s => s.transcript).join(" ") - transcriptionSessionStore.setFinalTranscript(sessionId, finalText) + await transcriptionSessionStore.setFinalTranscript(sessionId, finalText) console.log(`✅ Set final transcript: "${finalText}"`) unsubscribe() @@ -363,7 +363,7 @@ test("Phase 6: Complete pipeline - audio to final transcript", { timeout: 10000 }) for (const transcript of transcripts) { - transcriptionSessionStore.addSegment(sessionId, { + await transcriptionSessionStore.addSegment(sessionId, { seqNo: transcript.seqNo, startMs: transcript.startMs, endMs: transcript.endMs, @@ -374,7 +374,7 @@ test("Phase 6: Complete pipeline - audio to final transcript", { timeout: 10000 } const combinedText = transcripts.map(t => t.text).join(" ") - transcriptionSessionStore.setFinalTranscript(sessionId, combinedText) + await transcriptionSessionStore.setFinalTranscript(sessionId, combinedText) unsubscribe() diff --git a/packages/pipeline/eval/src/tests/e2e-real-api.test.ts b/packages/pipeline/eval/src/tests/e2e-real-api.test.ts index eb8052e..7c162ec 100644 --- a/packages/pipeline/eval/src/tests/e2e-real-api.test.ts +++ b/packages/pipeline/eval/src/tests/e2e-real-api.test.ts @@ -207,7 +207,7 @@ test("REAL E2E: Complete pipeline with actual Whisper API", { timeout: 60_000 }, console.log(`⏳ Adding ${transcripts.length} segments to session...`) for (const transcript of transcripts) { - transcriptionSessionStore.addSegment(sessionId, { + await transcriptionSessionStore.addSegment(sessionId, { seqNo: transcript.seqNo, startMs: transcript.startMs, endMs: transcript.endMs, @@ -219,7 +219,7 @@ test("REAL E2E: Complete pipeline with actual Whisper API", { timeout: 60_000 }, const combinedText = transcripts.map(t => t.text).join(" ").trim() console.log(`⏳ Setting final transcript...`) - transcriptionSessionStore.setFinalTranscript(sessionId, combinedText) + await transcriptionSessionStore.setFinalTranscript(sessionId, combinedText) unsubscribe()