1. 二次排序
Spark二次排序,即组装一个新的key并在这个key里实现排序接口所定义的方法。
例如一组数据:(点击次数,下单次数,支付次数) A:(30,35,40) B:(35,35,40) C:(30,38,40) D:(35,35,45)
需要分别对点击次数,下单次数,支付次数做比较。比较完35【点击次数】相等,则要对【下单次数】二次比较,若【下单次数】还是相等,则要对【支付次数再次比较】直到返回正确比较结果。
二次排序即需要自定义key 以及比较方法并返回比较结果。
2.Java代码实现如下:
import java.io.Serializable;
import scala.math.Ordered;
/**
* 品类二次排序key
*
* 封装你要排序的那几个字段
*
* 实现ordered接口要求的几个方法
*
* 跟其他key相比,如何来判定大于,大于等于,小于,小于等于
*
* 必须实现序列化接口(否则会报错)
*
* @author lxh
*
*/
public class CategorySortKey implements Ordered<CategorySortKey> ,Serializable{
private long clickCount;
private long orderCount;
private long payCount;
public CategorySortKey(long clickCount, long orderCount, long payCount) {
super();
this.clickCount = clickCount;
this.orderCount = orderCount;
this.payCount = payCount;
}
/**
* 大于
*/
@Override
public boolean $greater(CategorySortKey other) {
if(clickCount > other.getClickCount()){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount>other.getOrderCount()){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount > other.getPayCount()){
return true;
}
return false;
}
/**
* 大于等于
*/
@Override
public boolean $greater$eq(CategorySortKey other) {
if($greater(other)){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount == other.getPayCount()){
return true;
}
return false;
}
/**
* 小于
*/
@Override
public boolean $less(CategorySortKey other) {
if(clickCount < other.getClickCount()){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount < other.getOrderCount()){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount < other.getPayCount()){
return true;
}
return false;
}
/**
* 小于等于
*/
@Override
public boolean $less$eq(CategorySortKey other) {
if($less(other)){
return true;
}else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount == other.getPayCount()){
return true;
}
return false;
}
@Override
public int compare(CategorySortKey other) {
if(clickCount-other.getClickCount()!=0){
return (int)(clickCount-other.getClickCount());
}else if(orderCount - other.getOrderCount()!=0){
return (int)(orderCount - other.getOrderCount());
}else if(payCount - other.getPayCount()!=0){
return (int)(payCount - other.getPayCount());
}
return 0;
}
@Override
public int compareTo(CategorySortKey other) {
if(clickCount-other.getClickCount()!=0){
return (int)(clickCount-other.getClickCount());
}else if(orderCount - other.getOrderCount()!=0){
return (int)(orderCount - other.getOrderCount());
}else if(payCount - other.getPayCount()!=0){
return (int)(payCount - other.getPayCount());
}
return 0;
}
public final long getClickCount() {
return clickCount;
}
public final void setClickCount(long clickCount) {
this.clickCount = clickCount;
}
public final long getOrderCount() {
return orderCount;
}
public final void setOrderCount(long orderCount) {
this.orderCount = orderCount;
}
public final long getPayCount() {
return payCount;
}
public final void setPayCount(long payCount) {
this.payCount = payCount;
}
}
3.Scala代码实现:
/**
* 自定义的key
*
* @author lxh
*
*/
class SessionSortKey(val clickCount:Int, val orderCount:Int,val payCount:Int) extends Ordered[SessionSortKey] with Serializable{
def compare(other:SessionSortKey):Int = {
if(this.clickCount - other.clickCount != 0){
this.clickCount - other.clickCount
}else if(this.orderCount - other.orderCount!= 0){
this.orderCount - other.orderCount
}else if( this.payCount - other.payCount != 0){
this.payCount - other.payCount
}else{
/**
* 一定要有else!!!
*/
0
}
}
}
Scala测试类:
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by lxh on 2016/8/17.
*/
object SessionSortKeyTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sortKeyTest").setMaster("local")
val sc = new SparkContext(conf)
val ar = Array(new Tuple2(new SessionSortKey(30,35,40),"1"),
new Tuple2(new SessionSortKey(35,35,40),"2"),
new Tuple2(new SessionSortKey(30,38,40),"3"))
val rdd = sc.parallelize(ar,10)
val sortedRdd = rdd.sortByKey(false)
for(tuple <- sortedRdd.collect()){
println(tuple._2)
}
/**
* 输出结果
* 2
* 3
* 1
* 正确
*/
}
}