|
| 1 | +# H2OWorld - Building Machine Learning Applications with Sparkling Water |
| 2 | + |
| 3 | +## Requirements |
| 4 | + |
| 5 | + - Oracle Java 7+ ([USB](../../)) |
| 6 | + - [Spark 1.5.1](http://spark.apache.org/downloads.html) ([USB](../../Spark)) |
| 7 | + - [Sparkling Water 1.5.4](http://h2o-release.s3.amazonaws.com/sparkling-water/rel-1.5/4/index.html) ([USB](../../SparklingWater)) |
| 8 | + - [SMS dataset](https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/smsData.txt) ([USB](../data/smsData.txt)) |
| 9 | + |
| 10 | +## Provided USB |
| 11 | + - [Binaries](../../) |
| 12 | + - [SMS dataset](../data/smsData.txt) |
| 13 | + - [Slides](SparklingWater.pdf) |
| 14 | + - [Scala Script](h2oworld.script.scala) |
| 15 | + |
| 16 | +## Machine Learning Workflow |
| 17 | + |
| 18 | +**Goal**: For a given text message identify if it is spam or not. |
| 19 | + |
| 20 | + 1. Extract data |
| 21 | + 2. Transform, tokenize messages |
| 22 | + 3. Build Spark's [Tf-IDF model](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) and expand messages to feature vectors |
| 23 | + 4. Create and evaluate [H2O's Deep Learning model](https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/dl/dl.md) |
| 24 | + 5. Use the models to detect spam messages |
| 25 | + |
| 26 | +### Prepare environment |
| 27 | + |
| 28 | +1. Run Sparkling shell with an embedded Spark cluster: |
| 29 | + ``` |
| 30 | + export SPARK_HOME="/path/to/spark/installation" |
| 31 | + export MASTER="local-cluster[3,2,4096]" |
| 32 | + cd SparklingWater |
| 33 | + bin/sparkling-shell --conf spark.executor.memory=2G |
| 34 | + ``` |
| 35 | + > Note: I would recommend to edit your `$SPARK_HOME/conf/log4j.properties` and configure log level to `WARN` to avoid flooding output with Spark INFO messages. |
| 36 | +
|
| 37 | +2. Open Spark UI: You can go to [http://localhost:4040/](http://localhost:4040/) to see the Spark status. |
| 38 | + |
| 39 | +3. Prepare environment |
| 40 | + ```scala |
| 41 | + // Input data |
| 42 | + val DATAFILE="../data/smsData.txt" |
| 43 | + // Common imports from H2O and Sparks |
| 44 | + import _root_.hex.deeplearning.{DeepLearningModel, DeepLearning} |
| 45 | + import _root_.hex.deeplearning.DeepLearningParameters |
| 46 | + import org.apache.spark.examples.h2o.DemoUtils._ |
| 47 | + import org.apache.spark.h2o._ |
| 48 | + import org.apache.spark.mllib |
| 49 | + import org.apache.spark.mllib.feature.{IDFModel, IDF, HashingTF} |
| 50 | + import org.apache.spark.rdd.RDD |
| 51 | + import water.Key |
| 52 | + ``` |
| 53 | + |
| 54 | +4. Define representation of training message: |
| 55 | + ```scala |
| 56 | + // Representation of a training message |
| 57 | + case class SMS(target: String, fv: mllib.linalg.Vector) |
| 58 | + ``` |
| 59 | + |
| 60 | +5. Define data loader and parser: |
| 61 | + ```scala |
| 62 | + def load(dataFile: String): RDD[Array[String]] = { |
| 63 | + // Load file into memory, split on TABs and filter all empty lines |
| 64 | + sc.textFile(dataFile).map(l => l.split("\t")).filter(r => !r(0).isEmpty) |
| 65 | + } |
| 66 | + ``` |
| 67 | + |
| 68 | +6. Input messages tokenizer: |
| 69 | + ```scala |
| 70 | + // Tokenizer - for each sentence in input RDD it provides array of string representing individual interesting words in the sentence |
| 71 | + def tokenize(dataRDD: RDD[String]): RDD[Seq[String]] = { |
| 72 | + // Ignore all useless words |
| 73 | + val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for") |
| 74 | + // Ignore all useless characters |
| 75 | + val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1') |
| 76 | + |
| 77 | + // Invoke RDD API and transform input data |
| 78 | + val textsRDD = dataRDD.map( r => { |
| 79 | + // Get rid of all useless characters |
| 80 | + var smsText = r.toLowerCase |
| 81 | + for( c <- ignoredChars) { |
| 82 | + smsText = smsText.replace(c, ' ') |
| 83 | + } |
| 84 | + // Remove empty and uninteresting words |
| 85 | + val words = smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct |
| 86 | + |
| 87 | + words.toSeq |
| 88 | + }) |
| 89 | + textsRDD |
| 90 | + } |
| 91 | + ``` |
| 92 | + |
| 93 | +6. Spark's Tf-IDF model builder. |
| 94 | + ```scala |
| 95 | + def buildIDFModel(tokensRDD: RDD[Seq[String]], |
| 96 | + minDocFreq:Int = 4, |
| 97 | + hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[mllib.linalg.Vector]) = { |
| 98 | + // Hash strings into the given space |
| 99 | + val hashingTF = new HashingTF(hashSpaceSize) |
| 100 | + val tf = hashingTF.transform(tokensRDD) |
| 101 | + |
| 102 | + // Build term frequency-inverse document frequency model |
| 103 | + val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf) |
| 104 | + val expandedTextRDD = idfModel.transform(tf) |
| 105 | + (hashingTF, idfModel, expandedTextRDD) |
| 106 | + } |
| 107 | + ``` |
| 108 | + |
| 109 | + > **Wikipedia** says: "tf–idf, short for term frequency–inverse document frequency, is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus. It is often used as a weighting factor in information retrieval and text mining. The tf-idf value increases proportionally to the number of times a word appears in the document, but is offset by the frequency of the word in the corpus, which helps to adjust for the fact that some words appear more frequently in general. |
| 110 | + |
| 111 | +7. H2O's DeepLearning model builder: |
| 112 | + ```scala |
| 113 | + def buildDLModel(trainHF: Frame, validHF: Frame, |
| 114 | + epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0, |
| 115 | + hidden: Array[Int] = Array[Int](200, 200)) |
| 116 | + (implicit h2oContext: H2OContext): DeepLearningModel = { |
| 117 | + import h2oContext._ |
| 118 | + import _root_.hex.deeplearning.DeepLearning |
| 119 | + import _root_.hex.deeplearning.DeepLearningParameters |
| 120 | + // Create algorithm parameres |
| 121 | + val dlParams = new DeepLearningParameters() |
| 122 | + // Name for target model |
| 123 | + dlParams._model_id = Key.make("dlModel.hex") |
| 124 | + // Training dataset |
| 125 | + dlParams._train = trainHF |
| 126 | + // Validation dataset |
| 127 | + dlParams._valid = validHF |
| 128 | + // Column used as target for training |
| 129 | + dlParams._response_column = 'target |
| 130 | + // Number of passes over data |
| 131 | + dlParams._epochs = epochs |
| 132 | + // L1 penalty |
| 133 | + dlParams._l1 = l1 |
| 134 | + // Number internal hidden layers |
| 135 | + dlParams._hidden = hidden |
| 136 | + |
| 137 | + // Create a DeepLearning job |
| 138 | + val dl = new DeepLearning(dlParams) |
| 139 | + // And launch it |
| 140 | + val dlModel = dl.trainModel.get |
| 141 | + |
| 142 | + // Force computation of model metrics on both datasets |
| 143 | + dlModel.score(train).delete() |
| 144 | + dlModel.score(valid).delete() |
| 145 | + |
| 146 | + // And return resulting model |
| 147 | + dlModel |
| 148 | + } |
| 149 | + ``` |
| 150 | +8. Initialize `H2OContext` and start H2O services on top of the Spark: |
| 151 | + ```scala |
| 152 | + // Create SQL support |
| 153 | + import org.apache.spark.sql._ |
| 154 | + implicit val sqlContext = SQLContext.getOrCreate(sc) |
| 155 | + import sqlContext.implicits._ |
| 156 | + |
| 157 | + // Start H2O services |
| 158 | + import org.apache.spark.h2o._ |
| 159 | + val h2oContext = new H2OContext(sc).start() |
| 160 | + ``` |
| 161 | + |
| 162 | +9. Open H2O UI and verify that H2O is running: |
| 163 | + ```scala |
| 164 | + h2oContext.openFlow |
| 165 | + ``` |
| 166 | + > At this point, you can go use H2O UI and see status of H2O cloud by typing `getCloud`. |
| 167 | + |
| 168 | + |
| 169 | +10. Build the final workflow by using all building pieces: |
| 170 | + ```scala |
| 171 | + // Data load |
| 172 | + val dataRDD = load(DATAFILE) |
| 173 | + // Extract response column from dataset |
| 174 | + val hamSpamRDD = dataRDD.map( r => r(0)) |
| 175 | + // Extract message from dataset |
| 176 | + val messageRDD = dataRDD.map( r => r(1)) |
| 177 | + // Tokenize message content |
| 178 | + val tokensRDD = tokenize(messageRDD) |
| 179 | + |
| 180 | + // Build IDF model on tokenized messages |
| 181 | + // It returns |
| 182 | + // - hashingTF: hashing function to hash a word to a vector space |
| 183 | + // - idfModel: a model to transform hashed sentence to a feature vector |
| 184 | + // - tfidf: transformed input messages |
| 185 | + var (hashingTF, idfModel, tfidfRDD) = buildIDFModel(tokensRDD) |
| 186 | + |
| 187 | + // Merge response with extracted vectors |
| 188 | + val resultDF = hamSpamRDD.zip(tfidfRDD).map(v => SMS(v._1, v._2)).toDF |
| 189 | + |
| 190 | + // Publish Spark DataFrame as H2OFrame |
| 191 | + val table = h2oContext.asH2OFrame(resultDF, "messages_table") |
| 192 | + |
| 193 | + // Transform target column into categorical! |
| 194 | + table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove() |
| 195 | + table.update(null) |
| 196 | + |
| 197 | + // Split table into training and validation parts |
| 198 | + val keys = Array[String]("train.hex", "valid.hex") |
| 199 | + val ratios = Array[Double](0.8) |
| 200 | + val frs = split(table, keys, ratios) |
| 201 | + val (train, valid) = (frs(0), frs(1)) |
| 202 | + table.delete() |
| 203 | + |
| 204 | + // Build final DeepLearning model |
| 205 | + val dlModel = buildDLModel(train, valid)(h2oContext) |
| 206 | + ``` |
| 207 | + |
| 208 | +11. Evaluate model quality: |
| 209 | + ```scala |
| 210 | + // Collect model metrics and evaluate model quality |
| 211 | + import water.app.ModelMetricsSupport |
| 212 | + val trainMetrics = ModelMetricsSupport.binomialMM(dlModel, train) |
| 213 | + val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid) |
| 214 | + println(trainMetrics.auc._auc) |
| 215 | + println(validMetrics.auc._auc) |
| 216 | + ``` |
| 217 | + > You can also open H2O UI and type `getPredictions` to visualize model performance or `getModels` to see model output. |
| 218 | + |
| 219 | +12. Create a spam detector: |
| 220 | + ```scala |
| 221 | + // Spam detector |
| 222 | + def isSpam(msg: String, |
| 223 | + dlModel: DeepLearningModel, |
| 224 | + hashingTF: HashingTF, |
| 225 | + idfModel: IDFModel, |
| 226 | + h2oContext: H2OContext, |
| 227 | + hamThreshold: Double = 0.5):String = { |
| 228 | + val msgRdd = sc.parallelize(Seq(msg)) |
| 229 | + val msgVector: DataFrame = idfModel.transform( |
| 230 | + hashingTF.transform ( |
| 231 | + tokenize (msgRdd))).map(v => SMS("?", v)).toDF |
| 232 | + val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector) |
| 233 | + msgTable.remove(0) // remove first column |
| 234 | + val prediction = dlModel.score(msgTable) |
| 235 | + //println(prediction) |
| 236 | + if (prediction.vecs()(1).at(0) < hamThreshold) "SPAM DETECTED!" else "HAM" |
| 237 | + } |
| 238 | + ``` |
| 239 | + |
| 240 | +13. Try to detect spam: |
| 241 | + ```scala |
| 242 | + isSpam("Michal, h2oworld party tonight in MV?", dlModel, hashingTF, idfModel, h2oContext) |
| 243 | + // |
| 244 | + isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", dlModel, hashingTF, idfModel, h2oContext) |
| 245 | + ``` |
| 246 | + |
| 247 | +14. At this point you finished your 1st Sparkling Water Machine Learning application. Hack and enjoy! Thank you! |
0 commit comments