Spark二次排序学习总结

Posted by luoxuehuan on August 17, 2016

1. 二次排序

Spark二次排序,即组装一个新的key并在这个key里实现排序接口所定义的方法。

例如一组数据:(点击次数,下单次数,支付次数) A:(30,35,40) B:(35,35,40) C:(30,38,40) D:(35,35,45)

需要分别对点击次数,下单次数,支付次数做比较。比较完35【点击次数】相等,则要对【下单次数】二次比较,若【下单次数】还是相等,则要对【支付次数再次比较】直到返回正确比较结果。

二次排序即需要自定义key 以及比较方法并返回比较结果。

2.Java代码实现如下:


import java.io.Serializable;

import scala.math.Ordered;

/**
 * 品类二次排序key
 * 
 * 封装你要排序的那几个字段
 * 
 * 实现ordered接口要求的几个方法
 * 
 * 跟其他key相比,如何来判定大于,大于等于,小于,小于等于
 * 
 * 必须实现序列化接口(否则会报错)
 * 
 * @author lxh
 *
 */
public class CategorySortKey implements Ordered<CategorySortKey> ,Serializable{

	private long clickCount;
	private long orderCount;
	private long payCount;
	
	

	
	
	public CategorySortKey(long clickCount, long orderCount, long payCount) {
		super();
		this.clickCount = clickCount;
		this.orderCount = orderCount;
		this.payCount = payCount;
	}


	/**
	 * 大于
	 */
	@Override
	public boolean $greater(CategorySortKey other) {
		
		if(clickCount > other.getClickCount()){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount>other.getOrderCount()){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount == other.getOrderCount() &&
				payCount > other.getPayCount()){
			return true;
		}
		return false;
	}

	
	/**
	 * 大于等于
	 */
	@Override
	public boolean $greater$eq(CategorySortKey other) {
		if($greater(other)){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount == other.getOrderCount() &&
				payCount == other.getPayCount()){
			return true;
		}
		return false;
	}

	/**
	 * 小于
	 */
	@Override
	public boolean $less(CategorySortKey other) {
		if(clickCount < other.getClickCount()){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount < other.getOrderCount()){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount == other.getOrderCount() &&
				payCount < other.getPayCount()){
			return true;
		}
		return false;
	}

	/**
	 * 小于等于
	 */
	@Override
	public boolean $less$eq(CategorySortKey other) {
		if($less(other)){
			return true;
		}else if(clickCount == other.getClickCount() &&
				orderCount == other.getOrderCount() &&
				payCount == other.getPayCount()){
			return true;
		}
		return false;
	}

	@Override
	public int compare(CategorySortKey other) {
		if(clickCount-other.getClickCount()!=0){
			return (int)(clickCount-other.getClickCount());
		}else if(orderCount - other.getOrderCount()!=0){
			return (int)(orderCount - other.getOrderCount());
		}else if(payCount - other.getPayCount()!=0){
			return (int)(payCount - other.getPayCount());
		}
		return 0;
	}

	@Override
	public int compareTo(CategorySortKey other) {
		if(clickCount-other.getClickCount()!=0){
			return (int)(clickCount-other.getClickCount());
		}else if(orderCount - other.getOrderCount()!=0){
			return (int)(orderCount - other.getOrderCount());
		}else if(payCount - other.getPayCount()!=0){
			return (int)(payCount - other.getPayCount());
		}
		return 0;
	}
	
	
	
	
	public final long getClickCount() {
		return clickCount;
	}

	public final void setClickCount(long clickCount) {
		this.clickCount = clickCount;
	}

	public final long getOrderCount() {
		return orderCount;
	}

	public final void setOrderCount(long orderCount) {
		this.orderCount = orderCount;
	}

	public final long getPayCount() {
		return payCount;
	}

	public final void setPayCount(long payCount) {
		this.payCount = payCount;
	}

}

3.Scala代码实现:

/**
  * 自定义的key
  * 
 * @author lxh
  *
 */
class SessionSortKey(val clickCount:Int, val orderCount:Int,val payCount:Int) extends Ordered[SessionSortKey] with Serializable{


  def compare(other:SessionSortKey):Int = {
    if(this.clickCount - other.clickCount != 0){
      this.clickCount - other.clickCount
    }else if(this.orderCount - other.orderCount!= 0){
      this.orderCount - other.orderCount
    }else if( this.payCount - other.payCount != 0){
      this.payCount - other.payCount
    }else{
      /**
        * 一定要有else!!!
        */
      0
    }
  }
}

Scala测试类:

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by lxh on 2016/8/17.
  */
object SessionSortKeyTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("sortKeyTest").setMaster("local")

    val sc = new SparkContext(conf)

    val ar = Array(new Tuple2(new SessionSortKey(30,35,40),"1"),
    new Tuple2(new SessionSortKey(35,35,40),"2"),
    new Tuple2(new SessionSortKey(30,38,40),"3"))


    val rdd = sc.parallelize(ar,10)
    val sortedRdd = rdd.sortByKey(false)

    for(tuple <- sortedRdd.collect()){
      println(tuple._2)
    }

    /**
      * 输出结果
      * 2
      * 3
      * 1
      * 正确
      */


  }

}