三表相连 mapjoin
创始人
2024-05-24 02:30:55
0

三表相连 mapjoin

  • 要求
    • 输出的样式
    • 三张表
      • score.csv
      • student.csv
      • subject.csv
  • 创建三个类
  • StudentSc
    • getset方法
    • 实现类
  • MapJoinDriver
    • 用mapjoin不需要reduce
  • MapJoinMapper
  • 运行结果

要求

输出的样式

在这里插入图片描述

三张表

在这里插入图片描述

score.csv

在这里插入图片描述

student.csv

在这里插入图片描述

subject.csv

在这里插入图片描述

创建三个类

在这里插入图片描述

StudentSc

在这里插入图片描述

getset方法

插入getset方法,可用javabean插件一键生成

实现类

 public StudentSc(String stuName, String subName, Integer scScore, String flag) {this.stuName = stuName;this.subName = subName;this.scScore = scScore;}@Overridepublic int compareTo(nj.zb.kb21.demo5.StudentScore o) {return 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(stuName);dataOutput.writeUTF(subName);dataOutput.writeInt(scScore);}

MapJoinDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MapJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(StudentSc.class);job.setMapOutputValueClass(NullWritable.class);Path inPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\score.csv");FileInputFormat.setInputPaths(job,inPath);Path outPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\out7");FileSystem fs = FileSystem.get(outPath.toUri(), configuration);if (fs.exists(outPath)){fs.delete(outPath,true);}FileOutputFormat.setOutputPath(job,outPath);//设置Reduce阶段的任务数量job.setNumReduceTasks(0);//配置Map阶段的缓存,尽量使用小文件做缓存,如果文件太大,会引起OOM(内存溢出)Path cachePath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\student.csv");job.addCacheFile(cachePath.toUri());Path cachePath2 = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\subject.csv");job.addCacheFile(cachePath2.toUri());boolean result = job.waitForCompletion(true);System.out.println(result);}
}

用mapjoin不需要reduce

MapJoinMapper

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoinMapper extends Mapper {Map studentScMap = new HashMap();Map studentScMap2 = new HashMap();@Overrideprotected void setup(Mapper.Context context) throws IOException, InterruptedException {URI[] cacheFiles = context.getCacheFiles();for (URI uri : cacheFiles) {String currentFileName = new Path(uri).getName();if (currentFileName.startsWith("student")) {String path = uri.getPath();BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while ((line = br.readLine()) != null) {String[] fields = line.split(",");StudentSc studentSc = new StudentSc(fields[1],"",0,"");studentScMap.put(Integer.parseInt(fields[0]), studentSc);
> 这里按照要求将student的名字添加到studentScMap表中}br.close();}if (currentFileName.startsWith("subject")) {String path = uri.getPath();BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while ((line = br.readLine()) != null) {String[] fields = line.split(",");StudentSc studentSc = new StudentSc("",fields[1],0,"");studentScMap2.put(Integer.parseInt(fields[0]), studentSc);
>这里按照要求将subject的科目名字添加到studentScMap2表中}br.close();}}}@Overrideprotected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {String[] scFields = value.toString().split(",");//这个集合读取的是driver中的inpath的表 scoreStudentSc currentStudent = studentScMap.get(Integer.parseInt(scFields[0]));StudentSc currentStudent2 = studentScMap2.get(Integer.parseInt(scFields[1]));StudentSc studentScs = new StudentSc();studentScs.setStuName(currentStudent.getStuName());studentScs.setFlag("0");//flag不重要,是我上一个项目多写的,懒得删studentScs.setSubName(currentStudent2.getSubName());studentScs.setScScore(Integer.parseInt(scFields[2]));context.write(studentScs, NullWritable.get());}
}

运行结果

在这里插入图片描述

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...