Usage of GroupBy and Join in Apache Spark

15 / Sep / 2014 by Mohit Garg 0 comments

Using GroupBy and JOIN is often very challenging. Recently in one of the POCs of MEAN project, I used groupBy and join in apache spark.

I had two datasets in hdfs, one for the sales and other for the product.

Sales Datasets column : Sales Id, Version, Brand Name, Product Id, No of Item Purchased, Purchased Date

Product Datasets columns : Product Id, Version, Brand Name, Category, Price, Product Name, Weight

I wanted to calculate the total sales by year. So for this, I applied the join between the 2 datasets i.e. sales and product on the basis of the productId. I used groupBy on the joined datasets on the basis of year to calculate the Total Sales by year.

For this, you need to create a database named as sales, and import the sales.sql file into mysql

You need to use sqoop to import the data into hdfs.

Command : sqoop import-all-tables –connect jdbc:mysql://localhost/sales –username root –warehouse-dir /user/data/input-data/user/

For sale: hdfs location  : /user/data/input-data/user/sale

For Products: hdfs location : /user/data/input-data/user/product

Then you have to use following code for join and groupBy.

package com.spark.test;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;

/**
 * Created by mohit on 3/9/14.
 */
public class TotalSales {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String args[]) {
        JavaSparkContext ctx = new JavaSparkContext("local[*]", "TotalSales", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(TotalSales.class));
        final Calendar c = Calendar.getInstance();

        JavaPairRDD<String, Product> productJavaPairRDD = fetchProductData(ctx);
        JavaPairRDD<String, Sale> saleJavaPairRDD = fetchSalesData(ctx);
        JavaPairRDD<String, Tuple2<Product, Sale>> joinData = productJavaPairRDD.join(saleJavaPairRDD);
        JavaRDD productSaleMap = fetchFlatMap(joinData);
        JavaPairRDD<Object, Iterable> groupMap = productSaleMap.groupBy(new Function<ProductSale, Object>() {
            @Override
            public Object call(ProductSale productSale) throws Exception {
                c.setTime(productSale.getSale().getPurchaseDate());
                return c.get(Calendar.YEAR);
            }
        });

        JavaPairRDD<Object, Long> totalSaleData = groupMap.mapValues(new Function<Iterable, Long>() {
            @Override
            public Long call(Iterable productSales) throws Exception {
                Long sumData = 0L;
                for (ProductSale productSale : productSales) {
                    sumData = sumData + (productSale.getProduct().getPrice() * productSale.getSale().getItemPurchased());
                }
                return sumData;
            }
        });

        List<Tuple2<Object, Long>> collectData = totalSaleData.sortByKey().collect();
        System.out.println("Collect DAta:::::"+collectData);

        ctx.stop();
    }

    static JavaRDD fetchFlatMap(JavaPairRDD<String, Tuple2<Product, Sale>> joinData) {
        JavaRDD productSaleMap = joinData.flatMap(new FlatMapFunction<Tuple2<String, Tuple2<Product, Sale>>, ProductSale>() {
            @Override
            public Iterable call(Tuple2<String, Tuple2<Product, Sale>> tuple) throws Exception {
                ProductSale productSale = new ProductSale();
                productSale.setProductId(tuple._1());
                productSale.setSale(tuple._2()._2());
                productSale.setProduct(tuple._2()._1());
                List productSaleList = new ArrayList();
                productSaleList.add(productSale);
                return productSaleList;
            }
        });
        return productSaleMap;
    }

    static JavaPairRDD<String, Product> fetchProductData(JavaSparkContext ctx) {

        JavaRDD lines = ctx.textFile("hdfs://localhost:9000/user/data/input-data/user/product/part-*", 1);

        JavaRDD<String[]> splitMap = lines.map(new Function<String, String[]>() {
            @Override
            public String[] call(String s) throws Exception {
                return s.split("\t");
            }
        });

        JavaPairRDD<String, Product> mapKey = splitMap.mapToPair(new PairFunction<String[], String, Product>() {
            @Override
            public Tuple2<String, Product> call(String[] strings) throws Exception {
                String[] dataArray = strings[0].split(",");
                Product product = new Product();
                product.setProductId(Long.getLong(dataArray[0]));
                product.setBrandName(dataArray[2]);
                product.setCategory(dataArray[3]);
                product.setPrice(Integer.parseInt(dataArray[4]));
                product.setProductName(dataArray[5]);
                product.setWeight(dataArray[6]);
                return new Tuple2<String, Product>(dataArray[0], product);
            }
        });
        return mapKey;
    }

    static JavaPairRDD<String, Sale> fetchSalesData(JavaSparkContext ctx) {
        JavaRDD salesLines = ctx.textFile("hdfs://localhost:9000/user/data/input-data/user/sale/part-*", 1);

        JavaRDD<String[]> salesLineMap = salesLines.map(new Function<String, String[]>() {
            @Override
            public String[] call(String s) throws Exception {
                return s.split("\t");
            }
        });

        JavaPairRDD<String, Sale> salesMapKey = salesLineMap.mapToPair(new PairFunction<String[], String, Sale>() {
            @Override
            public Tuple2<String, Sale> call(String[] strings) throws Exception {
                String[] dataArray = strings[0].split(",");
                String date_s = dataArray[5];
                SimpleDateFormat dt = new SimpleDateFormat("yyyyy-mm-dd hh:mm:ss");
                Date date = dt.parse(date_s);
                Sale product = new Sale();
                product.setProductId(Long.getLong(dataArray[4]));
                product.setBrandName(dataArray[2]);
                product.setItemPurchased(Long.parseLong(dataArray[3]));
                product.setPurchaseDate(dt.parse(date_s));
                return new Tuple2<String, Sale>(dataArray[4], product);
            }
        });
        return salesMapKey;
    }
}

This line is used for the join the product and sale data on the basis of productId:

 JavaPairRDD<String, Tuple2<Product, Sale>> joinData = productJavaPairRDD.join(saleJavaPairRDD);

For GroupBy:

JavaPairRDD<Object, Iterable> groupMap = productSaleMap.groupBy(new Function<ProductSale, Object>() {
            @Override
            public Object call(ProductSale productSale) throws Exception {
                c.setTime(productSale.getSale().getPurchaseDate());
                return c.get(Calendar.YEAR);
            }
        });

For complete source code:

github location :  git@github.com:IntelliGrape/bigdata-poc.git

Hope this will give you a better idea about “join and group by” in Apache Spark.

FOUND THIS USEFUL? SHARE IT

Leave a comment -