Hadoop源码剖析06-Snappy压缩算法

什么是snappy?

snappy的前身是Zippy,Google表示该算法库对性能做了调整,针对64位x86处理器进行了优化
并在英特尔酷睿i7处理器单一核心上实现了至少每秒250MB的压缩性能和每秒500MB的解压缩性能,使用 New BSD协议开源

在这里插入图片描述

setInput()方法

setlnputO方法为压缩器提供数据,在做了一番输入数据的合法性检査后,先将finished
标志位置为false,并尝试将输入数据复制到内部缓冲区中。如果内部缓存器剩余空间不
够大,那么,压缩器将“借用”输入数据对应的缓冲区,即利用userBuf. userBufOff和
userBufLen记录输入的数据。否则,setlnput()复制数据到uncompressedDirectBuf中•

 /***设置输入数据进行压缩。*每当#needsInput()返回时,应调用此方法*  true 表示需要更多输入数据。** @param b   输入数据* @param off 开始偏移* @param len 长度*/@Overridepublic void setInput(byte[] b, int off, int len) {if (b == null) {throw new NullPointerException();}if (off < 0 || len < 0 || off > b.length - len) {throw new ArrayIndexOutOfBoundsException();}finished = false;if (len > uncompressedDirectBuf.remaining()) {// 借用外部缓冲区; 这时候 needsInput 为falsethis.userBuf = b;this.userBufOff = off;this.userBufLen = len;} else {((ByteBuffer) uncompressedDirectBuf).put(b, off, len);uncompressedDirectBufLen = uncompressedDirectBuf.position();}bytesRead += len;}

needinput()

needsInputO方法返回fMse有三种情况g输出缓冲区(即保持压缩结果的缓冲区)有未
读取的数据、输入缓冲区没有空间,以及压缩器已经借用外部缓冲区.这时,用户需要通过
compressO方法取走已经压缩的数据,直到needslnput()返回true,才可再次通过setlnput()
方法添加待压缩数据

  /***如果输入数据缓冲区为空且返回true,则返回true*应该调用#setInput()以提供更多输入。** @return true如果输入数据缓冲区为空,并且*应该调用#setInput()以便提供更多输入。*/@Overridepublic boolean needsInput() {return !(compressedDirectBuf.remaining() > 0|| uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);}

compress()

compress()方法用于获取压缩后的数据,它需要处理needsInputO返回felse的几种情况。
如果压缩数据缓冲区有数据,即compressedDirectBuf中还有数据,则读取这部分数据,
并返回。
如果该缓冲区为空,则需要压缩数据。首先清理compressedDirectBuf,这个清理(即
clear()调用和limitO调用)是一个典型的Buffer操作,待压缩的数据有两个来源,输入缓冲区uncompressedDirectBuf或者“借用”的数据缓冲区.
如果输入绶冲区没有数据,那待压缩数据可能(可以在没有任何带压缩数据的情况下调
用compressO方法)在“借用”的数据缓冲区里,这时使用setInputFromSavedData()方法复
制"借用”数据缓冲区中的数据到uncompressedDirectBuf中。setInputFromSavedData()函数
调用结束后,待压缩数据缓冲区里还没有数据,则设K finished标记位,并返回0,表明压
编数据已经读完。
uncompressedDiiectBuf中的数据,利用前面已经介绍过的native方法compressBytesDirectO
进行压缩,压缩后的数据保存在compressedDirectBuf中。由于待压缩数据缓冲区和压缩
数据缓冲区的大小是一样的,所以uncompressedDirectBuf中的数据是一次被处理完的。
compressBytesDirect()调用结束后,需要再次设置缓冲区的标记,并根据情况复制数据到
compressO的参数b提供的缓冲区中。相关代码如下:

    /*** 用压缩数据填充指定的缓冲区。返回实际数字*压缩数据的字节数。返回值为0表示*应该调用needsInput()以确定是否有更多输入*数据为必填项。** @param b   压缩数据的缓冲区* @param off 数据的起始偏移量* @param len 缓冲区大小* @return 压缩数据的实际字节数。*/@Overridepublic int compress(byte[] b, int off, int len)throws IOException {if (b == null) {throw new NullPointerException();}if (off < 0 || len < 0 || off > b.length - len) {throw new ArrayIndexOutOfBoundsException();}// 检查是否有压缩数据int n = compressedDirectBuf.remaining();if (n > 0) {n = Math.min(n, len);((ByteBuffer) compressedDirectBuf).get(b, off, n);bytesWritten += n;return n;}// 重新初始化snappy的输出直接缓冲区compressedDirectBuf.clear();compressedDirectBuf.limit(0);if (0 == uncompressedDirectBuf.position()) {// 没有压缩的数据,所以我们应该有!needsInput或!finishedsetInputFromSavedData();if (0 == uncompressedDirectBuf.position()) {// 没有数据调用;什么都不写finished = true;return 0;}}//压缩资料n = compressBytesDirect();compressedDirectBuf.limit(n);uncompressedDirectBuf.clear(); // 快照消耗所有缓冲区输入// 如果snapy已经消耗了所有用户数据,则设置为“完成”if (0 == userBufLen) {finished = true;}// 获取最多“ len”个字节n = Math.min(n, len);bytesWritten += n;((ByteBuffer) compressedDirectBuf).get(b, off, n);return n;}

finshed()

最后要分析的成员函数是finished。。如图3・8所示,finished。返回true,表明压缩过程
已经结束,压缩过程结束其实包含了多个条件,包括finish标志位和finished标志位必须都
为true,以及compressedDirectBuf中没有未取走的数据。其中finish为true,表示用户确认
已经完成数据的输入过程,finished表明压缩器中没有待压缩的数据,这三个条件缺一不可.
相关代码如下:

/***如果压缩结束则返回true*已达到数据输出流。** @return  true 如果压缩结束*已达到数据输出流。*/@Overridepublic boolean finished() {// 检查是否所有未压缩的数据都已消耗return (finish && finished && compressedDirectBuf.remaining() == 0);}

序列化和压缩小结:

  • 涉及了 org.apache.hadoop.io包下最重要的两部分内容:序列化和压缩。
  • 序列化广泛应用于分布式数据处理中,是交换数据必备的能力。Hadoop没有使用Java
    内建的序列化机制,而是引入了紧凑、快速、轻便和可扩展的Writable接口。Writable接
    口通过write()和readFields()方法声明了序列化和反序列化的功能。在此基础上,分析了
    Writable的一些典型子类的实现,包括Java基本类型对应的Writable封装和Object Writable,
    它们为用户使用Hadoop提供了很多方便。
  • 压缩是org.apache.hadoop.io包中实现的另一个重要功能,Hadoop必须支持多种压缩算
    法,如何灵活地支持这些算法呢? Hadoop实现了压缩框架,包括编码/解码器及其工厂、
    压缩器/解压器以及压缩流/解压缩流三种组件,它们相互配合满足了用户对压缩功能的需
    求。最后,以Cloudera发行版中Snappy压缩功能的实现为例,介绍了在压缩框架里如何集
    成新的压缩算法并支持本地库,以提高压缩效率。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章