Hadoop: Javadoc and cleanup in CephInputStream.

This commit is contained in:
Greg Farnum 2009-08-17 17:15:01 -07:00
parent 7dca3af80c
commit fea91de212
2 changed files with 73 additions and 57 deletions

View File

@ -161,7 +161,7 @@ public class CephFileSystem extends FileSystem {
/**
* Get an FSDataOutputStream to append onto a file.
* @param file The File you want to append onto
* @param bufferSize The size of the buffer to use in flushing writes
* @param bufferSize Ceph does internal buffering; this is ignored.
* @param progress The Progressable to report progress to.
* Reporting is limited but exists.
* @return An FSDataOutputStream that connects to the file on Ceph.
@ -444,7 +444,7 @@ public class CephFileSystem extends FileSystem {
* @param path The file to create.
* @param permission The permissions to apply to the file.
* @param overwrite If true, overwrite any existing file with this name.
* @param bufferSize The size of the write buffer in the returned OutputStream.
* @param bufferSize Ceph does internal buffering; this is ignored.
* @param replication Ignored by Ceph. This can be configured via Ceph configuration.
* @param blockSize Ignored by Ceph.
* @param progress A Progressable to report back to. Reporting is limited but exists.
@ -513,7 +513,7 @@ public class CephFileSystem extends FileSystem {
/**
* Open a Ceph file and attach the file handle to an FSDataInputStream.
* @param path The file to open
* @param bufferSize the size of the read buffer in the returned FSDataInputStream.
* @param bufferSize Ceph does internal buffering; this is ignored.
* @return FSDataInputStream reading from the given path.
* @throws IOException if initialize() hasn't been called, the path DNE or is a
* directory, or there is an error getting data to set up the FSDataInputStream.

View File

@ -14,6 +14,12 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
/**
* <p>
* An {@link FSInputStream} for a CephFileSystem and corresponding
* Ceph instance.
*/
class CephInputStream extends FSInputStream {
private int bufferSize;
@ -39,19 +45,13 @@ class CephInputStream extends FSInputStream {
private native long ceph_getpos(int fh);
private native int ceph_close(int fh);
/*
public S3InputStream(Configuration conf, FileSystemStore store,
INode inode) {
this.store = store;
this.blocks = inode.getBlocks();
for (Block block : blocks) {
this.fileLength += block.getLength();
}
this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
}
*/
/**
* Create a new CephInputStream.
* @param conf The system configuration. Unused.
* @param fh The filehandle provided by Ceph to reference.
* @param flength The current length of the file. If the length changes
* you will need to close and re-open it to access the new data.
*/
public CephInputStream(Configuration conf, int fh, long flength) {
System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
@ -59,14 +59,11 @@ class CephInputStream extends FSInputStream {
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
//System.out.println("CephInputStream constructor: initializing stream with fh "
// + fh + " and file length " + flength);
debug("CephInputStream constructor: initializing stream with fh "
+ fh + " and file length " + flength);
// TODO: Then what do we need from the config? The buffer size maybe?
// Anything? Bueller?
}
//Ceph requires that things be closed before it can shutdown,
//Ceph likes things to be closed before it shuts down,
//so closing the IOStream stuff voluntarily is good
public void finalize () throws Throwable {
try {
@ -80,13 +77,13 @@ class CephInputStream extends FSInputStream {
}
@Override
public synchronized int available() throws IOException {
public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}
public synchronized void seek(long targetPos) throws IOException {
//System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
// " on fd " + fileHandle);
debug("CephInputStream.seek: Seeking to position " + targetPos +
" on fd " + fileHandle);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
" on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
@ -94,19 +91,25 @@ class CephInputStream extends FSInputStream {
ceph_seek_from_start(fileHandle, targetPos);
}
//failovers are handled by the Ceph code at a very low level;
//if there are issues that can be solved by changing sources
//they'll be dealt with before anybody even tries to call this method!
/**
* Failovers are handled by the Ceph code at a very low level;
* if there are issues that can be solved by changing sources
* they'll be dealt with before anybody even tries to call this method!
* @return false.
*/
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
// reads a byte
/**
* Read a byte from the file.
* @return the next byte.
*/
@Override
public synchronized int read() throws IOException {
//System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
// + " by calling general read function");
public synchronized int read() throws IOException {
debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ " by calling general read function");
byte result[] = new byte[1];
if (getPos() >= fileLength) return -1;
@ -114,10 +117,16 @@ class CephInputStream extends FSInputStream {
return result[0];
}
/**
* Read a specified number of bytes into a byte[] from the file.
* @param buf[] the byte array to read into.
* @param off the offset to start at in the file
* @param len the number of bytes to read
* @return 0 if successful, otherwise an error code.
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
//System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
public synchronized int read(byte buf[], int off, int len) throws IOException {
debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
throw new IOException("CephInputStream.read: cannot read " + len +
@ -126,40 +135,38 @@ class CephInputStream extends FSInputStream {
if (null == buf) {
throw new NullPointerException("Read buffer is null");
}
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+ "read length is " + len + ", buffer offset is "
+ off +", and buffer size is " + buf.length);
+ "read length is " + len + ", buffer offset is "
+ off +", and buffer size is " + buf.length);
}
// ensure we're not past the end of the file
if (getPos() >= fileLength)
{
System.out.println("CephInputStream.read: cannot read " + len +
debug("CephInputStream.read: cannot read " + len +
" bytes from fd " + fileHandle + ": current position is " +
getPos() + " and file length is " + fileLength);
return -1;
}
// actually do the read
int result = ceph_read(fileHandle, buf, off, len);
if (result < 0)
System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
+ fileHandle + " failed.");
else {}
// System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
// + fileHandle + ": succeeded in reading " + result + " bytes");
debug("CephInputStream.read: Reading " + len
+ " bytes from fd " + fileHandle + " failed.");
debug("CephInputStream.read: Reading " + len + " bytes from fd "
+ fileHandle + ": succeeded in reading " + result + " bytes");
return result;
}
}
/**
* Close the CephInputStream and release the associated filehandle.
*/
@Override
public void close() throws IOException {
public void close() throws IOException {
debug("CephOutputStream.close:enter");
if (closed) {
throw new IOException("Stream closed");
@ -174,20 +181,29 @@ class CephInputStream extends FSInputStream {
}
/**
* We don't support marks.
* Marks are not supported.
* @return false
*/
@Override
public boolean markSupported() {
public boolean markSupported() {
return false;
}
/**
* Since marking isn't supported, this function throws an IOException.
* @throws IOException whenever called.
*/
@Override
public void mark(int readLimit) {
// Do nothing
public void mark(int readLimit) {
throw new IOException("Mark not supported");
}
/**
* Since marks aren't supported, this function throws an IOException.
* @throws IOException whenever called.
*/
@Override
public void reset() throws IOException {
public void reset() throws IOException {
throw new IOException("Mark not supported");
}