From 1e2b7f7c00fbded9e9ebb83b10ce302155ba444f Mon Sep 17 00:00:00 2001
Message-ID: <1e2b7f7c00fbded9e9ebb83b10ce302155ba444f.1737052666.git.sam@gentoo.org>
In-Reply-To: <1993383ddf67e296334c7916d6afc699ee6300c7.1737052666.git.sam@gentoo.org>
References: <1993383ddf67e296334c7916d6afc699ee6300c7.1737052666.git.sam@gentoo.org>
From: Wim Taymans <wtaymans@redhat.com>
Date: Tue, 26 Nov 2024 17:45:41 +0100
Subject: [PATCH 4/8] gst: add rate control to the sink

Track the elapsed time between buffers and try to keep the buffer fill
level constant by changing the rate of the stream.

See #4374
---
 src/gst/gstpipewiresink.c   | 76 ++++++++++++++++++++++++++++++++++---
 src/gst/gstpipewiresink.h   |  4 ++
 src/gst/gstpipewirestream.c |  1 +
 src/gst/gstpipewirestream.h |  8 ++++
 src/gst/meson.build         |  2 +-
 5 files changed, 84 insertions(+), 7 deletions(-)

diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c
index d79ceaa66..33f2322e9 100644
--- a/src/gst/gstpipewiresink.c
+++ b/src/gst/gstpipewiresink.c
@@ -26,6 +26,7 @@
 
 #include <spa/pod/builder.h>
 #include <spa/utils/result.h>
+#include <spa/utils/dll.h>
 
 #include <gst/video/video.h>
 
@@ -481,14 +482,13 @@ static void
 do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
 {
   GstPipeWirePoolData *data;
+  GstPipeWireStream *stream = pwsink->stream;
   gboolean res;
   guint i;
   struct spa_buffer *b;
 
   data = gst_pipewire_pool_get_data(buffer);
 
-  GST_LOG_OBJECT (pwsink, "queue buffer %p, pw_buffer %p", buffer, data->b);
-
   b = data->b->buffer;
 
   if (data->header) {
@@ -508,12 +508,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
       data->crop->region.size.height = meta->width;
     }
   }
+  data->b->size = 0;
   for (i = 0; i < b->n_datas; i++) {
     struct spa_data *d = &b->datas[i];
     GstMemory *mem = gst_buffer_peek_memory (buffer, i);
     d->chunk->offset = mem->offset;
     d->chunk->size = mem->size;
-    d->chunk->stride = pwsink->stream->pool->video_info.stride[i];
+    d->chunk->stride = stream->pool->video_info.stride[i];
+
+    data->b->size += mem->size / 4;
   }
 
   GstVideoMeta *meta = gst_buffer_get_video_meta (buffer);
@@ -532,9 +535,50 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
     }
   }
 
-  if ((res = pw_stream_queue_buffer (pwsink->stream->pwstream, data->b)) < 0) {
+  if ((res = pw_stream_queue_buffer (stream->pwstream, data->b)) < 0) {
     g_warning ("can't send buffer %s", spa_strerror(res));
   }
+
+  if (pwsink->rate_match) {
+    double err, corr;
+    struct pw_time ts;
+    guint64 queued, now, elapsed, target;
+
+    pw_stream_get_time_n(stream->pwstream, &ts, sizeof(ts));
+    now = pw_stream_get_nsec(stream->pwstream);
+    if (ts.now != 0)
+	    elapsed = gst_util_uint64_scale_int (now - ts.now, ts.rate.denom, GST_SECOND * ts.rate.num);
+    else
+	    elapsed = 0;
+
+    queued = ts.queued - ts.size;
+    target = 2 * elapsed;
+    err = ((gint64)queued - ((gint64)target));
+
+    corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -128.0, 128.0));
+
+    stream->err_wdw = (double)ts.rate.denom/ts.size;
+
+    double avg = (stream->err_avg * stream->err_wdw + (err - stream->err_avg)) / (stream->err_wdw + 1.0);
+    stream->err_var = (stream->err_var * stream->err_wdw +
+                      (err - stream->err_avg) * (err - avg)) / (stream->err_wdw + 1.0);
+    stream->err_avg = avg;
+
+    if (stream->last_ts == 0 || stream->last_ts + SPA_NSEC_PER_SEC < now) {
+      stream->last_ts = now;
+      spa_dll_set_bw(&stream->dll, SPA_CLAMPD(fabs(stream->err_avg) / sqrt(fabs(stream->err_var)), 0.001, SPA_DLL_BW_MAX),
+                     ts.size, ts.rate.denom);
+    GST_INFO_OBJECT (pwsink, "queue buffer %p, pw_buffer %p q:%"PRIi64"/%"PRIi64" e:%"PRIu64
+		    " err:%+03f corr:%f %f %f %f",
+                    buffer, data->b, ts.queued, ts.size, elapsed, err, corr,
+		    stream->err_avg, stream->err_var, stream->dll.bw);
+    }
+
+    if (pwsink->match) {
+	pwsink->match->rate = corr;
+	SPA_FLAG_UPDATE(pwsink->match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, true);
+    }
+  }
 }
 
 
@@ -576,6 +620,18 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
   pw_thread_loop_signal (pwsink->stream->core->loop, FALSE);
 }
 
+static void
+on_io_changed (void *obj, uint32_t id, void *data, uint32_t size)
+{
+  GstPipeWireSink *pwsink = obj;
+
+  switch (id) {
+    case SPA_IO_RateMatch:
+      pwsink->match = data;
+      break;
+  }
+}
+
 static void
 on_param_changed (void *data, uint32_t id, const struct spa_pod *param)
 {
@@ -613,9 +669,16 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
   pwsink = GST_PIPEWIRE_SINK (bsink);
 
   s = gst_caps_get_structure (caps, 0);
-  rate = 0;
-  if (gst_structure_has_name (s, "audio/x-raw"))
+  if (gst_structure_has_name (s, "audio/x-raw")) {
     gst_structure_get_int (s, "rate", &rate);
+    pwsink->rate = rate;
+    pwsink->rate_match = true;
+  } else {
+    pwsink->rate = rate = 0;
+    pwsink->rate_match = false;
+  }
+
+  spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate);
 
   possible = gst_caps_to_format_all (caps);
 
@@ -791,6 +854,7 @@ not_negotiated:
 static const struct pw_stream_events stream_events = {
         PW_VERSION_STREAM_EVENTS,
         .state_changed = on_state_changed,
+        .io_changed = on_io_changed,
         .param_changed = on_param_changed,
         .add_buffer = on_add_buffer,
         .remove_buffer = on_remove_buffer,
diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h
index 74e6667e6..33d7b5b4f 100644
--- a/src/gst/gstpipewiresink.h
+++ b/src/gst/gstpipewiresink.h
@@ -50,8 +50,12 @@ struct _GstPipeWireSink {
 
   /* video state */
   gboolean negotiated;
+  gboolean rate_match;
+  gint rate;
 
   GstPipeWireSinkMode mode;
+
+  struct spa_io_rate_match *match;
 };
 
 GType gst_pipewire_sink_mode_get_type (void);
diff --git a/src/gst/gstpipewirestream.c b/src/gst/gstpipewirestream.c
index bf7641548..68cb9be21 100644
--- a/src/gst/gstpipewirestream.c
+++ b/src/gst/gstpipewirestream.c
@@ -19,6 +19,7 @@ gst_pipewire_stream_init (GstPipeWireStream * self)
   self->fd = -1;
   self->client_name = g_strdup (pw_get_client_name());
   self->pool = gst_pipewire_pool_new (self);
+  spa_dll_init(&self->dll);
 }
 
 static void
diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h
index ff8c8e2e6..a301375c7 100644
--- a/src/gst/gstpipewirestream.h
+++ b/src/gst/gstpipewirestream.h
@@ -11,6 +11,7 @@
 #include "gstpipewirecore.h"
 
 #include <gst/gst.h>
+#include <spa/utils/dll.h>
 #include <pipewire/pipewire.h>
 
 G_BEGIN_DECLS
@@ -29,6 +30,13 @@ struct _GstPipeWireStream {
   GstPipeWirePool *pool;
   GstClock *clock;
 
+  guint64 position;
+  struct spa_dll dll;
+  double err_avg, err_var, err_wdw;
+  guint64 last_ts;
+  guint64 base_buffer_ts;
+  guint64 base_ts;
+
   /* the actual pw stream */
   struct pw_stream *pwstream;
   struct spa_hook pwstream_listener;
diff --git a/src/gst/meson.build b/src/gst/meson.build
index ba1f6d558..1e39bcf89 100644
--- a/src/gst/meson.build
+++ b/src/gst/meson.build
@@ -27,7 +27,7 @@ pipewire_gst_headers = [
 pipewire_gst = shared_library('gstpipewire',
     pipewire_gst_sources,
     include_directories : [ configinc ],
-    dependencies : [ spa_dep, gst_dep, pipewire_dep ],
+    dependencies : [ spa_dep, gst_dep, pipewire_dep, mathlib ],
     install : true,
     install_dir : '@0@/gstreamer-1.0'.format(get_option('libdir')),
 )
-- 
2.48.0

