/* * Advanced Analytics with Spark by Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills (O'Reilly). Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills. [ISBN]. */ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ //read data and pre-processing val rawData = sc.textFile("kddcup.data") rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println) val labelsAndData = rawData.map { line => val buffer = line.split(',').toBuffer buffer.remove(1, 3) val label = buffer.remove(buffer.length - 1) val vector = Vectors.dense(buffer.map(_.toDouble).toArray) (label, vector)} val data = labelsAndData.values.cache() // perform cluster analysis val numClusters = 20 val numIterations = 5 val model = KMeans.train(data,numClusters,numIterations) // print cluster centers model.clusterCenters.foreach(println) // apply cluster model val clusterLabelCount = labelsAndData.map { case (label, datum) => val cluster = model.predict(datum) (cluster, label)}.countByValue() clusterLabelCount.toSeq.sorted.foreach { case ((cluster, label), count) => println(f"$cluster%1s$label%18s$count%8s") } // write sample and total result in a directory hadoop style val sample = data.map(datum => model.predict(datum) + "," +datum.toArray.mkString(",")).sample(false,0.05) val total = data.map(datum => model.predict(datum) + "," +datum.toArray.mkString(",")) sample.saveAsTextFile("sample") sample.coalesce(1).saveAsTextFile("sample_coalesce") //choose k def distance(a: Vector, b: Vector) = math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum) def distToCentroid(datum: Vector, model: KMeansModel) = { val cluster = model.predict(datum) val centroid = model.clusterCenters(cluster) distance(centroid, datum) } def clusteringScore(data: RDD[Vector], k: Int): Double = { val kmeans = new KMeans() //kmeans.setK(k) //kmeans.setRuns(10) kmeans.setEpsilon(1.0e-6) val model = kmeans.run(data) data.map(datum => distToCentroid(datum, model)).mean() } (5 to 50 by 5).map(k => (k, clusteringScore(data, k))). foreach(println) // Entropy (check result when a label is available (not real case), low is better) def entropy(counts: Iterable[Int]) = { val values = counts.filter(_ > 0) val n: Double = values.sum values.map { v => val p = v / n -p * math.log(p) }.sum } val kmeans = new KMeans() val model = KMeans.train(data,150,numIterations) // Predict cluster for each datum val labelsAndClusters = labelsAndData.mapValues(model.predict) // Swap keys / values val clustersAndLabels = labelsAndClusters.map(_.swap) // Extract collections of labels, per cluster val labelsInCluster = clustersAndLabels.groupByKey().values // Count labels in collections val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size)) // Average entropy weighted by cluster size val n = labelsAndData.count() labelCounts.map(m => m.sum * entropy(m)).sum / n