Merge remote branch 'nwatkins/for-master'

This commit is contained in:
Greg Farnum 2011-11-03 14:43:50 -07:00
commit 0df3f0365a
7 changed files with 193 additions and 423 deletions

View File

@ -22,33 +22,14 @@
*/
package org.apache.hadoop.fs.ceph;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
abstract class CephFS {
protected static final int FATAL = 0;
protected static final int ERROR = 1;
protected static final int WARN = 2;
protected static final int INFO = 3;
protected static final int DEBUG = 4;
protected static final int TRACE = 5;
protected static final int NOLOG = 6;
protected static final int ENOTDIR = 20;
protected static final int EEXIST = 17;
protected static final int ENOENT = 2;
private boolean debug = false;
private Log LOG;
public CephFS(Configuration conf, Log log) {
debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
LOG = log;
}
/*
* Performs any necessary setup to allow general use of the filesystem.
* Inputs:
@ -266,41 +247,4 @@ abstract class CephFS {
* Returns: the new position (as a long) of the filehandle on success,
* or a negative error code on failure. */
abstract protected long ceph_seek_from_start(int fh, long pos);
protected void debug(String statement, int priority) {
if (debug) {
System.err.println(statement);
}
switch (priority) {
case FATAL:
LOG.fatal(statement);
break;
case ERROR:
LOG.error(statement);
break;
case WARN:
LOG.warn(statement);
break;
case INFO:
LOG.info(statement);
break;
case DEBUG:
LOG.debug(statement);
break;
case TRACE:
LOG.trace(statement);
break;
case NOLOG:
break;
default:
break;
}
}
}

View File

@ -29,6 +29,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@ -40,7 +41,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
class CephFaker extends CephFS {
private static final Log LOG = LogFactory.getLog(CephFaker.class);
FileSystem localFS;
String localPrefix;
int blockSize;
@ -51,7 +52,6 @@ class CephFaker extends CephFS {
boolean initialized = false;
public CephFaker(Configuration con, Log log) {
super(con, log);
conf = con;
files = new Hashtable<Integer, Object>();
filenames = new Hashtable<Integer, String>();
@ -254,7 +254,7 @@ class CephFaker extends CephFS {
stream = localFS.open(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
debug("ceph_open_for_read fh:" + fileCount + ", pathname:" + path, INFO);
LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
return fileCount++;
} catch (IOException e) {}
return -1; // failure
@ -268,15 +268,14 @@ class CephFaker extends CephFS {
stream = localFS.create(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
debug("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path,
INFO);
LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
return fileCount++;
} catch (IOException e) {}
return -1; // failure
}
protected int ceph_close(int filehandle) {
debug("ceph_close(filehandle " + filehandle + ")", INFO);
LOG.info("ceph_close(filehandle " + filehandle + ")");
try {
((Closeable) files.get(new Integer(filehandle))).close();
if (null == files.get(new Integer(filehandle))) {
@ -285,10 +284,10 @@ class CephFaker extends CephFS {
}
return 0; // hurray, success
} catch (NullPointerException ne) {
debug("ceph_close caught NullPointerException!" + ne, WARN);
LOG.warn("ceph_close caught NullPointerException!" + ne);
} // err, how?
catch (IOException ie) {
debug("ceph_close caught IOException!" + ie, WARN);
LOG.warn("ceph_close caught IOException!" + ie);
}
return -1; // failure
}
@ -391,24 +390,23 @@ class CephFaker extends CephFS {
protected int ceph_write(int fh, byte[] buffer,
int buffer_offset, int length) {
debug(
LOG.info(
"ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
+ length,
INFO);
+ length);
long ret = -1; // generic fail
try {
FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
debug("ceph_write got outputstream", INFO);
LOG.info("ceph_write got outputstream");
long startPos = os.getPos();
os.write(buffer, buffer_offset, length);
ret = os.getPos() - startPos;
} catch (IOException e) {
debug("ceph_write caught IOException!", WARN);
LOG.warn("ceph_write caught IOException!");
} catch (NullPointerException f) {
debug("ceph_write caught NullPointerException!", WARN);
LOG.warn("ceph_write caught NullPointerException!");
}
return (int) ret;
}
@ -428,24 +426,23 @@ class CephFaker extends CephFS {
}
protected long ceph_seek_from_start(int fh, long pos) {
debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
long ret = -1; // generic fail
try {
debug("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)),
INFO);
LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
if (null == files.get(new Integer(fh))) {
debug("ceph_seek_from_start: is is null!", WARN);
LOG.warn("ceph_seek_from_start: is is null!");
}
FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
debug("ceph_seek_from_start retrieved is!", INFO);
LOG.info("ceph_seek_from_start retrieved is!");
is.seek(pos);
ret = is.getPos();
} catch (IOException e) {
debug("ceph_seek_from_start caught IOException!", WARN);
LOG.warn("ceph_seek_from_start caught IOException!");
} catch (NullPointerException f) {
debug("ceph_seek_from_start caught NullPointerException!", WARN);
LOG.warn("ceph_seek_from_start caught NullPointerException!");
}
return (int) ret;
}

View File

@ -28,6 +28,8 @@ import java.net.URI;
import java.util.EnumSet;
import java.lang.Math;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@ -63,16 +65,13 @@ import org.apache.hadoop.fs.FileStatus;
* from the respective Ceph system of at least that importance.
*/
public class CephFileSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
private URI uri;
private Path workingDir;
private final Path root;
private boolean initialized = false;
private CephFS ceph = null;
private boolean debug = false;
private String fs_default_name;
/**
* Create a new CephFileSystem.
*/
@ -85,11 +84,10 @@ public class CephFileSystem extends FileSystem {
* sets the given CephFS instead of defaulting to a
* CephTalker (with its assumed real Ceph instance to talk to).
*/
public CephFileSystem(CephFS ceph_fs, String default_path) {
public CephFileSystem(CephFS ceph_fs) {
super();
root = new Path("/");
ceph = ceph_fs;
fs_default_name = default_path;
}
/**
@ -97,10 +95,7 @@ public class CephFileSystem extends FileSystem {
* @return the URI.
*/
public URI getUri() {
if (!initialized) {
return null;
}
ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
LOG.debug("getUri:exit with return " + uri);
return uri;
}
@ -110,24 +105,16 @@ public class CephFileSystem extends FileSystem {
* Starts up the connection to Ceph, reads in configuraton options, etc.
* @param uri The URI for this filesystem.
* @param conf The Hadoop Configuration to retrieve properties from.
* @throws IOException if the Ceph client initialization fails
* or necessary properties are unset.
* @throws IOException if necessary properties are unset.
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
if (initialized) {
return;
}
super.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
statistics = getStatistics(uri.getScheme(), getClass());
if (ceph == null) {
ceph = new CephTalker(conf, LOG);
}
if (null == fs_default_name) {
fs_default_name = conf.get("fs.default.name");
}
// build up the arguments for Ceph
String arguments = "CephFSInterface";
@ -147,44 +134,37 @@ public class CephFileSystem extends FileSystem {
arguments += " --client-readahead-max-periods="
+ conf.get("fs.ceph.readahead", "1");
// make sure they gave us a ceph monitor address or conf file
ceph.debug("initialize:Ceph initialization arguments: " + arguments,
ceph.INFO);
LOG.info("initialize:Ceph initialization arguments: " + arguments);
if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
&& (arguments.indexOf("-c") == -1)) {
ceph.debug("initialize:You need to specify a Ceph monitor address.",
ceph.FATAL);
LOG.fatal("initialize:You need to specify a Ceph monitor address.");
throw new IOException(
"You must specify a Ceph monitor address or config file!");
}
// Initialize the client
if (!ceph.ceph_initializeClient(arguments,
conf.getInt("fs.ceph.blockSize", 1 << 26))) {
ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
LOG.fatal("initialize:Ceph initialization failed!");
throw new IOException("Ceph initialization failed!");
}
initialized = true;
ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
LOG.info("initialize:Ceph initialized client. Setting cwd to /");
ceph.ceph_setcwd("/");
ceph.debug("initialize:exit", ceph.DEBUG);
LOG.debug("initialize:exit");
this.workingDir = getHomeDirectory();
}
/**
* Close down the CephFileSystem. Runs the base-class close method
* and then kills the Ceph client itself.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public void close() throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("close:enter", ceph.DEBUG);
LOG.debug("close:enter");
super.close(); // this method does stuff, make sure it's run!
ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
LOG.trace("close: Calling ceph_kill_client from Java");
ceph.ceph_kill_client();
ceph.debug("close:exit", ceph.DEBUG);
LOG.debug("close:exit");
}
/**
@ -194,26 +174,20 @@ public class CephFileSystem extends FileSystem {
* @param progress The Progressable to report progress to.
* Reporting is limited but exists.
* @return An FSDataOutputStream that connects to the file on Ceph.
* @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
* @throws IOException If the file cannot be found or appended to.
*/
public FSDataOutputStream append(Path file, int bufferSize,
Progressable progress) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize,
ceph.DEBUG);
LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
Path abs_path = makeAbsolute(file);
if (progress != null) {
progress.progress();
}
ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
int fd = ceph.ceph_open_for_append(abs_path.toString());
LOG.trace("append: Entering ceph_open_for_append from Java");
int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
ceph.debug("append: Returned to Java", ceph.TRACE);
LOG.trace("append: Returned to Java");
if (progress != null) {
progress.progress();
}
@ -225,7 +199,7 @@ public class CephFileSystem extends FileSystem {
CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
bufferSize);
ceph.debug("append:exit", ceph.DEBUG);
LOG.debug("append:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
@ -234,39 +208,28 @@ public class CephFileSystem extends FileSystem {
* @return the directory Path
*/
public Path getWorkingDirectory() {
if (!initialized) {
return null;
}
ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
String cwd = ceph.ceph_getcwd();
ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
return new Path(fs_default_name + ceph.ceph_getcwd());
return workingDir;
}
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it. You need to have initialized the
* filesystem prior to calling this method.
* paths will be resolved relative to it.
*
* @param dir The directory to change to.
*/
@Override
public void setWorkingDirectory(Path dir) {
if (!initialized) {
return;
}
ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
Path abs_path = makeAbsolute(dir);
workingDir = makeAbsolute(dir);
}
ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
if (!ceph.ceph_setcwd(abs_path.toString())) {
ceph.debug(
"setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path "
+ abs_path,
ceph.WARN);
/**
* Return only the path component from a potentially fully qualified path.
*/
private String getCephPath(Path path) {
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
return path.toUri().getPath();
}
/**
@ -274,29 +237,22 @@ public class CephFileSystem extends FileSystem {
* Overriden because it's moderately faster than the generic implementation.
* @param path The file to check existence on.
* @return true if the file exists, false otherwise.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public boolean exists(Path path) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("exists:enter with path " + path, ceph.DEBUG);
LOG.debug("exists:enter with path " + path);
boolean result;
Path abs_path = makeAbsolute(path);
if (abs_path.equals(root)) {
result = true;
} else {
ceph.debug(
"exists:Calling ceph_exists from Java on path " + abs_path.toString(),
ceph.TRACE);
result = ceph.ceph_exists(abs_path.toString());
ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
LOG.trace(
"exists:Calling ceph_exists from Java on path " + abs_path.toString());
result = ceph.ceph_exists(getCephPath(abs_path));
LOG.trace("exists:Returned from ceph_exists to Java");
}
ceph.debug("exists:exit with value " + result, ceph.DEBUG);
LOG.debug("exists:exit with value " + result);
return result;
}
@ -306,32 +262,25 @@ public class CephFileSystem extends FileSystem {
* @param path The directory path to create
* @param perms The permissions to apply to the created directories.
* @return true if successful, false otherwise
* @throws IOException if initialize() hasn't been called or the path
* is a child of a file.
* @throws IOException if the path is a child of a file.
*/
@Override
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
LOG.debug("mkdirs:enter with path " + path);
Path abs_path = makeAbsolute(path);
ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
int result = ceph.ceph_mkdirs(abs_path.toString(), (int) perms.toShort());
LOG.trace("mkdirs:calling ceph_mkdirs from Java");
int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
if (result != 0) {
ceph.debug(
"mkdirs: make directory " + abs_path + "Failing with result " + result,
ceph.WARN);
if (ceph.ENOTDIR == result) {
LOG.warn(
"mkdirs: make directory " + abs_path + "Failing with result " + result);
if (-ceph.ENOTDIR == result) {
throw new IOException("Parent path is not a directory");
}
return false;
} else {
ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
LOG.debug("mkdirs:exiting succesfully");
return true;
}
}
@ -341,55 +290,20 @@ public class CephFileSystem extends FileSystem {
* generic implementation.
* @param path The path to check.
* @return true if the path is definitely a file, false otherwise.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public boolean isFile(Path path) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
LOG.debug("isFile:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = false;
} else {
ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
result = ceph.ceph_isfile(abs_path.toString());
LOG.trace("isFile:entering ceph_isfile from Java");
result = ceph.ceph_isfile(getCephPath(abs_path));
}
ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
return result;
}
/**
* Check if a path is a directory. This is moderately faster than
* the generic implementation.
* @param path The path to check
* @return true if the path is definitely a directory, false otherwise.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public boolean isDirectory(Path path) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = true;
} else {
ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
result = ceph.ceph_isdirectory(abs_path.toString());
ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
}
ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG);
LOG.debug("isFile:exit with result " + result);
return result;
}
@ -398,36 +312,30 @@ public class CephFileSystem extends FileSystem {
* Ceph's support for these is a bit different than HDFS'.
* @param path The path to stat.
* @return FileStatus object containing the stat information.
* @throws IOException if initialize() hasn't been called
* @throws FileNotFoundException if the path could not be resolved.
*/
public FileStatus getFileStatus(Path path) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
LOG.debug("getFileStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
// sadly, Ceph doesn't really do uids/gids just yet, but
// everything else is filled
FileStatus status;
Stat lstat = new Stat();
ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
if (ceph.ceph_stat(abs_path.toString(), lstat)) {
LOG.trace("getFileStatus: calling ceph_stat from Java");
if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
ceph.ceph_replication(abs_path.toString()), lstat.block_size,
ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
lstat.mod_time, lstat.access_time,
new FsPermission((short) lstat.mode), null, null,
new Path(fs_default_name + abs_path.toString()));
new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
path.makeQualified(this));
} else { // fail out
throw new FileNotFoundException(
"org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
+ " does not exist or could not be accessed");
}
ceph.debug("getFileStatus:exit", ceph.DEBUG);
LOG.debug("getFileStatus:exit");
return status;
}
@ -436,15 +344,9 @@ public class CephFileSystem extends FileSystem {
* @param path The directory to get listings from.
* @return FileStatus[] containing one FileStatus for each directory listing;
* null if path does not exist.
* @throws IOException if initialize() hasn't been called.
*/
public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("listStatus:enter with path " + path, ceph.WARN);
LOG.warn("listStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
@ -454,7 +356,7 @@ public class CephFileSystem extends FileSystem {
for (int i = 0; i < paths.length; ++i) {
statuses[i] = getFileStatus(paths[i]);
}
ceph.debug("listStatus:exit", ceph.DEBUG);
LOG.debug("listStatus:exit");
return statuses;
}
@ -467,19 +369,13 @@ public class CephFileSystem extends FileSystem {
@Override
public void setPermission(Path p, FsPermission permission) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug(
"setPermission:enter with path " + p + " and permissions " + permission,
ceph.DEBUG);
LOG.debug(
"setPermission:enter with path " + p + " and permissions " + permission);
Path abs_path = makeAbsolute(p);
ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
ceph.debug("setPermission:exit", ceph.DEBUG);
LOG.trace("setPermission:calling ceph_setpermission from Java");
ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
LOG.debug("setPermission:exit");
}
/**
@ -490,25 +386,19 @@ public class CephFileSystem extends FileSystem {
*/
@Override
public void setTimes(Path p, long mtime, long atime) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug(
"setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime,
ceph.DEBUG);
LOG.debug(
"setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
Path abs_path = makeAbsolute(p);
ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
LOG.trace("setTimes:calling ceph_setTimes from Java");
int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
if (r < 0) {
throw new IOException(
"Failed to set times on path " + abs_path.toString() + " Error code: "
+ r);
}
ceph.debug("setTimes:exit", ceph.DEBUG);
LOG.debug("setTimes:exit");
}
/**
@ -526,7 +416,7 @@ public class CephFileSystem extends FileSystem {
* @param progress A Progressable to report back to.
* Reporting is limited but exists.
* @return An FSDataOutputStream pointing to the created file.
* @throws IOException if initialize() hasn't been called, or the path is an
* @throws IOException if the path is an
* existing directory, or the path exists but overwrite is false, or there is a
* failure in attempting to open for append with Ceph.
*/
@ -537,12 +427,7 @@ public class CephFileSystem extends FileSystem {
short replication,
long blockSize,
Progressable progress) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("create:enter with path " + path, ceph.DEBUG);
LOG.debug("create:enter with path " + path);
Path abs_path = makeAbsolute(path);
if (progress != null) {
@ -557,7 +442,7 @@ public class CephFileSystem extends FileSystem {
boolean exists = exists(abs_path);
if (exists) {
if (isDirectory(abs_path)) {
if (getFileStatus(abs_path).isDir()) {
throw new IOException(
"create: Cannot overwrite existing directory \"" + path.toString()
+ "\" with a file");
@ -578,7 +463,7 @@ public class CephFileSystem extends FileSystem {
Path parent = abs_path.getParent();
if (parent != null) { // if parent is root, we're done
int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
if (!(r == 0 || r == -ceph.EEXIST)) {
throw new IOException("Error creating parent directory; code: " + r);
@ -589,15 +474,14 @@ public class CephFileSystem extends FileSystem {
}
}
// Step 3: open the file
ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
int fh = ceph.ceph_open_for_overwrite(abs_path.toString(),
LOG.trace("calling ceph_open_for_overwrite from Java");
int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
(int) permission.toShort());
if (progress != null) {
progress.progress();
}
ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh,
ceph.TRACE);
LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
if (fh < 0) {
throw new IOException(
"create: Open for overwrite failed on path \"" + path.toString()
@ -608,7 +492,7 @@ public class CephFileSystem extends FileSystem {
OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
bufferSize);
ceph.debug("create:exit", ceph.DEBUG);
LOG.debug("create:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
@ -618,19 +502,14 @@ public class CephFileSystem extends FileSystem {
* @param bufferSize Ceph does internal buffering; but you can buffer in
* the Java code too if you like.
* @return FSDataInputStream reading from the given path.
* @throws IOException if initialize() hasn't been called, the path DNE or is a
* @throws IOException if the path DNE or is a
* directory, or there is an error getting data to set up the FSDataInputStream.
*/
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("open:enter with path " + path, ceph.DEBUG);
LOG.debug("open:enter with path " + path);
Path abs_path = makeAbsolute(path);
int fh = ceph.ceph_open_for_read(abs_path.toString());
int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
if (fh < 0) { // uh-oh, something's bad!
if (fh == -ceph.ENOENT) { // well that was a stupid open
@ -642,7 +521,7 @@ public class CephFileSystem extends FileSystem {
}
}
if (isDirectory(abs_path)) { // yes, it is possible to open Ceph directories
if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
// but that doesn't mean you should in Hadoop!
ceph.ceph_close(fh);
throw new IOException(
@ -650,9 +529,9 @@ public class CephFileSystem extends FileSystem {
}
Stat lstat = new Stat();
ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
ceph.ceph_stat(abs_path.toString(), lstat);
ceph.debug("open:returned to Java", ceph.TRACE);
LOG.trace("open:calling ceph_stat from Java");
ceph.ceph_stat(getCephPath(abs_path), lstat);
LOG.trace("open:returned to Java");
long size = lstat.size;
if (size < 0) {
@ -663,7 +542,7 @@ public class CephFileSystem extends FileSystem {
FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
bufferSize);
ceph.debug("open:exit", ceph.DEBUG);
LOG.debug("open:exit");
return new FSDataInputStream(cephIStream);
}
@ -672,35 +551,32 @@ public class CephFileSystem extends FileSystem {
* @param src The current path of the file/directory
* @param dst The new name for the path.
* @return true if the rename succeeded, false otherwise.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
ceph.debug("calling ceph_rename from Java", ceph.TRACE);
boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
LOG.trace("calling ceph_rename from Java");
boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
if (!result) {
if (isDirectory(abs_dst)) { // move the srcdir into destdir
ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
boolean isDir = false;
try {
isDir = getFileStatus(abs_dst).isDir();
} catch (FileNotFoundException e) {}
if (isDir) { // move the srcdir into destdir
LOG.debug("ceph_rename failed but dst is a directory!");
Path new_dst = new Path(abs_dst, abs_src.getName());
result = rename(abs_src, new_dst);
ceph.debug(
LOG.debug(
"attempt to move " + abs_src.toString() + " to "
+ new_dst.toString() + "has result:" + result,
ceph.NOLOG);
+ new_dst.toString() + "has result:" + result);
}
}
ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
LOG.debug("rename:exit with result: " + result);
return result;
}
@ -716,73 +592,59 @@ public class CephFileSystem extends FileSystem {
* @param len The amount of the file past the offset you are interested in.
* @return A BlockLocation[] where each object corresponds to a block within
* the given range.
* @throws IOException if initialize() hasn't been called.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug(
LOG.debug(
"getFileBlockLocations:enter with path " + file.getPath()
+ ", start pos " + start + ", length " + len,
ceph.DEBUG);
+ ", start pos " + start + ", length " + len);
// sanitize and get the filehandle
Path abs_path = makeAbsolute(file.getPath());
ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java",
ceph.TRACE);
int fh = ceph.ceph_open_for_read(abs_path.toString());
LOG.trace("getFileBlockLocations:call ceph_open_for_read from Java");
int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
ceph.debug(
LOG.trace(
"getFileBlockLocations:return from ceph_open_for_read to Java with fh "
+ fh,
ceph.TRACE);
+ fh);
if (fh < 0) {
ceph.debug(
LOG.error(
"getFileBlockLocations:got error " + fh
+ ", exiting and returning null!",
ceph.ERROR);
+ ", exiting and returning null!");
return null;
}
// get the block size
ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java",
ceph.TRACE);
long blockSize = ceph.ceph_getblocksize(abs_path.toString());
LOG.trace("getFileBlockLocations:call ceph_getblocksize from Java");
long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
LOG.trace("getFileBlockLocations:return from ceph_getblocksize");
BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
long offset;
for (int i = 0; i < locations.length; ++i) {
offset = start + i * blockSize;
ceph.debug(
LOG.trace(
"getFileBlockLocations:call ceph_hosts from Java on fh " + fh
+ " and offset " + offset,
ceph.TRACE);
+ " and offset " + offset);
String host = ceph.ceph_hosts(fh, offset);
ceph.debug(
LOG.trace(
"getFileBlockLocations:return from ceph_hosts to Java with host "
+ host,
ceph.TRACE);
+ host);
String[] hostArray = new String[1];
hostArray[0] = host;
locations[i] = new BlockLocation(hostArray, hostArray,
start + i * blockSize - (start % blockSize), blockSize);
}
ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " + fh,
ceph.TRACE);
LOG.trace("getFileBlockLocations:call ceph_close from Java on fh " + fh);
ceph.ceph_close(fh);
ceph.debug(
"getFileBlockLocations:return with " + locations.length + " locations",
ceph.DEBUG);
LOG.debug(
"getFileBlockLocations:return with " + locations.length + " locations");
return locations;
}
@Deprecated
public boolean delete(Path path) throws IOException {
return delete(path, false);
}
@ -794,18 +656,11 @@ public class CephFileSystem extends FileSystem {
* delete will throw an IOException. If path is a file this is ignored.
* @return true if the delete succeeded, false otherwise (including if
* path doesn't exist).
* @throws IOException if initialize() hasn't been called,
* or you attempt to non-recursively delete a directory,
* @throws IOException if you attempt to non-recursively delete a directory,
* or you attempt to delete the root directory.
*/
public boolean delete(Path path, boolean recursive) throws IOException {
if (!initialized) {
throw new IOException(
"You have to initialize the "
+ "CephFileSystem before calling other methods.");
}
ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
ceph.DEBUG);
LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
Path abs_path = makeAbsolute(path);
// sanity check
@ -818,16 +673,14 @@ public class CephFileSystem extends FileSystem {
// if the path is a file, try to delete it.
if (isFile(abs_path)) {
ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
ceph.TRACE);
boolean result = ceph.ceph_unlink(abs_path.toString());
LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
boolean result = ceph.ceph_unlink(getCephPath(abs_path));
if (!result) {
ceph.debug(
"delete: failed to delete file \"" + abs_path.toString() + "\".",
ceph.ERROR);
LOG.error(
"delete: failed to delete file \"" + abs_path.toString() + "\".");
}
ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
LOG.debug("delete:exit with success=" + result);
return result;
}
@ -837,37 +690,33 @@ public class CephFileSystem extends FileSystem {
Path[] contents = listPaths(abs_path);
if (contents == null) {
ceph.debug(
LOG.error(
"delete: Failed to read contents of directory \""
+ abs_path.toString() + "\" while trying to delete it, BAILING",
ceph.ERROR);
+ abs_path.toString() + "\" while trying to delete it, BAILING");
return false;
}
if (!recursive && contents.length > 0) {
throw new IOException("Directories must be deleted recursively!");
}
// delete the entries
ceph.debug("delete: recursively calling delete on contents of " + abs_path,
ceph.DEBUG);
LOG.debug("delete: recursively calling delete on contents of " + abs_path);
for (Path p : contents) {
if (!delete(p, true)) {
ceph.debug(
LOG.error(
"delete: Failed to delete file \"" + p.toString()
+ "\" while recursively deleting \"" + abs_path.toString()
+ "\", BAILING",
ceph.ERROR);
+ "\", BAILING");
return false;
}
}
// if we've come this far it's a now-empty directory, so delete it!
boolean result = ceph.ceph_rmdir(abs_path.toString());
boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
if (!result) {
ceph.debug(
"delete: failed to delete \"" + abs_path.toString() + "\", BAILING",
ceph.ERROR);
LOG.error(
"delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
}
ceph.debug("delete:exit", ceph.DEBUG);
LOG.debug("delete:exit");
return result;
}
@ -890,42 +739,29 @@ public class CephFileSystem extends FileSystem {
return getConf().getInt("fs.ceph.blockSize", 1 << 26);
}
// Makes a Path absolute. In a cheap, dirty hack, we're
// also going to strip off any fs_default_name prefix we see.
/**
* Adds the working directory to path if path is not already
* an absolute path. The URI scheme is not removed here. It
* is removed only when users (e.g. ceph native calls) need
* the path-only portion.
*/
private Path makeAbsolute(Path path) {
ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
if (path == null) {
return new Path("/");
}
// first, check for the prefix
if (path.toString().startsWith(fs_default_name)) {
Path stripped_path = new Path(
path.toString().substring(fs_default_name.length()));
ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
return stripped_path;
}
if (path.isAbsolute()) {
ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG);
return path;
}
Path new_path = new Path(ceph.ceph_getcwd(), path);
ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
return new_path;
return new Path(workingDir, path);
}
private Path[] listPaths(Path path) throws IOException {
ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
LOG.debug("listPaths:enter with path " + path);
String dirlist[];
Path abs_path = makeAbsolute(path);
// If it's a directory, get the listing. Otherwise, complain and give up.
ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG);
dirlist = ceph.ceph_getdir(abs_path.toString());
ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG);
LOG.debug("calling ceph_getdir from Java with path " + abs_path);
dirlist = ceph.ceph_getdir(getCephPath(abs_path));
LOG.debug("returning from ceph_getdir to Java");
if (dirlist == null) {
return null;
@ -935,10 +771,9 @@ public class CephFileSystem extends FileSystem {
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
ceph.debug(
LOG.trace(
"Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
+ dirlist[i] + "\"",
ceph.TRACE);
+ dirlist[i] + "\"");
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
@ -948,7 +783,7 @@ public class CephFileSystem extends FileSystem {
paths[i] = new Path(abs_path, raw_path);
}
}
ceph.debug("listPaths:exit", ceph.NOLOG);
LOG.debug("listPaths:exit");
return paths;
}

View File

@ -23,6 +23,8 @@ package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
@ -33,7 +35,7 @@ import org.apache.hadoop.fs.FSInputStream;
* Ceph instance.
*/
public class CephInputStream extends FSInputStream {
private static final Log LOG = LogFactory.getLog(CephInputStream.class);
private boolean closed;
private int fileHandle;
@ -63,10 +65,9 @@ public class CephInputStream extends FSInputStream {
closed = false;
ceph = cephfs;
buffer = new byte[bufferSize];
ceph.debug(
LOG.debug(
"CephInputStream constructor: initializing stream with fh " + fh
+ " and file length " + flength,
ceph.DEBUG);
+ " and file length " + flength);
}
@ -114,10 +115,9 @@ public class CephInputStream extends FSInputStream {
}
public synchronized void seek(long targetPos) throws IOException {
ceph.debug(
LOG.trace(
"CephInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle,
ceph.TRACE);
+ fileHandle);
if (targetPos > fileLength) {
throw new IOException(
"CephInputStream.seek: failed seek to position " + targetPos
@ -150,10 +150,9 @@ public class CephInputStream extends FSInputStream {
*/
@Override
public synchronized int read() throws IOException {
ceph.debug(
LOG.trace(
"CephInputStream.read: Reading a single byte from fd " + fileHandle
+ " by calling general read function",
ceph.TRACE);
+ " by calling general read function");
byte result[] = new byte[1];
@ -181,9 +180,8 @@ public class CephInputStream extends FSInputStream {
@Override
public synchronized int read(byte buf[], int off, int len)
throws IOException {
ceph.debug(
"CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle,
ceph.TRACE);
LOG.trace(
"CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
throw new IOException(
@ -193,11 +191,10 @@ public class CephInputStream extends FSInputStream {
// ensure we're not past the end of the file
if (getPos() >= fileLength) {
ceph.debug(
LOG.debug(
"CephInputStream.read: cannot read " + len + " bytes from fd "
+ fileHandle + ": current position is " + getPos()
+ " and file length is " + fileLength,
ceph.DEBUG);
+ " and file length is " + fileLength);
return -1;
}
@ -230,10 +227,9 @@ public class CephInputStream extends FSInputStream {
totalRead += read;
} while (len > 0 && fillBuffer());
ceph.debug(
LOG.trace(
"CephInputStream.read: Reading " + initialLen + " bytes from fd "
+ fileHandle + ": succeeded in reading " + totalRead + " bytes",
ceph.TRACE);
+ fileHandle + ": succeeded in reading " + totalRead + " bytes");
return totalRead;
}
@ -242,7 +238,7 @@ public class CephInputStream extends FSInputStream {
*/
@Override
public void close() throws IOException {
ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
LOG.trace("CephOutputStream.close:enter");
if (!closed) {
int result = ceph.ceph_close(fileHandle);
@ -252,7 +248,7 @@ public class CephInputStream extends FSInputStream {
"Close somehow failed!"
+ "Don't try and use this stream again, though");
}
ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
LOG.trace("CephOutputStream.close:exit");
}
}
}

View File

@ -25,6 +25,8 @@ package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
@ -35,7 +37,7 @@ import org.apache.hadoop.util.Progressable;
* Ceph instance.
*/
public class CephOutputStream extends OutputStream {
private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
private boolean closed;
private CephFS ceph;
@ -87,9 +89,8 @@ public class CephOutputStream extends OutputStream {
*/
@Override
public synchronized void write(int b) throws IOException {
ceph.debug(
"CephOutputStream.write: writing a single byte to fd " + fileHandle,
ceph.TRACE);
LOG.trace(
"CephOutputStream.write: writing a single byte to fd " + fileHandle);
if (closed) {
throw new IOException(
@ -115,9 +116,8 @@ public class CephOutputStream extends OutputStream {
*/
@Override
public synchronized void write(byte buf[], int off, int len) throws IOException {
ceph.debug(
"CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle,
ceph.TRACE);
LOG.trace(
"CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
// make sure stream is open
if (closed) {
throw new IOException(
@ -203,7 +203,7 @@ public class CephOutputStream extends OutputStream {
*/
@Override
public synchronized void close() throws IOException {
ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
LOG.trace("CephOutputStream.close:enter");
if (!closed) {
flush();
int result = ceph.ceph_close(fileHandle);
@ -213,7 +213,7 @@ public class CephOutputStream extends OutputStream {
}
closed = true;
ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
LOG.trace("CephOutputStream.close:exit");
}
}
}

View File

@ -32,7 +32,6 @@ class CephTalker extends CephFS {
// we write a constructor so we can load the libraries
public CephTalker(Configuration conf, Log log) {
super(conf, log);
System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
cluster = 0;

View File

@ -37,10 +37,9 @@ public class TestCeph extends FileSystemContractBaseTest {
protected void setUp() throws IOException {
Configuration conf = new Configuration();
CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
CephFileSystem cephfs = new CephFileSystem(cephfaker);
cephfs.initialize(URI.create("ceph://null"), conf);
fs = cephfs;
cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
}
}