七、HDFS上传和下载原理(有源码解析)
[TOC]
创新互联建站是一家专注于网站制作、做网站与策划设计,浔阳网站建设哪家好?创新互联建站做网站,专注于网站建设十多年,网设计领域的专业建站公司;建站业务涵盖:浔阳等地区。浔阳做网站价格咨询:028-86922220
一、HDFS文件上传基本原理
1、基本流程
1)客户端通过本地通过RPC与namenode建立rpc通信,然后请求上传文件
2)namenode收到请求后,会检查是否能创建该文件(比如校验用户是否有该权限,文件是否已经存在等)。如果检查通过,namenode就会开始记录该新文件的元信息(先写入到edits文件,然后更新内存中的metadata),并响应client可以开始上传。
3)client 在本地将文件进行切块(按照指定的block大小)。然后请求namemode上传第一个block。
4)namenode根据策略以及每个datanode的情况,返回3个datanode地址给client(这里默认3副本)。
5)client与请求namenode返回的3个datanode建立pipeline,即 client请求dn1,dn1请求dn2,dn2请求dn3,这样一个串行通道。
6)3个datanode逐级响应,最终响应给client。表示可以传输数据
7)client会将每个block还会分割成一个个packet,然后放入 data queue中,等待上传。每传输一个packet,就会将packet加入到另外一个 ack queue中,等到pipeline中的datanode响应传输完成后,就会讲相应的packet从ack queue中移除。
8)后面就是重复上面的流程,直到client关闭通道,并将所有的queue中的packet刷写到pipeline中之后,datanode就会标记文件已完成。
注意:client完成写入之后,此时block 才是可见的,正在写的block是不可见的。当调用sync方法时(将缓冲区数据刷写到磁盘中),client才确认写入已经完成。client关闭流时调用 的close方法,底层就会调用sync。是否需要手动调用取决你根据程序需 要在数据健壮性和吞吐率之间的权衡。
2、datanode发生错误的解决方式
问题:传输过程中,某个datanode发生错误,hdfs是怎么解决?
1)pipeline关闭掉
2)为了防止丢包,ack queue中的packet会同步到data queue中。重新进行下一次传输。
3)把产生错误的datanode上当前在写,但未完成的block删除掉
4)剩下的block写到剩余两个正常的datanode中。
5)namenode会自动寻找另外合适的一个datanode复制另外两个datanode中刷写的block,完成3副本的写入。当然,这个操作namenode的内部机制,对client来说是无感知的。
3、元数据存储
namenode使用两种文件保存元数据,fsimag和edits文件。
fsimage:元数据镜像文件,存储某一时间段内的namenode的内存元数据信息
edits:操作日志文件。
fstime:保存最近一次checkpoint的时间。
更详细的 fsimage和edits文件讲解,请看 “hdfs体系架构”
4、元数据的合并
namenode所有的元数据信息从启动时就已经全部加载到内存中(为了提高查询性能),用于处理读请求的查询操作。到有写操作时,namenode会先向edits文件中写入操作日志,完成后才会修改内存中的metadata,这个主要是保证元数据已经存储到磁盘中不丢失。
hdfs内部维护的fsimage文件其实就是内存中的metadata的镜像,但是两者并不是实时一致的。fsimage的更新是通过合并edits来实现的。而这个合并操作是 secondaryNameNode完成的,主要流程如下:
1)首先是 SNN通知 NN切换edits文件,主要是保证合并过程有新的写入操作时能够正常写入edits文件。
2)SNN通过http请求从NN获取 fsimage和edits文件。
3)SNN将fsiamge载入内存,开始合并edits到fsimage,生成新的fsimage
4)SNN将新的fsimage发送给NN
5)NN用新的fsimage,替换旧的fsimage。
4、写入时的网络拓扑选择
写入操作时,默认3副本,那么副本分布在哪些datanode节点上,会影响写入速度。在hdfs的网络拓扑中,有那么四种物理范围,同一节点、同一机架上的不同节点、同一机房中不同节点、不同机房中的不同节点。这4中物理范围表示节点间的距离逐渐增大。这种物理距离越远会影响副本之间所在节点之间的传输效率,即传输效率越低。
5、机架感知
上面说到副本的选择的节点的位置会影响写效率,那么hdfs是如何选择节点位置的。
(1)旧版本的方式
路径是 r1/n1 --> r2/n1 --> r2/n2
(2)新版本方式
路径是 r1/n1 --> r1/n2 --> r2/n2(后面这个其实任意都行,主要处于不同机架就好)
这种方式比第一种要好,因为这种方式数据经过的总路径更短了,只要一个副本需要跨机架传输,而上面的则有两个副本需要跨机架传输。
二、HDFS上传文件源码分析
下面的分析过程基于 hadoop2.8.4 的源码分析的。
1、client初始化源码分析
一般来说,会先通过 FileSystem.get() 获取到操作hdfs 的客户端对象,后面所有的操作都通过调用该对象的方法完成的。
FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);
接着我们看看 FileSystem.get() 的实现
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) {
return get(conf);
} else {
if (scheme != null && authority == null) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
return get(defaultUri, conf);
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
/*
这里是关键代码,表示进入 CACHE.get() 方法
*/
return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
}
}
CACHE是FileSystem的一个静态内部类Cache 的对象。继续看看 CACHE.get()方法
FileSystem get(URI uri, Configuration conf) throws IOException {
FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
//进入CACHE对象的 getInternal() 方法
return this.getInternal(uri, conf, key);
}
进入CACHE对象的 getInternal() 方法
private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {
FileSystem fs;
synchronized(this) {
/*
获取map中的filesytem对象,表示之前已经初始化了filesystem对象,并存储到map集合中,现在直接从map中获取就好。
*/
fs = (FileSystem)this.map.get(key);
}
if (fs != null) {
//如果fs存在,就直接返回存在的filesytem实例即可
return fs;
} else {
//如果是初次使用filesystem,就得创建并初始化
fs = FileSystem.createFileSystem(uri, conf);
synchronized(this) {
FileSystem oldfs = (FileSystem)this.map.get(key);
if (oldfs != null) {
fs.close();
return oldfs;
} else {
if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10);
}
fs.key = key;
this.map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
this.toAutoClose.add(key);
}
return fs;
}
}
}
}
我们看到了上面有两种方式,一种是如果filesytem对象已存在,那么直接从map获取并返回对象即可。如果不存在,就调用 FileSystem.createFileSystem() 方法创建,创建完成后返回fs。下面看看这个方法.
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Tracer tracer = FsTracer.get(conf);
TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
scope.addKVAnnotation("scheme", uri.getScheme());
FileSystem var6;
try {
Class> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//这是关键性的代码,看名字就知道,对filesytem 进行初始化
fs.initialize(uri, conf);
var6 = fs;
} finally {
scope.close();
}
return var6;
}
我们要注意,FileSystem这个类是抽象类,它的实现子类是 DistributedFileSystem,所以虽然 fs是FileSystem类型的,但是对象本身是DistributedFileSystem类型的,也就是java 的多态特性。所以fs.initialize() 调用的实际上是 DistributedFileSystem中initialize()方法。下面看看这个方法
/*
DistributedFileSystem.class
*/
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
this.setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: " + uri);
} else {
this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
//这是关键性代码,创建了一个DFSClient对象,顾名思义就是RPC的客户端
this.dfs = new DFSClient(uri, conf, this.statistics);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = this.getHomeDirectory();
this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() {
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
}
看到上面创建了一个 DFSClient() 对象,赋值给了 this.dfs。下面看看这个类的构造方法。
/*
DFSClient.class
*/
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
.............................
/*源码比较长,所以截取重要的部分显示*/
//这是一个关键性变量,其实就是namenode代理对象,只不过还没有创建对象
ProxyAndInfo proxyInfo = null;
...............................
//下面开始创建namenode代理对象
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = (ClientProtocol)proxyInfo.getProxy();
} else if (rpcNamenode != null) {
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
this.dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null, "null URI");
//这里创建代理对象信息
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
//这里可以看到直接通过 proxyInfo.getProxy()获取namenode代理对象,并将引用赋值给 this.namenode。并且类型是 ClientProtocol 类型的。
this.namenode = (ClientProtocol)proxyInfo.getProxy();
}
/*下面省略一堆代码*/
}
可以看到上面已经通过 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 获取到了 namenode 的代理对象,也就是rpc的客户端对象。下面看看 ClientProtocol 这个是啥东西,因为代理对象是这个类型的。
/*
ClientProtocol.class
这是个接口
*/
public interface ClientProtocol {
long versionID = 69L;
/*
下面主要是定义很多个抽象方法,主要就是用于对hdfs进行操作的接口,比如,open,create等这些常用方法。
*/
}
下面看看 proxyInfo创建代理对象的方法
/*
NameNodeProxiesClient
*/
public static NameNodeProxiesClient.ProxyAndInfo createProxyWithClientProtocol(Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {
AbstractNNFailoverProxyProvider failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
//创建无HA的代理对象
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
Text dtService = SecurityUtil.buildTokenService(nnAddr);
//创建proxy对象
ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
//ProxyAndInfo是一个静态内部类,将前面的proxy通过该类封装后返回,可通过该类的 getProxy 方法返回已创建的proxy对象
return new NameNodeProxiesClient.ProxyAndInfo(proxy, dtService, nnAddr);
} else {
//创建有HA的代理对象
return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider);
}
}
可以看到上面是已经创建了 proxy对象,并返回,而且我们也可以看到,创建的proxy对象就是clientProtocol类型的。下面看看创建proxy对象的方法 createNonHAProxyWithClientProtocol()
/*
NameNodeProxiesClient
*/
public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//这里是核心代码,可以明显看到调用 RPC 模块中的方法创建proxy对象
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
if (withRetries) {
Map methodNameToPolicyMap = new HashMap();
ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
所以至此我们可以发现,客户端和namenode之间通信的方式就是通过RPC实现的。
总结来说,方法的调用时序图如下:
2、上传源码分析
一般来说,上传操作,首先得
OutputStream os = fs.create(new Path("xxxx"));
即创建文件,然后再上传文件数据。上传数据的流程和普通的流操作没什么区别。
下面看看这个 create方法。
/*
FileSystem.class
*/
public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;
可以看到这是个抽象方法,前面也说到,它的实现子类是 DistributedFileSystem,这里这里是调用子类的 create方法,继续看
/*
DistributedFileSystem.class
*/
public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
this.statistics.incrementWriteOps(1);
this.storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = this.fixRelativePart(f);
return (FSDataOutputStream)(new FileSystemLinkResolver() {
public FSDataOutputStream doCall(Path p) throws IOException {
//这里是核心代码,this.dfs前面说到了就是存储了DFSClient对象的引用的。可以通过该客户端调用很多操作hdfs的方法。这里调用create方法,创建了一个 DFSOutputStream 对象。输出流对象
DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);
//这里将上面创建的dfsos进行包装并返回
return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
}
public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
}
}).resolve(this, absF);
}
可以看见上面创建返回了 DFSOutputStream 输出流对象。下面看看DFSClient.create方法的实现代码。
/*
DFSClient.class
*/
public DFSOutputStream create(String src, FsPermission permission, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException {
this.checkOpen();
FsPermission masked = this.applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
//创建输出流对象
DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes));
this.beginFileLease(result.getFileId(), result);
return result;
}
继续看 DFSOutputStream.newStreamForCreate 这个方法.
/*
DistributedFileSystem.class
*/
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src);
Throwable var12 = null;
try {
HdfsFileStatus stat = null;
boolean shouldRetry = true;
int retryCount = 10;
while(true) {
if (shouldRetry) {
shouldRetry = false;
try {
//这里是核心代码,可以看见是调用 dfsclient.namenode这个代理对象中的create方法创建文件,并返回状态
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS);
} catch (RemoteException var27) {
IOException e = var27.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
if (e instanceof RetryStartFileException) {
if (retryCount <= 0) {
throw new IOException("Too many retries because of encryption zone operations", e);
}
shouldRetry = true;
--retryCount;
continue;
}
throw e;
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//这里将上面创建文件的状态传入输出流作为参数
DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
//看见一个神奇的方法
out.start();
DFSOutputStream var30 = out;
//返回输出流
return var30;
}
} catch (Throwable var28) {
var12 = var28;
throw var28;
} finally {
if (ignored != null) {
if (var12 != null) {
try {
ignored.close();
} catch (Throwable var26) {
var12.addSuppressed(var26);
}
} else {
ignored.close();
}
}
}
}
上面看到 DFSOutputStream 对象居然有一个 start方法,来看看先。
/*
DFSOutputStream.class
*/
protected synchronized void start() {
this.getStreamer().start();
}
// 继续看 this.getStreamer() 这个方法,可以看到这个方法返回的是DataStreamer,继续看这个类
protected DataStreamer getStreamer() {
return this.streamer;
}
/*
DataStreamer.class
*/
//可以看到这个类继承了 Daemon类,而Daemon本身是继承了 Thread类
class DataStreamer extends Daemon { }
由此可得知,DFSOutputStream 这个类本身并没有继承 Thread类,但是使用DataStreamer这个继承了 Thread类的来新建线程传输数据,不占用当前线程。而在 DataStreamer 这个类中,重写了 Thread标志性的 run 方法。传输数据就是在这里完成的。前面说到的 hdfs的 pipeline 也是这个run方法中实现的,里面是一个while死循环,知道传输完数据为止,或者客户端关闭。代码过长,就不看了。反正看到这里已经成功获取了 client的输出流对象,后面就是传统的输入流和输出流的对接了,这里不细讲了。
方法时序图如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信
2、调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所有是调用该类中的create()方法
3、DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法
4、DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回
5、通过hadoop提供的IOUtil工具类将输出流输出到本地
三、HDFS下载基本原理
1、基本流程
1)客户端向namenode请求下载文件,namenode在内存的metadata查找对应的文件的元数据,如果无则返回无,有则返回对应文件的block位置信息。而且,namenode会根据客户端所在的位置,根据datanode以及client之间的距离大小,将返回的 block 的副本的datanode节点从距离小到大排序,距离最近的datanode则排在第一位。
2)client通过机架感知策略,选择最近的datanode进行block请求读取
3)datanode开始传输数据给client,以packet为单位,并做校验
4)客户端接收packet之后,本地缓存,然后再往本地路径写入该block。
5)第二块,第三块block重复以上过程
注意:
如果在读数据的时候, DFSInputStream和datanode的通讯发生异常,就会尝试正在读的block的排序第二近的datanode,并且会记录哪个 datanode发生错误,剩余的blocks读的时候就会直接跳过该datanode。 DFSInputStream也会检查block数据校验和,如果发现一个坏的block,就会先报告到namenode节点,然后 DFSInputStream在其他的datanode上读该block的镜像。
四、HDFS下载源码分析
client的初始化代码是一样的,这里不重复分析了。直接看下载
首先通过 open方法获取目标文件的输入流对象。
FSDataInputStream fis = client.open(getPath);
下面看看这个open方法
/*
FileSystem.class
*/
public FSDataInputStream open(Path f) throws IOException {
return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096));
}
public abstract FSDataInputStream open(Path var1, int var2) throws IOException;
可以看到,依旧是抽象方法,所以依旧是调用 DistributedFileSystem的open方法。
/*
DistributedFileSystem.class
*/
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
this.statistics.incrementReadOps(1);
this.storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = this.fixRelativePart(f);
return (FSDataInputStream)(new FileSystemLinkResolver() {
public FSDataInputStream doCall(Path p) throws IOException {
//核心代码,这里调用dfsclient的open方法穿件输入流
DFSInputStream dfsis = DistributedFileSystem.this.dfs.open(DistributedFileSystem.this.getPathName(p), bufferSize, DistributedFileSystem.this.verifyChecksum);
return DistributedFileSystem.this.dfs.createWrappedInputStream(dfsis);
}
public FSDataInputStream next(FileSystem fs, Path p) throws IOException {
return fs.open(p, bufferSize);
}
}).resolve(this, absF);
}
熟悉的套路,依旧调用 dfsclient的open方法,创建输入流,下面看看这个open方法
/*
DFSClient.class
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException {
this.checkOpen();
TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src);
Throwable var5 = null;
DFSInputStream var6;
try {
//这里直接创建一个输入流对象,如果按照上面上传文件的套路来说,应该是 dfsclient.namenode.open(xxx)才对的,这里并没有
var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null);
} catch (Throwable var15) {
var5 = var15;
throw var15;
} finally {
if (ignored != null) {
if (var5 != null) {
try {
ignored.close();
} catch (Throwable var14) {
var5.addSuppressed(var14);
}
} else {
ignored.close();
}
}
}
return var6;
}
上面并没有调用DFSClient.open,而是将DFSClient作为参数传入DFSInputStream。下面看看 DFSInputStream 这个神奇的类。
/*
DFSInputStream.class
*/
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException {
//将dfsclinet保存到当前类中
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized(this.infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
this.locatedBlocks = locatedBlocks;
//核心方法,开始获取block信息,如有多少个block,以及每个block所在的datanode节点名
this.openInfo(false);
}
下面看看 openInfo() 方法
/*
DFSInputStream.class
*/
void openInfo(boolean refreshLocatedBlocks) throws IOException {
DfsClientConf conf = this.dfsClient.getConf();
synchronized(this.infoLock) {
//获取block的位置信息以及最后一个block的长度(因为最后一个block肯定不是完整的128M的长度)
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength;
for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
this.waitFor(conf.getRetryIntervalForGetLastBlockLength());
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true);
}
if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
下面看看 fetchLocatedBlocksAndGetLastBlockLength 这个获取block信息的方法
/*
DFSInputStream.class
*/
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
LocatedBlocks newInfo = this.locatedBlocks;
if (this.locatedBlocks == null || refresh) {
//可以看到这里是调用 dfsclient中的方法俩获取block信息
newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
}
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + this.src);
} else {
if (this.locatedBlocks != null) {
Iterator oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
Iterator newIter = newInfo.getLocatedBlocks().iterator();
while(oldIter.hasNext() && newIter.hasNext()) {
if (!((LocatedBlock)oldIter.next()).getBlock().equals(((LocatedBlock)newIter.next()).getBlock())) {
throw new IOException("Blocklist for " + this.src + " has changed!");
}
}
}
this.locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0L;
if (!this.locatedBlocks.isLastBlockComplete()) {
LocatedBlock last = this.locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0L) {
return 0L;
}
return -1L;
}
long len = this.readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
}
看到上面又回到调用 dfsClient.getLocatedBlocks,看看这个方法
/*
DFSClient.class
*/
public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException {
return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize());
}
//继续调用下面这个方法
public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
TraceScope ignored = this.newPathTraceScope("getBlockLocations", src);
Throwable var7 = null;
LocatedBlocks var8;
try {
//调用这个静态方法获取 block位置信息
var8 = callGetBlockLocations(this.namenode, src, start, length);
} catch (Throwable var17) {
var7 = var17;
throw var17;
} finally {
if (ignored != null) {
if (var7 != null) {
try {
ignored.close();
} catch (Throwable var16) {
var7.addSuppressed(var16);
}
} else {
ignored.close();
}
}
}
return var8;
}
//继续看
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException {
try {
//熟悉的味道,通过 namenode 的代理对象获取block信息
return namenode.getBlockLocations(src, start, length);
} catch (RemoteException var7) {
throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});
}
}
上面可以看到,仍旧是通过 namenode代理对象发起操作,下面看看 namenode.getBlockLocations。因为代理对象的类型是 ClientProtocol类型的,是个接口,所以得到实现子类中查看 ,ClientNamenodeProtocolTranslatorPB这个类。
/*
ClientNamenodeProtocolTranslatorPB.class
*/
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build();
try {
//熟悉的味道,调用 rcpProxy 向namenode server 发起操作。
GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req);
return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null;
} catch (ServiceException var8) {
throw ProtobufHelper.getRemoteException(var8);
}
}
看到这里,下面就是RPC底层的操作了。
方法时序图如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(与前面一样)
2、调用FileSystem的open()方法,由于实现类为DistributedFileSystem,所有是调用该类中的open()方法
3、DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的open()方法
4、实例化DFSInputStream输入流
5、调用openinfo()方法
6、调用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并获取最后block长度
7、调用DFSClient中的getLocatedBlocks()方法,获取block信息
8、在callGetBlockLocations()方法中通过NameNode代理对象调用NameNodeRpcServer的getBlockLocations()方法
9、将block信息写入输出流,在8中会将 block 位置信息保存到DFSInputStream输入流对象中的成员变量中
10、交给IOUtil,下载文件到本地
网站题目:七、HDFS上传和下载原理(有源码解析)
本文来源:http://azwzsj.com/article/psdggo.html