本章节采用IDEA本地开发,新建一个sbt工程,调用LPA接口对开源数据进行转换,生成社区数据。
name := "LPA_Example" version := "0.1" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.3.2" withSources(); libraryDependencies += "org.apache.spark" % "spark-graphx_2.11" % "2.3.2";
将以下代码复制到LPA.scala文件中。
import org.apache.spark.graphx.lib.LabelPropagation import org.apache.spark.graphx.{Graph, PartitionStrategy} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object LPA { def readFile(sc: SparkContext, inputPath: String, split: String, partitionNum: Int): RDD[(Long, Long)] = { sc.textFile(inputPath, partitionNum) .flatMap(line => { if (line.startsWith("#")) Iterator.empty else { val x = line.split(split) if (x(0).toLong > x(1).toLong) { Iterator((x(1).toLong, x(0).toLong)) } else { Iterator((x(0).toLong, x(1).toLong)) } } }).distinct() } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("LPA").setMaster("yarn") val sc = new SparkContext(sparkConf) var inputPath = " " var split = " " var outputPath = " " var partitionNum = 1 if (args.length == 4) { inputPath = args(0) split = args(1) outputPath = args(2) partitionNum = args(3).toInt println("InputPath: " + inputPath + "\noutputPath: " + outputPath + "\npartitionNum: " + partitionNum) } else { println("Usage: <inputPath:String> " + "<split:String> <outPath:string> <partitionNum:Int>") sc.stop() sys.exit(-1) } val inputRdd = readFile(sc, inputPath, split, partitionNum) val graph = Graph.fromEdgeTuples(inputRdd, 0).partitionBy(PartitionStrategy.EdgePartition2D) val result = LabelPropagation.run(graph, 10).vertices.repartition(partitionNum) println("start to write file to hdfs") result.saveAsTextFile(outputPath) } }
文件目录结构如下。