HDFS源码解析---读数据流程
打开文件
1、DistributedFileSystem.open
返回FSDataInputStream输入流
public FSDataInputStream open(Path f) throws IOException {return open(f, getConf().getInt("io.file.buffer.size", 4096));}
2、调用DFSClient.open,返回DFSInputStream
@Overridepublic FSDataInputStream open(Path f, final int bufferSize)throws IOException {statistics.incrementReadOps(1);Path absF = fixRelativePart(f);return new FileSystemLinkResolverWithStatistics(proxyParameter) {@Overridepublic FSDataInputStream doCall(final Path p)throws IOException, UnresolvedLinkException {final DFSInputStream dfsis =dfs.open(getPathName(p), bufferSize, verifyChecksum);return dfs.createWrappedInputStream(dfsis);}@Overridepublic FSDataInputStream next(final FileSystem fs, final Path p)throws IOException {return fs.open(p, bufferSize);}}.resolve(this, absF, OpType.OPEN, OpType.OPEN_SPAN, OpType.OPEN_EXC, OpType.OPEN_EXC_SPAN);}
/*** Create an input stream that obtains a nodelist from the* namenode, and then reads from all the right places. Creates* inner subclass of InputStream that does the right out-of-band* work.*/public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)throws IOException, UnresolvedLinkException {checkOpen();// Get block info from namenodeTraceScope scope = newPathTraceScope("newDFSInputStream", src);try {return new DFSInputStream(this, src, verifyChecksum);} finally {if (scope != null) scope.close();}}
3、构造DFSInputStream
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum) throws IOException, UnresolvedLinkException {
...
openInfo();
...
}
这里的openInfo()方法比较关键,从namenode拿到文件对应的块信息,赋值DFSInputStream的locatedBlocks属性
看一下openInfo()
/*** Grab the open-file info from namenode*/void openInfo() throws IOException, UnresolvedLinkException {synchronized(infoLock) {lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;while (retriesForLastBlockLength > 0) {// Getting last block length as -1 is a special case. When cluster// restarts, DNs may not report immediately. At this time partial block// locations will not be available with NN for getting the length. Lets// retry for 3 times to get the length.if (lastBlockBeingWrittenLength == -1) {DFSClient.LOG.warn("Last block locations not available. "+ "Datanodes might not have reported blocks completely."+ " Will retry for " + retriesForLastBlockLength + " times");waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();} else {break;}retriesForLastBlockLength--;}if (retriesForLastBlockLength == 0) {throw new IOException("Could not obtain the last block locations.");}}}
这里调用了 fetchLocatedBlocksAndGetLastBlockLength获取文件对应的blocks的位置信息,这个方法最关键的调用了DFSClient.getLocatedBlocks,获取blocks信息,LocatedBlocks类封装了一堆block信息,每一个通过LocatedBlock对象表达。可以看下LocatedBlock类,ExtendedBlock就是block本身,offset表示这个block在文件中的偏移,locs表示,这个block的所有副本分布。
public class LocatedBlock {private final ExtendedBlock b;private long offset; // offset of the first byte of the block in the fileprivate final DatanodeInfoWithStorage[] locs;...
}
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("newInfo = " + newInfo);}if (newInfo == null) {throw new IOException("Cannot open filename " + src);}if (locatedBlocks != null) {Iterator oldIter = locatedBlocks.getLocatedBlocks().iterator();Iterator newIter = newInfo.getLocatedBlocks().iterator();while (oldIter.hasNext() && newIter.hasNext()) {if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {throw new IOException("Blocklist for " + src + " has changed!");}}}locatedBlocks = newInfo;long lastBlockBeingWrittenLength = 0;if (!locatedBlocks.isLastBlockComplete()) {final LocatedBlock last = locatedBlocks.getLastLocatedBlock();if (last != null) {if (last.getLocations().length == 0) {if (last.getBlockSize() == 0) {// if the length is zero, then no data has been written to// datanode. So no need to wait for the locations.return 0;}return -1;}final long len = readBlockLength(last);last.getBlock().setNumBytes(len);lastBlockBeingWrittenLength = len; }}fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();return lastBlockBeingWrittenLength;}
至此,DFSInputStream构造完毕。
二、读文件
DFSInputStream实现了InputStream.read()方法。
DFSInputStream#read
@Overridepublic synchronized int read(final byte buf[], int off, int len)throws IOException {validatePositionedReadArgs(pos, buf, off, len);if (len == 0) {return 0;}ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);TraceScope scope =dfsClient.newReaderTraceScope("DFSInputStream#byteArrayRead", src,getPos(), len);try {final int retLen = readWithStrategy(byteArrayReader, off, len);if (retLen < len) {dfsClient.addRetLenToReaderScope(scope, retLen);}return retLen;} finally {scope.close();}}
readWithStrategy方法主要分为以下几步
1、blockSeekTo
获取保存下一个数据块的Datanode,构造BlockReader对象从该Datanode读取Block
private synchronized DNAddrPair blockSeekTo(long target) throws IOException {if (target >= getFileLength()) {throw new IOException("Attempted to read past end of file");}...while (true) {//// Compute desired block//// 找到这个blockLocatedBlock targetBlock = getBlockAt(target, true);...// 选一个DatanodeDNAddrPair retval = chooseDataNode(targetBlock);...//blockReader = new BlockReaderFactory(dfsClient.getConf()).setInetSocketAddress(targetAddr).setRemotePeerFactory(dfsClient).setDatanodeInfo(chosenNode).setStorageType(storageType).setFileName(src).setBlock(blk).setBlockToken(accessToken).setStartOffset(offsetIntoBlock).setVerifyChecksum(verifyChecksum).setClientName(dfsClient.clientName).setLength(blk.getNumBytes() - offsetIntoBlock).setCachingStrategy(curCachingStrategy).setAllowShortCircuitLocalReads(!shortCircuitForbidden).setClientCacheContext(dfsClient.getClientContext()).setUserGroupInformation(dfsClient.ugi).setConfiguration(dfsClient.getConfiguration()).setTracer(dfsClient.getTracer()).build();...
}
1、getBlockAt
获取该数据块对应的LocatedBlock对象,之前说过,LocatedBlock对象有一个属性private final DatanodeInfoWithStorage[] locs;这个数组包含了这个block的副本分布信息,并且按照客户端的远近排序。
2、chooseDataNode
选择一个合适的DataNode,这里就很简单了,遍历副本信息数组,找到第一个不在黑名单的DataNode
3、BlockReaderFactory.build
略
2、readBuffer
读入数据(未完待续)
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
