本文共 4287 字,大约阅读时间需要 14 分钟。
1.使用场景
Map Join适用于一张表十分小、一张表很大的场景。2.优点
3.具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。// 缓存普通文件到Task运行节点。job.addCacheFile(new URI("file://e:/cache/pd.txt"));
package 第三章_MR框架原理.多种join应用;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;public class DistributedCacheMapper extends Mapper{ Text k = new Text(); // 创建集合存储拆分的数据信息 HashMap pdMap = new HashMap (); /** * 重写setUp方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { // 1.缓存小表 // 1.1获取要缓存的文件路径 // 1.1.1 首先获取缓存文件 URI[] cacheFiles = context.getCacheFiles(); // 1.1.2 通过缓存文件获取其路径 String path = cacheFiles[0].getPath().toString(); // 1.2 读取文件信息 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())){ //pid pname //01 小米 // 1.3 拆分 String[] fields = line.split("\t"); // 1.4 添加到集合中,将pid作为key,通过pid获取pname,后续写到order表中 pdMap.put(fields[0],fields[1]); } // 1.5 关闭资源 IOUtils.closeStream(reader); } /** * 在map阶段进行join操作 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // id pid amount // 1001 01 1 // pid pname // 01 小米 // 1.获取一行数据 String line = value.toString(); // 2.拆分 String[] fields = line.split("\t"); // 3.获取pid,到对应的pdMap中找出pname写入 String pid = fields[1]; String pname = (String) pdMap.get(pid); // 4.拼接 line = line + "\t" + pname; k.set(line); // 5.写出 context.write(k,NullWritable.get()); }}
package 第三章_MR框架原理.多种join应用;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DistributedCacheDriver { public static void main(String[] args) { Configuration configuration = new Configuration(); Job job = null; try { // 1 获取job信息 job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联map job.setMapperClass(DistributedCacheMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\data\\order.txt")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\cacheoutput")); // 6 加载缓存数据表 job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/第三章_MR框架原理/多种join应用/data/pd.txt")); // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } }}
转载地址:http://hueq.baihongyu.com/