hadoop第四阶段

hadoop中的序列化机制

  • 序列化:把对象转换为字节序列的过程
  • 反序列化:把字节序列恢复为对象的过程

jdk自带的序列化会把要序列化的接口的所有继承类给序列化过去,hadoop则不会。

自定义统计

  • map

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制
    * 就必须实现hadoop相应的序列化接口
    * @author duanhaitao@itcast.cn
    */
    public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //拿一行数据
    String line = value.toString();
    //切分成各个字段
    String[] fields = StringUtils.split(line, "\t");
    //拿到我们需要的字段
    String phoneNB = fields[1];
    long u_flow = Long.parseLong(fields[7]);
    long d_flow = Long.parseLong(fields[8]);
    //封装数据为kv并输出
    context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
    }
    }
  • reduce

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    //框架每传递一组数据<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>调用一次我们的reduce方法
    //reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,Context context)
    throws IOException, InterruptedException {
    long up_flow_counter = 0;
    long d_flow_counter = 0;
    for(FlowBean bean : values){
    up_flow_counter += bean.getUp_flow();
    d_flow_counter += bean.getD_flow();
    }
    context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
    }
    }
  • job

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    //这是job描述和提交类的规范写法
    public class FlowSumRunner extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(FlowSumRunner.class);
    job.setMapperClass(FlowSumMapper.class);
    job.setReducerClass(FlowSumReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true)?0:1;
    }
    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
    System.exit(res);
    }
    }
  • bean

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    public class FlowBean implements WritableComparable<FlowBean>{
    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;
    //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
    public FlowBean(){}
    //为了对象数据的初始化方便,加入一个带参的构造函数
    public FlowBean(String phoneNB, long up_flow, long d_flow) {
    this.phoneNB = phoneNB;
    this.up_flow = up_flow;
    this.d_flow = d_flow;
    this.s_flow = up_flow + d_flow;
    }
    public String getPhoneNB() {
    return phoneNB;
    }
    public void setPhoneNB(String phoneNB) {
    this.phoneNB = phoneNB;
    }
    public long getUp_flow() {
    return up_flow;
    }
    public void setUp_flow(long up_flow) {
    this.up_flow = up_flow;
    }
    public long getD_flow() {
    return d_flow;
    }
    public void setD_flow(long d_flow) {
    this.d_flow = d_flow;
    }
    public long getS_flow() {
    return s_flow;
    }
    public void setS_flow(long s_flow) {
    this.s_flow = s_flow;
    }
    //将对象数据序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(phoneNB);
    out.writeLong(up_flow);
    out.writeLong(d_flow);
    out.writeLong(s_flow);
    }
    //从数据流中反序列出对象的数据
    //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
    phoneNB = in.readUTF();
    up_flow = in.readLong();
    d_flow = in.readLong();
    s_flow = in.readLong();
    }
    @Override
    public String toString() {
    return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
    }
    @Override
    public int compareTo(FlowBean o) {
    return s_flow>o.getS_flow()?-1:1;
    }
    }

shuffle机制

map task的并发数是由切片的数量决定的,有多少切片,就启动多少map task
切片是一个逻辑的概念,指的就是文件中的数据的偏移量范围
切片的具体大小应该根据所处理的文件的大小来调整

hadoop 计算框架shuffle

Shuffle过程,也称Copy阶段。reduce task从各个map task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定的阀值,则写到磁盘上,否则直接放到内存中。

官方的Shuffle过程如上图所示,不过细节有错乱,官方图并没有说明partition、sort和combiner具体作用于哪个阶段。

注意:Shuffle过程是贯穿于map和reduce两个过程的!

Hadoop的集群环境,大部分的map task和reduce task是执行在不同的节点上的,那么reduce就要取map的输出结果。那么集群中运行多个Job时,task的正常执行会对集群内部的网络资源消耗严重。虽说这种消耗是正常的,是不可避免的,但是,我们可以采取措施尽可能的减少不必要的网络资源消耗。另一方面,每个节点的内部,相比于内存,磁盘IO对Job完成时间的影响相当的大。

  • 所以:从以上分析,shuffle过程的基本要求:
    1.完整地从map task端拉取数据到reduce task端
    2.在拉取数据的过程中,尽可能地减少网络资源的消耗
    3.尽可能地减少磁盘IO对task执行效率的影响

  • 那么,Shuffle的设计目的就要满足以下条件:
    1.保证拉取数据的完整性
    2.尽可能地减少拉取数据的数据量
    3.尽可能地使用节点的内存而不是磁盘

张冲 wechat
欢迎扫一扫上面的微信关注我,一起交流!
坚持原创技术分享,您的支持将鼓励我继续创,点击打赏!