本文共 8771 字,大约阅读时间需要 29 分钟。
需求:
对于有复杂排序条件的需求,可以利用自定义排序来实现,同时可以使用多种方案实现自定义排序需求。
对指定的数据(字段分别为:名称 年龄 颜值,数据以空格分割),按照指定的要求排序,排序要求为:根据颜值降序,如果颜值相同,再按照年龄升序排序。
示例数据:
"pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"
1.2 方案一:利用类或者样例类来封装数据
把数据封装成类或者case class,然后类继承Ordered[类型] ,然后可以自定义排序规则。
如果是class,需要实现序列化特质,Serializable,如果是case class,可以不实现该序列化特质。
这种处理方式,返回值类型是类的实例对象。
普通类:
objectSortDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是Person val prdd: RDD[Person] = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt new Person(name, age, fv) }) // sortBy val sortedrd:RDD[Person] = prdd.sortBy(t => t) sortedrd.foreach(println) } } class Person(val name: String, val age: Int, val fv: Int) extends Serializable withOrdered[Person] { override def compare(that: Person): Int = { // 根据颜值降序 如果颜值相同 再按照年龄的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" }
样例类:
objectSortDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是Person val prdd: RDD[Person] = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt Person(name, age, fv) }) // sortBy val sortedrd:RDD[Person] = prdd.sortBy(t => t) sortedrd.foreach(println) } } case class Person(val name: String, val age: Int, val fv: Int) extends Ordered[Person] { override def compare(that: Person): Int = { // 根据颜值降序 如果颜值相同 再按照年龄的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" }
1.3 方案二:利用class或者case class指定排序规则
对原始数据不进行封装,仅仅在排序的时候,利用class或者case class指定排序的规则。
如果使用类,需要继承Ordered[类型],实现序列化特质,
如果使用case class,不需实现序列化特质。
返回值的结果类型:还是原来的数据类型。和类本身无关。仅仅是利用类的规则来实现了排序。
objectSortDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是 元组 val prdd = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt (name, age, fv) }) // sortBy 在排序的时候 指定使用类的规则 利用已经存在的来来指定排序规则 // val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person2(t._1,t._2,t._3)) val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => Person2(t._1,t._2,t._3)) sortedrd.foreach(println) } } /* class Person2(val name: String, val age: Int, val fv: Int) extends Serializable with Ordered[Person2] { override def compare(that: Person2): Int = { // 根据颜值降序 如果颜值相同 再按照年龄的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" } */ case class Person2(val name: String, val age: Int, val fv: Int) extends Ordered[Person2] { override def compare(that: Person2): Int = { // 根据颜值降序 如果颜值相同 再按照年龄的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" }
1.4 方案三:利用隐式转换
利用隐式转换时,类可以不实现Ordered的特质,普通的类或者普通的样例类即可。
隐式转换支持,隐式方法,隐式函数,隐式的object和隐式的变量,
如果都同时存在,优先使用隐式的object,隐式方法和隐式函数中,会优先使用隐式函数。
隐式转换可以写在任意地方(当前对象中,外部的类中,外部的对象中),如果写在外部,需要导入到当前的对象中即可。
objectSortDemo3 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是 元组 val prdd = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt (name, age, fv) }) // 如果类没有继承 Ordered 特质 // 可以利用隐式转换 隐式方法 隐式函数 隐式值 隐式object都可以 implicit ord: Ordering[K] implicit def ordMethod(p: Person3): Ordered[Person3] = new Ordered[Person3] { override def compare(that: Person3): Int = { if (p.fv == that.fv) { -(p.age - that.age) } else { that.fv - p.fv } } } implicit val ordFunc = (p: Person3) => new Ordered[Person3] { override def compare(that: Person3): Int = { if (p.fv == that.fv) { -(p.age - that.age) } else { that.fv - p.fv } } } // 隐式的Object 优先级更高 implicit object ord extends Ordering[Person3] { override def compare(x: Person3, y: Person3): Int = { if (x.fv == y.fv) { x.age - y.age } else { y.fv - x.fv } } } // 隐式的变量 /* implicit val ord2: Ordering[Person3] = new Ordering[Person3] { override def compare(x: Person3, y: Person3): Int = { // 颜值相同 降序 if (x.fv == y.fv) { - (x.age - y.age) } else { y.fv - x.fv } } }*/ // 如果把隐式转换写在其他的object中,就使用import a._ 如果是写在其他的类中,val obj = new 类() import obj._ val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person3(t._1, t._2, t._3)) sortedrd.foreach(println) } } class Person3(val name: String, val age: Int, val fv: Int) extends Serializable { override def toString: String = s"$name,$age,$fv" }
在隐式转换中,Ordered和Ordering都是可以相互转换的。
1.5 方案四:利用Ordering的on方法
无需借助任何的类或者对象
只需要利用Ordering特质的on方法即可。
object SortDemo4 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是 元组 val prdd = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt (name, age, fv) }) /** t=>(-t._3,t._2) 具体的排序的规则 * Ordering[T].on[U](f) * U (String,Int,Int) 原始的数据类型 * T (Int,Int) 具体的函数的返回值的类型 */ implicit val obj = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2)) val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => t) sortedrd.foreach(println) } }
1.6 方案五:利用元组封装排序条件
最简单的实现方案,直接利用元组来封装要排序的条件,默认升序,降序使用-号即可
object SortDemo5 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 获得的数据类型是 元组 val prdd = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt (name, age, fv) }) // 利用元组直接封装要排序的条件 val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => (-t._3,-t._2)) sortedrd.foreach(println) } }
转载地址:http://nykni.baihongyu.com/