大数据培训课程Reduce Join案例实操
创始人
2024-02-06 10:47:19
0

Reduce Join案例实操

1.需求

表4-4 订单数据表t_order

idpidamount
1001011
1002022
1003033
1004014
1005025
1006036

表4-5 商品信息表t_product

pidpname
01小米
02华为
03格力

       将商品信息表中数据根据商品pid合并到订单数据表中。

表4-6 最终数据形式

idpnameamount
1001小米1
1004小米4
1002华为2
1005华为5
1003格力3
1006格力6

2.需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如图4-20所示。

图4-20 Reduce端表合并

3.代码实现

1)创建商品和订合并后的Bean类

package com.atguigu.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;   public class TableBean implements Writable {      private String order_id; // 订单id    private String p_id;      // 产品id    private int amount;       // 产品数量    private String pname;     // 产品名称    private String flag;      // 表的标记      public TableBean() {       super();    }      public TableBean(String order_id, String p_id, int amount, String pname, String flag) {         super();         this.order_id = order_id;       this.p_id = p_id;       this.amount = amount;       this.pname = pname;       this.flag = flag;    }      public String getFlag() {       return flag;    }      public void setFlag(String flag) {       this.flag = flag;    }      public String getOrder_id() {       return order_id;    }      public void setOrder_id(String order_id) {       this.order_id = order_id;    }      public String getP_id() {       return p_id;    }      public void setP_id(String p_id) {       this.p_id = p_id;    }      public int getAmount() {       return amount;    }      public void setAmount(int amount) {       this.amount = amount;    }      public String getPname() {       return pname;    }      public void setPname(String pname) {       this.pname = pname;    }      @Override    public void write(DataOutput out) throws IOException {       out.writeUTF(order_id);       out.writeUTF(p_id);       out.writeInt(amount);       out.writeUTF(pname);       out.writeUTF(flag);    }      @Override    public void readFields(DataInput in) throws IOException {       this.order_id = in.readUTF();       this.p_id = in.readUTF();       this.amount = in.readInt();       this.pname = in.readUTF();       this.flag = in.readUTF();    }      @Override    public String toString() {       return order_id + “\t” + pname + “\t” + amount + “\t” ;    } }

2)编写TableMapper类

package com.atguigu.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;   public class TableMapper extends Mapper{   String name;    TableBean bean = new TableBean();    Text k = new Text();       @Override    protected void setup(Context context) throws IOException, InterruptedException {         // 1 获取输入文件切片       FileSplit split = (FileSplit) context.getInputSplit();         // 2 获取输入文件名称       name = split.getPath().getName();    }      @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             // 1 获取输入数据       String line = value.toString();             // 2 不同文件分别处理       if (name.startsWith(“order”)) {// 订单表处理             // 2.1 切割           String[] fields = line.split(“\t”);                     // 2.2 封装bean对象           bean.setOrder_id(fields[0]);           bean.setP_id(fields[1]);           bean.setAmount(Integer.parseInt(fields[2]));           bean.setPname(“”);           bean.setFlag(“order”);                     k.set(fields[1]);       }else {// 产品表处理             // 2.3 切割           String[] fields = line.split(“\t”);                     // 2.4 封装bean对象           bean.setP_id(fields[0]);           bean.setPname(fields[1]);           bean.setFlag(“pd”);           bean.setAmount(0);           bean.setOrder_id(“”);                     k.set(fields[0]);       }         // 3 写出       context.write(k, bean);    } }

3)编写TableReducer类

package com.atguigu.mapreduce.table; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class TableReducer extends Reducer {      @Override    protected void reduce(Text key, Iterable values, Context context)   throws IOException, InterruptedException {         // 1准备存储订单的集合       ArrayList orderBeans = new ArrayList<>();       // 2 准备bean对象       TableBean pdBean = new TableBean();         for (TableBean bean : values) {             if (“order”.equals(bean.getFlag())) {// 订单表                // 拷贝传递过来的每条订单数据到集合中              TableBean orderBean = new TableBean();                try {                 BeanUtils.copyProperties(orderBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }                orderBeans.add(orderBean);           } else {// 产品表                try {                 // 拷贝传递过来的产品表到内存中                 BeanUtils.copyProperties(pdBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }           }       }         // 3 表的拼接       for(TableBean bean:orderBeans){             bean.setPname (pdBean.getPname());                     // 4 数据写出去           context.write(bean, NullWritable.get());       }    } }

4)编写TableDriver类

package com.atguigu.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class TableDriver {      public static void main(String[] args) throws Exception {       // 0 根据自己电脑路径重新配置 args = new String[]{“e:/input/inputtable”,”e:/output1″};   // 1 获取配置信息,或者job对象实例       Configuration configuration = new Configuration();       Job job = Job.getInstance(configuration);         // 2 指定本程序的jar包所在的本地路径       job.setJarByClass(TableDriver.class);         // 3 指定本业务job要使用的Mapper/Reducer业务类       job.setMapperClass(TableMapper.class);       job.setReducerClass(TableReducer.class);         // 4 指定Mapper输出数据的kv类型       job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(TableBean.class);         // 5 指定最终输出的数据的kv类型       job.setOutputKeyClass(TableBean.class);       job.setOutputValueClass(NullWritable.class);         // 6 指定job的输入原始文件所在目录       FileInputFormat.setInputPaths(job, new Path(args[0]));       FileOutputFormat.setOutputPath(job, new Path(args[1]));         // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行       boolean result = job.waitForCompletion(true);       System.exit(result ? 0 : 1);    } }

4.测试

运行程序查看结果

1001   小米   1  1001   小米   1  1002   华为   2  1002   华为   2  1003   格力   3  1003   格力   3    

5.总结

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...