Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8a1a310
Add Synapse Launcher
xiaoyongzhu Apr 25, 2021
8c06c45
Remove unnecessary printout
xiaoyongzhu Apr 26, 2021
b3fd983
Add eventhub support
xiaoyongzhu May 8, 2021
24f0986
Merge branch 'master' of https://github.com/xiaoyongzhu/feast-spark
xiaoyongzhu May 19, 2021
89f48a3
Add EventHub support and Redis Auth support
xiaoyongzhu May 19, 2021
f2cd8be
Adding EventHub support in Spark jobs
xiaoyongzhu May 19, 2021
53d7e20
add ScheduledBatchIngestionJobParameters
xiaoyongzhu Aug 31, 2021
79866e9
Merge pull request #1 from feast-dev/master
xiaoyongzhu Aug 31, 2021
c469ee7
Add Azure specific dependencies
xiaoyongzhu Sep 2, 2021
91c6822
Change azure storage dependencies
xiaoyongzhu Sep 2, 2021
0a7a56c
Commen for removing/adding spaces between brackets
xiaoyongzhu Sep 9, 2021
f4c0d5a
Delete feature_store_debug.py
xiaoyongzhumsft Sep 9, 2021
6bc9260
Update StreamingPipeline.scala
xiaoyongzhumsft Sep 14, 2021
dd53a53
Merge branch 'feast-dev:master' into master
xiaoyongzhumsft Oct 1, 2021
9fde235
Update synapse.py
xiaoyongzhumsft Oct 1, 2021
bb3d6be
Update synapse.py
xiaoyongzhumsft Oct 1, 2021
6885f31
Merge branch 'feast-dev:master' into master
xiaoyongzhumsft Oct 4, 2021
08da84f
Fix Redis auth issue
xiaoyongzhu Oct 12, 2021
0ddbcef
Update Ingestion jobs and add supporting files
xiaoyongzhu Oct 12, 2021
48a1c44
Fix build issues
xiaoyongzhu Oct 12, 2021
762386e
Add support for Kafka ingestion
xiaoyongzhu Oct 12, 2021
41fc406
Add build and push instructions
xiaoyongzhu Oct 12, 2021
0f7d433
Adding License
xiaoyongzhu Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding EventHub support in Spark jobs
  • Loading branch information
xiaoyongzhu committed May 19, 2021
commit f2cd8be5d003612f9d89dad811b991d96370b4fb
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ object BasePipeline {
val conf = new SparkConf()

jobConfig.store match {
case RedisConfig(host, port, ssl) =>
case RedisConfig(host, port, auth, ssl) =>
conf
.set("spark.redis.host", host)
.set("spark.redis.port", port.toString)
.set("spark.redis.ssl", ssl.toString)
.set("spark.redis.auth", auth.toString)
case BigTableConfig(projectId, instanceId) =>
conf
.set("spark.bigtable.projectId", projectId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.joda.time.{DateTime, DateTimeZone}
import org.json4s._
import org.json4s.ext.JavaEnumNameSerializer
import org.json4s.jackson.JsonMethods.{parse => parseJSON}
import org.json4s.ext.JavaEnumNameSerializer
import scala.collection.mutable.ArrayBuffer

object IngestionJob {
import Modes._
Expand Down Expand Up @@ -116,8 +118,21 @@ object IngestionJob {
.action((x, c) => c.copy(streamingTriggeringSecs = x))
}

opt[String](name = "kafka_sasl_auth")
.action((x, c) => c.copy(kafkaSASL = Some(x)))
}

def main(args: Array[String]): Unit = {
parser.parse(args, IngestionJobConfig()) match {
println("Debug... Received following argument:")
println(args.toList)
val args_modified = new Array[String](args.length)
for ( i <- 0 to (args_modified.length - 1)) {
args_modified(i) = args(i).replace(" }", "}");
args_modified(i) = args_modified(i).replace("\\", "\\\"");
}
println("Remove additional spaces in args:")
println(args_modified.toList)
parser.parse(args_modified, IngestionJobConfig()) match {
case Some(config) =>
println(s"Starting with config $config")
config.mode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ case class IngestionJobConfig(
source: Source = null,
startTime: DateTime = DateTime.now(),
endTime: DateTime = DateTime.now(),
store: StoreConfig = RedisConfig("localhost", 6379, false),
store: StoreConfig = RedisConfig("localhost", 6379, "", false),
metrics: Option[MetricConfig] = None,
deadLetterPath: Option[String] = None,
stencilURL: Option[String] = None,
streamingTriggeringSecs: Int = 0,
validationConfig: Option[ValidationConfig] = None,
doNotIngestInvalidRows: Boolean = false,
checkpointPath: Option[String] = None
kafkaSASL: Option[String] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.{SparkEnv, SparkFiles}
import org.apache.spark.eventhubs._
import org.apache.kafka.common.security.plain.PlainLoginModule
import org.apache.kafka.common.security.JaasContext

/**
* Streaming pipeline (currently in micro-batches mode only, since we need to have multiple sinks: redis & deadletters).
Expand All @@ -60,24 +62,36 @@ object StreamingPipeline extends BasePipeline with Serializable {
val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn)
val metrics = new IngestionPipelineMetrics
val validationUDF = createValidationUDF(sparkSession, config)
val connStr = "Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=xiaoyzhufeasttesteh"
val ehConf = EventHubsConf(connStr).setStartingPosition(EventPosition.fromStartOfStream)

val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://xiaoyzhufeasttest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;EntityPath=driver_trips\";"

val input = config.source match {
case source: KafkaSource =>
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", source.bootstrapServers)
.option("subscribe", source.topic)
.load()
case source: EventHubSource =>
sparkSession.readStream
.format("eventhubs")
.options(ehConf.toMap)
.load()
if (config.kafkaSASL.nonEmpty)
{
// if we have authentication enabled
sparkSession.readStream
.format("kafka")
.option("subscribe", source.topic)
.option("kafka.bootstrap.servers", source.bootstrapServers)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", config.kafkaSASL.get)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("failOnDataLoss", "false")
.load()
}
else
{
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", source.bootstrapServers)
.option("subscribe", source.topic)
.load()
}
case source: MemoryStreamingSource =>
source.read
source.read
}

val parsed = config.source.asInstanceOf[StreamingSource].format match {
Expand Down