《Hadoop权威指南:大数据的存储与分析》—3.5.2 通过FileSystem API读取数据
3.5.2 通过FileSystem API读取数据
正如前一小节所解释的,有时根本不可能在应用中设置URLStreamHandlerFactory实例。在这种情况下,我们需要用FileSystem API来打开一个文件的输入流。
Hadoop文件系统中通过Hadoop Path对象(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt。
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有下面这几个静态工厂方法:
public static FileSystem get(Configuration conf) throws IOException
Public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)
throws IOException
Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如etc/hadoop/core-site.xml)。第一个方法返回的是默认文件系统(在core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户来访问文件系统,对安全来说是至关重要。详情可以参见10.4节。
在某些情况下,你可能希望获取本地文件系统的运行实例,此时你可以使用的getLocal()方法很方便地获取。
public static LocalFileSystem getLocal(Configuration conf) throws IOException
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
Public FSDataInputStream open(Path f) throws IOException
Public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
第一个方法使用默认的缓冲区大小4 KB。
最后,我们重写范例3-1,得到范例3-2。
范例3-2. 直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
程序运行结果如下:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
FSDataInputStream对象
实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。
package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable {
// implementation elided
}
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法:
public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
}
调用seek()来定位大于文件长度的位置会引发IOException异常。与java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。
范例3-3是对范例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次以流方式读取该文件并输出。
范例3-3. 使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0); // go back to the start of the file
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
在一个小文件上运行的结果如下:
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:
public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}
read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度。readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。
所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDataInputStrean并不是为并发访问设计的,因此最好为此新建多个实例),因此它们提供了在读取文件的主体时,访问文件其他部分(可能是元数据)的便利方法。
最后务必牢记,seek()方法是一个相对高开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(比如使用MapReduce),而非执行大量seek()方法。
- 点赞
- 收藏
- 关注作者
评论(0)