From 7b2292ee20c5d49053cc5262dfbc99ce121b9b74 Mon Sep 17 00:00:00 2001
From: tifayuki <tifayuki@gmail.com>
Date: Tue, 13 Feb 2018 13:30:56 -0800
Subject: [PATCH 1/4] Add notification metrics

It adds notification related prometheus metrics, including:
  - total count for events/success/failure/error
  - total count for notification per each status code
  - gauge of the pending notification queue

Signed-off-by: tifayuki <tifayuki@gmail.com>
---
 metrics/prometheus.go    |  3 +++
 notifications/metrics.go | 28 ++++++++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/metrics/prometheus.go b/metrics/prometheus.go
index b5a532144..91b32b23d 100644
--- a/metrics/prometheus.go
+++ b/metrics/prometheus.go
@@ -10,4 +10,7 @@ const (
 var (
 	// StorageNamespace is the prometheus namespace of blob/cache related operations
 	StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil)
+
+	// NotificationsNamespace is the prometheus namespace of notification related metrics
+	NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil)
 )
diff --git a/notifications/metrics.go b/notifications/metrics.go
index a20af1687..69960e9cb 100644
--- a/notifications/metrics.go
+++ b/notifications/metrics.go
@@ -5,6 +5,18 @@ import (
 	"fmt"
 	"net/http"
 	"sync"
+
+	prometheus "github.com/docker/distribution/metrics"
+	"github.com/docker/go-metrics"
+)
+
+var (
+	// eventsCounter counts total events of incoming, success, failure, and errors
+	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
+	// pendingGauge measures the pending queue size
+	pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
+	// statusCounter counts the total notification call per each status code
+	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
 )
 
 // EndpointMetrics track various actions taken by the endpoint, typically by
@@ -61,6 +73,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
 	defer emsl.safeMetrics.Unlock()
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Successes += len(events)
+
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
+	eventsCounter.WithValues("Successes").Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
@@ -68,12 +83,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
 	defer emsl.safeMetrics.Unlock()
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Failures += len(events)
+
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
+	eventsCounter.WithValues("Failures").Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 	emsl.safeMetrics.Lock()
 	defer emsl.safeMetrics.Unlock()
 	emsl.Errors += len(events)
+
+	eventsCounter.WithValues("Errors").Inc(1)
 }
 
 // endpointMetricsEventQueueListener maintains the incoming events counter and
@@ -87,12 +107,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 	defer eqc.Unlock()
 	eqc.Events += len(events)
 	eqc.Pending += len(events)
+
+	eventsCounter.WithValues("Events").Inc()
+	pendingGauge.Inc(1)
 }
 
 func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 	eqc.Lock()
 	defer eqc.Unlock()
 	eqc.Pending -= len(events)
+
+	pendingGauge.Dec(1)
 }
 
 // endpoints is global registry of endpoints used to report metrics to expvar
@@ -149,4 +174,7 @@ func init() {
 	}))
 
 	registry.(*expvar.Map).Set("notifications", &notifications)
+
+	// register prometheus metrics
+	metrics.Register(prometheus.NotificationsNamespace)
 }

From 4497e40eda1e0024f055c09ab480b7816a1147b1 Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Thu, 11 Oct 2018 21:39:02 +0800
Subject: [PATCH 2/4] add label to the metrics

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 notifications/endpoint.go   |  2 +-
 notifications/http_test.go  |  2 +-
 notifications/metrics.go    | 26 ++++++++++++++------------
 notifications/sinks_test.go |  2 +-
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/notifications/endpoint.go b/notifications/endpoint.go
index a8a52d0c9..854f1dd6c 100644
--- a/notifications/endpoint.go
+++ b/notifications/endpoint.go
@@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
 	endpoint.url = url
 	endpoint.EndpointConfig = config
 	endpoint.defaults()
-	endpoint.metrics = newSafeMetrics()
+	endpoint.metrics = newSafeMetrics(name)
 
 	// Configures the inmemory queue, retry, http pipeline.
 	endpoint.Sink = newHTTPSink(
diff --git a/notifications/http_test.go b/notifications/http_test.go
index de47f789e..b7845cf95 100644
--- a/notifications/http_test.go
+++ b/notifications/http_test.go
@@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) {
 	})
 	server := httptest.NewTLSServer(serverHandler)
 
-	metrics := newSafeMetrics()
+	metrics := newSafeMetrics("")
 	sink := newHTTPSink(server.URL, 0, nil, nil,
 		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
 
diff --git a/notifications/metrics.go b/notifications/metrics.go
index 69960e9cb..4464edd8f 100644
--- a/notifications/metrics.go
+++ b/notifications/metrics.go
@@ -12,11 +12,11 @@ import (
 
 var (
 	// eventsCounter counts total events of incoming, success, failure, and errors
-	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
+	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to")
 	// pendingGauge measures the pending queue size
-	pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
+	pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to")
 	// statusCounter counts the total notification call per each status code
-	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
+	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to")
 )
 
 // EndpointMetrics track various actions taken by the endpoint, typically by
@@ -34,14 +34,16 @@ type EndpointMetrics struct {
 // safeMetrics guards the metrics implementation with a lock and provides a
 // safe update function.
 type safeMetrics struct {
+	EndpointName string
 	EndpointMetrics
 	sync.Mutex // protects statuses map
 }
 
 // newSafeMetrics returns safeMetrics with map allocated.
-func newSafeMetrics() *safeMetrics {
+func newSafeMetrics(name string) *safeMetrics {
 	var sm safeMetrics
 	sm.Statuses = make(map[string]int)
+	sm.EndpointName = name
 	return &sm
 }
 
@@ -74,8 +76,8 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Successes += len(events)
 
-	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
-	eventsCounter.WithValues("Successes").Inc(1)
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
+	eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
@@ -84,8 +86,8 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Failures += len(events)
 
-	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
-	eventsCounter.WithValues("Failures").Inc(1)
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
+	eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
@@ -93,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 	defer emsl.safeMetrics.Unlock()
 	emsl.Errors += len(events)
 
-	eventsCounter.WithValues("Errors").Inc(1)
+	eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
 }
 
 // endpointMetricsEventQueueListener maintains the incoming events counter and
@@ -108,8 +110,8 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 	eqc.Events += len(events)
 	eqc.Pending += len(events)
 
-	eventsCounter.WithValues("Events").Inc()
-	pendingGauge.Inc(1)
+	eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
+	pendingGauge.WithValues(eqc.EndpointName).Inc(1)
 }
 
 func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
@@ -117,7 +119,7 @@ func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 	defer eqc.Unlock()
 	eqc.Pending -= len(events)
 
-	pendingGauge.Dec(1)
+	pendingGauge.WithValues(eqc.EndpointName).Dec(1)
 }
 
 // endpoints is global registry of endpoints used to report metrics to expvar
diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go
index 06f88b2c9..4a69486b5 100644
--- a/notifications/sinks_test.go
+++ b/notifications/sinks_test.go
@@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) {
 func TestEventQueue(t *testing.T) {
 	const nevents = 1000
 	var ts testSink
-	metrics := newSafeMetrics()
+	metrics := newSafeMetrics("")
 	eq := newEventQueue(
 		// delayed sync simulates destination slower than channel comms
 		&delayedSink{

From 73e4232b5171c2988b0daeea517aa07386e7945d Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Mon, 15 Oct 2018 19:50:38 +0800
Subject: [PATCH 3/4] run go fmt

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 registry/storage/driver/s3-aws/s3.go     | 10 +++++-----
 registry/storage/linkedblobstore.go      | 16 ++++++++--------
 registry/storage/linkedblobstore_test.go |  4 ++--
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
index 800435d01..9cd87dbab 100644
--- a/registry/storage/driver/s3-aws/s3.go
+++ b/registry/storage/driver/s3-aws/s3.go
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
 	// }
 
 	d := &driver{
-		S3:                          s3obj,
-		Bucket:                      params.Bucket,
-		ChunkSize:                   params.ChunkSize,
-		Encrypt:                     params.Encrypt,
-		KeyID:                       params.KeyID,
+		S3:        s3obj,
+		Bucket:    params.Bucket,
+		ChunkSize: params.ChunkSize,
+		Encrypt:   params.Encrypt,
+		KeyID:     params.KeyID,
 		MultipartCopyChunkSize:      params.MultipartCopyChunkSize,
 		MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
 		MultipartCopyThresholdSize:  params.MultipartCopyThresholdSize,
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
index de591c8a5..3fb1da26f 100644
--- a/registry/storage/linkedblobstore.go
+++ b/registry/storage/linkedblobstore.go
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
 	}
 
 	bw := &blobWriter{
-		ctx:                    ctx,
-		blobStore:              lbs,
-		id:                     uuid,
-		startedAt:              startedAt,
-		digester:               digest.Canonical.Digester(),
-		fileWriter:             fw,
-		driver:                 lbs.driver,
-		path:                   path,
+		ctx:        ctx,
+		blobStore:  lbs,
+		id:         uuid,
+		startedAt:  startedAt,
+		digester:   digest.Canonical.Digester(),
+		fileWriter: fw,
+		driver:     lbs.driver,
+		path:       path,
 		resumableDigestEnabled: lbs.resumableDigestEnabled,
 	}
 
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
index e0ffd2796..85376f715 100644
--- a/registry/storage/linkedblobstore_test.go
+++ b/registry/storage/linkedblobstore_test.go
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
 func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
 	return &mockBlobDescriptorService{
 		BlobDescriptorService: svc,
-		t:                     f.t,
-		stats:                 f.stats,
+		t:     f.t,
+		stats: f.stats,
 	}
 }
 

From 5c66b577b027e3b314680f245be4213a002fcee0 Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Mon, 15 Oct 2018 20:18:36 +0800
Subject: [PATCH 4/4] run go fmt and goimports

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 registry/storage/driver/s3-aws/s3.go     | 10 +++++-----
 registry/storage/linkedblobstore.go      | 16 ++++++++--------
 registry/storage/linkedblobstore_test.go |  4 ++--
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
index 9cd87dbab..800435d01 100644
--- a/registry/storage/driver/s3-aws/s3.go
+++ b/registry/storage/driver/s3-aws/s3.go
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
 	// }
 
 	d := &driver{
-		S3:        s3obj,
-		Bucket:    params.Bucket,
-		ChunkSize: params.ChunkSize,
-		Encrypt:   params.Encrypt,
-		KeyID:     params.KeyID,
+		S3:                          s3obj,
+		Bucket:                      params.Bucket,
+		ChunkSize:                   params.ChunkSize,
+		Encrypt:                     params.Encrypt,
+		KeyID:                       params.KeyID,
 		MultipartCopyChunkSize:      params.MultipartCopyChunkSize,
 		MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
 		MultipartCopyThresholdSize:  params.MultipartCopyThresholdSize,
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
index 3fb1da26f..de591c8a5 100644
--- a/registry/storage/linkedblobstore.go
+++ b/registry/storage/linkedblobstore.go
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
 	}
 
 	bw := &blobWriter{
-		ctx:        ctx,
-		blobStore:  lbs,
-		id:         uuid,
-		startedAt:  startedAt,
-		digester:   digest.Canonical.Digester(),
-		fileWriter: fw,
-		driver:     lbs.driver,
-		path:       path,
+		ctx:                    ctx,
+		blobStore:              lbs,
+		id:                     uuid,
+		startedAt:              startedAt,
+		digester:               digest.Canonical.Digester(),
+		fileWriter:             fw,
+		driver:                 lbs.driver,
+		path:                   path,
 		resumableDigestEnabled: lbs.resumableDigestEnabled,
 	}
 
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
index 85376f715..e0ffd2796 100644
--- a/registry/storage/linkedblobstore_test.go
+++ b/registry/storage/linkedblobstore_test.go
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
 func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
 	return &mockBlobDescriptorService{
 		BlobDescriptorService: svc,
-		t:     f.t,
-		stats: f.stats,
+		t:                     f.t,
+		stats:                 f.stats,
 	}
 }
 
