Speaker Change Detection in Python: Meeting Bot Implementation
You have a WebSocket connection feeding you binary audio frames from a live meeting. Each frame tells you exactly who is speaking and what they said. The question is: how do you turn that stream of frames into a structured, speaker-attributed audio record you can actually use?
Speaker change detection in Python from a meeting bot is a different problem than running pyannote on a recording. You are not trying to separate a mixed-channel audio signal. The meeting platform has already done that separation. Your job is to parse the speaker metadata out of each frame, detect when the active voice changes, maintain per-speaker audio buffers, and fire callbacks your application logic can hook into.
The challenge is in the binary format. Meeting bot APIs do not send JSON with attached audio. They send tightly packed binary frames where speaker identity and raw PCM samples are interleaved with fixed-width length prefixes. Get the parsing wrong by a single byte offset and every downstream system breaks silently, which is much worse than a hard crash.
In this guide we walk through the complete MeetStream binary frame format, build a Python class that handles the full decode-track-buffer-callback cycle, and show how to feed the resulting per-speaker audio to a local or cloud speech-to-text model. Let's get into it.
Understanding the MeetStream Binary Frame Format
The MeetStream live audio stream sends binary WebSocket messages. Each message is a single frame with this exact byte layout:
| Field | Size | Type | Description |
|---|---|---|---|
| msg_type | 1 byte | uint8 | Always 0x01 for audio frames |
| sid_length | 2 bytes | uint16 LE | Byte length of the speaker_id string |
| speaker_id | sid_length bytes | UTF-8 | Stable platform-assigned speaker identifier |
| sname_length | 2 bytes | uint16 LE | Byte length of the speaker_name string |
| speaker_name | sname_length bytes | UTF-8 | Participant display name from the meeting |
| pcm_data | remaining bytes | int16 LE | Raw audio samples, 48kHz mono |
Three things matter here. First, the length fields are little-endian unsigned shorts, not big-endian and not signed. Python's struct module format code for this is <H. Second, the speaker_id and speaker_name strings can contain any UTF-8 characters, so you cannot assume ASCII. Third, the PCM data is signed int16 in little-endian byte order at 48kHz mono. If you feed this to a model expecting 16kHz you will get pitch-shifted garbage. Downsample first.

Frame Parser Implementation
Here is a complete, production-ready frame parser. It validates the message type, handles the variable-length string fields correctly, and returns a clean dictionary you can work with downstream.
import struct
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AudioFrame:
speaker_id: str
speaker_name: str
pcm_bytes: bytes
sample_count: int
sample_rate: int = 48000
@property
def duration_ms(self) -> float:
return (self.sample_count / self.sample_rate) * 1000
class FrameParser:
"""
Parses MeetStream binary audio frames.
Frame format:
[1B msg_type=0x01]
[2B sid_length LE]
[sid_length B speaker_id UTF-8]
[2B sname_length LE]
[sname_length B speaker_name UTF-8]
[remaining B PCM int16 LE 48kHz mono]
"""
AUDIO_MSG_TYPE = 0x01
BYTES_PER_SAMPLE = 2 # int16
def parse(self, raw: bytes) -> Optional[AudioFrame]:
if len(raw) < 1:
return None
offset = 0
# Message type
msg_type = raw[offset]
offset += 1
if msg_type != self.AUDIO_MSG_TYPE:
return None
# speaker_id
if offset + 2 > len(raw):
raise ValueError(f"Frame too short for sid_length at offset {offset}")
sid_length = struct.unpack_from(' len(raw):
raise ValueError(f"Frame too short for speaker_id of length {sid_length}")
speaker_id = raw[offset:offset + sid_length].decode('utf-8')
offset += sid_length
# speaker_name
if offset + 2 > len(raw):
raise ValueError(f"Frame too short for sname_length at offset {offset}")
sname_length = struct.unpack_from(' len(raw):
raise ValueError(f"Frame too short for speaker_name of length {sname_length}")
speaker_name = raw[offset:offset + sname_length].decode('utf-8')
offset += sname_length
# PCM audio data
pcm_bytes = raw[offset:]
sample_count = len(pcm_bytes) // self.BYTES_PER_SAMPLE
return AudioFrame(
speaker_id=speaker_id,
speaker_name=speaker_name,
pcm_bytes=pcm_bytes,
sample_count=sample_count
)
Speaker Change Tracker
With the parser handling the binary decode, the next layer tracks speaker turns. A turn starts when a new speaker_id appears and ends when a different speaker_id appears. The tracker maintains a timestamp based on cumulative sample count (which is more reliable than wall clock time for audio alignment) and fires a callback with the completed turn data.
from collections import defaultdict
from typing import Callable, Dict, List
import io
@dataclass
class SpeakerTurn:
speaker_id: str
speaker_name: str
start_sample: int
end_sample: int
sample_rate: int
pcm_bytes: bytes
@property
def start_seconds(self) -> float:
return self.start_sample / self.sample_rate
@property
def end_seconds(self) -> float:
return self.end_sample / self.sample_rate
@property
def duration_seconds(self) -> float:
return self.end_seconds - self.start_seconds
class SpeakerChangeTracker:
"""
Tracks speaker changes in a stream of AudioFrames.
Fires on_turn_complete callback when a speaker turn ends.
Fires on_speaker_change callback on every speaker transition.
"""
def __init__(
self,
on_turn_complete: Callable[[SpeakerTurn], None],
on_speaker_change: Optional[Callable[[str, str, float], None]] = None,
min_turn_samples: int = 2400 # 50ms at 48kHz, suppress noise
):
self.on_turn_complete = on_turn_complete
self.on_speaker_change = on_speaker_change
self.min_turn_samples = min_turn_samples
self._current_speaker_id: Optional[str] = None
self._current_speaker_name: Optional[str] = None
self._turn_start_sample: int = 0
self._turn_audio_buffer: List[bytes] = []
self._total_samples: int = 0
self._sample_rate: int = 48000
def process(self, frame: AudioFrame) -> None:
if self._current_speaker_id is None:
# First frame, initialize
self._current_speaker_id = frame.speaker_id
self._current_speaker_name = frame.speaker_name
self._turn_start_sample = 0
if frame.speaker_id != self._current_speaker_id:
# Speaker changed, flush current turn
self._flush_turn()
if self.on_speaker_change:
timestamp = self._total_samples / self._sample_rate
self.on_speaker_change(
self._current_speaker_name,
frame.speaker_name,
timestamp
)
self._current_speaker_id = frame.speaker_id
self._current_speaker_name = frame.speaker_name
self._turn_start_sample = self._total_samples
self._turn_audio_buffer = []
self._turn_audio_buffer.append(frame.pcm_bytes)
self._total_samples += frame.sample_count
def flush(self) -> None:
"""Call at end of meeting to emit the final turn."""
if self._turn_audio_buffer:
self._flush_turn()
def _flush_turn(self) -> None:
turn_samples = self._total_samples - self._turn_start_sample
if turn_samples < self.min_turn_samples:
return # skip sub-50ms noise bursts
turn = SpeakerTurn(
speaker_id=self._current_speaker_id,
speaker_name=self._current_speaker_name,
start_sample=self._turn_start_sample,
end_sample=self._total_samples,
sample_rate=self._sample_rate,
pcm_bytes=b''.join(self._turn_audio_buffer)
)
self._turn_audio_buffer = []
self.on_turn_complete(turn)
Building a Complete WebSocket Handler
Now wire the parser and tracker together with a WebSocket listener. The handler receives raw binary frames, parses them, and feeds them to the tracker. Speaker turns are emitted as they complete.
import asyncio
import websockets
async def run_live_diarizer(websocket_url: str):
completed_turns: List[SpeakerTurn] = []
def handle_turn(turn: SpeakerTurn):
completed_turns.append(turn)
print(
f"Turn complete: {turn.speaker_name} "
f"{turn.start_seconds:.2f}s-{turn.end_seconds:.2f}s "
f"({len(turn.pcm_bytes)} bytes)"
)
def handle_speaker_change(prev: str, next_: str, timestamp: float):
print(f"[{timestamp:.2f}s] Speaker change: {prev} -> {next_}")
parser = FrameParser()
tracker = SpeakerChangeTracker(
on_turn_complete=handle_turn,
on_speaker_change=handle_speaker_change
)
try:
async with websockets.connect(websocket_url) as ws:
async for message in ws:
if isinstance(message, bytes):
frame = parser.parse(message)
if frame:
tracker.process(frame)
except websockets.ConnectionClosed:
pass
finally:
tracker.flush() # emit final turn
return completed_turns
Creating the Bot with live_audio_required
To receive audio frames, you need to pass a WebSocket URL when creating your bot through the MeetStream API. The bot will connect to your WebSocket server and start streaming frames once it joins the meeting.

import requests
response = requests.post(
"https://api.meetstream.ai/api/v1/bots/create_bot",
json={
"meeting_link": "https://meet.google.com/abc-defg-hij",
"bot_name": "Transcriber",
"live_audio_required": {
"websocket_url": "wss://your-server.example.com/audio-stream"
}
},
headers={"Authorization": "Token YOUR_API_KEY"}
)
print(response.json())
Your server needs to be a WebSocket server that accepts the connection from MeetStream's infrastructure. Use websockets.serve in Python or any other WebSocket server library. The URL must be publicly reachable.
Downsampling for STT Models
Most speech-to-text models expect 16kHz audio, not 48kHz. Before passing a completed SpeakerTurn's PCM data to a model like Whisper or Vosk, downsample from 48kHz to 16kHz.
import numpy as np
from scipy.signal import resample_poly
def pcm_bytes_to_float32(pcm_bytes: bytes, sample_rate: int = 48000) -> np.ndarray:
"""Convert int16 PCM bytes to float32 numpy array."""
samples = np.frombuffer(pcm_bytes, dtype=np.int16)
return samples.astype(np.float32) / 32768.0
def downsample_48k_to_16k(audio_float32: np.ndarray) -> np.ndarray:
"""Resample from 48kHz to 16kHz (3:1 ratio)."""
return resample_poly(audio_float32, up=1, down=3)
def prepare_turn_for_stt(turn: SpeakerTurn) -> np.ndarray:
audio = pcm_bytes_to_float32(turn.pcm_bytes)
if turn.sample_rate == 48000:
audio = downsample_48k_to_16k(audio)
return audio # float32 at 16kHz, ready for Whisper / Vosk
Feeding Speaker-Separated Audio to a Local STT Model
With per-speaker audio prepared at 16kHz, you can pass each turn to any speech-to-text model independently. Running STT per-speaker rather than on a mixed stream improves accuracy because each utterance has only one voice, and the model does not waste capacity disentangling overlapping speech.
import whisper
model = whisper.load_model("base")
def transcribe_turn(turn: SpeakerTurn) -> dict:
audio = prepare_turn_for_stt(turn)
result = model.transcribe(audio, fp16=False)
return {
"speaker": turn.speaker_name,
"start": round(turn.start_seconds, 2),
"end": round(turn.end_seconds, 2),
"text": result["text"].strip()
}
def build_transcript(turns: List[SpeakerTurn]) -> str:
lines = []
for turn in turns:
entry = transcribe_turn(turn)
lines.append(
f"{entry['speaker']} [{entry['start']}s]: {entry['text']}"
)
return "\
".join(lines)
Handling Edge Cases
Three edge cases come up in real meetings. First, rapid turn switching: participants often interrupt or speak in short bursts under 100ms. The min_turn_samples threshold on the tracker filters these out, but you may need to tune it for your use case. 2400 samples (50ms at 48kHz) is a reasonable starting floor.

Second, simultaneous speech: when two participants speak at the same time, the MeetStream API continues sending separate frames for each speaker interleaved. Your tracker will see rapid alternation between two speaker IDs. One way to handle this is to keep a sliding window of recent speaker IDs and detect when two different IDs appear within a short interval, then label that segment as overlapping speech.
Third, participant reconnects: if a participant drops and rejoins, they may receive a new speaker_id even though it is the same person. Track speaker_name as your primary identifier for display purposes, but be aware that the underlying IDs may differ. The MeetStream API assigns speaker_id based on the platform's participant slot, which can reset on reconnect.
FAQ
Why use struct for speaker diarization python instead of a higher-level format?
Binary frame formats are used by low-latency audio APIs because they are compact and fast to serialize. JSON with base64-encoded audio would add 33 percent size overhead plus serialization cost on every frame. The struct module is the correct Python tool for this: it gives you direct byte-level control with no dependencies and predictable performance.
What does detect speaker changes mean in the context of meeting bots?
In the context of a meeting bot, speaker change detection means identifying the moment the active speaker transitions from one participant to another. With MeetStream's per-speaker audio frames, this is a simple equality check on the speaker_id field across consecutive frames. The hard part is deciding how to handle very short turns and overlap, which the min_turn_samples threshold addresses.
How do I handle audio speaker identification for participants who join without display names?
Some participants join with blank display names or names like "iPhone" or "Unknown." In those cases, fall back to speaker_id as the identifier. You can also maintain a name-resolution layer that maps speaker_id values to known identities using calendar invite attendee lists or CRM data from your MeetStream dashboard.
Can I run this speaker change detection python code in a serverless environment?
You can, but WebSocket connections are long-lived and serverless functions typically have a 15-second to 15-minute execution timeout. A better architecture for production is a dedicated lightweight worker process (a Docker container on a small VM or a managed container service) that maintains the WebSocket connection for the full meeting duration. The completed turns can then be offloaded to a queue for downstream processing in serverless functions.
How does the MeetStream speaker_id compare to speaker labels from post-call diarization?
The MeetStream speaker_id comes from the meeting platform and is stable for a participant's session. Post-call diarization labels from Deepgram or AssemblyAI are cluster indices that have no relation to platform identity. For named attribution, the MeetStream approach is significantly better because you get real names from the display name field rather than anonymous Speaker_0 labels. See speaker diarization techniques for a deeper comparison.
