SEO教程

SEO教程

Products

当前位置:首页 > SEO教程 >

如何找到提供网站模板后台网架服务的公司?

96SEO 2026-02-20 08:58 0


理解MapReduce编程模型独立完成一个MapReduce程序并运行成功了解MapReduce工程流程掌握并描述出shuffle全过程面试独立编写课堂及作业中的MR程序理解并解决数据倾斜

如何找到提供网站模板后台网架服务的公司?

MapReduce编程模型

Hadoop由HDFS分布式存储、MapReduce分布式计算、Yarn资源调度三部分组成

MapReduce是采用一种分而治之的思想设计出来的分布式计算框架

MapReduce由两个阶段组成

Map阶段切分成一个个小的任务Reduce阶段汇总小任务的结果

比如一复杂、计算量大、耗时长的的任务暂且称为“大任务”此时使用单台服务器无法计算或较短时间内计算出结果时可将此大任务切分成一个个小的任务小任务分别在不同的服务器上并行的执行最终再汇总每个小任务的结果

1.1

map阶段有一个关键的map()函数此函数的输入是键值对输出是一系列键值对输出写入本地磁盘。

1.2

以MapReduce的词频统计为例统计一批英文文章当中每个单词出现的总次数

2.1

Wind”有三个blockblock1、block2、block3MR编程时每个block对应一个分片split每一个split对应一个map任务map

task如图共3个map任务map1、map2、map3这3个任务的逻辑一样所以以第一个map任务map1为例分析map1读取block1的数据一次读取block1的一行数据

产生键值对(key/value)作为map()的参数传入调用map()假设当前所读行是第一行将当前所读行的行首相对于当前block开始处的字节偏移量作为key0当前行的内容作为valueDear

Bear

(按需求写业务代码)将value当前行内容按空格切分得到三个单词Dear

Bear

1)最终结果写入map任务所在节点的本地磁盘中内里还有细节讲到shuffle时再细细展开block的第一行的数据被处理完后接着处理第二行逻辑同上当map任务将当前block中所有的数据全部处理完后此map任务即运行结束

Reduce阶段

task的个数由自己写的程序编程指定main()内的job.setNumReduceTasks(4)指定reduce任务是4个reduce1、reduce2、reduce3、reduce4每一个reduce任务的逻辑一样所以以第一个reduce任务reduce1为例分析map1任务完成后reduce1通过网络连接到map1将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端拷贝阶段同样也如此连接到map2、map3获取结果最终reduce1端获得4个(Dear,

1)键值对转换成[Dear,

)]作为两个参数传入reduce()在reduce()内部计算Dear的总数为4并将(Dear,

4)作为键值对输出每个reduce任务最终输出文件内里还有细节讲到shuffle时再细细展开文件写入到HDFS

2.2

①数据中若要针对某个值进行分组、聚合时需将此值作为MR中的reduce的输入的key

如当前的词频统计例子按单词进行分组每组中对出现次数做聚合计算总和所以需要将每个单词作为reduce输入的keyMapReduce框架自动按照单词分组进而求出每组即每个单词的总次数

②另外key还具有可排序的特性因为MR中的key类需要实现WritableComparable接口而此接口又继承Comparable接口可查看源码

2.3

使用IDEA创建maven工程pom文件参考提供的pom.xml主要用到的dependencies有

propertiescdh.version2.6.0-cdh5.14.2/cdh.version/propertiesrepositoriesrepositoryidcloudera/idurlhttps://repository.cloudera.com/artifactory/cloudera-repos//url/repository/repositoriesdependenciesdependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.6.0-mr1-cdh5.14.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${cdh.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-core/artifactIdversion${cdh.version}/version/dependency!--

https://mvnrepository.com/artifact/junit/junit

--dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependencydependencygroupIdorg.testng/groupIdartifactIdtestng/artifactIdversionRELEASE/versionscopetest/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/versionscopecompile/scope/dependency/dependencies2.4

MR参考代码

创建包com.kaikeba.hadoop.wordcount

在包中创建自定义mapper类、自定义reducer类、包含main类

2.4.1

com.kaikeba.hadoop.wordcount;import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.mapreduce.Mapper;import

类MapperLongWritable,

map方法的输入的键的类型kin、值的类型vin输出的键的类型kout、输出的值的类型vout*

kin指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型对应序列化类型LongWritable*

vin指的是当前所读行类型是String对应序列化类型Text*

kout根据需求输出键指的是单词类型是String对应序列化类型是Text*

vout根据需求输出值指的是单词的个数1类型是int对应序列化类型是IntWritable**/

public

处理分片split中的每一行的数据针对每行数据会调用一次map方法*

在一次map方法调用时从一行数据中获得一个个单词word再将每个单词word变成键值对形式(word,

1)输出出去*

value.toString();//按照\t进行分割得到当前行所有单词String[]

words

vout包装成对应的可序列化类型如String对应Textint对应IntWritablecontext.write(new

Text(word),

com.kaikeba.hadoop.wordcount;import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.mapreduce.Reducer;import

ReducerText,

reduce方法的输入的键的类型kin、输入值的类型vin输出的键的类型kout、输出的值的类型vout*

注意因为map的输出作为reduce的输入所以此处的kin、vin类型分别与map的输出的键类型、值类型相同*

kout根据需求输出键指的是单词类型是String对应序列化类型是Text*

vout根据需求输出值指的是每个单词的总个数类型是int对应序列化类型是IntWritable**/

public

task汇聚了众多的键值对有key是hello的键值对也有key是spark的键值对如下*

(hello,

其中key是hello的键值对被分成一组merge成[hello,

Iterable(1,1,1,1)]调用一次reduce方法*

同样key是spark的键值对被分成一组merge成[spark,

Iterable(1,1,1)]再调用一次reduce方法**

param

count.get();}//将单词、单词次数分别作为键值对输出context.write(key,

new

com.kaikeba.hadoop.wordcount;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import

另外map\reduce相关的类使用mapreduce包下的是新API如org.apache.hadoop.mapreduce.Job;*/

public

{//若在IDEA中本地执行MR程序需要将mapred-site.xml中的mapreduce.framework.name值修改成local//参数

c:/test/README.txt

IOException,ClassNotFoundException,

InterruptedException

{//判断一下输入参数是否是两个分别表示输入路径、输出路径if

(args.length

Path!);System.exit(0);}Configuration

configuration

Configuration();//configuration.set(mapreduce.framework.name,local);//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job

job

WordCountMain.class.getSimpleName());//设置job的jar包如果参数指定的类包含在一个jar包中则此jar包作为job的jar包

job.setJarByClass(WordCountMain.class);job.setJarByClass(WordCountMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(WordCountMap.class);//设置map

combine类减少网路传出量job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(WordCountReduce.class);//注意如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,

reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);//设置reduce

task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//

提交作业job.waitForCompletion(true);}

}程序运行有两种方式分别是windows本地运行、集群运行依次演示

2.5

初次运行WordCountMain先设置main方法参数根据图示操作即可

设置main方法在参数

用maven插件打jar包①点击Maven②双击package打包

将jar包上传到node01用户主目录/home/hadoop下

用hadoop

com.kaikeba.hadoop-1.0-SNAPSHOT.jar

com.kaikeba.hadoop.wordcount.WordCountMain

/README.txt

com.kaikeba.hadoop-1.0-SNAPSHOT.jar是jar包名

com.kaikeba.hadoop.wordcount.WordCountMain是包含main方法的类的全限定名

/NOTICE.txt和/wordcount是main方法的两个参数表示输入路径、输出路径

确认结果

MR分为两个阶段map阶段、reduce阶段MR输入的文件有几个block就会生成几个map任务MR的reduce任务的个数由程序中编程指定job.setNumReduceTasks(4)map任务

map任务中map()一次读取block的一行数据以kv对的形式输入map()map()的输出作为reduce()的输入

reduce任务

reduce任务通过网络将各执行完成的map任务输出结果中属于自己的数据取过来key相同的键值对作为一组调用一次reduce()reduce任务生成一个结果文件文件写入HDFS

WEB

node01是resourcemanager所在节点主机名根据自己的实际情况修改主机名

3.2

①点击下拉框②浏览文件系统③输入根目录查看hdfs根路径中的内容

mapreduce在企业中可以用于对海量数据的数据清洗当然随着新一代大数据框架的出现也可以使用spark、flink等框架做数据清洗

4.1

现有一批日志文件日志来源于用户使用搜狗搜索引擎搜索新闻并点击查看搜索结果过程但是日志中有一些记录损坏现需要使用MapReduce来将这些损坏记录如记录中少字段、多字段从日志文件中删除此过程就是传说中的数据清洗。

并且在清洗时要统计损坏的记录数。

4.2

日志格式每行记录有6个字段分别表示时间datetime、用户ID

userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl

4.3

而有些mapreduce应用不需要数据聚合的操作也就是说不需要reduce阶段。

即编程时不需要编写自定义的reducer类在main()中调用job.setNumReduceTasks(0)设置

map方法的逻辑取得每一行数据与每条记录的固定格式比对是否符合

4.4

若要集群运行需先将sogou.50w.utf8上传到HDFS根目录

[hadoopnode01

com.kaikeba.hadoop.dataclean;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.Counter;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import

每行记录有6个字段*

userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl*

日志样本*

0bf5778fc7ba35e657ee88b25984c6e9

nba直播

基本上大部分MR程序的main方法逻辑大同小异将其他MR程序的main方法代码拷贝过来稍做修改即可*

注意若要IDEA中本地运行MR程序需要将resources/mapred-site.xml中的mapreduce.framework.name属性值设置成local*

param

{//判断一下输入参数是否是两个分别表示输入路径、输出路径if

(args.length

Path!);System.exit(0);}Configuration

configuration

Configuration();//调用getInstance方法生成job实例Job

job

DataClean.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(DataClean.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(DataCleanMapper.class);//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//注意因为不需要reduce聚合阶段所以需要显示设置reduce

task个数是0job.setNumReduceTasks(0);//

提交作业job.waitForCompletion(true);}/***

自定义mapper类*

注意若自定义的mapper类与main方法在同一个类中需要将自定义mapper类声明成static的*/public

static

{//为了提高程序的效率避免创建大量短周期的对象出发频繁GC此处生成一个对象共用NullWritable

nullValue

NullWritable.get();Overrideprotected

void

context.getCounter(DataCleaning,

damagedRecord);//获得当前行数据//样例数据20111230111645

169796ae819ae8b32668662bb99b6c2d

塘承高速公路规划线路图

http://auto.ifeng.com/roll/20111212/729164.shtmlString

line

value.toString();//将行数据按照记录中字段分隔符切分String[]

fields

line.split(\t);//判断字段数组长度是否为6if(fields.length

{//若不是则不输出并递增自定义计数器counter.increment(1L);}

else

{//若是6则原样输出context.write(value,

nullValue);}}}

0%job就已经successfully表示此MR程序没有reduce阶段

②DataCleaning是自定义计数器组名damagedRecord是自定义的计数器值为6表示有6条损坏记录

图中part-m-00000中的m表示此文件是由map任务生成

4.5

MR可用于数据清洗另外也可以使用Spark、Flink等组件做数据清洗

5.1

使用MR编程统计sogou日志数据中每个用户搜索的次数结果写入HDFS

5.2

还记得之前提到的MR中key的作用吗MR编程时若要针对某个值对数据进行分组、聚合时如当前的词频统计例子需要将每个单词作为reduce输入的key从而按照单词分组进而求出每组即每个单词的总次数那么此例也是类似的。

统计每个用户的搜索次数将userid放到reduce输入的key的位置对userid进行分组进而统计每个用户的搜索次数

5.4

此处MR程序的输入文件是“MapReduce编程数据清洗”中的输出结果文件

package

com.kaikeba.hadoop.searchcount;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.Reducer;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import

public

{//判断一下输入参数是否是两个分别表示输入路径、输出路径if

(args.length

Path!);System.exit(0);}Configuration

configuration

Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job

job

UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat输出格式是TextOutputFormat所以下两行可以注释掉

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map

combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,

reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);//设置reduce

task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//提交作业job.waitForCompletion(true);}public

static

IntWritable(1);Overrideprotected

void

{//获得当前行的数据//样例数据20111230111645

169796ae819ae8b32668662bb99b6c2d

塘承高速公路规划线路图

http://auto.ifeng.com/roll/20111212/729164.shtmlString

line

value.toString();//切分获得各字段组成的数组String[]

fields

line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String

userid

fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut,

vOut);}}public

IntWritable();Overrideprotected

void

value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key,

totalNumVOut);}}

结合本例子的需求设计MR程序因为要统计每个用户的搜索次数所以最终userid作为reduce的输出的keyMR编程能够根据业务需求设计合适的key是一个很重要的能力而这是需要建立在自己地MR框架原理有清晰认识的基础之上的

Shuffle重点

shuffle主要指的是map端的输出作为reduce端输入的过程

6.1

每个map任务都有一个对应的环形内存缓冲区输出是kv对先写入到环形缓冲区默认大小100M当内容占据80%缓冲区空间后由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件在溢出写的过程中map任务可以继续向环形缓冲区写入数据但是若写入速度大于溢出写的速度最终造成100m占满后map任务会暂停向环形缓冲区中写数据的过程只执行溢出写的过程直到环形缓冲区的数据全部溢出写到磁盘才恢复向缓冲区写入后台线程溢写磁盘过程有以下几个步骤

先对每个溢写的kv对做分区分区的个数由MR程序的reduce任务数决定默认使用HashPartitioner计算当前kv对属于哪个分区计算公式(key.hashCode()

Integer.MAX_VALUE)

numReduceTasks每个分区中根据kv对的key做内存中排序若设置了map端本地聚合combiner则对每个分区中排好序的数据做combine操作若设置了对map输出压缩的功能会对溢写数据压缩

随着不断的向环形缓冲区中写入数据会多次触发溢写每当环形缓冲区写满100m本地磁盘最终会生成多个溢出文件合并溢写文件在map

task完成之前所有溢出文件会被合并成一个大的溢出文件且是已分区、已排序的输出文件小细节

在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销

6.4

如果map输出数据比较小先保存在reduce的jvm内存中否则直接写入reduce磁盘

一旦内存缓冲区达到阈值默认0.66或map输出数的阈值默认1000则触发归并merge结果写到本地磁盘

若MR编程指定了combine在归并过程中会执行combine操作

reduce

map()输出结果先写入环形缓冲区缓冲区100M写满80M后开始溢出写磁盘文件此过程中会进行分区、排序、combine可选、压缩可选map任务完成前会将多个小的溢出文件合并成一个大的溢出文件已分区、排序

reduce端

拷贝阶段reduce任务通过http将map任务属于自己的分区数据拉取过来开始merge及溢出写磁盘文件所有map任务的分区全部拷贝过来后进行阶段合并、排序、分组阶段每组数据调用一次reduce()结果写入HDFS

自定义分区重点

根据之前讲的shuffle我们知道在map任务中从环形缓冲区溢出写磁盘时会先对kv对数据进行分区操作

HashPartitioner关键方法getPartition返回当前键值对的分区索引(partition

index)

}环形缓冲区溢出写磁盘前将每个kv对作为getPartition()的参数传入

先对键值对中的key求hash值int类型与MAX_VALUE按位与再模上reduce

task个数假设reduce

task个数设置为4可在程序中使用job.setNumReduceTasks(4)指定reduce

task个数为4

那么map任务溢出文件有4个分区分区index分别是0、1、2、3getPartition()结果有四种0、1、2、3根据计算结果决定当前kv对落入哪个分区如结果是0则当前kv对落入溢出文件的0分区中最终被相应的reduce

若是MR默认分区器不满足需求可根据业务逻辑设计自定义分区器比如实现图上的功能

7.2

代码详见工程com.kaikeba.hadoop.partitioner包

MR读取三个文件part1.txt、part2.txt、part3.txt三个文件放到HDFS目录/customParttitioner中

part1.txt内容如下

只有part-r-00001、part-r-00003有数据另外两个没有数据

HashPartitioner将Bear分到index1的分区将Car|Dear|River分到index3分区

7.3

自定义分区使得文件中分别以Dear、Bear、River、Car为键的键值对分别落到index是0、1、2、3的分区中

7.3.2

此类实现Partitioner接口在getPartition()中实现分区逻辑

main方法中

设定reduce个数为4设置自定义的分区类调用job.setPartitionerClass方法

7.3.3

com.kaikeba.hadoop.partitioner;import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.mapreduce.Partitioner;import

class

Integer();//定义每个键对应的分区index使用map数据结构完成static{dict.put(Dear,

0);dict.put(Bear,

dict.get(text.toString());return

partitionIndex;}

自定义分区器的类继承Partitioner类覆写getPartition()在方法中定义自己的分区策略在main()方法中调用job.setPartitionerClass()main()中设置reduce任务数

8.1

普通的MR是reduce通过http取得map任务的分区结果具体的聚合出结果是在reduce端进行的

下图中的第一个map任务(map1)本地磁盘中的结果有5个键值对(Dear,

1)、(Bear,

1)会被第一个reduce任务(reduce1)通过网络拉取到reduce1端那么假设map1中(Dear,

1)再将1亿个(Dear,

1)通过网络被reduce1获得然后再在reduce1端汇总这样做map端本地磁盘IO、数据从map端到reduce端传输的网络IO比较大那么想能不能在reduce1从map1拉取1亿个(Dear,

1)之前在map端就提前先做下reduce汇总得到结果(Dear,

100000000)然后再将这个结果一个键值对传输到reduce1呢答案是可以的我们称之为combine操作

8.2

此过程会分区、每个分区内按键排序、再combine操作若设置了combine的话、若设置map输出压缩的话则再压缩

在合并溢写文件时如果至少有3个溢写文件并且设置了map端combine的话会在合并的过程中触发combine操作但是若只有2个或1个溢写文件则不触发combine操作因为combine操作本质上是一个reduce需要启动JVM虚拟机有一定的开销

combine本质上也是reduce因为自定义的combine类继承自Reducer父类

map:

reduce函数与combine函数通常是一样的K3与K2类型相同V3与V2类型相同即reduce的输入的kv类型分别与输出的kv类型相同

8.3

WordCountMap、WordCountReduce代码保持不变唯一需要做的修改是在WordCountMain中增加job.setCombinerClass(WordCountReduce.class);修改如下

8.4

使用combine时首先考虑当前MR是否适合combine总原则是不论使不使用combine不能影响最终的结果在MR时发生数据倾斜且可以使用combine时可以使用combine缓解数据倾斜

MR压缩

作用在MR中为了减少磁盘IO及网络IO可考虑在map端、reduce端设置压缩功能给“MapReduce编程用户搜索次数”代码增加压缩功能

9.2

那么如何设置压缩功能呢只需在main方法中给Configuration对象增加如下设置即可

configuration.set(mapreduce.map.output.compress,

true);

//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分

configuration.set(mapreduce.map.output.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);

//开启job输出压缩功能

configuration.set(mapreduce.output.fileoutputformat.compress,

true);

configuration.set(mapreduce.output.fileoutputformat.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);9.3

MR代码

给“MapReduce编程用户搜索次数”代码增加压缩功能代码如下

package

com.kaikeba.hadoop.mrcompress;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.Reducer;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import

public

{//判断以下输入参数是否是两个分别表示输入路径、输出路径if

(args.length

Path!);System.exit(0);}Configuration

configuration

Configuration();//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//开启map输出进行压缩的功能configuration.set(mapreduce.map.output.compress,

true);//设置map输出的压缩算法是BZip2Codec它是hadoop默认支持的压缩算法且支持切分configuration.set(mapreduce.map.output.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);//开启job输出压缩功能configuration.set(mapreduce.output.fileoutputformat.compress,

true);//指定job输出使用的压缩算法configuration.set(mapreduce.output.fileoutputformat.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);//调用getInstance方法生成job实例Job

job

UserSearchCount.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

FileOutputFormat.setCompressOutput(job,

true);

FileOutputFormat.setOutputCompressorClass(job,

BZip2Codec.class);//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map

combine类减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,

reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);//设置reduce

task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//

提交作业job.waitForCompletion(true);}public

static

IntWritable(1);Overrideprotected

void

{//获得当前行的数据//样例数据20111230111645

169796ae819ae8b32668662bb99b6c2d

塘承高速公路规划线路图

http://auto.ifeng.com/roll/20111212/729164.shtmlString

line

value.toString();//切分获得各字段组成的数组String[]

fields

line.split(\t);//因为要统计每个user搜索并查看URL的次数所以将userid放到输出key的位置//注意MR编程中根据业务需求设计key是很重要的能力String

userid

fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut,

vOut);}}public

IntWritable();Overrideprotected

void

value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key,

totalNumVOut);}}

com.kaikeba.hadoop-1.0-SNAPSHOT.jar

com.kaikeba.hadoop.mrcompress.UserSearchCount

/sogou.2w.utf8

MR过程中使用压缩可减少数据量进而减少磁盘IO、网络IO数据量可设置map端输出的压缩可设置job最终结果的压缩通过相应的配置项即可实现

10.

上图也描述了mapreduce的一个完整的过程我们主要看map任务是如何从hdfs读取分片数据的部分

①InputFormat输入格式类

InputFormat输入格式类将输入文件分成一个个分片InputSplit每个Map任务对应一个split分片

③RecordReader记录读取器类createRecordReader()

RecordReader记录读取器读取分片数据一行记录生成一个键值对传入map任务的map()方法调用map()

详细流程

客户端调用InputFormat的**getSplits()**方法获得输入文件的分片信息

针对每个MR

master根据分片信息尽量将map任务尽量调度在split分片数据所在节点移动计算不移动数据

有几个分片就生成几个map任务

每个map任务将split分片传递给createRecordReader()方法生成此分片对应的RecordReader

nextKeyValue()判断是否有下一个键值对如果有返回true否则返回false如果返回true调用getCurrentKey()获得当前的键调用getCurrentValue()获得当前的值

map任务运行时会调用run()

首先运行一次setup()方法只在map任务启动时运行一次一些初始化的工作可以在setup方法中完成如要连接数据库之类的操作

while循环调用context.nextKeyValue()会委托给RecordRecord的nextKeyValue()判断是否有下一个键值对

如果有下一个键值对调用context.getCurrentKey()、context.getCurrentValue()获得当前的键、值的值也是调用RecordReader的同名方法

作为参数传入map(key,

当读取分片尾context.nextKeyValue()返回false退出循环

调用cleanup()方法只在map任务结束之前调用一次所以一些回收资源的工作可在此方法中实现如关闭数据库连接

10.2

无论hdfs还是mapreduce处理小文件都有损效率实践中又难免面临处理大量小文件的场景此时就需要有相应解决方案

10.3

在数据采集的时候就将小文件或小批数据合成大文件再上传HDFS(SequenceFile方案)在业务处理之前在HDFS上使用mapreduce程序对小文件进行合并可使用自定义InputFormat实现在mapreduce处理时可采用CombineFileInputFormat提高效率

本例使用第二种方案自定义输入格式

com.kaikeba.hadoop.inputformat;import

import

org.apache.hadoop.io.BytesWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.InputSplit;

import

org.apache.hadoop.mapreduce.JobContext;

import

org.apache.hadoop.mapreduce.RecordReader;

import

org.apache.hadoop.mapreduce.TaskAttemptContext;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import

自定义InputFormat类*

值值用于保存小文件的内容此处使用BytesWritable*/

public

InterruptedException*/Overridepublic

BytesWritable

IOException,InterruptedException

{//使用自定义的RecordReader类WholeFileRecordReader

reader

WholeFileRecordReader();//初始化RecordReaderreader.initialize(split,

context);return

com.kaikeba.hadoop.inputformat;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.fs.FSDataInputStream;

import

org.apache.hadoop.fs.FileSystem;

import

org.apache.hadoop.io.BytesWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.InputSplit;

import

org.apache.hadoop.mapreduce.RecordReader;

import

org.apache.hadoop.mapreduce.TaskAttemptContext;

import

org.apache.hadoop.mapreduce.lib.input.FileSplit;import

通过nextKeyValue()方法去读取数据构造将返回的key

value*

getCurrentValue来返回上面构造好的key和value**

author*/

标识变量分片是否已被读取过因为小文件设置成了不可切分所以一个小文件只有一个分片*

boolean

InterruptedException*/Overridepublic

void

context.getConfiguration();}/***

return*

InterruptedException*/Overridepublic

boolean

file.getFileSystem(conf);FSDataInputStream

null;try

fs.open(file);IOUtils.readFully(in,

contents,

contents.length);value.set(contents,

contents.length);}

{IOUtils.closeStream(in);}processed

true;return

InterruptedException*/Overridepublic

NullWritable

IOException,InterruptedException

{return

InterruptedException*/Overridepublic

BytesWritable

IOException,InterruptedException

{return

获得分片读取的百分比因为如果读取分片数据的话会一次性的读取完所以进度要么是1要么是0*

return*

{//因为一个文件作为一个整体处理所以如果processed为true表示已经处理过了进度为1否则为0return

processed

com.kaikeba.hadoop.inputformat;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.conf.Configured;

import

org.apache.hadoop.io.BytesWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.InputSplit;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.lib.input.FileSplit;

import

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import

org.apache.hadoop.util.ToolRunner;import

实现run()方法*

在main()中调用ToolRunner.run()方法第一个参数是当前对象第二个参数是输入、输出*/

public

mapper类的输入键值对类型与自定义InputFormat的输入键值对保持一致*

mapper类的输出的键值对类型分别是文件名、文件内容*/static

class

InterruptedException*/Overrideprotected

void

IOException,InterruptedException

{InputSplit

context.getInputSplit();//获得当前文件路径Path

path

Text(path.toString());}Overrideprotected

void

sequencefile);job.setJarByClass(SmallFiles2SequenceFile.class);//设置自定义输入格式job.setInputFormatClass(WholeFileInputFormat.class);WholeFileInputFormat.addInputPath(job,new

Path(args[0]));//设置输出格式SequenceFileOutputFormat及输出路径job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new

Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return

1;}public

SmallFiles2SequenceFile(),args);System.exit(exitCode);}

}10.5

需要自定义InputFormat类并覆写getRecordReader()方法自定义RecordReader类实现方法

initialize()nextKeyValue()getCurrentKey()getCurrentValue()getProgress()close()

11.

现在有一些订单的评论数据要将订单的好评与其它级别的评论中评、差评进行区分开来将最终的数据分开到不同的文件夹下面去

数据第九个字段表示评分等级0

程序的关键点是在一个mapreduce程序中根据数据的不同(好评的评级不同)输出两类结果到不同目录这类灵活的输出需求通过自定义OutputFormat来实现

11.3

在mapreduce中访问外部资源自定义OutputFormat类覆写getRecordWriter()方法自定义RecordWriter类覆写具体输出数据的方法write()

11.4

com.kaikeba.hadoop.outputformat;import

org.apache.hadoop.fs.FSDataOutputStream;

import

org.apache.hadoop.fs.FileSystem;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.RecordWriter;

import

org.apache.hadoop.mapreduce.TaskAttemptContext;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import

本例使用框架默认的Reducer它将Mapper输入的kv对原样输出所以reduce输出的kv类型分别是Text,

NullWritable*

自定义OutputFormat的类泛型表示reduce输出的键值对类型要保持一致;*

map--(kv)--reduce--(kv)--OutputFormat*/

public

根据实际情况修改path;node01及端口号8020*/String

bad

hdfs://node01:8020/outputformat/bad/r.txt;String

good

hdfs://node01:8020/outputformat/good/r.txt;/****

param

InterruptedException*/Overridepublic

RecordWriterText,

getRecordWriter(TaskAttemptContext

context)

FileSystem.get(context.getConfiguration());//两个输出文件路径Path

badPath

fs.create(badPath);FSDataOutputStream

goodOut

MyRecordWriter(badOut,goodOut);}/***

泛型表示reduce输出的键值对类型要保持一致*/static

class

NullWritable{FSDataOutputStream

badOut

MyRecordWriter(FSDataOutputStream

badOut,

InterruptedException*/Overridepublic

void

(key.toString().split(\t)[9].equals(0)){//好评goodOut.write(key.toString().getBytes());goodOut.write(\r\n.getBytes());}else{//其它评级badOut.write(key.toString().getBytes());badOut.write(\r\n.getBytes());}}/***

关闭流*

InterruptedException*/Overridepublic

void

!null){goodOut.close();}if(badOut

}main方法

com.kaikeba.hadoop.outputformat;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.conf.Configured;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import

org.apache.hadoop.util.ToolRunner;import

class

MyOwnOutputFormatMain.class.getSimpleName());job.setJarByClass(MyOwnOutputFormatMain.class);//默认项可以省略或者写出也可以//job.setInputFormatClass(TextInputFormat.class);//设置输入文件TextInputFormat.addInputPath(job,

new

Path(args[0]));job.setMapperClass(MyMapper.class);//job.setMapOutputKeyClass(Text.class);//job.setMapOutputValueClass(NullWritable.class);//设置自定义的输出类job.setOutputFormatClass(MyOutPutFormat.class);//设置一个输出会输出一个success的成功标志的文件MyOutPutFormat.setOutputPath(job,

new

Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//默认项即默认有一个reduce任务所以可以省略

job.setReducerClass(Reducer.class);boolean

job.waitForCompletion(true);return

1;}/****

输出的value为null对应NullWritable*/public

static

{//把当前行内容作为key输出value为nullcontext.write(value,

param

Configuration();ToolRunner.run(configuration,

new

泛型与reduce输出的键值对类型保持一致覆写getRecordWriter()方法

自定义RecordWriter

泛型与reduce输出的键值对类型保持一致覆写具体输出数据的方法write()、close()

main方法

job.setOutputFormatClass使用自定义在输出类

12.

有些MR的输出的key可以直接使用hadoop框架的可序列化可比较类型表示如Text、IntWritable等等而这些类型本身是可比较的如IntWritable默认升序排序

但有时使用MR编程输出的key若使用hadoop自带的key类型无法满足需求

此时需要自定义的key类型包含的是非单一信息如此例包含工资、年龄并且也得是**可序列化、可比较的**

需要自定义key定义排序规则

实现按照人的salary降序排序若相同则再按age升序排序若salary、age相同则放入同一组

12.3

com.kaikeba.hadoop.secondarysort;import

org.apache.hadoop.io.WritableComparable;import

java.io.DataInput;

java.io.IOException;//根据输入文件格式定义JavaBean作为MR时Map的输出key类型要求此类可序列化、可比较

public

this.name;}//两个Person对象的比较规则①先比较salary高的排序在前②若相同age小的在前public

int

{//若两个人工资不同//工资降序排序即工资高的排在前边return

-compareResult;}

{//若工资相同//年龄升序排序即年龄小的排在前边return

this.age

other.age;}}//序列化将NewKey转化成使用流传送的二进制public

void

{//注意①使用正确的write方法②记住此时的序列化的顺序name、age、salarydataOutput.writeUTF(name);dataOutput.writeInt(age);dataOutput.writeInt(salary);}//使用in读字段的顺序要与write方法中写的顺序保持一致name、age、salarypublic

void

string//注意①使用正确的read方法②读取顺序与write()中序列化的顺序保持一致this.name

dataInput.readInt();this.salary

package

com.kaikeba.hadoop.secondarysort;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.fs.FileSystem;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.Reducer;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import

java.io.IOException;

Configuration();//configuration.set(mapreduce.job.jar,/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);Job

job

SecondarySort.class.getSimpleName());FileSystem

fileSystem

FileSystem.get(URI.create(args[1]),

Path(args[1])))

true);}FileInputFormat.setInputPaths(job,

new

Path(args[0]));job.setMapperClass(MyMap.class);//由于mapper与reducer输出的kv类型分别相同所以下两行可以省略

job.setMapOutputKeyClass(Person.class);

job.setMapOutputValueClass(NullWritable.class);//设置reduce的个数;默认为1//job.setNumReduceTasks(1);job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileOutputFormat.setOutputPath(job,

new

Path(args[1]));job.waitForCompletion(true);}//MyMap的输出key用自定义的Person类型输出的value为nullpublic

static

value.toString().split(\t);String

name

Integer.parseInt(fields[1]);int

salary

Integer.parseInt(fields[2]);//在自定义类中进行比较Person

person

salary);//person对象作为输出的keycontext.write(person,

static

{//输入的kv对原样输出context.write(key,

}12.4

如果MR时key的排序规则比较复杂比如需要根据字段1排序若字段1相同则需要根据字段2排序…此时可以使用自定义key实现将自定义的key作为MR中map输出的key的类型reduce输入的类型自定义的key

实现WritableComparable接口实现compareTo比较方法实现write序列化方法实现readFields反序列化方法

13.

userid、datetime、title商品标题、unitPrice商品单价、purchaseNum购买量、productId商品ID

现使用MR编程求出每个用户、每个月消费金额最多的两笔订单花了多少钱

13.2

实现WritableComparable接口包含6个字段分别对应文件中的6个字段重点实现compareTo方法

先比较userid是否相等若不相等则userid升序排序若相等比较两个Bean的日期是否相等若不相等则日期升序排序若相等再比较总开销降序排序

实现序列化方法write()实现反序列化方法readFields()

自定义分区类

继承Partitioner类getPartiton()实现userid相同的处于同一个分区

自定义Mapper类

输出key是当前记录对应的Bean对象输出的value对应当前订单的总开销

自定义分组类

决定userid相同、日期年月相同的记录分到同一组中调用一次reduce()

自定义Reduce类

job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass

13.3

com.kaikeba.hadoop.grouping;import

org.apache.hadoop.io.WritableComparable;import

java.io.DataInput;

java.io.IOException;//实现WritableComparable接口

public

{//OrderBean作为MR中的key如果对象中的userid相同即ret1为0就表示两个对象是同一个用户int

ret1

this.userid.compareTo(other.userid);if

(ret1

thisYearMonth.compareTo(otherYearMonth);if(ret2

{//若datetime相同//如果userid、年月都相同比较单笔订单的总开销Double

thisTotalPrice

this.getPurchaseNum()*this.getUnitPrice();Double

oTotalPrice

other.getPurchaseNum()*other.getUnitPrice();//总花销降序排序即总花销高的排在前边return

-thisTotalPrice.compareTo(oTotalPrice);}

else

{//若datatime不同按照datetime升序排序return

ret2;}}

{dataOutput.writeUTF(userid);dataOutput.writeUTF(datetime);dataOutput.writeUTF(title);dataOutput.writeDouble(unitPrice);dataOutput.writeInt(purchaseNum);dataOutput.writeUTF(produceId);}/***

反序列化*

dataInput.readUTF();this.datetime

dataInput.readUTF();this.unitPrice

dataInput.readDouble();this.purchaseNum

dataInput.readInt();this.produceId

另外一个方案此处不覆写hashCode方法而是自定义分区器getPartition方法中对OrderBean的userid求hashCode值%reduce任务数*

return*/

com.kaikeba.hadoop.grouping;import

org.apache.hadoop.io.DoubleWritable;

import

org.apache.hadoop.mapreduce.Partitioner;//mapper的输出key类型是自定义的key类型OrderBean输出value类型是单笔订单的总开销double

DoubleWritable

(orderBean.getUserid().hashCode()

Integer.MAX_VALUE)

com.kaikeba.hadoop.grouping;import

org.apache.hadoop.io.DoubleWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.mapreduce.Mapper;import

public

record.split(\t);if(fields.length

{String

dateUtils.getYearMonthString(datetime);String

title

Double.parseDouble(fields[3]);int

purchaseNum

Integer.parseInt(fields[4]);String

produceId

fields[5];//生成OrderBean对象OrderBean

orderBean

purchaseNum;valueOut.set(totalPrice);context.write(orderBean,

valueOut);}}

com.kaikeba.hadoop.grouping;import

org.apache.hadoop.io.DoubleWritable;

import

org.apache.hadoop.io.NullWritable;

import

org.apache.hadoop.mapreduce.Reducer;import

java.io.IOException;//输出的key为userid拼接上年月的字符串对应Text输出的value对应单笔订单的金额

public

①由于自定义分组逻辑相同用户、相同年月的订单是一组调用一次reduce()*

②由于自定义的key类OrderBean中比较规则compareTo规定相同用户、相同年月的订单按总金额降序排序*

param

InterruptedException*/Overrideprotected

void

key.getDatetime();context.write(new

Text(keyOut),

com.kaikeba.hadoop.grouping;import

org.apache.hadoop.io.WritableComparable;

import

org.apache.hadoop.io.WritableComparator;//自定义分组类reduce端调用reduce()前对数据做分组每组数据调用一次reduce()

public

bOrderBean.getUserid();//userid、年、月相同的作为一组int

ret1

aUserId.compareTo(bUserId);if(ret1

aOrderBean.getDatetime().compareTo(bOrderBean.getDatetime());}

else

com.kaikeba.hadoop.grouping;import

com.kaikeba.hadoop.wordcount.WordCountMain;

import

com.kaikeba.hadoop.wordcount.WordCountMap;

import

com.kaikeba.hadoop.wordcount.WordCountReduce;

import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.conf.Configured;

import

org.apache.hadoop.io.DoubleWritable;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import

org.apache.hadoop.util.ToolRunner;import

class

args);System.exit(exitCode);}Overridepublic

int

{//判断以下输入参数是否是两个分别表示输入路径、输出路径if

(args.length

Path!);System.exit(0);}Configuration

configuration

Configuration();//告诉程序要运行的jar包在哪//configuration.set(mapreduce.job.jar,/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar);//调用getInstance方法生成job实例Job

job

CustomGroupingMain.class.getSimpleName());//设置jar包参数是包含main方法的类job.setJarByClass(CustomGroupingMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat所以下两行可以注释掉

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(MyMapper.class);//设置map

combine类减少网路传出量//job.setCombinerClass(MyReducer.class);job.setPartitionerClass(MyPartitioner.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(MyReducer.class);job.setGroupingComparatorClass(MyGroup.class);//如果map、reduce的输出的kv对类型一致直接设置reduce的输出的kv对就行如果不一样需要分别设置map,

reduce的输出的kv类型//注意此处设置的map输出的key/value类型一定要与自定义map类输出的kv对类型一致否则程序运行报错job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(DoubleWritable.class);//设置reduce

task最终输出key/value的类型//注意此处设置的reduce输出的key/value类型一定要与自定义reduce类输出的kv对类型一致否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);//

提交作业return

com.kaikeba.hadoop.grouping;import

import

java.time.format.DateTimeFormatter;public

class

System.out.println(str1.compareTo(str2));//test2

String

System.out.println(getYearMonthString(datetime));}public

LocalDateTime

DateTimeFormatter.ofPattern(yyyy-MM-dd

localDateTime

localDateTime;}//日期格式转换工具类将2014-12-14

String

DateTimeFormatter.ofPattern(yyyy-MM-dd

localDateTime

localDateTime.getMonthValue();return

year

实现其中的compareTo方法设置key的比较逻辑实现序列化方法write()实现反序列化方法readFields()

自定义mapper类、reducer类自定义partition类getPartition方法决定哪些key落入哪些分区自定义group分组类决定reduce阶段哪些kv对落入同一组调用一次reduce()写main方法设置自定义的类

job.setMapperClassjob.setPartitionerClassjob.setReducerClassjob.setGroupingComparatorClass

14.

数据中不可避免地会出现离群值outlier并导致数据倾斜。

这些离群值会显著地拖慢MapReduce的执行。

数据频率倾斜——某一个区域的数据量要远远大于其他区域。

比如某一个key对应的键值对远远大于其他键的键值对。

数据大小倾斜——部分记录的大小远远大于平均值。

在map端的数据倾斜可以考虑使用combine在reduce端的数据倾斜常常来源于MapReduce的默认分区器

数据倾斜会导致map和reduce的任务执行时间大为延长也会让需要缓存数据集的操作消耗更多的内存资源

14.1

发现倾斜数据之后有必要诊断造成数据倾斜的那些键。

有一个简便方法就是在代码里实现追踪每个键的最大值。

为了减少追踪量可以设置数据量阀值只追踪那些数据量大于阀值的键并输出到日志中。

实现代码如下运行作业后就可以从日志中判断发生倾斜的键以及倾斜程度跟踪倾斜数据是了解数据的重要一步也是设计MapReduce作业的重要基础

package

com.kaikeba.hadoop.dataskew;import

org.apache.hadoop.io.IntWritable;import

org.apache.hadoop.io.Text;import

org.apache.hadoop.mapreduce.Reducer;import

class

Logger.getLogger(WordCountReduce.class);Overrideprotected

void

{//一个键达到多少后会做数据倾斜记录maxValueThreshold

10000;}/*(hello,

count.get();i;}//如果当前键超过10000个则打印日志if(i

maxValueThreshold)

Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况即部分输出键的数据量远远大于其它的输出键

一、自定义分区

例如如果map输出键的单词来源于一本书。

其中大部分必然是省略词stopword。

那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。

而将其他的都发送给剩余的reduce实例。

二、Combine

使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。

combine的目的就是聚合并精简数据。

三、抽样和范围分区

Hadoop默认的分区器是HashPartitioner基于map输出键的哈希值分区。

这仅在数据分布比较均匀时比较好。

在有数据倾斜时就很有问题。

使用分区器需要首先了解数据的特性。

TotalOrderPartitioner中可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。

因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。

在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响乃至导致OutOfMemoryError异常。

处理这种情况并不容易。

可以参考以下方法。

设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。

RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。

默认长度没有上限。

15.

数据气象站气象数据来源美国国家气候数据中心NCDC1901-2001年数据每年一个文件

16.2

设置一个分区即一个reduce任务在一个reduce中对结果进行排序失去了MR框架并行计算的优势

方案二

自定义分区人为指定各温度区间的记录落入哪个分区如分区温度边界值分别是-15、0、20共4个分区但由于对整个数据集的气温分布不了解可能某些分区的数据量大其它的分区小数据倾斜

方案三

通过对键空间采样只查看一小部分键获得键的近似分布好温度的近似分布进而据此结果创建分区实现尽可能的均匀的划分数据集Hadoop内置了采样器InputSampler

16.4

一、先将数据按气温对天气数据集排序。

结果存储为sequencefile文件气温作为输出键数据行作为输出值

此代码处理原始日志文件

com.kaikeba.hadoop.totalorder;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.Mapper;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import

此代码处理原始日志文件

温度作为SequenceFile的key记录作为value*/

public

IntWritable();Overrideprotected

void

{//002902907099999190101010600464333023450FM-12000599999V0202701N015919999999N0000001N9-0078199999102001ADDGF108991999999999999999999parser.parse(value);if

{//是否是有效的记录temperature.set(parser.getAirTemperature());context.write(temperature,

/ncdc/sfoutputpublic

SortDataPreprocessor.class.getSimpleName());job.setJarByClass(SortDataPreprocessor.class);//FileInputFormat.addInputPath(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));job.setMapperClass(CleanerMapper.class);//最终输出的键、值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);//reduce个数为0job.setNumReduceTasks(0);//以sequencefile的格式输出job.setOutputFormatClass(SequenceFileOutputFormat.class);//开启job输出压缩功能//方案一conf.set(mapreduce.output.fileoutputformat.compress,

true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);//方案二//设置sequencefile的压缩、压缩算法、sequencefile文件压缩格式block//SequenceFileOutputFormat.setCompressOutput(job,

true);//SequenceFileOutputFormat.setOutputCompressorClass(job,

GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressorClass(job,

SnappyCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job,

SequenceFile.CompressionType.BLOCK);System.exit(job.waitForCompletion(true)

1);}

com.kaikeba.hadoop.totalorder;import

org.apache.hadoop.conf.Configuration;

import

org.apache.hadoop.filecache.DistributedCache;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.mapreduce.Job;

import

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import

org.apache.hadoop.mapreduce.lib.partition.InputSampler;

import

org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import

java.net.URI;/***

使用TotalOrderPartitioner全局排序一个SequenceFile文件的内容*

public

SortByTemperatureUsingTotalOrderPartitioner{/***

两个参数/ncdc/sfoutput

第一个参数是SortDataPreprocessor的输出文件*/public

static

SortByTemperatureUsingTotalOrderPartitioner.class.getSimpleName());job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);FileInputFormat.addInputPath(job,

new

Path(args[0]));FileOutputFormat.setOutputPath(job,

new

Path(args[1]));//输入文件是SequenceFilejob.setInputFormatClass(SequenceFileInputFormat.class);//Hadoop提供的方法来实现全局排序要求Mapper的输入、输出的key必须保持类型一致job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//分区器全局排序分区器job.setPartitionerClass(TotalOrderPartitioner.class);//分了3个区且分区i-1中的key小于i分区中所有的键job.setNumReduceTasks(3);/***

每一个参数采样率*

只要numSamples和maxSplitSampled第二、第三参数任一条件满足则停止采样*/InputSampler.SamplerIntWritable,

Text

InputSampler.RandomSamplerIntWritable,

Text(0.1,

TotalOrderPartitioner.setPartitionFile();/***

由TotalOrderPartitioner读取作为全排序的分区依据让每个分区中的数据量近似*/InputSampler.writePartitionFile(job,

sampler);//根据上边的SequenceFile文件包含键的近似分布情况创建分区String

partitionFile

TotalOrderPartitioner.getPartitionFile(job.getConfiguration());URI

partitionUri

JobConf();//与所有map任务共享此文件添加到分布式缓存中DistributedCache.addCacheFile(partitionUri,

job.addCacheFile(partitionUri);//方案一输出的文件RECORD级别使用BZip2Codec进行压缩conf.set(mapreduce.output.fileoutputformat.compress,

true);conf.set(mapreduce.output.fileoutputformat.compress.type,RECORD);//指定job输出使用的压缩算法conf.set(mapreduce.output.fileoutputformat.compress.codec,

org.apache.hadoop.io.compress.BZip2Codec);//方案二//SequenceFileOutputFormat.setCompressOutput(job,

true);//SequenceFileOutputFormat.setOutputCompressorClass(job,

GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job,

CompressionType.BLOCK);System.exit(job.waitForCompletion(true)

1);}

先使用InputSampler.Sampler采样器对整个key空间进行采样得到key的近似分布

保存到key分布情况文件中

使用TotalOrderPartitioner利用上边的key分布情况文件进行分区每个分区的数据量近似从而防止数据倾斜

扩展阅读《Hadoop权威指南第4版》

描述MR的shuffle全流程面试谈谈什么是数据倾斜什么情况会造成数据倾斜面试对MR数据倾斜如何解决面试补充图解》》》》》》》》》》



SEO优化服务概述

作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。

百度官方合作伙伴 白帽SEO技术 数据驱动优化 效果长期稳定

SEO优化核心服务

网站技术SEO

  • 网站结构优化 - 提升网站爬虫可访问性
  • 页面速度优化 - 缩短加载时间,提高用户体验
  • 移动端适配 - 确保移动设备友好性
  • HTTPS安全协议 - 提升网站安全性与信任度
  • 结构化数据标记 - 增强搜索结果显示效果

内容优化服务

  • 关键词研究与布局 - 精准定位目标关键词
  • 高质量内容创作 - 原创、专业、有价值的内容
  • Meta标签优化 - 提升点击率和相关性
  • 内容更新策略 - 保持网站内容新鲜度
  • 多媒体内容优化 - 图片、视频SEO优化

外链建设策略

  • 高质量外链获取 - 权威网站链接建设
  • 品牌提及监控 - 追踪品牌在线曝光
  • 行业目录提交 - 提升网站基础权威
  • 社交媒体整合 - 增强内容传播力
  • 链接质量分析 - 避免低质量链接风险

SEO服务方案对比

服务项目 基础套餐 标准套餐 高级定制
关键词优化数量 10-20个核心词 30-50个核心词+长尾词 80-150个全方位覆盖
内容优化 基础页面优化 全站内容优化+每月5篇原创 个性化内容策略+每月15篇原创
技术SEO 基本技术检查 全面技术优化+移动适配 深度技术重构+性能优化
外链建设 每月5-10条 每月20-30条高质量外链 每月50+条多渠道外链
数据报告 月度基础报告 双周详细报告+分析 每周深度报告+策略调整
效果保障 3-6个月见效 2-4个月见效 1-3个月快速见效

SEO优化实施流程

我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:

1

网站诊断分析

全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。

2

关键词策略制定

基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。

3

技术优化实施

解决网站技术问题,优化网站结构,提升页面速度和移动端体验。

4

内容优化建设

创作高质量原创内容,优化现有页面,建立内容更新机制。

5

外链建设推广

获取高质量外部链接,建立品牌在线影响力,提升网站权威度。

6

数据监控调整

持续监控排名、流量和转化数据,根据效果调整优化策略。

SEO优化常见问题

SEO优化一般需要多长时间才能看到效果?
SEO是一个渐进的过程,通常需要3-6个月才能看到明显效果。具体时间取决于网站现状、竞争程度和优化强度。我们的标准套餐一般在2-4个月内开始显现效果,高级定制方案可能在1-3个月内就能看到初步成果。
你们使用白帽SEO技术还是黑帽技术?
我们始终坚持使用白帽SEO技术,遵循搜索引擎的官方指南。我们的优化策略注重长期效果和可持续性,绝不使用任何可能导致网站被惩罚的违规手段。作为百度官方合作伙伴,我们承诺提供安全、合规的SEO服务。
SEO优化后效果能持续多久?
通过我们的白帽SEO策略获得的排名和流量具有长期稳定性。一旦网站达到理想排名,只需适当的维护和更新,效果可以持续数年。我们提供优化后维护服务,确保您的网站长期保持竞争优势。
你们提供SEO优化效果保障吗?
我们提供基于数据的SEO效果承诺。根据服务套餐不同,我们承诺在约定时间内将核心关键词优化到指定排名位置,或实现约定的自然流量增长目标。所有承诺都会在服务合同中明确约定,并提供详细的KPI衡量标准。

SEO优化效果数据

基于我们服务的客户数据统计,平均优化效果如下:

+85%
自然搜索流量提升
+120%
关键词排名数量
+60%
网站转化率提升
3-6月
平均见效周期

行业案例 - 制造业

  • 优化前:日均自然流量120,核心词无排名
  • 优化6个月后:日均自然流量950,15个核心词首页排名
  • 效果提升:流量增长692%,询盘量增加320%

行业案例 - 电商

  • 优化前:月均自然订单50单,转化率1.2%
  • 优化4个月后:月均自然订单210单,转化率2.8%
  • 效果提升:订单增长320%,转化率提升133%

行业案例 - 教育

  • 优化前:月均咨询量35个,主要依赖付费广告
  • 优化5个月后:月均咨询量180个,自然流量占比65%
  • 效果提升:咨询量增长414%,营销成本降低57%

为什么选择我们的SEO服务

专业团队

  • 10年以上SEO经验专家带队
  • 百度、Google认证工程师
  • 内容创作、技术开发、数据分析多领域团队
  • 持续培训保持技术领先

数据驱动

  • 自主研发SEO分析工具
  • 实时排名监控系统
  • 竞争对手深度分析
  • 效果可视化报告

透明合作

  • 清晰的服务内容和价格
  • 定期进展汇报和沟通
  • 效果数据实时可查
  • 灵活的合同条款

我们的SEO服务理念

我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。

提交需求或反馈

Demand feedback