Skip to main content
Version: 2.0.0

Example

note

Websockets API is only available for Business plan users. If you're running into trouble, upgrade to a Business plan or higher on the billing page.

Below is a simple Python example of how to use Websockets to receive audio data from the API.

import asyncio
import base64
import time

import websockets
import json
import pyaudio

import os
from dotenv import load_dotenv

load_dotenv() # Load environment variables from a .env file

API_KEY = os.getenv("RESEMBLE_API_KEY")
STREAMING_URL = "wss://websocket.cluster.resemble.ai/stream"

p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=48000,
output=True)

async def connect_websocket():
return await websockets.connect(STREAMING_URL, extra_headers={"Authorization": f"Bearer {API_KEY}"}, ping_interval=5, ping_timeout=10)

async def listen():
while True:
websocket = None
try:
websocket = await connect_websocket()
while True:
user_input = input("Text: ")

if user_input is None or user_input == "":
await websocket.ping()
continue

request = {
"voice_uuid": "<voice_uuid>",
"project_uuid": "<project_uuid>",
"data": user_input,
"precision": "PCM_16",
"no_audio_header": True,
"sample_rate": 48000,
}
json_data = json.dumps(request)
await websocket.send(json_data) # Send tts request

first_byte = None
start_time = time.time()

while True:
message = await websocket.recv()
try:
# First try and load as json
data = json.loads(message)

if data['type'] == 'audio':
audio = base64.b64decode(data['audio_content'])
if first_byte is None:
first_byte = True
end_time = time.time()
print(f"TTFS: {end_time - start_time}s")
stream.write(audio)

if data['type'] == 'audio_end':
break

except json.JSONDecodeError:
print("Expected json but did not receive json.")

except websockets.exceptions.ConnectionClosedError:
print("Connection closed. Reconnecting...")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if websocket:
await websocket.close()
await asyncio.sleep(1) # Wait a bit before reconnecting

asyncio.get_event_loop().run_until_complete(listen())