Skip to content

Commit 542e7d7

Browse files
authored
lazy metric registering & metric name filtering (feast-dev#1195)
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 60b568d commit 542e7d7

5 files changed

Lines changed: 46 additions & 19 deletions

File tree

spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,7 @@ trait BasePipeline {
4141
case Some(c: StatsDConfig) =>
4242
conf
4343
.set(
44-
"spark.metrics.conf.*.source.redis.class",
45-
"org.apache.spark.metrics.source.RedisSinkMetricSource"
46-
)
47-
.set(
48-
"spark.metrics.conf.*.source.redis.labels",
44+
"spark.metrics.labels",
4945
s"feature_table=${jobConfig.featureTable.name}"
5046
)
5147
.set(
@@ -56,7 +52,7 @@ trait BasePipeline {
5652
.set("spark.metrics.conf.*.sink.statsd.port", c.port.toString)
5753
.set("spark.metrics.conf.*.sink.statsd.period", "30")
5854
.set("spark.metrics.conf.*.sink.statsd.unit", "seconds")
59-
.set("spark.metrics.namespace", jobConfig.mode.toString)
55+
.set("spark.metrics.namespace", jobConfig.mode.toString.toLowerCase)
6056
case None => ()
6157
}
6258

spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,28 @@ class StatsdReporterWithTags(
169169
reportMetered(name, timer)
170170
}
171171

172+
private val nameWithTag = """(\S+)#(\S+)""".r
173+
172174
private def send(name: String, value: String, metricType: String)(implicit
173175
socket: DatagramSocket
174176
): Unit = {
175-
val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
177+
val bytes = name match {
178+
case nameWithTag(name, tags) =>
179+
val tagsWithSemicolon = tags.replace('=', ':')
180+
sanitize(s"$name:$value|$metricType|#$tagsWithSemicolon").getBytes(UTF_8)
181+
case _ =>
182+
sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
183+
}
176184
val packet = new DatagramPacket(bytes, bytes.length, address)
177185
socket.send(packet)
178186
}
179187

180-
private val nameWithTag = """(\S+)#(\S+)""".r
181-
182188
private def fullName(name: String, suffixes: String*): String = name match {
183189
case nameWithTag(name, tags) =>
184-
MetricRegistry.name(prefix, name +: suffixes: _*) ++ "#" ++ tags
190+
// filter out parts that consists only from numbers
191+
// that could be executor-id for example
192+
val stableName = name.split('.').filterNot(_ forall Character.isDigit).mkString(".")
193+
MetricRegistry.name(prefix, stableName +: suffixes: _*) ++ "#" ++ tags
185194
case _ =>
186195
MetricRegistry.name(prefix, name +: suffixes: _*)
187196
}

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
4848
extends BaseRelation
4949
with InsertableRelation
5050
with Serializable {
51+
52+
import RedisSinkRelation._
53+
5154
private implicit val redisConfig: RedisConfig = {
5255
new RedisConfig(
5356
new RedisEndpoint(sqlContext.sparkContext.getConf)
@@ -151,11 +154,21 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
151154
.build
152155
}
153156

154-
private lazy val metricSource: Option[RedisSinkMetricSource] =
157+
private lazy val metricSource: Option[RedisSinkMetricSource] = {
158+
MetricInitializationLock.synchronized {
159+
// RedisSinkMetricSource needs to be registered on executor and SparkEnv must already exist.
160+
// Which is problematic, since metrics system is initialized before SparkEnv set.
161+
// That's why I moved source registering here
162+
if (SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName).isEmpty) {
163+
SparkEnv.get.metricsSystem.registerSource(new RedisSinkMetricSource)
164+
}
165+
}
166+
155167
SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName) match {
156168
case Seq(head) => Some(head.asInstanceOf[RedisSinkMetricSource])
157169
case _ => None
158170
}
171+
}
159172

160173
private def groupKeysByNode(
161174
nodes: Array[RedisNode],
@@ -202,3 +215,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
202215

203216
}
204217
}
218+
219+
object RedisSinkRelation {
220+
object MetricInitializationLock
221+
}

spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,28 @@
1717
package org.apache.spark.metrics.source
1818

1919
import com.codahale.metrics.MetricRegistry
20-
import org.apache.spark.{SparkConf, SparkEnv}
20+
import org.apache.spark.SparkEnv
2121

2222
class RedisSinkMetricSource extends Source {
2323
override val sourceName: String = RedisSinkMetricSource.sourceName
2424

2525
override val metricRegistry: MetricRegistry = new MetricRegistry
2626

27-
private val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true))
27+
private val sparkConfig = SparkEnv.get.conf
2828

29-
private val metricLabels = sparkConfig.get("spark.metrics.conf.*.source.redis.labels")
29+
private val metricLabels = sparkConfig.get("spark.metrics.labels", "")
3030

31-
private def nameWithLabels(name: String) =
31+
private val appId = sparkConfig.get("spark.app.id", "")
32+
33+
private val executorId = sparkConfig.get("spark.executor.id", "")
34+
35+
private def nameWithLabels(name: String) = {
3236
if (metricLabels.isEmpty) {
3337
name
3438
} else {
35-
s"$name#$metricLabels"
39+
s"$name#$metricLabels,job_id=$appId-$executorId"
3640
}
41+
}
3742

3843
val METRIC_TOTAL_ROWS_INSERTED =
3944
metricRegistry.counter(nameWithLabels("feast_ingestion_feature_row_ingested_count"))

spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,19 @@ class StatsReporterSpec extends UnitSpec {
8989
server.receive should contain("test:0|g")
9090
}
9191

92-
"Statsd reporter" should "keep tags part in the name's end" in new Scope {
92+
"Statsd reporter" should "keep tags part in the message's end" in new Scope {
9393
reporter.report(
9494
gauges = Collections.emptySortedMap(),
9595
counters = Collections.emptySortedMap(),
9696
histograms = new util.TreeMap(
9797
Map(
98-
"test#fs=name" -> histogram((1 to 100))
98+
"prefix.1111.test#fs=name,job=aaa" -> histogram((1 to 100))
9999
).asJava
100100
),
101101
meters = Collections.emptySortedMap(),
102102
timers = Collections.emptySortedMap()
103103
)
104104

105-
server.receive should contain("test.p95#fs=name:95.95|ms")
105+
server.receive should contain("prefix.test.p95:95.95|ms|#fs:name,job:aaa")
106106
}
107107
}

0 commit comments

Comments
 (0)