1、使用URL读取数据
import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IoUtils; public class readURL { static{ //使URL可以解析hdfs,这个方法在一个jvm中只能调用一次 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream in = null; try { in = new URL("hdfs://localhost:9000/user/readme.txt").openStream(); IoUtils.copyBytes(in, System.out, 4096,false);//第三个参数为复制缓冲区大小,第四个参数复制后是否关闭数据流 } catch (MalformedURLException e) { e.printstacktrace(); } catch (IOException e) { e.printstacktrace(); } finally { IoUtils.closeStream(in); } } }
上面程序中的static语句块如故不写就如出现以下错误:
这是因为URL并不能解析hdfs,必须通过设置setURLStreamHandlerFactory为FsUrlStreamHandlerFactory来是URL能够解析hdfs
2、使用FileSystem API读取数据
import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IoUtils; public class readFS { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/user/readme.txt"), conf); InputStream in =null; try{ in = fs.open(new Path("hdfs://localhost:9000/user/readme.txt")); IoUtils.copyBytes(in, System.out, 4096,false); }finally{ IoUtils.closeStream(in); } } }
import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IoUtils; public class readFS { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/user/readme.txt"), conf); FSDataInputStream in =null;//FSDataInputStream实现了Seekable接口允许在文件中定位 try{ in = fs.open(new Path("hdfs://localhost:9000/user/readme.txt")); IoUtils.copyBytes(in, System.out, 4096,false); in.seek(6); IoUtils.copyBytes(in, System.out, 4096,false); }finally{ IoUtils.closeStream(in); } } }
3、使用FileSystem API写入数据
import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IoUtils; public class writeFS { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/user/hello.txt"), conf); OutputStream out =fs.create(new Path("hdfs://localhost:9000/user/hello.txt")); InputStream in = new FileInputStream("/home/hello.txt"); IoUtils.copyBytes(in, out, 4096, true); } }
运行结果如下:
也可以使用FileSystem的append方法在一个已经存在的文件上追加内容(hadoop2.x版本才开始支持)
使用前要在conf目录下hdfs-site.xml文件中加入
<property> <name>dfs.support.append</name> <value>true</value> </property>
或者在程序中配置configuration
conf.set("dfs.support.append","true");