// host-listener-client.js
const { Room, AudioSource, AudioFrame, LocalAudioTrack, TrackPublishOptions, AudioStream, TrackKind, RoomEvent } = require("@livekit/rtc-node");
const fs = require("fs");
const path = require("path");
require('dotenv').config();
const getToken = require("./token.js");
const { readFileSync } = require('node:fs');
const LIVEKIT_URL = process.env.LIVEKIT_URL;
const BITS_PER_SAMPLE = 16;
const writers = new Map();
/**
* Returns (or lazily creates) a WAV writer for a publication name.
*
* @param {string} filename - Logical output identifier, usually publication name.
* @param {{ sampleRate: number, channels: number }} frame - First frame used to seed WAV header.
* @returns {Object} Writable stream for PCM data.
*/
function getWriter(filename, frame) {
// If we already created a writer for this file, return it
if (writers.has(filename)) {
return writers.get(filename);
}
// Otherwise create a new writer
const filePath = `./receivedAudio-${filename}.wav`;
const writer = fs.createWriteStream(filePath);
// Write header only once
writeWavHeader(writer, frame);
// Store globally
writers.set(filename, writer);
return writer;
}
/**
* Writes a 44-byte PCM WAV header with placeholder sizes.
*
* @param {Object} writer - Destination writer.
* @param {{ sampleRate: number, channels: number }} frame - Frame metadata used for format fields.
* @returns {void}
*/
function writeWavHeader(writer, frame) {
const header = Buffer.alloc(44);
const byteRate = (frame.sampleRate * frame.channels * BITS_PER_SAMPLE) / 8;
const blockAlign = (frame.channels * BITS_PER_SAMPLE) / 8;
// RIFF header
header.write("RIFF", 0);
header.writeUInt32LE(0, 4); // Placeholder chunk size
header.write("WAVE", 8);
// fmt subchunk
header.write("fmt ", 12);
header.writeUInt32LE(16, 16); // Subchunk1Size (PCM)
header.writeUInt16LE(1, 20); // AudioFormat (1 = PCM)
header.writeUInt16LE(frame.channels, 22);
header.writeUInt32LE(frame.sampleRate, 24);
header.writeUInt32LE(byteRate, 28);
header.writeUInt16LE(blockAlign, 32);
header.writeUInt16LE(BITS_PER_SAMPLE, 34);
// data subchunk
header.write("data", 36);
header.writeUInt32LE(0, 40); // Placeholder Subchunk2Size
writer.write(header);
}
/**
* Appends a single audio frame to a WAV file for the given publication.
*
* @param {{ data: Int16Array|Float32Array|Uint8Array }} frame - Audio frame from LiveKit stream.
* @param {string} trackToProcess - Track identifier guard; if falsy, frame is ignored.
* @param {string} filename - Output grouping key and file suffix.
* @returns {void}
*/
function saveFrameToFile(frame, trackToProcess, filename) {
// console.log('Saving audio frame to ', filename);
if (!trackToProcess) {
return;
}
const writer = getWriter(filename, frame);
if (writer) {
const buf = Buffer.from(frame.data.buffer);
writer.write(buf);
}
}
/**
* Parses core WAV metadata and finds the PCM data chunk.
*
* @param {Buffer} buffer - Full WAV file bytes.
* @returns {{
* channels: number,
* sampleRate: number,
* bitsPerSample: number,
* dataStart: number,
* dataSizeBytes: number
* }} Parsed WAV information.
* @throws {Error} If the file is not PCM WAV or no data chunk is found.
*/
function parseWav(buffer) {
// Basic RIFF/WAVE validation
if (
buffer.toString('ascii', 0, 4) !== 'RIFF' ||
buffer.toString('ascii', 8, 12) !== 'WAVE'
) {
throw new Error('Not a WAV file');
}
let offset = 12;
let channels;
let sampleRate;
let bitsPerSample;
let dataStart;
let dataSizeBytes;
while (offset + 8 <= buffer.length) {
const chunkId = buffer.toString('ascii', offset, offset + 4);
const chunkSize = buffer.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
const audioFormat = buffer.readUInt16LE(offset + 8);
if (audioFormat !== 1) {
throw new Error('Only PCM WAV files are supported');
}
channels = buffer.readUInt16LE(offset + 10);
sampleRate = buffer.readUInt32LE(offset + 12);
bitsPerSample = buffer.readUInt16LE(offset + 22);
}
if (chunkId === 'data') {
dataStart = offset + 8;
dataSizeBytes = chunkSize;
break;
}
offset += 8 + chunkSize;
}
if (dataStart == null) {
throw new Error('WAV data chunk not found');
}
return {
channels,
sampleRate,
bitsPerSample,
dataStart,
dataSizeBytes,
};
}
/**
* Publishes prerecorded audio to the room on the `translateAudio` track.
*
* @param {{ room: Room }} params - LiveKit room context.
* @returns {Promise<void>}
*/
async function publishTracks({ room }){
// Read WAV header first so the AudioSource matches the file properties
const filePath = "../prerecordedAudio/3MinWavAudio.wav";
// const filePath = "../prerecordedAudio/testAudio.wav";
const sample = readFileSync(filePath);
const {
channels,
sampleRate,
bitsPerSample,
dataStart,
dataSizeBytes,
} = parseWav(sample);
const bytesPerSample = bitsPerSample / 8;
const totalSamples16 = dataSizeBytes / bytesPerSample;
// Create AudioSource with matching params
const audioSource = new AudioSource(sampleRate, channels);
const track = LocalAudioTrack.createAudioTrack('translateAudio', audioSource);
const options = new TrackPublishOptions();
await room.localParticipant.publishTrack(track, options);
const dataView = new Int16Array(sample.buffer, sample.byteOffset + dataStart, totalSamples16);
console.log('[MOCK] loaded WAV', {
filePath,
channels,
sampleRate,
bitsPerSample,
dataSizeBytes,
totalSamples16,
durationSec: (
dataSizeBytes /
(sampleRate * channels * bytesPerSample)
).toFixed(2),
});
// Send frames of FRAME_DURATION seconds. Work in 16-bit sample units.
const FRAME_DURATION = 1; // seconds per frame
const samplesPerChannelPerFrame = sampleRate * FRAME_DURATION;
const frameTotalSamples = samplesPerChannelPerFrame * channels; // interleaved samples count
let written = 0; // index into dataView (16-bit elements)
let frameCount = 0;
const startTime = Date.now();
while (written < dataView.length) {
const remaining = dataView.length - written;
const take = Math.min(frameTotalSamples, remaining);
const frameData = dataView.slice(written, written + take);
let peak = 0;
let sumSq = 0;
for (let i = 0; i < frameData.length; i++) {
const abs = Math.abs(frameData[i]);
if (abs > peak) peak = abs;
sumSq += frameData[i] * frameData[i];
}
const rms = frameData.length ? Math.sqrt(sumSq / frameData.length) : 0;
const frame = new AudioFrame(
frameData,
sampleRate,
channels,
Math.trunc(frameData.length / channels),
);
await audioSource.captureFrame(frame);
written += take;
frameCount++;
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
if (frameCount % 10 === 0 || frameCount <= 3) {
console.log(
`[MOCK] published frames=${frameCount} elapsed=${elapsed}s peak_int16=${peak} rms_int16=${rms.toFixed(
1,
)} len=${frameData.length}`,
);
}
}
}
/**
* Connects the mock client, subscribes to translated tracks, records incoming audio,
* and publishes prerecorded source audio.
*
* @param {Object} params - Runtime options.
* @param {string} params.roomName - LiveKit room name.
* @param {string[]} params.languages - Track names to subscribe to.
* @returns {Promise<void>}
*/
async function main({ roomName, languages }) {
try {
const token = await getToken({
username: "testClient",
roomName,
metadata: JSON.stringify({languages: ['pa', 'ru', 'sq', 'tr', 'yue', 'de']})
});
//'es', 'pt', 'fr', 'zh', 'ja', 'ko'
const room = new Room();
await room.connect(LIVEKIT_URL, token);//, { autoSubscribe: false });
/**
* This is a test to see if we can fix track publications
*/
for (const participant of room.remoteParticipants.values()) {
// console.log(participant)
for (const publication of participant.trackPublications.values()) {
// console.log(publication);
console.log(publication.name);
if (languages.includes(publication.name)) {
await publication.setSubscribed(true);
console.log("Subscribed to existing track:", publication.name);
}
}
}
// 2️⃣ Subscribe to tracks published in the future
room.on(RoomEvent.TrackPublished, async (publication, participant) => {
console.log(publication.name);
if (languages.includes(publication.name)) {
await publication.setSubscribed(true);
console.log("Subscribed to new track from participant:", participant.identity);
}
});
room.on(RoomEvent.ParticipantConnected, async (participant) => {
// sleep so track can be made first
await new Promise(res => setTimeout(res, 1000));
console.log("Participant connected:", participant.identity);
// Subscribe to existing tracks
console.log(participant);
for (const publication of participant.tracks.values()) {
console.log(publication.name);
if (languages.includes(publication.name)) {
await publication.setSubscribed(true);
console.log("Subscribed to existing track of new participant:", publication.name);
}
}
});
room.on(RoomEvent.TrackSubscribed, async (track, publication) => {
console.log(`✅ Receiving audio: ${publication.name}`);
if (track.kind !== TrackKind.KIND_AUDIO) return;
const stream = new AudioStream(track);
for await (const frame of stream) {
// console.log('WHy are you calling htis for ', publication.name);
saveFrameToFile(frame, track.sid, publication.name)
}
});
console.log('clientcheckpoint 6');
// clean exit logging
room.on("disconnected", () => {
console.log("❌ Disconnected from room");
});
console.log("Waiting 1 second so bot can subscribe...");
await new Promise(res => setTimeout(res, 5000));
console.log("Bot is connected. Starting audio publish...");
await publishTracks({ room });
console.log('Looping done');
}catch (error) {
console.log(error);
return;
}
}
const roomName = process.argv[2] || 'test-room';
const languages = (process.argv[3] || 'es,en').split(',');
main({ roomName, languages }).catch(console.error);