hadoop第三阶段

map reduce

当面临海量数据处理的时候,需要把数据源文件放入分布式文件系统中,这时这个简单的处理就会变得很复杂,这时 map reduce 就发生了作用

所以我们程序员的工作就是写一些业务逻辑,这个逻辑本身不具备特别多分布式的特点,但是他要符合mapReduce的逻辑规范。

  • map 局部处理
  • reduce 汇总

编程的基本规范

一个简单的统计单词个数的map reduce程序:

  • LongWritable 其实就是Long类型只不过实现了hadoop自己的序列化封装
  • Text就是hadoop实现了自己的序列化封装。

map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
//mapreduce框架每读一行数据就调用一次该方法
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中 key-value
//key 是这一行数据的起始偏移量value 是这一行的文本内容
//将这一行的内容转换成string类型
String line = value.toString();
//对这一行的文本按特定分隔符切分
String[] words = StringUtils.split(line, " ");
//遍历这个单词数组输出为kv形式 k:单词 v : 1 , 在这里不用考虑通信,hadoop把通信的框架封装到了context中了
for(String word : words){
context.write(new Text(word), new LongWritable(1));
}
}
}

reduce:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法
//<hello,{1,1,1,1,1,1.....}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long count = 0;
//遍历value的list,进行累加求和
for(LongWritable value:values){
count += value.get();
}
//输出这一个单词的统计结果
context.write(key, new LongWritable(count));
}
}

还需要一个类来描述整个的业务逻辑job描述类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 用来描述一个特定的作业
* 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
* 还可以指定该作业要处理的数据所在的路径
* 还可以指定改作业输出的结果放到哪个路径
* ....
* @author duanhaitao@itcast.cn
*/
public class WCRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
wcjob.setJarByClass(WCRunner.class);
//本job使用的mapper和reducer的类
wcjob.setMapperClass(WCMapper.class);
wcjob.setReducerClass(WCReducer.class);
//指定reduce的输出数据kv类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据kv类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(LongWritable.class);
//指定要处理的输入数据存放路径,hdfs的路径
FileInputFormat.setInputPaths(wcjob, new Path("/wc/srcdata/"));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output/"));
//将job提交给集群运行
wcjob.waitForCompletion(true);
}
}

运行

  • 上面的程序写好了之后可以打成一个jar包放在hadoop下运行
    • 首先运行hadoop的集群包括yarn
    • 然后在hadoop下创建输入目录和输出目录hadoop fs -mkdir /wc hadoop fs -mkdir /wc/srcdata
    • 创建一个文件然后把文件上传到输入目录hadoop fs -put words.log /wc/srcdata
    • 现在可以运行了hadoop jar wc.jar com.test.mr.WCRunner
  • 执行完了之后会在输出目录有两个文件,一个是空的表示执行成功了,另外一个是输出文件结果。

  • 还可以运行在本地模式,在window下运行的话,可以把路径设置成本地路径c:/wc/srcdata/,也可以是hafs路径hdfs://node1:9000/wc/srcdata/
    • 这里有一个问题就是本地向hdfs中写数据的话可能没有权限,所以要改一下权限hadoop fs -chmod -r 777 /wc

yarn任务调度

mapreduce和yarn的执行流程:

里面有很多的细节,也会有很多的进程,例如在运行上面写的程序时会产生一个Runjar,MRAppMaster,yarnChild..等进程可以通过jps查看

  • 在本地运行的时候,是在本地jvm中运行,如果想用yarn来进行任务调度,可以把两个配置文件加进来maper-site.xml yarn-site.xml

    • 还是会出现一个错误,因为任务调度找不到相应的jar包,所以可以在程序上面在加一个导入jar包conf.set("mapreduce.job.jar","wc.jar");,然后把项目打成jar然后放在项目下边。
    • 本地运行最好在linux中,因为window下的很多执行环境跟linux不一样,会报错
  • hadoop的8088端口可以看到yarn的运行信息。

MR程序的几种提交运行模式

本地模型运行

  • 在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行
    • 输入输出数据可以放在本地路径下(c:/wc/srcdata/)
    • 输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)
  • 在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行
    • 输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata/)
    • 输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)

集群模式运行

  • 将工程打成jar包,上传到服务器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
  • 在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取以下措施:
    • 在工程src目录下加入 mapred-site.xmlyarn-site.xml
    • 将工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数conf.set("mapreduce.job.jar","wc.jar")
  • 在windows的eclipse中直接运行main方法,也可以提交给集群中运行,但是因为平台不兼容,需要做很多的设置修改
    • 要在windows中存放一份hadoop的安装包(解压好的)
    • 要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件
    • 再要配置系统环境变量 HADOOP_HOME 和 PATH
    • 修改YarnRunner这个类的源码
张冲 wechat
欢迎扫一扫上面的微信关注我,一起交流!
坚持原创技术分享,您的支持将鼓励我继续创,点击打赏!