Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
bin/
vendor/
node_modules/
logs/

### Files ###
#############
Expand Down
6 changes: 6 additions & 0 deletions bwe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package bwe implements data structures that are common to all bandwidth
// estimators.
package bwe
233 changes: 233 additions & 0 deletions bwe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build !js && go1.25

package bwe_test

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
"testing"
"testing/synctest"
"time"

"github.com/pion/webrtc/v4"
"github.com/stretchr/testify/assert"
)

var logDir string

type vnetFactory func(*testing.T) *virtualNetwork

func TestMain(m *testing.M) {
logDir = os.Getenv("BWE_LOG_DIR")
if logDir == "" {
logDir = "test-web/logs"
}
if err := os.MkdirAll(logDir, 0o755); err != nil {
log.Printf("failed to create log dir %q: %v", logDir, err)
os.Exit(1)
}

ec := m.Run()

files, err := filepath.Glob(filepath.Join(logDir, "*.jsonl"))
if err != nil {
log.Printf("Failed to list JSONL files: %v", err)
}

var names []string
for _, f := range files {
names = append(names, filepath.Base(f))
}

b, err := json.Marshal(names)
if err != nil {
log.Printf("Failed to marshal index.json: %v", err)
os.Exit(ec)
}

indexPath := filepath.Join(logDir, "index.json")
if err := os.WriteFile(indexPath, b, 0644); err != nil {
log.Printf("Failed to write index.json: %v", err)
} else {
log.Printf("Generated index.json with %d files", len(names))
}

os.Exit(ec)
}

func TestBWE(t *testing.T) {
networks := map[string]vnetFactory{
"1mbps_very_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
"5mbps_very_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
"1mbps_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
"5mbps_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
"1mbps_medium_latency": createVirtualNetwork(1_000_000, 80_000, 150*time.Millisecond),
"5mbps_medium_latency": createVirtualNetwork(5_000_000, 80_000, 150*time.Millisecond),
"1mbps_high_latency": createVirtualNetwork(1_000_000, 80_000, 300*time.Millisecond),
"5mbps_high_latency": createVirtualNetwork(5_000_000, 80_000, 300*time.Millisecond),
}
peerOptions := map[string]struct {
receiver []option
sender []option
}{
"gcc-ccfb": {
receiver: []option{
registerCCFB(),
},
sender: []option{
registerPacer(),
initGCC(),
},
},
"gcc-twcc": {
receiver: []option{
registerTWCC(),
},
sender: []option{
registerPacer(),
registerTWCCHeaderExtension(),
initGCC(),
},
},
}
for netName, vnf := range networks {
for peerName, pos := range peerOptions {
t.Run(fmt.Sprintf("%v-%v", netName, peerName), func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
t.Helper()

logger, cleanup := testLogger(t)
defer cleanup()

onTrack := make(chan struct{})
connected := make(chan struct{})
done := make(chan struct{})

network := vnf(t)

receiverOptions := []option{
registerDefaultCodecs(),
setVNet(network.left, []string{"10.0.1.1"}),
onRemoteTrack(func(track *webrtc.TrackRemote) {
close(onTrack)
go func() {
buf := make([]byte, 1500)
for {
select {
case <-done:
return
default:
_, _, err := track.Read(buf)
if errors.Is(err, io.EOF) {
return
}
assert.NoError(t, err)
}
}
}()
}),
registerPacketLogger(logger.With("vantage-point", "receiver")),
}
receiverOptions = append(receiverOptions, pos.receiver...)
receiver, err := newPeer(receiverOptions...)
assert.NoError(t, err)

err = receiver.addRemoteTrack()
assert.NoError(t, err)

var codec *perfectCodec
senderOptions := []option{
registerDefaultCodecs(),
onConnected(func() { close(connected) }),
setVNet(network.right, []string{"10.0.2.1"}),
registerPacketLogger(logger.With("vantage-point", "sender")),
registerRTPFB(),
setOnRateCallback(func(rate int) {
logger.Info("setting codec target bitrate", "rate", rate)
codec.setTargetBitrate(rate)
}),
}
senderOptions = append(senderOptions, pos.sender...)
sender, err := newPeer(senderOptions...)
assert.NoError(t, err)

track, err := sender.addLocalTrack()
assert.NoError(t, err)

codec = newPerfectCodec(track, 1_000_000)
go func() {
<-connected
codec.start()
}()

offer, err := sender.createOffer()
assert.NoError(t, err)

err = receiver.setRemoteDescription(offer)
assert.NoError(t, err)

answer, err := receiver.createAnswer()
assert.NoError(t, err)

err = sender.setRemoteDescription(answer)
assert.NoError(t, err)

synctest.Wait()

select {
case <-onTrack:
case <-time.After(5 * time.Second):
assert.Fail(t, "on track not called")
}

time.Sleep(100 * time.Second)
close(done)

err = codec.Close()
assert.NoError(t, err)

err = sender.pc.Close()
assert.NoError(t, err)

err = receiver.pc.Close()
assert.NoError(t, err)

err = network.Close()
assert.NoError(t, err)

synctest.Wait()
})
})
}
}
}

func testLogger(t *testing.T) (*slog.Logger, func()) {
t.Helper()
name := strings.ReplaceAll(t.Name(), "/", "-")
filename := filepath.Join(logDir, fmt.Sprintf("%s.jsonl", name))
file, err := os.Create(filename)
if err != nil {
t.Fatalf("failed to create log file %q: %v", filename, err)
}

handler := slog.NewJSONHandler(file, &slog.HandlerOptions{Level: slog.LevelInfo})
logger := slog.New(handler)

cleanup := func() {
file.Sync()
file.Close()
}

return logger, cleanup
}
86 changes: 86 additions & 0 deletions gcc/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"time"
)

type arrivalGroupItem struct {
SequenceNumber uint64
Departure time.Time
Arrival time.Time
Size int
}

type arrivalGroup []arrivalGroupItem

type arrivalGroupAccumulator struct {
next arrivalGroup
burstInterval time.Duration
maxBurstDuration time.Duration
}

func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
return &arrivalGroupAccumulator{
next: make([]arrivalGroupItem, 0),
burstInterval: 5 * time.Millisecond,
maxBurstDuration: 100 * time.Millisecond,
}
}

func (a *arrivalGroupAccumulator) onPacketAcked(
sequenceNumber uint64,
size int,
departure, arrival time.Time,
) arrivalGroup {
if len(a.next) == 0 {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

sendTimeDelta := departure.Sub(a.next[0].Departure)
if sendTimeDelta < a.burstInterval {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

arrivalTimeDeltaLast := arrival.Sub(a.next[len(a.next)-1].Arrival)
arrivalTimeDeltaFirst := arrival.Sub(a.next[0].Arrival)
propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta

if propagationDelta < 0 && arrivalTimeDeltaLast <= a.burstInterval && arrivalTimeDeltaFirst < a.maxBurstDuration {
a.next = append(a.next, arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
})

return nil
}

group := make(arrivalGroup, len(a.next))
copy(group, a.next)
a.next = arrivalGroup{arrivalGroupItem{
SequenceNumber: sequenceNumber,
Size: size,
Departure: departure,
Arrival: arrival,
}}

return group
}
Loading
Loading