Speaker Change Detection in Python: Meeting Bot Implementation

You have a WebSocket connection feeding you binary audio frames from a live meeting. Each frame tells you exactly who is speaking and what they said. The question is: how do you turn that stream of frames into a structured, speaker-attributed audio record you can actually use?

Speaker change detection in Python from a meeting bot is a different problem than running pyannote on a recording. You are not trying to separate a mixed-channel audio signal. The meeting platform has already done that separation. Your job is to parse the speaker metadata out of each frame, detect when the active voice changes, maintain per-speaker audio buffers, and fire callbacks your application logic can hook into.

The challenge is in the binary format. Meeting bot APIs do not send JSON with attached audio. They send tightly packed binary frames where speaker identity and raw PCM samples are interleaved with fixed-width length prefixes. Get the parsing wrong by a single byte offset and every downstream system breaks silently, which is much worse than a hard crash.

In this guide we walk through the complete MeetStream binary frame format, build a Python class that handles the full decode-track-buffer-callback cycle, and show how to feed the resulting per-speaker audio to a local or cloud speech-to-text model. Let's get into it.

Understanding the MeetStream Binary Frame Format

The MeetStream live audio stream sends binary WebSocket messages. Each message is a single frame with this exact byte layout:

FieldSizeTypeDescription
msg_type1 byteuint8Always 0x01 for audio frames
sid_length2 bytesuint16 LEByte length of the speaker_id string
speaker_idsid_length bytesUTF-8Stable platform-assigned speaker identifier
sname_length2 bytesuint16 LEByte length of the speaker_name string
speaker_namesname_length bytesUTF-8Participant display name from the meeting
pcm_dataremaining bytesint16 LERaw audio samples, 48kHz mono

Three things matter here. First, the length fields are little-endian unsigned shorts, not big-endian and not signed. Python's struct module format code for this is <H. Second, the speaker_id and speaker_name strings can contain any UTF-8 characters, so you cannot assume ASCII. Third, the PCM data is signed int16 in little-endian byte order at 48kHz mono. If you feed this to a model expecting 16kHz you will get pitch-shifted garbage. Downsample first.

Speaker segmentation in audio
Speaker segmentation breaks audio into per-speaker segments. Source: Symbl.ai.

Frame Parser Implementation

Here is a complete, production-ready frame parser. It validates the message type, handles the variable-length string fields correctly, and returns a clean dictionary you can work with downstream.

import struct
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AudioFrame:
    speaker_id: str
    speaker_name: str
    pcm_bytes: bytes
    sample_count: int
    sample_rate: int = 48000

    @property
    def duration_ms(self) -> float:
        return (self.sample_count / self.sample_rate) * 1000

class FrameParser:
    """
    Parses MeetStream binary audio frames.
    Frame format:
        [1B msg_type=0x01]
        [2B sid_length LE]
        [sid_length B speaker_id UTF-8]
        [2B sname_length LE]
        [sname_length B speaker_name UTF-8]
        [remaining B PCM int16 LE 48kHz mono]
    """
    AUDIO_MSG_TYPE = 0x01
    BYTES_PER_SAMPLE = 2  # int16

    def parse(self, raw: bytes) -> Optional[AudioFrame]:
        if len(raw) < 1:
            return None

        offset = 0

        # Message type
        msg_type = raw[offset]
        offset += 1
        if msg_type != self.AUDIO_MSG_TYPE:
            return None

        # speaker_id
        if offset + 2 > len(raw):
            raise ValueError(f"Frame too short for sid_length at offset {offset}")
        sid_length = struct.unpack_from(' len(raw):
            raise ValueError(f"Frame too short for speaker_id of length {sid_length}")
        speaker_id = raw[offset:offset + sid_length].decode('utf-8')
        offset += sid_length

        # speaker_name
        if offset + 2 > len(raw):
            raise ValueError(f"Frame too short for sname_length at offset {offset}")
        sname_length = struct.unpack_from(' len(raw):
            raise ValueError(f"Frame too short for speaker_name of length {sname_length}")
        speaker_name = raw[offset:offset + sname_length].decode('utf-8')
        offset += sname_length

        # PCM audio data
        pcm_bytes = raw[offset:]
        sample_count = len(pcm_bytes) // self.BYTES_PER_SAMPLE

        return AudioFrame(
            speaker_id=speaker_id,
            speaker_name=speaker_name,
            pcm_bytes=pcm_bytes,
            sample_count=sample_count
        )

Speaker Change Tracker

With the parser handling the binary decode, the next layer tracks speaker turns. A turn starts when a new speaker_id appears and ends when a different speaker_id appears. The tracker maintains a timestamp based on cumulative sample count (which is more reliable than wall clock time for audio alignment) and fires a callback with the completed turn data.

from collections import defaultdict
from typing import Callable, Dict, List
import io

@dataclass
class SpeakerTurn:
 speaker_id: str
 speaker_name: str
 start_sample: int
 end_sample: int
 sample_rate: int
 pcm_bytes: bytes

 @property
 def start_seconds(self) -> float:
 return self.start_sample / self.sample_rate

 @property
 def end_seconds(self) -> float:
 return self.end_sample / self.sample_rate

 @property
 def duration_seconds(self) -> float:
 return self.end_seconds - self.start_seconds

class SpeakerChangeTracker:
 """
 Tracks speaker changes in a stream of AudioFrames.
 Fires on_turn_complete callback when a speaker turn ends.
 Fires on_speaker_change callback on every speaker transition.
 """

 def __init__(
 self,
 on_turn_complete: Callable[[SpeakerTurn], None],
 on_speaker_change: Optional[Callable[[str, str, float], None]] = None,
 min_turn_samples: int = 2400 # 50ms at 48kHz, suppress noise
 ):
 self.on_turn_complete = on_turn_complete
 self.on_speaker_change = on_speaker_change
 self.min_turn_samples = min_turn_samples

 self._current_speaker_id: Optional[str] = None
 self._current_speaker_name: Optional[str] = None
 self._turn_start_sample: int = 0
 self._turn_audio_buffer: List[bytes] = []
 self._total_samples: int = 0
 self._sample_rate: int = 48000

 def process(self, frame: AudioFrame) -> None:
 if self._current_speaker_id is None:
 # First frame, initialize
 self._current_speaker_id = frame.speaker_id
 self._current_speaker_name = frame.speaker_name
 self._turn_start_sample = 0

 if frame.speaker_id != self._current_speaker_id:
 # Speaker changed, flush current turn
 self._flush_turn()

 if self.on_speaker_change:
 timestamp = self._total_samples / self._sample_rate
 self.on_speaker_change(
 self._current_speaker_name,
 frame.speaker_name,
 timestamp
 )

 self._current_speaker_id = frame.speaker_id
 self._current_speaker_name = frame.speaker_name
 self._turn_start_sample = self._total_samples
 self._turn_audio_buffer = []

 self._turn_audio_buffer.append(frame.pcm_bytes)
 self._total_samples += frame.sample_count

 def flush(self) -> None:
 """Call at end of meeting to emit the final turn."""
 if self._turn_audio_buffer:
 self._flush_turn()

 def _flush_turn(self) -> None:
 turn_samples = self._total_samples - self._turn_start_sample
 if turn_samples < self.min_turn_samples:
 return # skip sub-50ms noise bursts

 turn = SpeakerTurn(
 speaker_id=self._current_speaker_id,
 speaker_name=self._current_speaker_name,
 start_sample=self._turn_start_sample,
 end_sample=self._total_samples,
 sample_rate=self._sample_rate,
 pcm_bytes=b''.join(self._turn_audio_buffer)
 )
 self._turn_audio_buffer = []
 self.on_turn_complete(turn)

Building a Complete WebSocket Handler

Now wire the parser and tracker together with a WebSocket listener. The handler receives raw binary frames, parses them, and feeds them to the tracker. Speaker turns are emitted as they complete.

import asyncio
import websockets

async def run_live_diarizer(websocket_url: str):
    completed_turns: List[SpeakerTurn] = []

    def handle_turn(turn: SpeakerTurn):
        completed_turns.append(turn)
        print(
            f"Turn complete: {turn.speaker_name} "
            f"{turn.start_seconds:.2f}s-{turn.end_seconds:.2f}s "
            f"({len(turn.pcm_bytes)} bytes)"
        )

    def handle_speaker_change(prev: str, next_: str, timestamp: float):
        print(f"[{timestamp:.2f}s] Speaker change: {prev} -> {next_}")

    parser = FrameParser()
    tracker = SpeakerChangeTracker(
        on_turn_complete=handle_turn,
        on_speaker_change=handle_speaker_change
    )

    try:
        async with websockets.connect(websocket_url) as ws:
            async for message in ws:
                if isinstance(message, bytes):
                    frame = parser.parse(message)
                    if frame:
                        tracker.process(frame)
    except websockets.ConnectionClosed:
        pass
    finally:
        tracker.flush()  # emit final turn

    return completed_turns

Creating the Bot with live_audio_required

To receive audio frames, you need to pass a WebSocket URL when creating your bot through the MeetStream API. The bot will connect to your WebSocket server and start streaming frames once it joins the meeting.

Speaker diarization separating multiple speakers
Speaker diarization separating multiple speakers from a single audio stream. Source: Medium/DDI.
import requests

response = requests.post(
    "https://api.meetstream.ai/api/v1/bots/create_bot",
    json={
        "meeting_link": "https://meet.google.com/abc-defg-hij",
        "bot_name": "Transcriber",
        "live_audio_required": {
            "websocket_url": "wss://your-server.example.com/audio-stream"
        }
    },
    headers={"Authorization": "Token YOUR_API_KEY"}
)
print(response.json())

Your server needs to be a WebSocket server that accepts the connection from MeetStream's infrastructure. Use websockets.serve in Python or any other WebSocket server library. The URL must be publicly reachable.

Downsampling for STT Models

Most speech-to-text models expect 16kHz audio, not 48kHz. Before passing a completed SpeakerTurn's PCM data to a model like Whisper or Vosk, downsample from 48kHz to 16kHz.

import numpy as np
from scipy.signal import resample_poly

def pcm_bytes_to_float32(pcm_bytes: bytes, sample_rate: int = 48000) -> np.ndarray:
    """Convert int16 PCM bytes to float32 numpy array."""
    samples = np.frombuffer(pcm_bytes, dtype=np.int16)
    return samples.astype(np.float32) / 32768.0

def downsample_48k_to_16k(audio_float32: np.ndarray) -> np.ndarray:
    """Resample from 48kHz to 16kHz (3:1 ratio)."""
    return resample_poly(audio_float32, up=1, down=3)

def prepare_turn_for_stt(turn: SpeakerTurn) -> np.ndarray:
    audio = pcm_bytes_to_float32(turn.pcm_bytes)
    if turn.sample_rate == 48000:
        audio = downsample_48k_to_16k(audio)
    return audio  # float32 at 16kHz, ready for Whisper / Vosk

Feeding Speaker-Separated Audio to a Local STT Model

With per-speaker audio prepared at 16kHz, you can pass each turn to any speech-to-text model independently. Running STT per-speaker rather than on a mixed stream improves accuracy because each utterance has only one voice, and the model does not waste capacity disentangling overlapping speech.

import whisper

model = whisper.load_model("base")

def transcribe_turn(turn: SpeakerTurn) -> dict:
    audio = prepare_turn_for_stt(turn)
    result = model.transcribe(audio, fp16=False)
    return {
        "speaker": turn.speaker_name,
        "start": round(turn.start_seconds, 2),
        "end": round(turn.end_seconds, 2),
        "text": result["text"].strip()
    }

def build_transcript(turns: List[SpeakerTurn]) -> str:
    lines = []
    for turn in turns:
        entry = transcribe_turn(turn)
        lines.append(
            f"{entry['speaker']} [{entry['start']}s]: {entry['text']}"
        )
    return "\
".join(lines)

Handling Edge Cases

Three edge cases come up in real meetings. First, rapid turn switching: participants often interrupt or speak in short bursts under 100ms. The min_turn_samples threshold on the tracker filters these out, but you may need to tune it for your use case. 2400 samples (50ms at 48kHz) is a reasonable starting floor.

NVIDIA speaker diarization architecture
Multiscale speaker diarization architecture. Source: NVIDIA Technical Blog.

Second, simultaneous speech: when two participants speak at the same time, the MeetStream API continues sending separate frames for each speaker interleaved. Your tracker will see rapid alternation between two speaker IDs. One way to handle this is to keep a sliding window of recent speaker IDs and detect when two different IDs appear within a short interval, then label that segment as overlapping speech.

Third, participant reconnects: if a participant drops and rejoins, they may receive a new speaker_id even though it is the same person. Track speaker_name as your primary identifier for display purposes, but be aware that the underlying IDs may differ. The MeetStream API assigns speaker_id based on the platform's participant slot, which can reset on reconnect.

FAQ

Why use struct for speaker diarization python instead of a higher-level format?

Binary frame formats are used by low-latency audio APIs because they are compact and fast to serialize. JSON with base64-encoded audio would add 33 percent size overhead plus serialization cost on every frame. The struct module is the correct Python tool for this: it gives you direct byte-level control with no dependencies and predictable performance.

What does detect speaker changes mean in the context of meeting bots?

In the context of a meeting bot, speaker change detection means identifying the moment the active speaker transitions from one participant to another. With MeetStream's per-speaker audio frames, this is a simple equality check on the speaker_id field across consecutive frames. The hard part is deciding how to handle very short turns and overlap, which the min_turn_samples threshold addresses.

How do I handle audio speaker identification for participants who join without display names?

Some participants join with blank display names or names like "iPhone" or "Unknown." In those cases, fall back to speaker_id as the identifier. You can also maintain a name-resolution layer that maps speaker_id values to known identities using calendar invite attendee lists or CRM data from your MeetStream dashboard.

Can I run this speaker change detection python code in a serverless environment?

You can, but WebSocket connections are long-lived and serverless functions typically have a 15-second to 15-minute execution timeout. A better architecture for production is a dedicated lightweight worker process (a Docker container on a small VM or a managed container service) that maintains the WebSocket connection for the full meeting duration. The completed turns can then be offloaded to a queue for downstream processing in serverless functions.

How does the MeetStream speaker_id compare to speaker labels from post-call diarization?

The MeetStream speaker_id comes from the meeting platform and is stable for a participant's session. Post-call diarization labels from Deepgram or AssemblyAI are cluster indices that have no relation to platform identity. For named attribution, the MeetStream approach is significantly better because you get real names from the display name field rather than anonymous Speaker_0 labels. See speaker diarization techniques for a deeper comparison.