{"id":15533,"date":"2014-09-15T17:08:23","date_gmt":"2014-09-15T11:38:23","guid":{"rendered":"http:\/\/www.tothenew.com\/blog\/?p=15533"},"modified":"2016-11-29T16:55:22","modified_gmt":"2016-11-29T11:25:22","slug":"how-to-use-groupby-and-join-in-apache-spark","status":"publish","type":"post","link":"https:\/\/www.tothenew.com\/blog\/how-to-use-groupby-and-join-in-apache-spark\/","title":{"rendered":"Usage of GroupBy and Join in Apache Spark"},"content":{"rendered":"<p>Using GroupBy and JOIN is often very challenging.\u00a0Recently in one of the POCs of <a title=\"MEAN web development\" href=\"http:\/\/www.tothenew.com\/mean-stack-web-development-consulting\" target=\"_blank\">MEAN project<\/a>, I used groupBy and join in apache spark.<\/p>\n<p>I had two\u00a0datasets in hdfs,\u00a0one for the sales and other for the product.<\/p>\n<p>Sales Datasets column : Sales Id, Version, Brand Name, Product Id, No of Item Purchased, Purchased Date<\/p>\n<p>Product Datasets columns : Product Id, Version, Brand Name, Category, Price, Product Name, Weight<\/p>\n<p>I wanted to calculate the total sales by year. So for this, I applied\u00a0the join between the 2 datasets i.e. sales and product on the basis of the productId. I used\u00a0groupBy on the joined\u00a0datasets on the basis of year to calculate the Total Sales by year.<\/p>\n<p>For this, you need to create a <a href=\"http:\/\/www.tothenew.com\/blog\/overview-of-database-testing\/\">database<\/a> named as sales, and import the sales.sql file into mysql<\/p>\n<p>You need to use sqoop to import the data into hdfs.<\/p>\n<p>Command : sqoop import-all-tables &#8211;connect jdbc:mysql:\/\/localhost\/sales &#8211;username root &#8211;warehouse-dir \/user\/data\/input-data\/user\/<\/p>\n<p>For sale: hdfs location \u00a0: \/user\/data\/input-data\/user\/sale<\/p>\n<p>For Products: hdfs location : \/user\/data\/input-data\/user\/product<\/p>\n<p>Then you have to use following code for join and groupBy.<\/p>\n<p>[java]package com.spark.test;<\/p>\n<p>import org.apache.spark.api.java.JavaPairRDD;<br \/>\nimport org.apache.spark.api.java.JavaRDD;<br \/>\nimport org.apache.spark.api.java.JavaSparkContext;<br \/>\nimport org.apache.spark.api.java.function.*;<br \/>\nimport scala.Tuple2;<\/p>\n<p>import java.text.SimpleDateFormat;<br \/>\nimport java.util.ArrayList;<br \/>\nimport java.util.Calendar;<br \/>\nimport java.util.Date;<br \/>\nimport java.util.List;<br \/>\nimport java.util.regex.Pattern;<\/p>\n<p>\/**<br \/>\n * Created by mohit on 3\/9\/14.<br \/>\n *\/<br \/>\npublic class TotalSales {<br \/>\n    private static final Pattern SPACE = Pattern.compile(&quot; &quot;);<\/p>\n<p>    public static void main(String args[]) {<br \/>\n        JavaSparkContext ctx = new JavaSparkContext(&quot;local[*]&quot;, &quot;TotalSales&quot;, System.getenv(&quot;SPARK_HOME&quot;), JavaSparkContext.jarOfClass(TotalSales.class));<br \/>\n        final Calendar c = Calendar.getInstance();<\/p>\n<p>        JavaPairRDD&lt;String, Product&gt; productJavaPairRDD = fetchProductData(ctx);<br \/>\n        JavaPairRDD&lt;String, Sale&gt; saleJavaPairRDD = fetchSalesData(ctx);<br \/>\n        JavaPairRDD&lt;String, Tuple2&lt;Product, Sale&gt;&gt; joinData = productJavaPairRDD.join(saleJavaPairRDD);<br \/>\n        JavaRDD productSaleMap = fetchFlatMap(joinData);<br \/>\n        JavaPairRDD&lt;Object, Iterable&gt; groupMap = productSaleMap.groupBy(new Function&lt;ProductSale, Object&gt;() {<br \/>\n            @Override<br \/>\n            public Object call(ProductSale productSale) throws Exception {<br \/>\n                c.setTime(productSale.getSale().getPurchaseDate());<br \/>\n                return c.get(Calendar.YEAR);<br \/>\n            }<br \/>\n        });<\/p>\n<p>        JavaPairRDD&lt;Object, Long&gt; totalSaleData = groupMap.mapValues(new Function&lt;Iterable, Long&gt;() {<br \/>\n            @Override<br \/>\n            public Long call(Iterable productSales) throws Exception {<br \/>\n                Long sumData = 0L;<br \/>\n                for (ProductSale productSale : productSales) {<br \/>\n                    sumData = sumData + (productSale.getProduct().getPrice() * productSale.getSale().getItemPurchased());<br \/>\n                }<br \/>\n                return sumData;<br \/>\n            }<br \/>\n        });<\/p>\n<p>        List&lt;Tuple2&lt;Object, Long&gt;&gt; collectData = totalSaleData.sortByKey().collect();<br \/>\n        System.out.println(&quot;Collect DAta:::::&quot;+collectData);<\/p>\n<p>        ctx.stop();<br \/>\n    }<\/p>\n<p>    static JavaRDD fetchFlatMap(JavaPairRDD&lt;String, Tuple2&lt;Product, Sale&gt;&gt; joinData) {<br \/>\n        JavaRDD productSaleMap = joinData.flatMap(new FlatMapFunction&lt;Tuple2&lt;String, Tuple2&lt;Product, Sale&gt;&gt;, ProductSale&gt;() {<br \/>\n            @Override<br \/>\n            public Iterable call(Tuple2&lt;String, Tuple2&lt;Product, Sale&gt;&gt; tuple) throws Exception {<br \/>\n                ProductSale productSale = new ProductSale();<br \/>\n                productSale.setProductId(tuple._1());<br \/>\n                productSale.setSale(tuple._2()._2());<br \/>\n                productSale.setProduct(tuple._2()._1());<br \/>\n                List productSaleList = new ArrayList();<br \/>\n                productSaleList.add(productSale);<br \/>\n                return productSaleList;<br \/>\n            }<br \/>\n        });<br \/>\n        return productSaleMap;<br \/>\n    }<\/p>\n<p>    static JavaPairRDD&lt;String, Product&gt; fetchProductData(JavaSparkContext ctx) {<\/p>\n<p>        JavaRDD lines = ctx.textFile(&quot;hdfs:\/\/localhost:9000\/user\/data\/input-data\/user\/product\/part-*&quot;, 1);<\/p>\n<p>        JavaRDD&lt;String[]&gt; splitMap = lines.map(new Function&lt;String, String[]&gt;() {<br \/>\n            @Override<br \/>\n            public String[] call(String s) throws Exception {<br \/>\n                return s.split(&quot;\\t&quot;);<br \/>\n            }<br \/>\n        });<\/p>\n<p>        JavaPairRDD&lt;String, Product&gt; mapKey = splitMap.mapToPair(new PairFunction&lt;String[], String, Product&gt;() {<br \/>\n            @Override<br \/>\n            public Tuple2&lt;String, Product&gt; call(String[] strings) throws Exception {<br \/>\n                String[] dataArray = strings[0].split(&quot;,&quot;);<br \/>\n                Product product = new Product();<br \/>\n                product.setProductId(Long.getLong(dataArray[0]));<br \/>\n                product.setBrandName(dataArray[2]);<br \/>\n                product.setCategory(dataArray[3]);<br \/>\n                product.setPrice(Integer.parseInt(dataArray[4]));<br \/>\n                product.setProductName(dataArray[5]);<br \/>\n                product.setWeight(dataArray[6]);<br \/>\n                return new Tuple2&lt;String, Product&gt;(dataArray[0], product);<br \/>\n            }<br \/>\n        });<br \/>\n        return mapKey;<br \/>\n    }<\/p>\n<p>    static JavaPairRDD&lt;String, Sale&gt; fetchSalesData(JavaSparkContext ctx) {<br \/>\n        JavaRDD salesLines = ctx.textFile(&quot;hdfs:\/\/localhost:9000\/user\/data\/input-data\/user\/sale\/part-*&quot;, 1);<\/p>\n<p>        JavaRDD&lt;String[]&gt; salesLineMap = salesLines.map(new Function&lt;String, String[]&gt;() {<br \/>\n            @Override<br \/>\n            public String[] call(String s) throws Exception {<br \/>\n                return s.split(&quot;\\t&quot;);<br \/>\n            }<br \/>\n        });<\/p>\n<p>        JavaPairRDD&lt;String, Sale&gt; salesMapKey = salesLineMap.mapToPair(new PairFunction&lt;String[], String, Sale&gt;() {<br \/>\n            @Override<br \/>\n            public Tuple2&lt;String, Sale&gt; call(String[] strings) throws Exception {<br \/>\n                String[] dataArray = strings[0].split(&quot;,&quot;);<br \/>\n                String date_s = dataArray[5];<br \/>\n                SimpleDateFormat dt = new SimpleDateFormat(&quot;yyyyy-mm-dd hh:mm:ss&quot;);<br \/>\n                Date date = dt.parse(date_s);<br \/>\n                Sale product = new Sale();<br \/>\n                product.setProductId(Long.getLong(dataArray[4]));<br \/>\n                product.setBrandName(dataArray[2]);<br \/>\n                product.setItemPurchased(Long.parseLong(dataArray[3]));<br \/>\n                product.setPurchaseDate(dt.parse(date_s));<br \/>\n                return new Tuple2&lt;String, Sale&gt;(dataArray[4], product);<br \/>\n            }<br \/>\n        });<br \/>\n        return salesMapKey;<br \/>\n    }<br \/>\n}<br \/>\n[\/java]<\/p>\n<p>This line is\u00a0used for the join the product and sale data on the basis of productId:<\/p>\n<p>[java]<br \/>\n JavaPairRDD&lt;String, Tuple2&lt;Product, Sale&gt;&gt; joinData = productJavaPairRDD.join(saleJavaPairRDD);<br \/>\n[\/java]<\/p>\n<p>For GroupBy:<\/p>\n<p>[java]<br \/>\nJavaPairRDD&lt;Object, Iterable&gt; groupMap = productSaleMap.groupBy(new Function&lt;ProductSale, Object&gt;() {<br \/>\n            @Override<br \/>\n            public Object call(ProductSale productSale) throws Exception {<br \/>\n                c.setTime(productSale.getSale().getPurchaseDate());<br \/>\n                return c.get(Calendar.YEAR);<br \/>\n            }<br \/>\n        });<br \/>\n[\/java]<\/p>\n<p>For complete source code:<\/p>\n<p>github location : \u00a0git@github.com:IntelliGrape\/bigdata-poc.git<\/p>\n<p>Hope this will give you a\u00a0better idea about &#8220;join and group by&#8221; in Apache Spark.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Using GroupBy and JOIN is often very challenging.\u00a0Recently in one of the POCs of MEAN project, I used groupBy and join in apache spark. I had two\u00a0datasets in hdfs,\u00a0one for the sales and other for the product. Sales Datasets column : Sales Id, Version, Brand Name, Product Id, No of Item Purchased, Purchased Date Product [&hellip;]<\/p>\n","protected":false},"author":47,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"iawp_total_views":8},"categories":[1395],"tags":[1515,1398,1516,1517],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/15533"}],"collection":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/users\/47"}],"replies":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/comments?post=15533"}],"version-history":[{"count":0,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/15533\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/media?parent=15533"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/categories?post=15533"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/tags?post=15533"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}