const { Room, AudioFrame, AudioSource, LocalAudioTrack, TrackPublishOptions, RoomEvent, TrackKind, AudioStream } = require('@livekit/rtc-node');
const getToken = require('./token');
const fs = require('fs');
const { readFileSync } = require('node:fs');
require('dotenv').config();
const { spawn } = require('child_process');
const path = require('path');
const LIVEKIT_URL = process.env.LIVEKIT_URL;
const sources = {};
const sourceQueues = {};
const activeLanguages = new Set();
const langaugeFile = "languages.json"
// If we want it to persist we can add this back
// try {
// const existing = JSON.parse(fs.readFileSync("languages.json", "utf-8"));
// existing.forEach(lang => activeLanguages.add(lang));
// } catch (e) {
// // File may not exist yet
// }
/**
* Publishes missing language tracks for all currently active languages
* and persists the language list to disk.
*
* @param {Room} room - Connected LiveKit room instance.
* @returns {Promise<void>}
*/
async function addLanguages(room) {
// Persist to JSON
fs.writeFileSync(langaugeFile, JSON.stringify([...activeLanguages], null, 2));
// activeLanguages.forEach(async (language) => {
// if (language in sources) {
// continue;
// }
// sources[language] = await publishAudioTrack({room, language})
// });
const promises = [];
for (const language of activeLanguages) {
if (!Object.hasOwn(sources, language)) {
promises.push(
publishAudioTrack({ room, language }).then(track => {
sources[language] = track;
})
);
}
}
await Promise.all(promises);
console.log("Updated languages:", [...activeLanguages]);
}
/**
* Creates and publishes an outgoing translation audio track for a language.
*
* @param {{ room: Room, language: string }} params - Publish parameters.
* @returns {Promise<AudioSource>} Audio source used to push translated frames.
*/
async function publishAudioTrack({ room, language }) {
const audioSource = new AudioSource(16000, 1);
const track = LocalAudioTrack.createAudioTrack(language, audioSource);
const options = new TrackPublishOptions();
await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription());
return audioSource;
}
// Testing this AI pipe stuff
/**
* Normalizes an input frame and forwards it to a published audio source.
* Converts stereo to mono and linearly resamples when needed.
*
* @param {AudioFrame} inFrame - Incoming frame from translator output.
* @param {AudioSource} source - Destination LiveKit audio source.
* @param {number} [outSampleRate=16000] - Target sample rate.
* @param {number} [outChannels=1] - Target channel count.
* @returns {Promise<void>}
*/
async function pipeIncomingToSource(inFrame, source, outSampleRate = 16000, outChannels = 1) {
const inputPcm = new Int16Array(inFrame.data);
const inRate = inFrame.sampleRate;
const inChannels = inFrame.channels;
// 1. Convert stereo → mono
let mono = inputPcm;
if (inChannels === 2) {
mono = new Int16Array(inputPcm.length / 2);
for (let i = 0, j = 0; i < inputPcm.length; i += 2, j++) {
mono[j] = ((inputPcm[i] + inputPcm[i+1]) / 2) | 0;
}
}
// 2. Resample to match outgoing format
let pcm = mono;
if (inRate !== outSampleRate) {
const ratio = outSampleRate / inRate;
const newLength = Math.round(mono.length * ratio);
const resampled = new Int16Array(newLength);
for (let i = 0; i < newLength; i++) {
const t = i / ratio;
const i0 = Math.floor(t);
const i1 = Math.min(i0 + 1, mono.length - 1);
const frac = t - i0;
resampled[i] = (mono[i0] * (1 - frac) + mono[i1] * frac) | 0;
}
pcm = resampled;
}
// 3. Build outgoing frame (exact match to track format!)
const outFrame = new AudioFrame(
pcm,
outSampleRate,
outChannels,
pcm.length // samplesPerChannel
);
// 4. Send to LiveKit track
await source.captureFrame(outFrame);
// const FRAME_DURATION_MS = 20; // 20ms
// const FRAME_SIZE = (outSampleRate * FRAME_DURATION_MS) / 1000 * outChannels;
// for (let i = 0; i < pcm.length; i += FRAME_SIZE) {
// const slice = pcm.subarray(i, i + FRAME_SIZE);
// const samplesPerChannel = slice.length / outChannels;
// const subFrame = new AudioFrame(
// slice,
// outSampleRate,
// outChannels,
// samplesPerChannel
// );
// await source.captureFrame(subFrame);
// }
}
// temp prerecorded for testing
/**
* Streams a prerecorded WAV file to a language track.
*
* @param {string} language - Target language code used in filename lookup.
* @param {AudioSource} source - Destination LiveKit audio source.
* @returns {Promise<void>}
*/
async function pipePreRecordedToSource(language, source) {
console.log('should be piping something that doesnt exist', language);
const filePath = `../prerecordedAudio/${language}.wav`;
const sample = readFileSync(filePath);
const channels = sample.readUInt16LE(22);
const sampleRate = sample.readUInt32LE(24);
const dataSize = sample.readUInt32LE(40) / 2;
var buffer = new Int16Array(sample.buffer);
let written = 44; // start of WAVE data stream
const FRAME_DURATION = 1; // write 1s of audio at a time
const numSamples = sampleRate * FRAME_DURATION;
while (written < dataSize) {
// for (let i = 0; i < 2; i++) {
const available = dataSize - written;
const frameSize = Math.min(numSamples, available);
const frame = new AudioFrame(
buffer.slice(written, written + frameSize),
sampleRate,
channels,
Math.trunc(frameSize / channels),
);
await source.captureFrame(frame);
console.log('just sending a couple packets back for ', language);
written += frameSize;
}
}
/**
* Serializes frame publishing per language and drops frames if queue grows too large.
*
* @param {string} language - Target language key for routing.
* @param {AudioFrame} frame - Frame to enqueue for publish.
* @returns {void}
*/
function enqueueFrame(language, frame, room) {
const source = sources[language];
if (!source) {
console.warn("No source for ", language, " Adding source");
// This is more of a temporary fix until we can figure out why sometimes it doesn't create a track, but does translation.
activeLanguages.add(language);
addLanguages(room)
// return;
}
// initialize queue chain
if (!sourceQueues[language]) {
sourceQueues[language] = Promise.resolve();
sourceQueues[language].length = 0;
}
sourceQueues[language].length++;
if (sourceQueues[language].length > 50) {
console.warn("Queue too large, dropping frame");
sourceQueues[language].length--;
return;
}
// console.log(sourceQueues[language]);
// append work to queue
// console.log('saving something to the queue?')
sourceQueues[language] = sourceQueues[language]
.then(() => pipeIncomingToSource(frame, source))
.finally(() => {
sourceQueues[language].length--;
})
.catch((e) => console.error("publish error", e));
}
/**
* Connects the backend bot, manages language track lifecycle, and bridges
* LiveKit audio with the Python translation child process.
*
* @param {{ roomName: string }} params - Startup options.
* @returns {Promise<void>}
*/
async function main({ roomName }) {
// clear out language.json
fs.writeFileSync(langaugeFile, '');
const token = await getToken({ username: "BotAccount", roomName });
const room = new Room();
console.log('trying to connect');
await room.connect(LIVEKIT_URL, token);
room.remoteParticipants.forEach((participant) => {
console.log('participant', participant.metadata);
const { languages } = JSON.parse(participant.metadata)
languages.forEach((language) => {
activeLanguages.add(language);
});
});
if (activeLanguages.size > 0) {
await addLanguages(room);
}
// Spawn the Python translator worker in childmode (framed stdin/stdout)
// const pyScript = path.join(__dirname, 'AiTranslation', 'tannerPlusSileroChatTest.py');
const pyScript = path.join(__dirname, 'aiTranslation', 'tannerPlusSileroChatTest.py');
// const pyScript = path.join(__dirname, 'aiTranslation', 'aiTranslation.py');
let pyBin = path.join(__dirname, 'AiTranslation', 'venv', 'bin', 'python');
if (!fs.existsSync(pyBin)) {
// Fallback to system python
pyBin = process.env.PYTHON || process.env.PYTHON3 || 'python3';
console.warn('Python venv not found, falling back to', pyBin);
}
// Pipe stderr so we can capture Python errors and startup logs
// Child environment: pass through current env. Enable echo only when
// ENABLE_ECHO_CHILD=1 is set in the parent environment.
const childEnv = Object.assign({}, process.env);
if (process.env.ENABLE_ECHO_CHILD === '1') {
childEnv.ECHO_CHILD = '1';
console.log('ENABLE_ECHO_CHILD=1 -> enabling ECHO_CHILD in python child');
}
const py = spawn(pyBin, [pyScript, '--childmode'], { stdio: ['pipe', 'pipe', 'pipe'], cwd: path.join(__dirname, 'AiTranslation'), env: childEnv });
// Detect spawn problems early
py.on('error', (err) => {
console.error('Failed to spawn python child:', err);
});
py.on('exit', (code, signal) => {
console.log(`Python child exited with code=${code} signal=${signal}`);
});
py.on('close', (code, signal) => {
console.log(`Python child closed with code=${code} signal=${signal}`);
});
// Buffer outgoing frames until python writer thread is ready
let pyReady = false;
const pendingFrames = [];
const MAX_PENDING = 200; // avoid unbounded growth
let sentSynthetic = false;
py.stderr.on('data', (chunk) => {
const s = chunk.toString();
console.error('[py stderr]', s);
// detect the writer thread ready log emitted by the child
if (!pyReady && s.includes('[CHILDMODE] writer thread started')) {
pyReady = true;
console.log('Python child signalled ready; flushing', pendingFrames.length, 'pending frames');
// flush pending frames
while (pendingFrames.length > 0) {
try {
py.stdin.write(pendingFrames.shift());
} catch (e) {
console.error('Error flushing pending frame to python', e);
break;
}
}
}
});
// Startup watchdog: warn if no stdout data within 5s
let sawStdout = false;
const startupTimer = setTimeout(() => {
if (!sawStdout) console.warn('No stdout data received from Python child within 10s — it may have crashed or be stuck.');
}, 10000);
let pyRecvBuf = Buffer.alloc(0);
py.stdout.on('data', (chunk) => {
// mark that we have seen stdout so the startup watchdog won't fire
if (!sawStdout) {
sawStdout = true;
clearTimeout(startupTimer);
}
pyRecvBuf = Buffer.concat([pyRecvBuf, chunk]);
while (pyRecvBuf.length >= 8) {
const header = pyRecvBuf.slice(0, 8);
let metaLen, audioLen;
try {
metaLen = header.readUInt32LE(0);
audioLen = header.readUInt32LE(4);
} catch (err) {
console.error('Failed to read lengths from header', err, 'header=', header);
// drop one byte and try to resync
pyRecvBuf = pyRecvBuf.slice(1);
continue;
}
const total = 8 + metaLen + audioLen;
// sanity-check sizes to avoid trying to allocate huge buffers on corruption
const MAX_META = 1024 * 16; // 16 KiB
const MAX_AUDIO = 1024 * 1024 * 20; // 20 MiB
if (metaLen < 0 || metaLen > MAX_META || audioLen < 0 || audioLen > MAX_AUDIO) {
// NEED TO LOOK INTO WHAT CAUSES THIS RANDOMLY
// console.error('Invalid lengths detected in header — attempting resync', { metaLen, audioLen });
// console.error('Dumping header and buffer (hex):', header.toString('hex'), pyRecvBuf.toString('hex').slice(0, 512));
// discard first byte and try to find the next plausible header
pyRecvBuf = pyRecvBuf.slice(1);
continue;
}
if (pyRecvBuf.length < total) break;
const meta = JSON.parse(pyRecvBuf.slice(8, 8 + metaLen).toString('utf8'));
let audioBuf = pyRecvBuf.slice(8 + metaLen, total);
try {
// audio is raw int16 LE PCM at 16000 mono
// Create an aligned Int16Array view. Node Buffers may have
// an ArrayBuffer with a non-2-byte-aligned byteOffset which
// causes a RangeError when constructing Int16Array directly.
let samples;
try {
if (audioBuf.length % 2 !== 0) {
// odd length: drop trailing byte to keep sample pairs
console.warn('[PY RX] audioBuf length odd, trimming last byte');
audioBuf = audioBuf.slice(0, audioBuf.length - 1);
}
const ab = audioBuf.buffer.slice(audioBuf.byteOffset, audioBuf.byteOffset + audioBuf.length);
samples = new Int16Array(ab);
} catch (innerErr) {
console.error('[PY RX] aligned Int16Array failed, falling back to copy', innerErr);
const count = Math.floor(audioBuf.length / 2);
samples = new Int16Array(count);
for (let i = 0; i < count; i++) {
samples[i] = audioBuf.readInt16LE(i * 2);
}
}
const reportedRate = parseInt(meta.sample_rate || 16000, 10);
const reportedChannels = parseInt(meta.channels || 1, 10);
const inFrame = new AudioFrame(
samples,
reportedRate,
reportedChannels,
Math.trunc(samples.length / Math.max(1, reportedChannels))
);
// Log incoming audio stats and routing info
try {
const view = samples;
let peak = 0;
let sumSq = 0;
for (let i = 0; i < view.length; i++) {
const v = Math.abs(view[i]);
if (v > peak) peak = v;
sumSq += view[i] * view[i];
}
const rms = view.length ? Math.sqrt(sumSq / view.length) : 0;
console.log(`[FROM PY] parsed meta=${JSON.stringify(meta)} bytes=${audioBuf.length} samples=${view.length} ch=${reportedChannels} sr=${reportedRate} peak=${peak} rms=${rms.toFixed(1)} t=${Date.now()}`);
} catch (err) {
console.error('Failed computing incoming audio stats', err);
}
// Choose target language/source (fallback to 'es')
// make this more dynamic?
const targetLanguage = (meta.language);// || meta.lang || 'Spanish');
console.log('Routing incoming audio to', targetLanguage);
// Route through the resampler/pipeline helper so it matches published track format
// if (sources[targetLang]) {
// pipeIncomingToSource(inFrame, sources[targetLang]).catch((e) => console.error('publish error', e));
// } else {
// // fallback to 'es' if target not present
// pipeIncomingToSource(inFrame, sources['es']).catch((e) => console.error('publish error', e));
// }
// pipeIncomingToSource(inFrame, sources[targetLang]).catch((e) => console.error('publish error', e));
enqueueFrame(targetLanguage, inFrame, room)
} catch (e) {
console.error('Error handling py response', e);
}
pyRecvBuf = pyRecvBuf.slice(total);
}
});
/**
* This is a test to see if we can fix track publications
*/
for (const participant of room.remoteParticipants.values()) {
// console.log(participant)
for (const publication of participant.trackPublications.values()) {
console.log(publication.name);
if (publication.name === "translateAudio") {
await publication.setSubscribed(true);
console.log("Subscribed to existing track:", publication.name);
}
}
}
// 2️⃣ Subscribe to tracks published in the future
room.on(RoomEvent.TrackPublished, async (publication, participant) => {
// console.log(participant, publication);
if (publication.name === "translateAudio") {
await publication.setSubscribed(true);
// console.log("Subscribed to new track from participant:", participant.identity);
}
});
room.on(RoomEvent.ParticipantConnected, async (participant) => {
console.log('participant', participant.metadata);
const { languages } = JSON.parse(participant.metadata);
const activeLanguagesSize = activeLanguages.size;
languages.forEach((language) => {
activeLanguages.add(language);
});
if (activeLanguages.size > activeLanguagesSize) {
await addLanguages(room);
}
// Subscribe to existing tracks
for (const publication of participant.trackPublications.values()) {
if (publication.name === "translateAudio") {
publication.setSubscribed(true);
console.log("Subscribed to existing track of new participant:", publication.name);
}
}
});
room.on(RoomEvent.TrackSubscribed, async (track, publication, participant) => {
// console.log('does this ever get called?');
// console.log(track, publication);
// console.log('pub details', publication)
if (publication.name !== "translateAudio") return;
console.log('subscribed to track', track.sid, );//publication, participant.identity);
if (track.kind !== TrackKind.KIND_AUDIO) return;
// console.log('track details', track)
const stream = new AudioStream(track);
// Use framed child-process IPC to send frames to the Python translator
/**
* Encodes a LiveKit frame as framed PCM payload for Python child stdin.
*
* @param {AudioFrame} frame - LiveKit audio frame to send.
* @returns {void}
*/
function sendFrameToPy(frame) {
try {
// Normalize frame.data to 16-bit PCM Int16Array. LiveKit may expose
// audio as Int16Array or Float32Array depending on platform/version.
let int16;
if (frame.data instanceof Int16Array) {
int16 = frame.data;
} else if (frame.data instanceof Float32Array) {
const f32 = frame.data;
int16 = new Int16Array(f32.length);
for (let i = 0; i < f32.length; i++) {
// Clamp to [-1, 1] then scale to int16 range
const v = Math.max(-1, Math.min(1, f32[i]));
int16[i] = (v * 32767) | 0;
}
} else {
// Fallback: try to treat data as a generic array of samples in [-1,1]
const arr = Array.from(frame.data || []);
int16 = new Int16Array(arr.length);
for (let i = 0; i < arr.length; i++) {
const v = Math.max(-1, Math.min(1, Number(arr[i]) || 0));
int16[i] = (v * 32767) | 0;
}
}
const sampleCount = int16.length;
const buf = Buffer.alloc(sampleCount * 2);
for (let i = 0; i < sampleCount; i++) {
buf.writeInt16LE(int16[i], i * 2);
}
const metaObj = { sample_rate: frame.sampleRate, channels: frame.channels };
const meta = Buffer.from(JSON.stringify(metaObj));
const header = Buffer.alloc(8);
header.writeUInt32LE(meta.length, 0);
header.writeUInt32LE(buf.length, 4);
const payload = Buffer.concat([header, meta, buf]);
// Log outgoing audio stats (samples, channels, peak) so we can
// compare them with what the Python child reports.
// Log outgoing audio stats (samples, channels, peak) so we can
// compare them with what the Python child reports.
try {
const view = int16;
let peak = 0;
let sumSq = 0;
for (let i = 0; i < view.length; i++) {
const v = Math.abs(view[i]);
if (v > peak) peak = v;
sumSq += view[i] * view[i];
}
const rms = view.length ? Math.sqrt(sumSq / view.length) : 0;
// console.log(
// `[TO PY] samples=${sampleCount} ch=${frame.channels} sr=${metaObj.sample_rate} peak=${peak} rms=${rms.toFixed(
// 1,
// )}`,
// );
// console.log(
// `[TO PY] samples=${sampleCount} ch=${frame.channels} sr=${metaObj.sample_rate} peak=${peak} rms=${rms.toFixed(
// 1,
// )}`,
// );
} catch (err) {
console.error('Failed computing outgoing audio stats', err);
}
if (pyReady) {
py.stdin.write(payload);
} else {
// buffer until python signals readiness
if (pendingFrames.length >= MAX_PENDING) {
// drop oldest to avoid memory growth
pendingFrames.shift();
}
pendingFrames.push(payload);
}
} catch (e) {
console.error('sendFrameToPy error', e);
}
}
// Kick off prerecorded audio ONCE per language
// for (const language of languages) {
// if (language !== "en") {
// console.log("Creating translated audio for ", language);
// pipePreRecordedToSource(language, sources[language]); // no await needed
// }
// }
// forward incoming frames to python as they arrive (framed to child stdin)
let frameCount = 0;
const startTime = Date.now();
for await (const frame of stream) {
frameCount++;
if (frameCount % 10 === 0) {
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
// console.log('[BOT] forwarding frames to Python:', frameCount, 'elapsed=', `${elapsed}s`);
}
// if (frameCount % 50 === 0) {
// console.log(
// '[AUDIO CONT]',
// 'participant=', participant.identity,
// 'frames=', frameCount,
// 'elapsed_s=', ((Date.now() - startTime) / 1000).toFixed(1)
// );
// }
try {
sendFrameToPy(frame);
} catch (err) {
console.error('Error forwarding frame to python', err);
}
}
});
}
const roomName = process.argv[2] || 'test-room';
// const languages = (process.argv[3] || 'es').split(',');
main({ roomName }).catch(console.error);