Skip to content

Commit

Permalink
Update WavePlayer to improved version from PicoFX
Browse files Browse the repository at this point in the history
  • Loading branch information
ZodiusInfuser committed Sep 27, 2024
1 parent ddab9d7 commit 6dc5d97
Showing 1 changed file with 144 additions and 77 deletions.
221 changes: 144 additions & 77 deletions lib/pimoroni_yukon/devices/audio.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# SPDX-FileCopyrightText: 2023 Christopher Parrott for Pimoroni Ltd
# SPDX-FileCopyrightText: 2024 Christopher Parrott for Pimoroni Ltd
#
# SPDX-License-Identifier: MIT

import os
import math
import struct
from machine import I2S
from machine import I2S, Pin

"""
A class for playing Wav files out of an I2S audio amp. It can also play pure tones.
Expand All @@ -14,6 +13,70 @@
"""


class WavReader:
def __init__(self, file):
self.wav_file = open(file, "rb")
self._parse(self.wav_file)

def _parse(self, wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
if wav_file.read(4) != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]

if num_channels == 1:
self.format = I2S.MONO
else:
self.format = I2S.STEREO

self.sample_rate = struct.unpack("<I", wav_file.read(4))[0]
# if sample_rate != 44_100 and sample_rate != 48_000:
# raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")

_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
self.bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]

# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".

binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")

self.offset = offset + 44

wav_file.seek(offset + 40)
self.size = struct.unpack("<I", wav_file.read(4))[0]

wav_file.seek(self.offset)

def seek(self, pos):
return self.wav_file.seek(pos + self.offset)

def tell(self):
return self.wav_file.tell() - self.offset

def readinto(self, buf):
max_bytes = self.size - self.tell()
max_bytes = max(0, min(len(buf), max_bytes))
return self.wav_file.readinto(buf[:max_bytes])

def close(self):
self.wav_file.close()


class WavPlayer:
# Internal states
PLAY = 0
Expand All @@ -25,21 +88,32 @@ class WavPlayer:
MODE_WAV = 0
MODE_TONE = 1

TONE_SINE = 0
TONE_SQUARE = 1
TONE_TRIANGLE = 2

# Default buffer length
SILENCE_BUFFER_LENGTH = 1000
WAV_BUFFER_LENGTH = 10000
INTERNAL_BUFFER_LENGTH = 20000
SILENCE_BUFFER_LENGTH = 1024
WAV_BUFFER_LENGTH = 1024
INTERNAL_BUFFER_LENGTH = WAV_BUFFER_LENGTH * 2

TONE_SAMPLE_RATE = 44_100
TONE_BITS_PER_SAMPLE = 16
TONE_FULL_WAVES = 2

def __init__(self, id, sck_pin, ws_pin, sd_pin, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
def __init__(self, id, sck_pin, ws_pin, sd_pin, amp_enable=None, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
self.__id = id
self.__sck_pin = sck_pin
self.__ws_pin = ws_pin
self.__sd_pin = sd_pin
self.__ibuf_len = ibuf_len
self.__enable = None

# Manually tweak the tone amplitude for equal loudness of sine/square/triangle
self.__amplitude_scale = [1.0, 0.2, 0.5]

if amp_enable is not None:
self.__enable = Pin(amp_enable, Pin.OUT)

# Set the directory to search for files in
self.set_root(root)
Expand All @@ -48,7 +122,6 @@ def __init__(self, id, sck_pin, ws_pin, sd_pin, ibuf_len=INTERNAL_BUFFER_LENGTH,
self.__mode = WavPlayer.MODE_WAV
self.__wav_file = None
self.__loop_wav = False
self.__first_sample_offset = None
self.__flush_count = 0
self.__audio_out = None

Expand All @@ -66,44 +139,60 @@ def set_root(self, root):
self.__root = root.rstrip("/") + "/"

def play_wav(self, wav_file, loop=False):
if os.listdir(self.__root).count(wav_file) == 0:
raise ValueError(f"'{wav_file}' not found")

self.__stop_i2s() # Stop any active playback and terminate the I2S instance

self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
try:
self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
except OSError:
raise ValueError(f"'{wav_file}' not found")
self.__loop_wav = loop # Record if the user wants the file to loop
self.___loop_count = 0 # Count loops for debugging purposes

# Parse the WAV file, returning the necessary parameters to initialise I2S communication
format, sample_rate, bits_per_sample, self.__first_sample_offset = WavPlayer.__parse_wav(self.__wav_file)
self.__wav_file = WavReader(self.__root + wav_file)

self.__wav_file.seek(self.__first_sample_offset) # Advance to first byte of sample data

self.__start_i2s(bits=bits_per_sample,
format=format,
rate=sample_rate,
self.__start_i2s(bits=self.__wav_file.bits_per_sample,
format=self.__wav_file.format,
rate=self.__wav_file.sample_rate,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_WAV)

def play_tone(self, frequency, amplitude):
def play_tone(self, frequency, amplitude, shape=TONE_SINE):
if frequency < 20.0 or frequency > 20_000:
raise ValueError("frequency out of range. Expected between 20Hz and 20KHz")

if amplitude < 0.0 or amplitude > 1.0:
raise ValueError("amplitude out of range. Expected 0.0 to 1.0")

if not isinstance(shape, (list, tuple)):
shape = (shape, )

# Create a buffer containing the pure tone samples
samples_per_cycle = self.TONE_SAMPLE_RATE // frequency
sample_size_in_bytes = self.TONE_BITS_PER_SAMPLE // 8
samples = bytearray(self.TONE_FULL_WAVES * samples_per_cycle * sample_size_in_bytes)
range = pow(2, self.TONE_BITS_PER_SAMPLE) // 2
maximum = (pow(2, self.TONE_BITS_PER_SAMPLE) // 2 - 1) * amplitude

format = "<h" if self.TONE_BITS_PER_SAMPLE == 16 else "<l"

# Populate the buffer with multiple cycles to avoid it completing too quickly and causing drop outs
for i in range(samples_per_cycle * self.TONE_FULL_WAVES):
sample = int((range - 1) * (math.sin(2 * math.pi * i / samples_per_cycle)) * amplitude)
struct.pack_into(format, samples, i * sample_size_in_bytes, sample)
sample = 0
if self.TONE_TRIANGLE in shape:
triangle = (i % samples_per_cycle) - (samples_per_cycle // 2)
triangle /= samples_per_cycle
triangle *= self.__amplitude_scale[self.TONE_TRIANGLE]
sample += triangle
if self.TONE_SINE in shape:
sine = math.sin(2 * math.pi * i / samples_per_cycle)
sine *= self.__amplitude_scale[self.TONE_SINE]
sample += sine
if self.TONE_SQUARE in shape:
square = 1 if (i % samples_per_cycle) < (samples_per_cycle // 2) else -1
square *= self.__amplitude_scale[self.TONE_SQUARE]
sample += square
sample = max(-1, min(1, sample))
struct.pack_into(format, samples, i * sample_size_in_bytes, int(sample * maximum))

# Are we not already playing tones?
if not (self.__mode == WavPlayer.MODE_TONE and (self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE)):
Expand Down Expand Up @@ -137,6 +226,9 @@ def stop(self):
else:
self.__state = WavPlayer.STOP

def deinit(self):
self.__stop_i2s()

def is_playing(self):
return self.__state != WavPlayer.NONE and self.__state != WavPlayer.STOP

Expand Down Expand Up @@ -164,45 +256,61 @@ def __start_i2s(self, bits=16, format=I2S.MONO, rate=44_100, state=STOP, mode=MO
self.__audio_out.irq(self.__i2s_callback)
self.__audio_out.write(self.__silence_samples)

if self.__enable is not None:
self.__enable.on()

def __stop_i2s(self):
self.stop() # Stop any active playback
while self.is_playing(): # and wait for it to complete
pass

if self.__enable is not None:
self.__enable.off()

if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms

self.__state == WavPlayer.NONE # Return to the none state

def __i2s_callback(self, arg):
### PLAY ###
# PLAY
if self.__state == WavPlayer.PLAY:
if self.__mode == WavPlayer.MODE_WAV:
num_read = self.__wav_file.readinto(self.__wav_samples_mv) # Read the next section of the WAV file

# Have we reached the end of the file?
if num_read == 0:
# Do we want to loop the WAV playback?
if self.__loop_wav:
_ = self.__wav_file.seek(self.__first_sample_offset) # Play again, so advance to first byte of sample data
if self.__loop_wav: # Looped playback
loop_read = 0
while loop_read < self.WAV_BUFFER_LENGTH:
num_read = self.__wav_file.readinto(self.__wav_samples_mv[loop_read:]) # Read the next section of the WAV file
loop_read += num_read
if num_read == 0:
_ = self.__wav_file.seek(0) # Play again, so advance to first byte of sample data
self.__loop_count += 1
self.__audio_out.write(self.__wav_samples_mv)

else: # Single shot playback
num_read = self.__wav_file.readinto(self.__wav_samples_mv)

if num_read:
self.__audio_out.write(self.__wav_samples_mv[: num_read]) # We are within the file, so write out the next audio samples
else:
self.__audio_out.write(self.__silence_samples) # Play silence to end this callback

# Have we reached the end of the file? (num_read is either 0 or a short read)
if num_read < self.WAV_BUFFER_LENGTH:
# Do we want to loop the WAV playback?
self.__wav_file.close() # Stop playing, so close the file
self.__state = WavPlayer.FLUSH # and enter the flush state on the next callback

self.__audio_out.write(self.__silence_samples) # In both cases play silence to end this callback
else:
self.__audio_out.write(self.__wav_samples_mv[: num_read]) # We are within the file, so write out the next audio samples
else:
if self.__queued_samples is not None:
self.__tone_samples = self.__queued_samples
self.__queued_samples = None
self.__audio_out.write(self.__tone_samples)

### PAUSE or STOP ###
# PAUSE or STOP
elif self.__state == WavPlayer.PAUSE or self.__state == WavPlayer.STOP:
self.__audio_out.write(self.__silence_samples) # Play silence

### FLUSH ###
# FLUSH
elif self.__state == WavPlayer.FLUSH:
# Flush is used to allow the residual audio samples in the internal buffer to be written
# to the I2S peripheral. This step avoids part of the sound file from being cut off
Expand All @@ -212,47 +320,6 @@ def __i2s_callback(self, arg):
self.__state = WavPlayer.STOP # Enter the stop state on the next callback
self.__audio_out.write(self.__silence_samples) # Play silence

### NONE ###
# NONE
elif self.__state == WavPlayer.NONE:
pass

@staticmethod
def __parse_wav(wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
format = wav_file.read(4)
if format != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]

if num_channels == 1:
format = I2S.MONO
else:
format = I2S.STEREO

sample_rate = struct.unpack("<I", wav_file.read(4))[0]
if sample_rate != 44_100 and sample_rate != 48_000:
raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")

_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]

# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".

binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")

return (format, sample_rate, bits_per_sample, 44 + offset)

0 comments on commit 6dc5d97

Please sign in to comment.