Source: backendBot.js

const { Room, AudioFrame, AudioSource, LocalAudioTrack, TrackPublishOptions, RoomEvent, TrackKind, AudioStream } = require('@livekit/rtc-node');
const getToken = require('./token');
const fs = require('fs');
const { readFileSync } = require('node:fs');
require('dotenv').config();
const { spawn } = require('child_process');
const path = require('path');

const LIVEKIT_URL = process.env.LIVEKIT_URL;
const sources = {};
const sourceQueues = {};
const activeLanguages = new Set();
const langaugeFile = "languages.json"
// If we want it to persist we can add this back
// try {
//   const existing = JSON.parse(fs.readFileSync("languages.json", "utf-8"));
//   existing.forEach(lang => activeLanguages.add(lang));
// } catch (e) {
//   // File may not exist yet
// }

/**
 * Publishes missing language tracks for all currently active languages
 * and persists the language list to disk.
 *
 * @param {Room} room - Connected LiveKit room instance.
 * @returns {Promise<void>}
 */
async function addLanguages(room) {
  // Persist to JSON
  fs.writeFileSync(langaugeFile, JSON.stringify([...activeLanguages], null, 2));

  // activeLanguages.forEach(async (language) => {
  //   if (language in sources) {
  //     continue;
  //   }
  //   sources[language] = await publishAudioTrack({room, language})
  // });
  const promises = [];

  for (const language of activeLanguages) {
    if (!Object.hasOwn(sources, language)) {
      promises.push(
        publishAudioTrack({ room, language }).then(track => {
          sources[language] = track;
        })
      );
    }
  }

  await Promise.all(promises);

  console.log("Updated languages:", [...activeLanguages]);
}

/**
 * Creates and publishes an outgoing translation audio track for a language.
 *
 * @param {{ room: Room, language: string }} params - Publish parameters.
 * @returns {Promise<AudioSource>} Audio source used to push translated frames.
 */
async function publishAudioTrack({ room, language }) {
  const audioSource = new AudioSource(16000, 1);
  const track = LocalAudioTrack.createAudioTrack(language, audioSource);
  const options = new TrackPublishOptions();
  await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription());

  return audioSource;
}

// Testing this AI pipe stuff
/**
 * Normalizes an input frame and forwards it to a published audio source.
 * Converts stereo to mono and linearly resamples when needed.
 *
 * @param {AudioFrame} inFrame - Incoming frame from translator output.
 * @param {AudioSource} source - Destination LiveKit audio source.
 * @param {number} [outSampleRate=16000] - Target sample rate.
 * @param {number} [outChannels=1] - Target channel count.
 * @returns {Promise<void>}
 */
async function pipeIncomingToSource(inFrame, source, outSampleRate = 16000, outChannels = 1) {
  const inputPcm = new Int16Array(inFrame.data);
  const inRate = inFrame.sampleRate;
  const inChannels = inFrame.channels;

  // 1. Convert stereo → mono
  let mono = inputPcm;
  if (inChannels === 2) {
    mono = new Int16Array(inputPcm.length / 2);
    for (let i = 0, j = 0; i < inputPcm.length; i += 2, j++) {
      mono[j] = ((inputPcm[i] + inputPcm[i+1]) / 2) | 0;
    }
  }

  // 2. Resample to match outgoing format
  let pcm = mono;
  if (inRate !== outSampleRate) {
    const ratio = outSampleRate / inRate;
    const newLength = Math.round(mono.length * ratio);
    const resampled = new Int16Array(newLength);

    for (let i = 0; i < newLength; i++) {
      const t = i / ratio;
      const i0 = Math.floor(t);
      const i1 = Math.min(i0 + 1, mono.length - 1);
      const frac = t - i0;
      resampled[i] = (mono[i0] * (1 - frac) + mono[i1] * frac) | 0;
    }
    pcm = resampled;
  }

  // 3. Build outgoing frame (exact match to track format!)
  const outFrame = new AudioFrame(
    pcm,
    outSampleRate,
    outChannels,
    pcm.length // samplesPerChannel
  );

  // 4. Send to LiveKit track
  await source.captureFrame(outFrame);

  // const FRAME_DURATION_MS = 20; // 20ms
  // const FRAME_SIZE = (outSampleRate * FRAME_DURATION_MS) / 1000 * outChannels;
  
  // for (let i = 0; i < pcm.length; i += FRAME_SIZE) {
  //   const slice = pcm.subarray(i, i + FRAME_SIZE);
  //   const samplesPerChannel = slice.length / outChannels;
  
  //   const subFrame = new AudioFrame(
  //     slice,
  //     outSampleRate,
  //     outChannels,
  //     samplesPerChannel
  //   );
  
  //   await source.captureFrame(subFrame);
  // }
}

// temp prerecorded for testing
/**
 * Streams a prerecorded WAV file to a language track.
 *
 * @param {string} language - Target language code used in filename lookup.
 * @param {AudioSource} source - Destination LiveKit audio source.
 * @returns {Promise<void>}
 */
async function pipePreRecordedToSource(language, source) {
  console.log('should be piping something that doesnt exist', language);
  const filePath = `../prerecordedAudio/${language}.wav`;

  const sample = readFileSync(filePath);
  const channels = sample.readUInt16LE(22);
  const sampleRate = sample.readUInt32LE(24);
  const dataSize = sample.readUInt32LE(40) / 2;
  var buffer = new Int16Array(sample.buffer);

  let written = 44; // start of WAVE data stream
  const FRAME_DURATION = 1; // write 1s of audio at a time
  const numSamples = sampleRate * FRAME_DURATION;
  while (written < dataSize) {
  
  // for (let i = 0; i < 2; i++) {
    const available = dataSize - written;
    const frameSize = Math.min(numSamples, available);

    const frame = new AudioFrame(
      buffer.slice(written, written + frameSize),
      sampleRate,
      channels,
      Math.trunc(frameSize / channels),
    );
    await source.captureFrame(frame);
    console.log('just sending a couple packets back for ', language);
    written += frameSize;
  }
}

/**
 * Serializes frame publishing per language and drops frames if queue grows too large.
 *
 * @param {string} language - Target language key for routing.
 * @param {AudioFrame} frame - Frame to enqueue for publish.
 * @returns {void}
 */
function enqueueFrame(language, frame, room) {
  const source = sources[language];
  if (!source) {
    console.warn("No source for ", language, " Adding source");
    // This is more of a temporary fix until we can figure out why sometimes it doesn't create a track, but does translation.
    activeLanguages.add(language);
    addLanguages(room)
    // return;
  }

  // initialize queue chain
  if (!sourceQueues[language]) {
    sourceQueues[language] = Promise.resolve();
    sourceQueues[language].length = 0;
  }

  sourceQueues[language].length++;

  if (sourceQueues[language].length > 50) {
    console.warn("Queue too large, dropping frame");
    sourceQueues[language].length--;
    return;
  }
  // console.log(sourceQueues[language]);

  // append work to queue
  
  // console.log('saving something to the queue?')
  sourceQueues[language] = sourceQueues[language]
    .then(() => pipeIncomingToSource(frame, source))
    .finally(() => {
      sourceQueues[language].length--;
    })
    .catch((e) => console.error("publish error", e));

}

/**
 * Connects the backend bot, manages language track lifecycle, and bridges
 * LiveKit audio with the Python translation child process.
 *
 * @param {{ roomName: string }} params - Startup options.
 * @returns {Promise<void>}
 */
async function main({ roomName }) {
  // clear out language.json
  fs.writeFileSync(langaugeFile, '');
  const token = await getToken({ username: "BotAccount", roomName });

  const room = new Room();
  console.log('trying to connect');
  await room.connect(LIVEKIT_URL, token);

  room.remoteParticipants.forEach((participant) =>  {
    console.log('participant', participant.metadata);
    const { languages } = JSON.parse(participant.metadata)
    
    languages.forEach((language) => {
      activeLanguages.add(language);
    });
  });

  if (activeLanguages.size > 0) {
    await addLanguages(room);
  }

  // Spawn the Python translator worker in childmode (framed stdin/stdout)
  // const pyScript = path.join(__dirname, 'AiTranslation', 'tannerPlusSileroChatTest.py');
  const pyScript = path.join(__dirname, 'aiTranslation', 'tannerPlusSileroChatTest.py');
  // const pyScript = path.join(__dirname, 'aiTranslation', 'aiTranslation.py');
  let pyBin = path.join(__dirname, 'AiTranslation', 'venv', 'bin', 'python');
  if (!fs.existsSync(pyBin)) {
    // Fallback to system python
    pyBin = process.env.PYTHON || process.env.PYTHON3 || 'python3';
    console.warn('Python venv not found, falling back to', pyBin);
  }

  // Pipe stderr so we can capture Python errors and startup logs
  // Child environment: pass through current env. Enable echo only when
  // ENABLE_ECHO_CHILD=1 is set in the parent environment.
  const childEnv = Object.assign({}, process.env);
  if (process.env.ENABLE_ECHO_CHILD === '1') {
    childEnv.ECHO_CHILD = '1';
    console.log('ENABLE_ECHO_CHILD=1 -> enabling ECHO_CHILD in python child');
  }
  const py = spawn(pyBin, [pyScript, '--childmode'], { stdio: ['pipe', 'pipe', 'pipe'], cwd: path.join(__dirname, 'AiTranslation'), env: childEnv });

  // Detect spawn problems early
  py.on('error', (err) => {
    console.error('Failed to spawn python child:', err);
  });

  py.on('exit', (code, signal) => {
    console.log(`Python child exited with code=${code} signal=${signal}`);
  });

  py.on('close', (code, signal) => {
    console.log(`Python child closed with code=${code} signal=${signal}`);
  });

  // Buffer outgoing frames until python writer thread is ready
  let pyReady = false;
  const pendingFrames = [];
  const MAX_PENDING = 200; // avoid unbounded growth
  let sentSynthetic = false;

  py.stderr.on('data', (chunk) => {
    const s = chunk.toString();
    console.error('[py stderr]', s);

    // detect the writer thread ready log emitted by the child
    if (!pyReady && s.includes('[CHILDMODE] writer thread started')) {
      pyReady = true;
      console.log('Python child signalled ready; flushing', pendingFrames.length, 'pending frames');
      // flush pending frames
      while (pendingFrames.length > 0) {
        try {
          py.stdin.write(pendingFrames.shift());
        } catch (e) {
          console.error('Error flushing pending frame to python', e);
          break;
        }
      }
    }
  });

  // Startup watchdog: warn if no stdout data within 5s
  let sawStdout = false;
  const startupTimer = setTimeout(() => {
    if (!sawStdout) console.warn('No stdout data received from Python child within 10s — it may have crashed or be stuck.');
  }, 10000);

  let pyRecvBuf = Buffer.alloc(0);
  py.stdout.on('data', (chunk) => {
    // mark that we have seen stdout so the startup watchdog won't fire
    if (!sawStdout) {
      sawStdout = true;
      clearTimeout(startupTimer);
    }
    pyRecvBuf = Buffer.concat([pyRecvBuf, chunk]);
    while (pyRecvBuf.length >= 8) {
      const header = pyRecvBuf.slice(0, 8);
      let metaLen, audioLen;
      try {
        metaLen = header.readUInt32LE(0);
        audioLen = header.readUInt32LE(4);
      } catch (err) {
        console.error('Failed to read lengths from header', err, 'header=', header);
        // drop one byte and try to resync
        pyRecvBuf = pyRecvBuf.slice(1);
        continue;
      }

      const total = 8 + metaLen + audioLen;

      // sanity-check sizes to avoid trying to allocate huge buffers on corruption
      const MAX_META = 1024 * 16; // 16 KiB
      const MAX_AUDIO = 1024 * 1024 * 20; // 20 MiB
      if (metaLen < 0 || metaLen > MAX_META || audioLen < 0 || audioLen > MAX_AUDIO) {
        // NEED TO LOOK INTO WHAT CAUSES THIS RANDOMLY
        // console.error('Invalid lengths detected in header — attempting resync', { metaLen, audioLen });
        // console.error('Dumping header and buffer (hex):', header.toString('hex'), pyRecvBuf.toString('hex').slice(0, 512));
        // discard first byte and try to find the next plausible header
        pyRecvBuf = pyRecvBuf.slice(1);
        continue;
      }

      if (pyRecvBuf.length < total) break;

      const meta = JSON.parse(pyRecvBuf.slice(8, 8 + metaLen).toString('utf8'));
      let audioBuf = pyRecvBuf.slice(8 + metaLen, total);
        try {
        // audio is raw int16 LE PCM at 16000 mono
        // Create an aligned Int16Array view. Node Buffers may have
        // an ArrayBuffer with a non-2-byte-aligned byteOffset which
        // causes a RangeError when constructing Int16Array directly.
        let samples;
        try {
          if (audioBuf.length % 2 !== 0) {
            // odd length: drop trailing byte to keep sample pairs
            console.warn('[PY RX] audioBuf length odd, trimming last byte');
            audioBuf = audioBuf.slice(0, audioBuf.length - 1);
          }
          const ab = audioBuf.buffer.slice(audioBuf.byteOffset, audioBuf.byteOffset + audioBuf.length);
          samples = new Int16Array(ab);
        } catch (innerErr) {
          console.error('[PY RX] aligned Int16Array failed, falling back to copy', innerErr);
          const count = Math.floor(audioBuf.length / 2);
          samples = new Int16Array(count);
          for (let i = 0; i < count; i++) {
            samples[i] = audioBuf.readInt16LE(i * 2);
          }
        }
        const reportedRate = parseInt(meta.sample_rate || 16000, 10);
        const reportedChannels = parseInt(meta.channels || 1, 10);

        const inFrame = new AudioFrame(
          samples,
          reportedRate,
          reportedChannels,
          Math.trunc(samples.length / Math.max(1, reportedChannels))
        );
        // Log incoming audio stats and routing info
        try {
          const view = samples;
          let peak = 0;
          let sumSq = 0;
          for (let i = 0; i < view.length; i++) {
            const v = Math.abs(view[i]);
            if (v > peak) peak = v;
            sumSq += view[i] * view[i];
          }
          const rms = view.length ? Math.sqrt(sumSq / view.length) : 0;
          console.log(`[FROM PY] parsed meta=${JSON.stringify(meta)} bytes=${audioBuf.length} samples=${view.length} ch=${reportedChannels} sr=${reportedRate} peak=${peak} rms=${rms.toFixed(1)} t=${Date.now()}`);
        } catch (err) {
          console.error('Failed computing incoming audio stats', err);
        }

        // Choose target language/source (fallback to 'es')
        // make this more dynamic?
        const targetLanguage = (meta.language);// || meta.lang || 'Spanish');
        console.log('Routing incoming audio to', targetLanguage);
        // Route through the resampler/pipeline helper so it matches published track format
        // if (sources[targetLang]) {
        //   pipeIncomingToSource(inFrame, sources[targetLang]).catch((e) => console.error('publish error', e));
        // } else {
        //   // fallback to 'es' if target not present
        //   pipeIncomingToSource(inFrame, sources['es']).catch((e) => console.error('publish error', e));
        // }
        // pipeIncomingToSource(inFrame, sources[targetLang]).catch((e) => console.error('publish error', e));
        enqueueFrame(targetLanguage, inFrame, room)
      } catch (e) {
        console.error('Error handling py response', e);
      }

      pyRecvBuf = pyRecvBuf.slice(total);
    }
  });

  /**
   * This is a test to see if we can fix track publications
   */
  for (const participant of room.remoteParticipants.values()) {
    // console.log(participant)
    for (const publication of participant.trackPublications.values()) {
      console.log(publication.name);
      if (publication.name === "translateAudio") {
        await publication.setSubscribed(true);
        console.log("Subscribed to existing track:", publication.name);
      }
    }
  }

  // 2️⃣ Subscribe to tracks published in the future
  room.on(RoomEvent.TrackPublished, async (publication, participant) => {
    // console.log(participant, publication);
    if (publication.name === "translateAudio") {
      await publication.setSubscribed(true);
      // console.log("Subscribed to new track from participant:", participant.identity);
    }
  });

  room.on(RoomEvent.ParticipantConnected, async (participant) => {
    console.log('participant', participant.metadata);
    const { languages } = JSON.parse(participant.metadata);
    const activeLanguagesSize = activeLanguages.size;
    languages.forEach((language) => {
      activeLanguages.add(language);
    });

    if (activeLanguages.size > activeLanguagesSize) {
      await addLanguages(room);
    }

    // Subscribe to existing tracks
    for (const publication of participant.trackPublications.values()) {
      if (publication.name === "translateAudio") {
        publication.setSubscribed(true);
        console.log("Subscribed to existing track of new participant:", publication.name);
      }
    }
  });

  room.on(RoomEvent.TrackSubscribed, async (track, publication, participant) => {
    // console.log('does this ever get called?');
    // console.log(track, publication);
    // console.log('pub details', publication)
    if (publication.name !== "translateAudio") return;
    console.log('subscribed to track', track.sid, );//publication, participant.identity);

    if (track.kind !== TrackKind.KIND_AUDIO) return;
    // console.log('track details', track)
    const stream = new AudioStream(track);

    // Use framed child-process IPC to send frames to the Python translator
    /**
     * Encodes a LiveKit frame as framed PCM payload for Python child stdin.
     *
     * @param {AudioFrame} frame - LiveKit audio frame to send.
     * @returns {void}
     */
    function sendFrameToPy(frame) {
      try {
        // Normalize frame.data to 16-bit PCM Int16Array. LiveKit may expose
        // audio as Int16Array or Float32Array depending on platform/version.
        let int16;
        if (frame.data instanceof Int16Array) {
          int16 = frame.data;
        } else if (frame.data instanceof Float32Array) {
          const f32 = frame.data;
          int16 = new Int16Array(f32.length);
          for (let i = 0; i < f32.length; i++) {
            // Clamp to [-1, 1] then scale to int16 range
            const v = Math.max(-1, Math.min(1, f32[i]));
            int16[i] = (v * 32767) | 0;
          }
        } else {
          // Fallback: try to treat data as a generic array of samples in [-1,1]
          const arr = Array.from(frame.data || []);
          int16 = new Int16Array(arr.length);
          for (let i = 0; i < arr.length; i++) {
            const v = Math.max(-1, Math.min(1, Number(arr[i]) || 0));
            int16[i] = (v * 32767) | 0;
          }
        }

        const sampleCount = int16.length;
        const buf = Buffer.alloc(sampleCount * 2);
        for (let i = 0; i < sampleCount; i++) {
          buf.writeInt16LE(int16[i], i * 2);
        }
        const metaObj = { sample_rate: frame.sampleRate, channels: frame.channels };
        const meta = Buffer.from(JSON.stringify(metaObj));
        const header = Buffer.alloc(8);
        header.writeUInt32LE(meta.length, 0);
        header.writeUInt32LE(buf.length, 4);
        const payload = Buffer.concat([header, meta, buf]);

        // Log outgoing audio stats (samples, channels, peak) so we can
        // compare them with what the Python child reports.
        // Log outgoing audio stats (samples, channels, peak) so we can
        // compare them with what the Python child reports.
        try {
          const view = int16;
          let peak = 0;
          let sumSq = 0;
          for (let i = 0; i < view.length; i++) {
            const v = Math.abs(view[i]);
            if (v > peak) peak = v;
            sumSq += view[i] * view[i];
          }
          const rms = view.length ? Math.sqrt(sumSq / view.length) : 0;
          // console.log(
          //   `[TO PY] samples=${sampleCount} ch=${frame.channels} sr=${metaObj.sample_rate} peak=${peak} rms=${rms.toFixed(
          //     1,
          //   )}`,
          // );
          // console.log(
          //   `[TO PY] samples=${sampleCount} ch=${frame.channels} sr=${metaObj.sample_rate} peak=${peak} rms=${rms.toFixed(
          //     1,
          //   )}`,
          // );
        } catch (err) {
          console.error('Failed computing outgoing audio stats', err);
        }

        if (pyReady) {
          py.stdin.write(payload);
        } else {
          // buffer until python signals readiness
          if (pendingFrames.length >= MAX_PENDING) {
            // drop oldest to avoid memory growth
            pendingFrames.shift();
          }
          pendingFrames.push(payload);
        }
      } catch (e) {
        console.error('sendFrameToPy error', e);
      }
    }

    // Kick off prerecorded audio ONCE per language
    // for (const language of languages) {
    //   if (language !== "en") {
    //     console.log("Creating translated audio for ", language);
    //     pipePreRecordedToSource(language, sources[language]); // no await needed
    //   }
    // }

    // forward incoming frames to python as they arrive (framed to child stdin)
    let frameCount = 0;
    const startTime = Date.now();
    for await (const frame of stream) {
      frameCount++;
      if (frameCount % 10 === 0) {
        const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
        // console.log('[BOT] forwarding frames to Python:', frameCount, 'elapsed=', `${elapsed}s`);
      }

      // if (frameCount % 50 === 0) {
        // console.log(
        //   '[AUDIO CONT]',
        //   'participant=', participant.identity,
        //   'frames=', frameCount,
        //   'elapsed_s=', ((Date.now() - startTime) / 1000).toFixed(1)
        // );
      // }
      try {
        sendFrameToPy(frame);
      } catch (err) {
        console.error('Error forwarding frame to python', err);
      }
    }
  });
}

const roomName = process.argv[2] || 'test-room';
// const languages = (process.argv[3] || 'es').split(',');

main({ roomName }).catch(console.error);