hadoop第二阶段

hdfs

NameNode工作机制

  • hdfs写文件的过程,以及怎么保证元数据不丢失
    • 首先向namenode申请上传文件,nameNode中的元数据返回要分配的DataNode有哪些
    • 然后客户端会分别把要存的block写入datanode,只要要存的block存入成功就会返回成功信息
    • datanode之间在形成管道,来写入block的副本,如果副本写入错误,datanode则会自动在别的datanode中重新写一次
    • hdfs中有一个edits log日志文件来记录block存在哪些datanode中,当上传文件时nameNode首先往edits log文件中记录操作日志,当存入完成之后会把这些信息存入内存中。
    • 每当editslog写满时,需要将这一段时间的新的元数据刷到fsimage也就是磁盘文件中去,这一步是由SecondaryNameNode来操作的
      • 首先editlogs满了会通知secondaryNameNode进行checkpoint操作,然后namenode会停止往editlogs中写数据并切换到一个新的edit new,在后面会重新命名为edits。
      • 然后secondaryNameNode会下载nameNode中的fsimage跟editlogs中的文件,进行合并生成一个新的fsimage.checkpoint文件
      • 在把这个文件上传给NameNode替换掉以前的fsimage文件,一切重新开始。

  • 什么时候checkpoint?

    • fs.checkpoint.period 指定两次checkpoint的最大时间间隔,默认3600秒
    • fs.checkpoint.size 规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否达到最大时间间隔,默认大小是64M。
  • 但是还有一个问题,在进行checkpoint时突然宕机了,再次重启虽然可以保证元数据的不丢失,但是在这个同时还会有应用在跟hdfs进行交互,这时怎么办。

    • 可以有两个NameNode这是就要涉及到hdfs的高可用性的知识了,在后边再介绍
  • 在写数据的时候其实有一个规则,在存的时候首先第一个block会存在namenode直接连的那个最近的datanode,它的副本会优先放在别的机架上,如果还有副本就会在本机架上在找一个datanode
  • namenode中的文件存在了data — > name —->current 中

dataNode

提供真实文件数据的存储服务

  • 文件块:最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.(dfs.block.size)
  • 不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间。

  • 存在datanode中的数据,可以直接找到他然后如果少于一个block并且是文件可以直接打开,如果分为了两个block可以把他们追加在一起,然后再解压还是能用的,追加cat 文件1 >> 文件2

java操作hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class HdfsUtil {
FileSystem fs = null;
@Before
public void init() throws Exception{
//读取classpath下的xxx-site.xml 配置文件,并解析其内容,封装到conf对象中
Configuration conf = new Configuration();
//也可以在代码中对conf中的配置信息进行手动设置,会覆盖掉配置文件中的读取的值,也就是可以写core-site.xml
conf.set("fs.defaultFS", "hdfs://weekend110:9000/");
//根据配置信息,去获取一个具体文件系统的客户端操作实例对象
fs = FileSystem.get(new URI("hdfs://weekend110:9000/"),conf,"hadoop");
}
/**
* 上传文件,比较底层的写法
在这里会有一个问题,就是身份问题,因为创建的hadoop身份是linux的用户,所以可以修改`右键--->run---->run configurations`在argument中的VM arguments中加一个`-DHADOOP_USER_NAME=bingbing(linux系统名字)。
也可以终极解决就是像before中配置的那样,在获得client的时候多加两个参数:fs = FileSystem.get(new URI("hdfs://weekend110:9000/"),conf,"hadoop");
*
* @throws Exception
*/
@Test
public void upload() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://weekend110:9000/");
FileSystem fs = FileSystem.get(conf);
Path dst = new Path("hdfs://weekend110:9000/aa/qingshu.txt");
FSDataOutputStream os = fs.create(dst);
FileInputStream is = new FileInputStream("c:/qingshu.txt");
IOUtils.copy(is, os);
}
/**
* 上传文件,封装好的写法
* @throws Exception
* @throws IOException
*/
@Test
public void upload2() throws Exception, IOException{
fs.copyFromLocalFile(new Path("c:/qingshu.txt"), new Path("hdfs://weekend110:9000/aaa/bbb/ccc/qingshu2.txt"));
}
/**
* 下载文件
* @throws Exception
* @throws IllegalArgumentException
*/
@Test
public void download() throws Exception {
fs.copyToLocalFile(new Path("hdfs://weekend110:9000/aa/qingshu2.txt"), new Path("c:/qingshu2.txt"));
}
/**
* 查看文件信息
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*
*/
@Test
public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// listFiles列出的是文件信息,而且提供递归遍历
RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true);
while(files.hasNext()){
LocatedFileStatus file = files.next();
Path filePath = file.getPath();
String fileName = filePath.getName();
System.out.println(fileName);
}
System.out.println("---------------------------------");
//listStatus 可以列出文件和文件夹的信息,但是不提供自带的递归遍历
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for(FileStatus status: listStatus){
String name = status.getPath().getName();
System.out.println(name + (status.isDirectory()?" is dir":" is file"));
}
}
/**
* 创建文件夹
* @throws Exception
* @throws IllegalArgumentException
*/
@Test
public void mkdir() throws IllegalArgumentException, Exception {
fs.mkdirs(new Path("/aaa/bbb/ccc"));
}
/**
* 删除文件或文件夹
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void rm() throws IllegalArgumentException, IOException {
fs.delete(new Path("/aa"), true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://weekend110:9000/");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
FileOutputStream os = new FileOutputStream("c:/jdk7.tgz");
IOUtils.copy(is, os);
}
}

RPC远程过程调用

  • 普通远程通信,tcp或soap

  • hadoop自己封装的一个远程调用协议就是RPC,效率特别高

java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//接口跟实现类
public interface LoginServiceInterface {
//必须有跟客户端的一样版本号
public static final long versionID=1L;
public String login(String username,String password);
}
public class LoginServiceImpl implements LoginServiceInterface {
@Override
public String login(String username, String password) {
return username + " logged in successfully!";
}
}
//服务端
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Builder builder = new RPC.Builder(new Configuration());
builder.setBindAddress("weekend110").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());
Server server = builder.build();
server.start();
}
DJLCNV
//客户端
public class LoginController {
public static void main(String[] args) throws Exception {
//1L是个长整型的版本号
LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("weekend110", 10000), new Configuration());
String result = proxy.login("mijie", "123456");
System.out.println(result);
}
}
张冲 wechat
欢迎扫一扫上面的微信关注我,一起交流!
坚持原创技术分享,您的支持将鼓励我继续创,点击打赏!