96SEO 2026-02-20 08:58 0
理解MapReduce编程模型独立完成一个MapReduce程序并运行成功了解MapReduce工程流程掌握并描述出shuffle全过程面试独立编写课堂及作业中的MR程序理解并解决数据倾斜

Hadoop由HDFS分布式存储、MapReduce分布式计算、Yarn资源调度三部分组成
MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
Map阶段切分成一个个小的任务Reduce阶段汇总小任务的结果
比如一复杂、计算量大、耗时长的的任务暂且称为“大任务”此时使用单台服务器无法计算或较短时间内计算出结果时可将此大任务切分成一个个小的任务小任务分别在不同的服务器上并行的执行最终再汇总每个小任务的结果
map阶段有一个关键的map()函数此函数的输入是键值对输出是一系列键值对输出写入本地磁盘。
以MapReduce的词频统计为例统计一批英文文章当中每个单词出现的总次数
Wind”有三个blockblock1、block2、block3MR编程时每个block对应一个分片split每一个split对应一个map任务map
task如图共3个map任务map1、map2、map3这3个任务的逻辑一样所以以第一个map任务map1为例分析map1读取block1的数据一次读取block1的一行数据
产生键值对(key/value)作为map()的参数传入调用map()假设当前所读行是第一行将当前所读行的行首相对于当前block开始处的字节偏移量作为key0当前行的内容作为valueDear
(按需求写业务代码)将value当前行内容按空格切分得到三个单词Dear
1)最终结果写入map任务所在节点的本地磁盘中内里还有细节讲到shuffle时再细细展开block的第一行的数据被处理完后接着处理第二行逻辑同上当map任务将当前block中所有的数据全部处理完后此map任务即运行结束
task的个数由自己写的程序编程指定main()内的job.setNumReduceTasks(4)指定reduce任务是4个reduce1、reduce2、reduce3、reduce4每一个reduce任务的逻辑一样所以以第一个reduce任务reduce1为例分析map1任务完成后reduce1通过网络连接到map1将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端拷贝阶段同样也如此连接到map2、map3获取结果最终reduce1端获得4个(Dear,
)]作为两个参数传入reduce()在reduce()内部计算Dear的总数为4并将(Dear,
4)作为键值对输出每个reduce任务最终输出文件内里还有细节讲到shuffle时再细细展开文件写入到HDFS
①数据中若要针对某个值进行分组、聚合时需将此值作为MR中的reduce的输入的key
如当前的词频统计例子按单词进行分组每组中对出现次数做聚合计算总和所以需要将每个单词作为reduce输入的keyMapReduce框架自动按照单词分组进而求出每组即每个单词的总次数
②另外key还具有可排序的特性因为MR中的key类需要实现WritableComparable接口而此接口又继承Comparable接口可查看源码
使用IDEA创建maven工程pom文件参考提供的pom.xml主要用到的dependencies有
propertiescdh.version2.6.0-cdh5.14.2/cdh.version/propertiesrepositoriesrepositoryidcloudera/idurlhttps://repository.cloudera.com/artifactory/cloudera-repos//url/repository/repositoriesdependenciesdependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.6.0-mr1-cdh5.14.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-core/artifactIdversion${cdh.version}/version/dependency!--
https://mvnrepository.com/artifact/junit/junit
--dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependencydependencygroupIdorg.testng/groupIdartifactIdtestng/artifactIdversionRELEASE/versionscopetest/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/versionscopecompile/scope/dependency/dependencies2.4
创建包com.kaikeba.hadoop.wordcount
在包中创建自定义mapper类、自定义reducer类、包含main类
com.kaikeba.hadoop.wordcount;import
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mapreduce.Mapper;import
map方法的输入的键的类型kin、值的类型vin输出的键的类型kout、输出的值的类型vout*
kin指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型对应序列化类型LongWritable*
vin指的是当前所读行类型是String对应序列化类型Text*
kout根据需求输出键指的是单词类型是String对应序列化类型是Text*
vout根据需求输出值指的是单词的个数1类型是int对应序列化类型是IntWritable**/
处理分片split中的每一行的数据针对每行数据会调用一次map方法*
在一次map方法调用时从一行数据中获得一个个单词word再将每个单词word变成键值对形式(word,
value.toString();//按照\t进行分割得到当前行所有单词String[]
vout包装成对应的可序列化类型如String对应Textint对应IntWritablecontext.write(new
com.kaikeba.hadoop.wordcount;import
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.mapreduce.Reducer;import
reduce方法的输入的键的类型kin、输入值的类型vin输出的键的类型kout、输出的值的类型vout*
注意因为map的输出作为reduce的输入所以此处的kin、vin类型分别与map的输出的键类型、值类型相同*
kout根据需求输出键指的是单词类型是String对应序列化类型是Text*
vout根据需求输出值指的是每个单词的总个数类型是int对应序列化类型是IntWritable**/
task汇聚了众多的键值对有key是hello的键值对也有key是spark的键值对如下*
其中key是hello的键值对被分成一组merge成[hello,
Iterable(1,1,1,1)]调用一次reduce方法*
同样key是spark的键值对被分成一组merge成[spark,
Iterable(1,1,1)]再调用一次reduce方法**
count.get();}//将单词、单词次数分别作为键值对输出context.write(key,
com.kaikeba.hadoop.wordcount;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
另外map\reduce相关的类使用mapreduce包下的是新API如org.apache.hadoop.mapreduce.Job;*/
{//若在IDEA中本地执行MR程序需要将mapred-site.xml中的mapreduce.framework.name值修改成local//参数
IOException,ClassNotFoundException,
{//判断一下输入参数是否是两个分别表示输入路径、输出路径if
Path!);System.exit(0);}Configuration
Configuration();//configuration.set(mapreduce.framework.name,local);//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job
WordCountMain.class.getSimpleName());//设置job的jar包如果参数指定的类包含在一个jar包中则此jar包作为job的jar包
job.setJarByClass(WordCountMain.class);job.setJarByClass(WordCountMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(WordCountMap.class);//设置map
combine类减少网路传出量job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(WordCountReduce.class);//注意如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,
reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);//设置reduce
task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//
提交作业job.waitForCompletion(true);}
}程序运行有两种方式分别是windows本地运行、集群运行依次演示
初次运行WordCountMain先设置main方法参数根据图示操作即可
用maven插件打jar包①点击Maven②双击package打包
将jar包上传到node01用户主目录/home/hadoop下
com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.wordcount.WordCountMain
com.kaikeba.hadoop-1.0-SNAPSHOT.jar是jar包名
com.kaikeba.hadoop.wordcount.WordCountMain是包含main方法的类的全限定名
/NOTICE.txt和/wordcount是main方法的两个参数表示输入路径、输出路径
MR分为两个阶段map阶段、reduce阶段MR输入的文件有几个block就会生成几个map任务MR的reduce任务的个数由程序中编程指定job.setNumReduceTasks(4)map任务
map任务中map()一次读取block的一行数据以kv对的形式输入map()map()的输出作为reduce()的输入
reduce任务通过网络将各执行完成的map任务输出结果中属于自己的数据取过来key相同的键值对作为一组调用一次reduce()reduce任务生成一个结果文件文件写入HDFS
node01是resourcemanager所在节点主机名根据自己的实际情况修改主机名
①点击下拉框②浏览文件系统③输入根目录查看hdfs根路径中的内容
mapreduce在企业中可以用于对海量数据的数据清洗当然随着新一代大数据框架的出现也可以使用spark、flink等框架做数据清洗
现有一批日志文件日志来源于用户使用搜狗搜索引擎搜索新闻并点击查看搜索结果过程但是日志中有一些记录损坏现需要使用MapReduce来将这些损坏记录如记录中少字段、多字段从日志文件中删除此过程就是传说中的数据清洗。
并且在清洗时要统计损坏的记录数。
日志格式每行记录有6个字段分别表示时间datetime、用户ID
userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl
而有些mapreduce应用不需要数据聚合的操作也就是说不需要reduce阶段。
即编程时不需要编写自定义的reducer类在main()中调用job.setNumReduceTasks(0)设置
map方法的逻辑取得每一行数据与每条记录的固定格式比对是否符合
若要集群运行需先将sogou.50w.utf8上传到HDFS根目录
com.kaikeba.hadoop.dataclean;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.Counter;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl*
0bf5778fc7ba35e657ee88b25984c6e9
基本上大部分MR程序的main方法逻辑大同小异将其他MR程序的main方法代码拷贝过来稍做修改即可*
注意若要IDEA中本地运行MR程序需要将resources/mapred-site.xml中的mapreduce.framework.name属性值设置成local*
{//判断一下输入参数是否是两个分别表示输入路径、输出路径if
Path!);System.exit(0);}Configuration
Configuration();//调用getInstance方法生成job实例Job
DataClean.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(DataClean.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(DataCleanMapper.class);//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//注意因为不需要reduce聚合阶段所以需要显示设置reduce
task个数是0job.setNumReduceTasks(0);//
提交作业job.waitForCompletion(true);}/***
注意若自定义的mapper类与main方法在同一个类中需要将自定义mapper类声明成static的*/public
{//为了提高程序的效率避免创建大量短周期的对象出发频繁GC此处生成一个对象共用NullWritable
NullWritable.get();Overrideprotected
context.getCounter(DataCleaning,
damagedRecord);//获得当前行数据//样例数据20111230111645
169796ae819ae8b32668662bb99b6c2d
http://auto.ifeng.com/roll/20111212/729164.shtmlString
value.toString();//将行数据按照记录中字段分隔符切分String[]
line.split(\t);//判断字段数组长度是否为6if(fields.length
{//若不是则不输出并递增自定义计数器counter.increment(1L);}
{//若是6则原样输出context.write(value,
0%job就已经successfully表示此MR程序没有reduce阶段
②DataCleaning是自定义计数器组名damagedRecord是自定义的计数器值为6表示有6条损坏记录
图中part-m-00000中的m表示此文件是由map任务生成
MR可用于数据清洗另外也可以使用Spark、Flink等组件做数据清洗
使用MR编程统计sogou日志数据中每个用户搜索的次数结果写入HDFS
还记得之前提到的MR中key的作用吗MR编程时若要针对某个值对数据进行分组、聚合时如当前的词频统计例子需要将每个单词作为reduce输入的key从而按照单词分组进而求出每组即每个单词的总次数那么此例也是类似的。
统计每个用户的搜索次数将userid放到reduce输入的key的位置对userid进行分组进而统计每个用户的搜索次数
此处MR程序的输入文件是“MapReduce编程数据清洗”中的输出结果文件
com.kaikeba.hadoop.searchcount;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
{//判断一下输入参数是否是两个分别表示输入路径、输出路径if
Path!);System.exit(0);}Configuration
Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job
UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map
combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,
reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);//设置reduce
task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//提交作业job.waitForCompletion(true);}public
IntWritable(1);Overrideprotected
{//获得当前行的数据//样例数据20111230111645
169796ae819ae8b32668662bb99b6c2d
http://auto.ifeng.com/roll/20111212/729164.shtmlString
value.toString();//切分获得各字段组成的数组String[]
line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String
fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut,
IntWritable();Overrideprotected
value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key,
结合本例子的需求设计MR程序因为要统计每个用户的搜索次数所以最终userid作为reduce的输出的keyMR编程能够根据业务需求设计合适的key是一个很重要的能力而这是需要建立在自己地MR框架原理有清晰认识的基础之上的
shuffle主要指的是map端的输出作为reduce端输入的过程
每个map任务都有一个对应的环形内存缓冲区输出是kv对先写入到环形缓冲区默认大小100M当内容占据80%缓冲区空间后由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件在溢出写的过程中map任务可以继续向环形缓冲区写入数据但是若写入速度大于溢出写的速度最终造成100m占满后map任务会暂停向环形缓冲区中写数据的过程只执行溢出写的过程直到环形缓冲区的数据全部溢出写到磁盘才恢复向缓冲区写入后台线程溢写磁盘过程有以下几个步骤
先对每个溢写的kv对做分区分区的个数由MR程序的reduce任务数决定默认使用HashPartitioner计算当前kv对属于哪个分区计算公式(key.hashCode()
numReduceTasks每个分区中根据kv对的key做内存中排序若设置了map端本地聚合combiner则对每个分区中排好序的数据做combine操作若设置了对map输出压缩的功能会对溢写数据压缩
随着不断的向环形缓冲区中写入数据会多次触发溢写每当环形缓冲区写满100m本地磁盘最终会生成多个溢出文件合并溢写文件在map
task完成之前所有溢出文件会被合并成一个大的溢出文件且是已分区、已排序的输出文件小细节
在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销
如果map输出数据比较小先保存在reduce的jvm内存中否则直接写入reduce磁盘
一旦内存缓冲区达到阈值默认0.66或map输出数的阈值默认1000则触发归并merge结果写到本地磁盘
若MR编程指定了combine在归并过程中会执行combine操作
map()输出结果先写入环形缓冲区缓冲区100M写满80M后开始溢出写磁盘文件此过程中会进行分区、排序、combine可选、压缩可选map任务完成前会将多个小的溢出文件合并成一个大的溢出文件已分区、排序
拷贝阶段reduce任务通过http将map任务属于自己的分区数据拉取过来开始merge及溢出写磁盘文件所有map任务的分区全部拷贝过来后进行阶段合并、排序、分组阶段每组数据调用一次reduce()结果写入HDFS
根据之前讲的shuffle我们知道在map任务中从环形缓冲区溢出写磁盘时会先对kv对数据进行分区操作
HashPartitioner关键方法getPartition返回当前键值对的分区索引(partition
}环形缓冲区溢出写磁盘前将每个kv对作为getPartition()的参数传入
先对键值对中的key求hash值int类型与MAX_VALUE按位与再模上reduce
task个数设置为4可在程序中使用job.setNumReduceTasks(4)指定reduce
那么map任务溢出文件有4个分区分区index分别是0、1、2、3getPartition()结果有四种0、1、2、3根据计算结果决定当前kv对落入哪个分区如结果是0则当前kv对落入溢出文件的0分区中最终被相应的reduce
若是MR默认分区器不满足需求可根据业务逻辑设计自定义分区器比如实现图上的功能
代码详见工程com.kaikeba.hadoop.partitioner包
MR读取三个文件part1.txt、part2.txt、part3.txt三个文件放到HDFS目录/customParttitioner中
只有part-r-00001、part-r-00003有数据另外两个没有数据
HashPartitioner将Bear分到index1的分区将Car|Dear|River分到index3分区
自定义分区使得文件中分别以Dear、Bear、River、Car为键的键值对分别落到index是0、1、2、3的分区中
此类实现Partitioner接口在getPartition()中实现分区逻辑
设定reduce个数为4设置自定义的分区类调用job.setPartitionerClass方法
com.kaikeba.hadoop.partitioner;import
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.mapreduce.Partitioner;import
Integer();//定义每个键对应的分区index使用map数据结构完成static{dict.put(Dear,
dict.get(text.toString());return
自定义分区器的类继承Partitioner类覆写getPartition()在方法中定义自己的分区策略在main()方法中调用job.setPartitionerClass()main()中设置reduce任务数
普通的MR是reduce通过http取得map任务的分区结果具体的聚合出结果是在reduce端进行的
下图中的第一个map任务(map1)本地磁盘中的结果有5个键值对(Dear,
1)会被第一个reduce任务(reduce1)通过网络拉取到reduce1端那么假设map1中(Dear,
1)通过网络被reduce1获得然后再在reduce1端汇总这样做map端本地磁盘IO、数据从map端到reduce端传输的网络IO比较大那么想能不能在reduce1从map1拉取1亿个(Dear,
1)之前在map端就提前先做下reduce汇总得到结果(Dear,
100000000)然后再将这个结果一个键值对传输到reduce1呢答案是可以的我们称之为combine操作
此过程会分区、每个分区内按键排序、再combine操作若设置了combine的话、若设置map输出压缩的话则再压缩
在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销
combine本质上也是reduce因为自定义的combine类继承自Reducer父类
reduce函数与combine函数通常是一样的K3与K2类型相同V3与V2类型相同即reduce的输入的kv类型分别与输出的kv类型相同
WordCountMap、WordCountReduce代码保持不变唯一需要做的修改是在WordCountMain中增加job.setCombinerClass(WordCountReduce.class);修改如下
使用combine时首先考虑当前MR是否适合combine总原则是不论使不使用combine不能影响最终的结果在MR时发生数据倾斜且可以使用combine时可以使用combine缓解数据倾斜
作用在MR中为了减少磁盘IO及网络IO可考虑在map端、reduce端设置压缩功能给“MapReduce编程用户搜索次数”代码增加压缩功能
那么如何设置压缩功能呢只需在main方法中给Configuration对象增加如下设置即可
configuration.set(mapreduce.map.output.compress,
//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分
configuration.set(mapreduce.map.output.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);
configuration.set(mapreduce.output.fileoutputformat.compress,
configuration.set(mapreduce.output.fileoutputformat.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);9.3
给“MapReduce编程用户搜索次数”代码增加压缩功能代码如下
com.kaikeba.hadoop.mrcompress;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
{//判断以下输入参数是否是两个分别表示输入路径、输出路径if
Path!);System.exit(0);}Configuration
Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//开启map输出进行压缩的功能configuration.set(mapreduce.map.output.compress,
true);//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分configuration.set(mapreduce.map.output.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);//开启job输出压缩功能configuration.set(mapreduce.output.fileoutputformat.compress,
true);//指定job输出使用的压缩算法configuration.set(mapreduce.output.fileoutputformat.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);//调用getInstance方法生成job实例Job
UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
FileOutputFormat.setCompressOutput(job,
FileOutputFormat.setOutputCompressorClass(job,
BZip2Codec.class);//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map
combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,
reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);//设置reduce
task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//
提交作业job.waitForCompletion(true);}public
IntWritable(1);Overrideprotected
{//获得当前行的数据//样例数据20111230111645
169796ae819ae8b32668662bb99b6c2d
http://auto.ifeng.com/roll/20111212/729164.shtmlString
value.toString();//切分获得各字段组成的数组String[]
line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String
fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut,
IntWritable();Overrideprotected
value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key,
com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.mrcompress.UserSearchCount
MR过程中使用压缩可减少数据量进而减少磁盘IO、网络IO数据量可设置map端输出的压缩可设置job最终结果的压缩通过相应的配置项即可实现
上图也描述了mapreduce的一个完整的过程我们主要看map任务是如何从hdfs读取分片数据的部分
InputFormat输入格式类将输入文件分成一个个分片InputSplit每个Map任务对应一个split分片
③RecordReader记录读取器类createRecordReader()
RecordReader记录读取器读取分片数据一行记录生成一个键值对传入map任务的map()方法调用map()
客户端调用InputFormat的**getSplits()**方法获得输入文件的分片信息
master根据分片信息尽量将map任务尽量调度在split分片数据所在节点移动计算不移动数据
每个map任务将split分片传递给createRecordReader()方法生成此分片对应的RecordReader
nextKeyValue()判断是否有下一个键值对如果有返回true否则返回false如果返回true调用getCurrentKey()获得当前的键调用getCurrentValue()获得当前的值
首先运行一次setup()方法只在map任务启动时运行一次一些初始化的工作可以在setup方法中完成如要连接数据库之类的操作
while循环调用context.nextKeyValue()会委托给RecordRecord的nextKeyValue()判断是否有下一个键值对
如果有下一个键值对调用context.getCurrentKey()、context.getCurrentValue()获得当前的键、值的值也是调用RecordReader的同名方法
当读取分片尾context.nextKeyValue()返回false退出循环
调用cleanup()方法只在map任务结束之前调用一次所以一些回收资源的工作可在此方法中实现如关闭数据库连接
无论hdfs还是mapreduce处理小文件都有损效率实践中又难免面临处理大量小文件的场景此时就需要有相应解决方案
在数据采集的时候就将小文件或小批数据合成大文件再上传HDFS(SequenceFile方案)在业务处理之前在HDFS上使用mapreduce程序对小文件进行合并可使用自定义InputFormat实现在mapreduce处理时可采用CombineFileInputFormat提高效率
com.kaikeba.hadoop.inputformat;import
org.apache.hadoop.io.BytesWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.InputSplit;
org.apache.hadoop.mapreduce.JobContext;
org.apache.hadoop.mapreduce.RecordReader;
org.apache.hadoop.mapreduce.TaskAttemptContext;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import
值值用于保存小文件的内容此处使用BytesWritable*/
InterruptedException*/Overridepublic
IOException,InterruptedException
{//使用自定义的RecordReader类WholeFileRecordReader
WholeFileRecordReader();//初始化RecordReaderreader.initialize(split,
com.kaikeba.hadoop.inputformat;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.fs.FSDataInputStream;
org.apache.hadoop.fs.FileSystem;
org.apache.hadoop.io.BytesWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.InputSplit;
org.apache.hadoop.mapreduce.RecordReader;
org.apache.hadoop.mapreduce.TaskAttemptContext;
org.apache.hadoop.mapreduce.lib.input.FileSplit;import
通过nextKeyValue()方法去读取数据构造将返回的key
getCurrentValue来返回上面构造好的key和value**
标识变量分片是否已被读取过因为小文件设置成了不可切分所以一个小文件只有一个分片*
InterruptedException*/Overridepublic
context.getConfiguration();}/***
InterruptedException*/Overridepublic
file.getFileSystem(conf);FSDataInputStream
fs.open(file);IOUtils.readFully(in,
contents.length);value.set(contents,
{IOUtils.closeStream(in);}processed
InterruptedException*/Overridepublic
IOException,InterruptedException
InterruptedException*/Overridepublic
IOException,InterruptedException
获得分片读取的百分比因为如果读取分片数据的话会一次性的读取完所以进度要么是1要么是0*
{//因为一个文件作为一个整体处理所以如果processed为true表示已经处理过了进度为1否则为0return
com.kaikeba.hadoop.inputformat;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.conf.Configured;
org.apache.hadoop.io.BytesWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.InputSplit;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.lib.input.FileSplit;
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
org.apache.hadoop.util.ToolRunner;import
在main()中调用ToolRunner.run()方法第一个参数是当前对象第二个参数是输入、输出*/
mapper类的输入键值对类型与自定义InputFormat的输入键值对保持一致*
mapper类的输出的键值对类型分别是文件名、文件内容*/static
InterruptedException*/Overrideprotected
IOException,InterruptedException
context.getInputSplit();//获得当前文件路径Path
Text(path.toString());}Overrideprotected
sequencefile);job.setJarByClass(SmallFiles2SequenceFile.class);//设置自定义输入格式job.setInputFormatClass(WholeFileInputFormat.class);WholeFileInputFormat.addInputPath(job,new
Path(args[0]));//设置输出格式SequenceFileOutputFormat及输出路径job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new
Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return
SmallFiles2SequenceFile(),args);System.exit(exitCode);}
需要自定义InputFormat类并覆写getRecordReader()方法自定义RecordReader类实现方法
initialize()nextKeyValue()getCurrentKey()getCurrentValue()getProgress()close()
现在有一些订单的评论数据要将订单的好评与其它级别的评论中评、差评进行区分开来将最终的数据分开到不同的文件夹下面去
程序的关键点是在一个mapreduce程序中根据数据的不同(好评的评级不同)输出两类结果到不同目录这类灵活的输出需求通过自定义OutputFormat来实现
在mapreduce中访问外部资源自定义OutputFormat类覆写getRecordWriter()方法自定义RecordWriter类覆写具体输出数据的方法write()
com.kaikeba.hadoop.outputformat;import
org.apache.hadoop.fs.FSDataOutputStream;
org.apache.hadoop.fs.FileSystem;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.RecordWriter;
org.apache.hadoop.mapreduce.TaskAttemptContext;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
本例使用框架默认的Reducer它将Mapper输入的kv对原样输出所以reduce输出的kv类型分别是Text,
自定义OutputFormat的类泛型表示reduce输出的键值对类型要保持一致;*
map--(kv)--reduce--(kv)--OutputFormat*/
根据实际情况修改path;node01及端口号8020*/String
hdfs://node01:8020/outputformat/bad/r.txt;String
hdfs://node01:8020/outputformat/good/r.txt;/****
InterruptedException*/Overridepublic
getRecordWriter(TaskAttemptContext
FileSystem.get(context.getConfiguration());//两个输出文件路径Path
fs.create(badPath);FSDataOutputStream
MyRecordWriter(badOut,goodOut);}/***
泛型表示reduce输出的键值对类型要保持一致*/static
NullWritable{FSDataOutputStream
MyRecordWriter(FSDataOutputStream
InterruptedException*/Overridepublic
(key.toString().split(\t)[9].equals(0)){//好评goodOut.write(key.toString().getBytes());goodOut.write(\r\n.getBytes());}else{//其它评级badOut.write(key.toString().getBytes());badOut.write(\r\n.getBytes());}}/***
InterruptedException*/Overridepublic
!null){goodOut.close();}if(badOut
com.kaikeba.hadoop.outputformat;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.conf.Configured;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
org.apache.hadoop.util.ToolRunner;import
MyOwnOutputFormatMain.class.getSimpleName());job.setJarByClass(MyOwnOutputFormatMain.class);//默认项可以省略或者写出也可以//job.setInputFormatClass(TextInputFormat.class);//设置输入文件TextInputFormat.addInputPath(job,
Path(args[0]));job.setMapperClass(MyMapper.class);//job.setMapOutputKeyClass(Text.class);//job.setMapOutputValueClass(NullWritable.class);//设置自定义的输出类job.setOutputFormatClass(MyOutPutFormat.class);//设置一个输出会输出一个success的成功标志的文件MyOutPutFormat.setOutputPath(job,
Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//默认项即默认有一个reduce任务所以可以省略
job.setReducerClass(Reducer.class);boolean
job.waitForCompletion(true);return
输出的value为null对应NullWritable*/public
{//把当前行内容作为key输出value为nullcontext.write(value,
Configuration();ToolRunner.run(configuration,
泛型与reduce输出的键值对类型保持一致覆写getRecordWriter()方法
泛型与reduce输出的键值对类型保持一致覆写具体输出数据的方法write()、close()
job.setOutputFormatClass使用自定义在输出类
有些MR的输出的key可以直接使用hadoop框架的可序列化可比较类型表示如Text、IntWritable等等而这些类型本身是可比较的如IntWritable默认升序排序
但有时使用MR编程输出的key若使用hadoop自带的key类型无法满足需求
此时需要自定义的key类型包含的是非单一信息如此例包含工资、年龄并且也得是**可序列化、可比较的**
实现按照人的salary降序排序若相同则再按age升序排序若salary、age相同则放入同一组
com.kaikeba.hadoop.secondarysort;import
org.apache.hadoop.io.WritableComparable;import
java.io.IOException;//根据输入文件格式定义JavaBean作为MR时Map的输出key类型要求此类可序列化、可比较
this.name;}//两个Person对象的比较规则①先比较salary高的排序在前②若相同age小的在前public
{//若两个人工资不同//工资降序排序即工资高的排在前边return
{//若工资相同//年龄升序排序即年龄小的排在前边return
other.age;}}//序列化将NewKey转化成使用流传送的二进制public
{//注意①使用正确的write方法②记住此时的序列化的顺序name、age、salarydataOutput.writeUTF(name);dataOutput.writeInt(age);dataOutput.writeInt(salary);}//使用in读字段的顺序要与write方法中写的顺序保持一致name、age、salarypublic
string//注意①使用正确的read方法②读取顺序与write()中序列化的顺序保持一致this.name
dataInput.readInt();this.salary
com.kaikeba.hadoop.secondarysort;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.fs.FileSystem;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
Configuration();//configuration.set(mapreduce.job.jar,/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);Job
SecondarySort.class.getSimpleName());FileSystem
FileSystem.get(URI.create(args[1]),
true);}FileInputFormat.setInputPaths(job,
Path(args[0]));job.setMapperClass(MyMap.class);//由于mapper与reducer输出的kv类型分别相同所以下两行可以省略
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(NullWritable.class);//设置reduce的个数;默认为1//job.setNumReduceTasks(1);job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileOutputFormat.setOutputPath(job,
Path(args[1]));job.waitForCompletion(true);}//MyMap的输出key用自定义的Person类型输出的value为nullpublic
value.toString().split(\t);String
Integer.parseInt(fields[1]);int
Integer.parseInt(fields[2]);//在自定义类中进行比较Person
salary);//person对象作为输出的keycontext.write(person,
{//输入的kv对原样输出context.write(key,
如果MR时key的排序规则比较复杂比如需要根据字段1排序若字段1相同则需要根据字段2排序…此时可以使用自定义key实现将自定义的key作为MR中map输出的key的类型reduce输入的类型自定义的key
实现WritableComparable接口实现compareTo比较方法实现write序列化方法实现readFields反序列化方法
userid、datetime、title商品标题、unitPrice商品单价、purchaseNum购买量、productId商品ID
现使用MR编程求出每个用户、每个月消费金额最多的两笔订单花了多少钱
实现WritableComparable接口包含6个字段分别对应文件中的6个字段重点实现compareTo方法
先比较userid是否相等若不相等则userid升序排序若相等比较两个Bean的日期是否相等若不相等则日期升序排序若相等再比较总开销降序排序
实现序列化方法write()实现反序列化方法readFields()
继承Partitioner类getPartiton()实现userid相同的处于同一个分区
输出key是当前记录对应的Bean对象输出的value对应当前订单的总开销
决定userid相同、日期年月相同的记录分到同一组中调用一次reduce()
job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass
com.kaikeba.hadoop.grouping;import
org.apache.hadoop.io.WritableComparable;import
java.io.IOException;//实现WritableComparable接口
{//OrderBean作为MR中的key如果对象中的userid相同即ret1为0就表示两个对象是同一个用户int
this.userid.compareTo(other.userid);if
thisYearMonth.compareTo(otherYearMonth);if(ret2
{//若datetime相同//如果userid、年月都相同比较单笔订单的总开销Double
this.getPurchaseNum()*this.getUnitPrice();Double
other.getPurchaseNum()*other.getUnitPrice();//总花销降序排序即总花销高的排在前边return
-thisTotalPrice.compareTo(oTotalPrice);}
{//若datatime不同按照datetime升序排序return
{dataOutput.writeUTF(userid);dataOutput.writeUTF(datetime);dataOutput.writeUTF(title);dataOutput.writeDouble(unitPrice);dataOutput.writeInt(purchaseNum);dataOutput.writeUTF(produceId);}/***
dataInput.readUTF();this.datetime
dataInput.readUTF();this.unitPrice
dataInput.readDouble();this.purchaseNum
dataInput.readInt();this.produceId
另外一个方案此处不覆写hashCode方法而是自定义分区器getPartition方法中对OrderBean的userid求hashCode值%reduce任务数*
com.kaikeba.hadoop.grouping;import
org.apache.hadoop.io.DoubleWritable;
org.apache.hadoop.mapreduce.Partitioner;//mapper的输出key类型是自定义的key类型OrderBean输出value类型是单笔订单的总开销double
(orderBean.getUserid().hashCode()
com.kaikeba.hadoop.grouping;import
org.apache.hadoop.io.DoubleWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mapreduce.Mapper;import
record.split(\t);if(fields.length
dateUtils.getYearMonthString(datetime);String
Double.parseDouble(fields[3]);int
Integer.parseInt(fields[4]);String
fields[5];//生成OrderBean对象OrderBean
purchaseNum;valueOut.set(totalPrice);context.write(orderBean,
com.kaikeba.hadoop.grouping;import
org.apache.hadoop.io.DoubleWritable;
org.apache.hadoop.io.NullWritable;
org.apache.hadoop.mapreduce.Reducer;import
java.io.IOException;//输出的key为userid拼接上年月的字符串对应Text输出的value对应单笔订单的金额
①由于自定义分组逻辑相同用户、相同年月的订单是一组调用一次reduce()*
②由于自定义的key类OrderBean中比较规则compareTo规定相同用户、相同年月的订单按总金额降序排序*
InterruptedException*/Overrideprotected
key.getDatetime();context.write(new
com.kaikeba.hadoop.grouping;import
org.apache.hadoop.io.WritableComparable;
org.apache.hadoop.io.WritableComparator;//自定义分组类reduce端调用reduce()前对数据做分组每组数据调用一次reduce()
bOrderBean.getUserid();//userid、年、月相同的作为一组int
aUserId.compareTo(bUserId);if(ret1
aOrderBean.getDatetime().compareTo(bOrderBean.getDatetime());}
com.kaikeba.hadoop.grouping;import
com.kaikeba.hadoop.wordcount.WordCountMain;
com.kaikeba.hadoop.wordcount.WordCountMap;
com.kaikeba.hadoop.wordcount.WordCountReduce;
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.conf.Configured;
org.apache.hadoop.io.DoubleWritable;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.util.ToolRunner;import
args);System.exit(exitCode);}Overridepublic
{//判断以下输入参数是否是两个分别表示输入路径、输出路径if
Path!);System.exit(0);}Configuration
Configuration();//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job
CustomGroupingMain.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(CustomGroupingMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(MyMapper.class);//设置map
combine类减少网路传出量//job.setCombinerClass(MyReducer.class);job.setPartitionerClass(MyPartitioner.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(MyReducer.class);job.setGroupingComparatorClass(MyGroup.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,
reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(DoubleWritable.class);//设置reduce
task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);//
com.kaikeba.hadoop.grouping;import
java.time.format.DateTimeFormatter;public
System.out.println(str1.compareTo(str2));//test2
System.out.println(getYearMonthString(datetime));}public
DateTimeFormatter.ofPattern(yyyy-MM-dd
localDateTime;}//日期格式转换工具类将2014-12-14
DateTimeFormatter.ofPattern(yyyy-MM-dd
localDateTime.getMonthValue();return
实现其中的compareTo方法设置key的比较逻辑实现序列化方法write()实现反序列化方法readFields()
自定义mapper类、reducer类自定义partition类getPartition方法决定哪些key落入哪些分区自定义group分组类决定reduce阶段哪些kv对落入同一组调用一次reduce()写main方法设置自定义的类
job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass
数据中不可避免地会出现离群值outlier并导致数据倾斜。
这些离群值会显著地拖慢MapReduce的执行。
数据频率倾斜——某一个区域的数据量要远远大于其他区域。
比如某一个key对应的键值对远远大于其他键的键值对。
数据大小倾斜——部分记录的大小远远大于平均值。
在map端的数据倾斜可以考虑使用combine在reduce端的数据倾斜常常来源于MapReduce的默认分区器
数据倾斜会导致map和reduce的任务执行时间大为延长也会让需要缓存数据集的操作消耗更多的内存资源
发现倾斜数据之后有必要诊断造成数据倾斜的那些键。
有一个简便方法就是在代码里实现追踪每个键的最大值。
为了减少追踪量可以设置数据量阀值只追踪那些数据量大于阀值的键并输出到日志中。
实现代码如下运行作业后就可以从日志中判断发生倾斜的键以及倾斜程度跟踪倾斜数据是了解数据的重要一步也是设计MapReduce作业的重要基础
com.kaikeba.hadoop.dataskew;import
org.apache.hadoop.io.IntWritable;import
org.apache.hadoop.io.Text;import
org.apache.hadoop.mapreduce.Reducer;import
Logger.getLogger(WordCountReduce.class);Overrideprotected
{//一个键达到多少后会做数据倾斜记录maxValueThreshold
count.get();i;}//如果当前键超过10000个则打印日志if(i
Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况即部分输出键的数据量远远大于其它的输出键
例如如果map输出键的单词来源于一本书。
其中大部分必然是省略词stopword。
那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。
而将其他的都发送给剩余的reduce实例。
使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。
combine的目的就是聚合并精简数据。
Hadoop默认的分区器是HashPartitioner基于map输出键的哈希值分区。
这仅在数据分布比较均匀时比较好。
在有数据倾斜时就很有问题。
使用分区器需要首先了解数据的特性。
TotalOrderPartitioner中可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。
因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。
在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响乃至导致OutOfMemoryError异常。
处理这种情况并不容易。
可以参考以下方法。
设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。
RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。
默认长度没有上限。
数据气象站气象数据来源美国国家气候数据中心NCDC1901-2001年数据每年一个文件
设置一个分区即一个reduce任务在一个reduce中对结果进行排序失去了MR框架并行计算的优势
自定义分区人为指定各温度区间的记录落入哪个分区如分区温度边界值分别是-15、0、20共4个分区但由于对整个数据集的气温分布不了解可能某些分区的数据量大其它的分区小数据倾斜
通过对键空间采样只查看一小部分键获得键的近似分布好温度的近似分布进而据此结果创建分区实现尽可能的均匀的划分数据集Hadoop内置了采样器InputSampler
一、先将数据按气温对天气数据集排序。
结果存储为sequencefile文件气温作为输出键数据行作为输出值
com.kaikeba.hadoop.totalorder;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import
温度作为SequenceFile的key记录作为value*/
IntWritable();Overrideprotected
{//002902907099999190101010600464333023450FM-12000599999V0202701N015919999999N0000001N9-0078199999102001ADDGF108991999999999999999999parser.parse(value);if
{//是否是有效的记录temperature.set(parser.getAirTemperature());context.write(temperature,
SortDataPreprocessor.class.getSimpleName());job.setJarByClass(SortDataPreprocessor.class);//FileInputFormat.addInputPath(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));job.setMapperClass(CleanerMapper.class);//最终输出的键、值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);//reduce个数为0job.setNumReduceTasks(0);//以sequencefile的格式输出job.setOutputFormatClass(SequenceFileOutputFormat.class);//开启job输出压缩功能//方案一conf.set(mapreduce.output.fileoutputformat.compress,
true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);//方案二//设置sequencefile的压缩、压缩算法、sequencefile文件压缩格式block//SequenceFileOutputFormat.setCompressOutput(job,
true);//SequenceFileOutputFormat.setOutputCompressorClass(job,
GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressorClass(job,
SnappyCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);System.exit(job.waitForCompletion(true)
com.kaikeba.hadoop.totalorder;import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.filecache.DistributedCache;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
org.apache.hadoop.mapreduce.lib.partition.InputSampler;
org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import
使用TotalOrderPartitioner全局排序一个SequenceFile文件的内容*
SortByTemperatureUsingTotalOrderPartitioner{/***
第一个参数是SortDataPreprocessor的输出文件*/public
SortByTemperatureUsingTotalOrderPartitioner.class.getSimpleName());job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);FileInputFormat.addInputPath(job,
Path(args[0]));FileOutputFormat.setOutputPath(job,
Path(args[1]));//输入文件是SequenceFilejob.setInputFormatClass(SequenceFileInputFormat.class);//Hadoop提供的方法来实现全局排序要求Mapper的输入、输出的key必须保持类型一致job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//分区器全局排序分区器job.setPartitionerClass(TotalOrderPartitioner.class);//分了3个区且分区i-1中的key小于i分区中所有的键job.setNumReduceTasks(3);/***
只要numSamples和maxSplitSampled第二、第三参数任一条件满足则停止采样*/InputSampler.SamplerIntWritable,
InputSampler.RandomSamplerIntWritable,
TotalOrderPartitioner.setPartitionFile();/***
由TotalOrderPartitioner读取作为全排序的分区依据让每个分区中的数据量近似*/InputSampler.writePartitionFile(job,
sampler);//根据上边的SequenceFile文件包含键的近似分布情况创建分区String
TotalOrderPartitioner.getPartitionFile(job.getConfiguration());URI
JobConf();//与所有map任务共享此文件添加到分布式缓存中DistributedCache.addCacheFile(partitionUri,
job.addCacheFile(partitionUri);//方案一输出的文件RECORD级别使用BZip2Codec进行压缩conf.set(mapreduce.output.fileoutputformat.compress,
true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec,
org.apache.hadoop.io.compress.BZip2Codec);//方案二//SequenceFileOutputFormat.setCompressOutput(job,
true);//SequenceFileOutputFormat.setOutputCompressorClass(job,
GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);System.exit(job.waitForCompletion(true)
先使用InputSampler.Sampler采样器对整个key空间进行采样得到key的近似分布
使用TotalOrderPartitioner利用上边的key分布情况文件进行分区每个分区的数据量近似从而防止数据倾斜
描述MR的shuffle全流程面试谈谈什么是数据倾斜什么情况会造成数据倾斜面试对MR数据倾斜如何解决面试补充图解》》》》》》》》》》
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback