Merge pull request #597 from ceph/remove-hadoop-shim

This branch built fine on the gitbuilders and the list of removed files looks good to me.

Reviewed-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Gregory Farnum 2013-09-16 13:57:47 -07:00
commit 4930ccb8a2
29 changed files with 2 additions and 6141 deletions

View File

@ -6,10 +6,6 @@ Files: doc/*
Copyright: (c) 2010-2012 New Dream Network and contributors
License: Creative Commons Attribution-ShareAlike (CC BY-SA)
Files: src/client/hadoop/ceph
Copyright: Copyright (C) New Dream Network and contributors
License: Apache License v2
Files: src/mount/canonicalize.c
Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com>
License: LGPL2 or later

View File

@ -278,7 +278,6 @@ export RPM_OPT_FLAGS=`echo $RPM_OPT_FLAGS | sed -e 's/i386/i486/'`
--localstatedir=/var \
--sysconfdir=/etc \
--docdir=%{_docdir}/ceph \
--without-hadoop \
--with-nss \
--without-cryptopp \
--with-rest-bench \

View File

@ -391,25 +391,6 @@ if test "x$enable_cephfs_java" = "xyes"; then
fi
AM_CONDITIONAL(HAVE_JUNIT4, [test "$have_junit4" = "1"])
# jni?
# clear cache (from java above) -- this whole thing will get
# folded into the bigger java package later -- for now maintain
# backward compat
AS_UNSET(ac_cv_header_jni_h)
AC_ARG_WITH([hadoop],
[AS_HELP_STRING([--with-hadoop], [build hadoop client])],
[],
[with_hadoop=check])
AS_IF([test "x$with_hadoop" != xno],
[AC_CHECK_HEADER([jni.h],
[HAVE_JNI=1],
[if test "x$with_hadoop" != xcheck; then
AC_MSG_FAILURE(
[--with-hadoop was given but jni.h not found])
fi
])])
AM_CONDITIONAL(WITH_HADOOPCLIENT, [test "$HAVE_JNI" = "1"])
#
# FreeBSD has it in base.
#

4
debian/copyright vendored
View File

@ -7,10 +7,6 @@ Files: *
Copyright: (c) 2004-2010 by Sage Weil <sage@newdream.net>
License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
Files: src/client/hadoop/ceph
Copyright: Copyright (C) New Dream Network and contributors
License: Apache License v2
Files: src/mount/canonicalize.c
Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com>
License: LGPL2 or later

View File

@ -10,7 +10,6 @@ do_autogen.sh: make a ceph build by running autogen, etc.
level 1: -g
level 3: -Wextra
level 4: even more...
-H --with-hadoop
-T --without-tcmalloc
-e <path> dump encoded objects to <path>
-P profiling build
@ -46,8 +45,6 @@ do
h) usage
exit 0;;
H) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --with-hadoop";;
T) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --without-tcmalloc";;
j) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --enable-cephfs-java";;

View File

@ -243,7 +243,7 @@ to their default level or to a level suitable for normal operations.
+--------------------+-----------+--------------+
| ``rgw`` | 1 | 5 |
+--------------------+-----------+--------------+
| ``hadoop`` | 1 | 5 |
| ``javaclient`` | 1 | 5 |
+--------------------+-----------+--------------+
| ``asok`` | 1 | 5 |
+--------------------+-----------+--------------+

View File

@ -96,25 +96,13 @@ bin_PROGRAMS += rbd-fuse
endif # WITH_FUSE
# libcephfs (this and libhadoopcephfs should go somewhere else in the future)
# libcephfs (this should go somewhere else in the future)
libcephfs_la_SOURCES = libcephfs.cc
libcephfs_la_LIBADD = $(LIBCLIENT) $(LIBCOMMON) $(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS)
libcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '^ceph_.*'
lib_LTLIBRARIES += libcephfs.la
# order matters! this *must* come after libcephfs, or else you'll encounter something
# like "error: relink libhadoopcephfs.la with the above command before installing it"
if WITH_HADOOPCLIENT
JAVA_BASE = /usr/lib/jvm/java-6-sun
libhadoopcephfs_la_SOURCES = client/hadoop/CephFSInterface.cc
libhadoopcephfs_la_LIBADD = $(LIBCEPHFS)
libhadoopcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex 'hadoopcephfs_.*'
lib_LTLIBRARIES += libhadoopcephfs.la
noinst_HEADERS += client/hadoop/CephFSInterface.h
endif # WITH_HADOOPCLIENT
# jni library (java source is in src/java)
if ENABLE_CEPHFS_JAVA

View File

@ -1,993 +0,0 @@
// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include "CephFSInterface.h"
#include "include/cephfs/libcephfs.h"
#include "common/ceph_argparse.h"
#include "common/config.h"
#include "msg/SimpleMessenger.h"
#include <arpa/inet.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#define dout_subsys ceph_subsys_hadoop
union ceph_mount_union_t {
struct ceph_mount_info *cmount;
jlong cjlong;
};
static void set_ceph_mount_info(JNIEnv *env, jobject obj, struct ceph_mount_info *cmount)
{
jclass cls = env->GetObjectClass(obj);
if (cls == NULL)
return;
jfieldID fid = env->GetFieldID(cls, "cluster", "J");
if (fid == NULL)
return;
ceph_mount_union_t ceph_mount_union;
ceph_mount_union.cjlong = 0;
ceph_mount_union.cmount = cmount;
env->SetLongField(obj, fid, ceph_mount_union.cjlong);
}
static struct ceph_mount_info *get_ceph_mount_t(JNIEnv *env, jobject obj)
{
jclass cls = env->GetObjectClass(obj);
jfieldID fid = env->GetFieldID(cls, "cluster", "J");
if (fid == NULL)
return NULL;
ceph_mount_union_t ceph_mount_union;
ceph_mount_union.cjlong = env->GetLongField(obj, fid);
return ceph_mount_union.cmount;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_initializeClient
* Signature: (Ljava/lang/String;I)Z
*
* Performs any necessary setup to allow general use of the filesystem.
* Inputs:
* jstring args -- a command-line style input of Ceph config params
* jint block_size -- the size in bytes to use for blocks
* Returns: true on success, false otherwise
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
(JNIEnv *env, jobject obj, jstring j_args, jint block_size)
{
// Convert Java argument string to argv
const char *c_args = env->GetStringUTFChars(j_args, 0);
if (c_args == NULL)
return false; //out of memory!
string cppargs(c_args);
char b[cppargs.length()+1];
strcpy(b, cppargs.c_str());
env->ReleaseStringUTFChars(j_args, c_args);
std::vector<const char*> args;
char *p = b;
while (*p) {
args.push_back(p);
while (*p && *p != ' ')
p++;
if (!*p)
break;
*p++ = 0;
while (*p && *p == ' ')
p++;
}
// parse the arguments
bool set_local_writes = false;
std::string mount_root, val;
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) {
mount_root = val;
} else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) {
set_local_writes = true;
} else {
++i;
}
}
// connect to the cmount
struct ceph_mount_info *cmount;
int ret = ceph_create(&cmount, NULL);
if (ret)
return false;
ceph_conf_read_file(cmount, NULL); // read config file from the default location
ceph_conf_parse_argv(cmount, args.size(), &args[0]);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 3) << "CephFSInterface: mounting filesystem...:" << dendl;
ret = ceph_mount(cmount, mount_root.c_str());
if (ret)
return false;
ceph_localize_reads(cmount, true);
ceph_set_default_file_stripe_unit(cmount, block_size);
ceph_set_default_object_size(cmount, block_size);
if (set_local_writes) {
ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount));
}
set_ceph_mount_info(env, obj, cmount);
return true;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getcwd
* Signature: (J)Ljava/lang/String;
*
* Returns the current working directory.(absolute) as a jstring
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
(JNIEnv *env, jobject obj)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "CephFSInterface: In getcwd" << dendl;
jstring j_path = env->NewStringUTF(ceph_getcwd(cmount));
return j_path;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setcwd
* Signature: (Ljava/lang/String;)Z
*
* Changes the working directory.
* Inputs:
* jstring j_path: The path (relative or absolute) to switch to
* Returns: true on success, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "CephFSInterface: In setcwd" << dendl;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
int ret = ceph_chdir(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rmdir
* Signature: (Ljava/lang/String;)Z
*
* Given a path to a directory, removes the directory.if empty.
* Inputs:
* jstring j_path: The path (relative or absolute) to the directory
* Returns: true on successful delete; false otherwise
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "CephFSInterface: In rmdir" << dendl;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if(c_path == NULL)
return false;
int ret = ceph_rmdir(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_unlink
* Signature: (Ljava/lang/String;)Z
* Given a path, unlinks it.
* Inputs:
* jstring j_path: The path (relative or absolute) to the file or empty dir
* Returns: true if the unlink occurred, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
ldout(cct, 10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl;
int ret = ceph_unlink(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rename
* Signature: (Ljava/lang/String;Ljava/lang/String;)Z
* Changes a given path name to a new name.
* Inputs:
* jstring j_from: The path whose name you want to change.
* jstring j_to: The new name for the path.
* Returns: true if the rename occurred, false otherwise
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
(JNIEnv *env, jobject obj, jstring j_from, jstring j_to)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "CephFSInterface: In rename" << dendl;
const char *c_from = env->GetStringUTFChars(j_from, 0);
if (c_from == NULL)
return false;
const char *c_to = env->GetStringUTFChars(j_to, 0);
if (c_to == NULL) {
env->ReleaseStringUTFChars(j_from, c_from);
return false;
}
struct stat stbuf;
int ret = ceph_lstat(cmount, c_to, &stbuf);
if (ret != -ENOENT) {
// Hadoop doesn't want to overwrite files in a rename.
env->ReleaseStringUTFChars(j_from, c_from);
env->ReleaseStringUTFChars(j_to, c_to);
return JNI_FALSE;
}
ret = ceph_rename(cmount, c_from, c_to);
return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_exists
* Signature: (Ljava/lang/String;)Z
* Returns true if it the input path exists, false
* if it does not or there is an unexpected failure.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "CephFSInterface: In exists" << dendl;
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
ldout(cct, 10) << "Attempting lstat with file " << c_path << ":" << dendl;
int ret = ceph_lstat(cmount, c_path, &stbuf);
ldout(cct, 10) << "result is " << ret << dendl;
env->ReleaseStringUTFChars(j_path, c_path);
if (ret < 0) {
ldout(cct, 10) << "Returning false (file does not exist)" << dendl;
return JNI_FALSE;
}
else {
ldout(cct, 10) << "Returning true (file exists)" << dendl;
return JNI_TRUE;
}
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getblocksize
* Signature: (Ljava/lang/String;)J
* Get the block size for a given path.
* Input:
* j_string j_path: The path (relative or absolute) you want
* the block size for.
* Returns: block size (as a long) if the path exists, otherwise a negative
* number corresponding to the standard C++ error codes (which are positive).
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In getblocksize" << dendl;
//struct stat stbuf;
jlong result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
// we need to open the file to retrieve the stripe size
ldout(cct, 10) << "CephFSInterface: getblocksize: opening file" << dendl;
int fh = ceph_open(cmount, c_path, O_RDONLY, 0);
env->ReleaseStringUTFChars(j_path, c_path);
if (fh < 0)
return fh;
result = ceph_get_file_stripe_unit(cmount, fh);
int close_result = ceph_close(cmount, fh);
if (close_result < 0)
return close_result;
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isfile
* Signature: (Ljava/lang/String;)Z
* Returns true if the given path is a file; false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In isfile" << dendl;
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
int ret = ceph_lstat(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
// if the stat call failed, it's definitely not a file...
if (ret < 0)
return false;
// check the stat result
return !!S_ISREG(stbuf.st_mode);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isdirectory
* Signature: (Ljava/lang/String;)Z
* Returns true if the given path is a directory, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In isdirectory" << dendl;
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
int result = ceph_lstat(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
// if the stat call failed, it's definitely not a directory...
if (result < 0)
return JNI_FALSE;
// check the stat result
return !!S_ISDIR(stbuf.st_mode);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getdir
* Signature: (Ljava/lang/String;)[Ljava/lang/String;
* Get the contents of a given directory.
* Inputs:
* jstring j_path: The path (relative or absolute) to the directory.
* Returns: A Java String[] of the contents of the directory, or
* NULL if there is an error (ie, path is not a dir). This listing
* will not contain . or .. entries.
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In getdir" << dendl;
// get the directory listing
list<string> contents;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL) return NULL;
struct ceph_dir_result *dirp;
int r;
r = ceph_opendir(cmount, c_path, &dirp);
if (r<0) {
env->ReleaseStringUTFChars(j_path, c_path);
return NULL;
}
int buflen = 100; //good default?
char *buf = new char[buflen];
string *ent;
int bufpos;
while (1) {
r = ceph_getdnames(cmount, dirp, buf, buflen);
if (r==-ERANGE) { //expand the buffer
delete [] buf;
buflen *= 2;
buf = new char[buflen];
continue;
}
if (r<=0) break;
//if we make it here, we got at least one name
bufpos = 0;
while (bufpos<r) {//make new strings and add them to listing
ent = new string(buf+bufpos);
if (ent->compare(".") && ent->compare(".."))
//we DON'T want to include dot listings; Hadoop gets confused
contents.push_back(*ent);
bufpos+=ent->size()+1;
delete ent;
}
}
delete [] buf;
ceph_closedir(cmount, dirp);
env->ReleaseStringUTFChars(j_path, c_path);
if (r < 0) return NULL;
// Create a Java String array of the size of the directory listing
jclass stringClass = env->FindClass("java/lang/String");
if (stringClass == NULL) {
ldout(cct, 0) << "ERROR: java String class not found; dying a horrible, painful death" << dendl;
assert(0);
}
jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(contents.size(), stringClass, NULL);
if(dirListingStringArray == NULL) return NULL;
// populate the array with the elements of the directory list
int i = 0;
for (list<string>::iterator it = contents.begin();
it != contents.end();
++it) {
env->SetObjectArrayElement(dirListingStringArray, i,
env->NewStringUTF(it->c_str()));
++i;
}
return dirListingStringArray;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_mkdirs
* Signature: (Ljava/lang/String;I)I
* Create the specified directory and any required intermediate ones with the
* given mode.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
(JNIEnv *env, jobject obj, jstring j_path, jint mode)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In Hadoop mk_dirs" << dendl;
//get c-style string and make the call, clean up the string...
jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
result = ceph_mkdirs(cmount, c_path, mode);
env->ReleaseStringUTFChars(j_path, c_path);
//...and return
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_append
* Signature: (Ljava/lang/String;)I
* Open a file to append. If the file does not exist, it will be created.
* Opening a dir is possible but may have bad results.
* Inputs:
* jstring j_path: The path to open.
* Returns: a jint filehandle, or a number<0 if an error occurs.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In hadoop open_for_append" << dendl;
jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0);
env->ReleaseStringUTFChars(j_path, c_path);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_read
* Signature: (Ljava/lang/String;)I
* Open a file for reading.
* Opening a dir is possible but may have bad results.
* Inputs:
* jstring j_path: The path to open.
* Returns: a jint filehandle, or a number<0 if an error occurs.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
(JNIEnv *env, jobject obj, jstring j_path)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In open_for_read" << dendl;
jint result;
// open as read-only: flag = O_RDONLY
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
result = ceph_open(cmount, c_path, O_RDONLY, 0);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_overwrite
* Signature: (Ljava/lang/String;)I
* Opens a file for overwriting; creates it if necessary.
* Opening a dir is possible but may have bad results.
* Inputs:
* jstring j_path: The path to open.
* jint mode: The mode to open with.
* Returns: a jint filehandle, or a number<0 if an error occurs.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
(JNIEnv *env, jobject obj, jstring j_path, jint mode)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In open_for_overwrite" << dendl;
jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_close
* Signature: (I)I
* Closes a given filehandle.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
(JNIEnv *env, jobject obj, jint fh)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In CephTalker::ceph_close" << dendl;
return ceph_close(cmount, fh);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setPermission
* Signature: (Ljava/lang/String;I)Z
* Change the mode on a path.
* Inputs:
* jstring j_path: The path to change mode on.
* jint j_new_mode: The mode to apply.
* Returns: true if the mode is properly applied, false if there
* is any error.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
(JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode)
{
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return false;
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
int result = ceph_chmod(cmount, c_path, j_new_mode);
env->ReleaseStringUTFChars(j_path, c_path);
return (result==0);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_kill_client
* Signature: (J)Z
*
* Closes the Ceph client. This should be called before shutting down
* (multiple times is okay but redundant).
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
(JNIEnv *env, jobject obj)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
if (!cmount)
return true;
ceph_shutdown(cmount);
set_ceph_mount_info(env, obj, NULL);
return true;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_stat
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
* Get the statistics on a path returned in a custom format defined
* in CephTalker.
* Inputs:
* jstring j_path: The path to stat.
* jobject j_stat: The stat object to fill.
* Returns: true if the stat is successful, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
(JNIEnv *env, jobject obj, jstring j_path, jobject j_stat)
{
//setup variables
struct stat st;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL) return false;
jclass cls = env->GetObjectClass(j_stat);
if (cls == NULL) return false;
jfieldID c_size_id = env->GetFieldID(cls, "size", "J");
if (c_size_id == NULL) return false;
jfieldID c_dir_id = env->GetFieldID(cls, "is_dir", "Z");
if (c_dir_id == NULL) return false;
jfieldID c_block_id = env->GetFieldID(cls, "block_size", "J");
if (c_block_id == NULL) return false;
jfieldID c_mod_id = env->GetFieldID(cls, "mod_time", "J");
if (c_mod_id == NULL) return false;
jfieldID c_access_id = env->GetFieldID(cls, "access_time", "J");
if (c_access_id == NULL) return false;
jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I");
if (c_mode_id == NULL) return false;
//do actual lstat
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
int r = ceph_lstat(cmount, c_path, &st);
env->ReleaseStringUTFChars(j_path, c_path);
if (r < 0) return false; //fail out; file DNE or Ceph broke
//put variables from struct stat into Java
env->SetLongField(j_stat, c_size_id, st.st_size);
env->SetBooleanField(j_stat, c_dir_id, (0 != S_ISDIR(st.st_mode)));
env->SetLongField(j_stat, c_block_id, st.st_blksize);
long long java_mtime(st.st_mtim.tv_sec);
java_mtime *= 1000;
java_mtime += st.st_mtim.tv_nsec / 1000;
env->SetLongField(j_stat, c_mod_id, java_mtime);
long long java_atime(st.st_atim.tv_sec);
java_atime *= 1000;
java_atime += st.st_atim.tv_nsec / 1000;
env->SetLongField(j_stat, c_access_id, java_atime);
env->SetIntField(j_stat, c_mode_id, (int)st.st_mode);
//return happy
return true;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_statfs
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
* Statfs a filesystem in a custom format defined in CephTalker.
* Inputs:
* jstring j_path: A path on the filesystem that you wish to stat.
* jobject j_ceph_stat: The CephStat object to fill.
* Returns: true if successful and the CephStat is filled; false otherwise.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
(JNIEnv *env, jobject obj, jstring j_path, jobject j_cephstat)
{
//setup variables
struct statvfs stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
jclass cls = env->GetObjectClass(j_cephstat);
if (cls == NULL)
return 1; //JVM error of some kind
jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J");
jfieldID c_used_id = env->GetFieldID(cls, "used", "J");
jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J");
//do the statfs
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
int r = ceph_statfs(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
if (r != 0)
return r; //something broke
//place info into Java; convert from bytes to kilobytes
env->SetLongField(j_cephstat, c_capacity_id,
(long)stbuf.f_blocks*stbuf.f_bsize/1024);
env->SetLongField(j_cephstat, c_used_id,
(long)(stbuf.f_blocks-stbuf.f_bavail)*stbuf.f_bsize/1024);
env->SetLongField(j_cephstat, c_remaining_id,
(long)stbuf.f_bavail*stbuf.f_bsize/1024);
return r;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_replication
* Signature: (Ljava/lang/String;)I
* Check how many times a path should be replicated (if it is
* degraded it may not actually be replicated this often).
* Inputs:
* jstring j_path: The path to check.
* Returns: an int containing the number of times replicated.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
(JNIEnv *env, jobject obj, jstring j_path)
{
//get c-string of path, send off to libceph, release c-string, return
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL)
return -ENOMEM;
ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
int fh = 0;
fh = ceph_open(cmount, c_path, O_RDONLY, 0);
env->ReleaseStringUTFChars(j_path, c_path);
if (fh < 0) {
return fh;
}
int replication = ceph_get_file_replication(cmount, fh);
ceph_close(cmount, fh);
return replication;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_hosts
* Signature: (IJ)[Ljava/lang/String;
* Find the IP:port addresses of the primary OSD for a given file and offset.
* Inputs:
* jint j_fh: The filehandle for the file.
* jlong j_offset: The offset to get the location of.
* Returns: a jstring of the location as IP, or NULL if there is an error.
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
(JNIEnv *env, jobject obj, jint j_fh, jlong j_offset)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
struct sockaddr_storage *ss;
char address[30];
jobjectArray addr_array;
jclass string_cls;
jstring j_addr;
int r, n = 3; /* initial guess at # of replicas */
for (;;) {
ss = new struct sockaddr_storage[n];
r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, ss, n);
if (r < 0) {
if (r == -ERANGE) {
delete [] ss;
n *= 2;
continue;
}
return NULL;
}
n = r;
break;
}
/* TODO: cache this */
string_cls = env->FindClass("java/lang/String");
if (!string_cls)
goto out;
addr_array = env->NewObjectArray(n, string_cls, NULL);
if (!addr_array)
goto out;
for (r = 0; r < n; r++) {
/* Hadoop only deals with IPv4 */
if (ss[r].ss_family != AF_INET)
goto out;
memset(address, 0, sizeof(address));
inet_ntop(ss[r].ss_family, &((struct sockaddr_in *)&ss[r])->sin_addr,
address, sizeof(address));
j_addr = env->NewStringUTF(address);
env->SetObjectArrayElement(addr_array, r, j_addr);
if (env->ExceptionOccurred())
goto out;
env->DeleteLocalRef(j_addr);
}
delete [] ss;
return addr_array;
out:
delete [] ss;
return NULL;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setTimes
* Signature: (Ljava/lang/String;JJ)I
* Set the mtime and atime for a given path.
* Inputs:
* jstring j_path: The path to set the times for.
* jlong mtime: The mtime to set, in millis since epoch (-1 to not set).
* jlong atime: The atime to set, in millis since epoch (-1 to not set)
* Returns: 0 if successful, an error code otherwise.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
(JNIEnv *env, jobject obj, jstring j_path, jlong mtime, jlong atime)
{
const char *c_path = env->GetStringUTFChars(j_path, 0);
if(c_path == NULL) return -ENOMEM;
//build the mask for ceph_setattr
int mask = 0;
if (mtime!=-1) mask = CEPH_SETATTR_MTIME;
if (atime!=-1) mask |= CEPH_SETATTR_ATIME;
//build a struct stat and fill it in!
//remember to convert from millis to seconds and microseconds
struct stat attr;
attr.st_mtim.tv_sec = mtime / 1000;
attr.st_mtim.tv_nsec = (mtime % 1000) * 1000000;
attr.st_atim.tv_sec = atime / 1000;
attr.st_atim.tv_nsec = (atime % 1000) * 1000000;
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
return ceph_setattr(cmount, c_path, &attr, mask);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_read
* Signature: (JI[BII)I
* Reads into the given byte array from the current position.
* Inputs:
* jint fh: the filehandle to read from
* jbyteArray j_buffer: the byte array to read into
* jint buffer_offset: where in the buffer to start writing
* jint length: how much to read.
* There'd better be enough space in the buffer to write all
* the data from the given offset!
* Returns: the number of bytes read on success (as jint),
* or an error code otherwise.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
(JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In read" << dendl;
// Make sure to convert the Hadoop read arguments into a
// more ceph-friendly form
jint result;
// Step 1: get a pointer to the buffer.
jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
if (j_buffer_ptr == NULL) return -ENOMEM;
char *c_buffer = (char*) j_buffer_ptr;
// Step 2: pointer arithmetic to start in the right buffer position
c_buffer += (int)buffer_offset;
// Step 3: do the read
result = ceph_read(cmount, (int)fh, c_buffer, length, -1);
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_seek_from_start
* Signature: (JIJ)J
* Seeks to the given position in the given file.
* Inputs:
* jint fh: The filehandle to seek in.
* jlong pos: The position to seek to.
* Returns: the new position (as a jlong) of the filehandle on success,
* or a negative error code on failure.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
(JNIEnv *env, jobject obj, jint fh, jlong pos)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In CephTalker::seek_from_start" << dendl;
return ceph_lseek(cmount, fh, pos, SEEK_SET);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getpos
* Signature: (I)J
*
* Get the current position in a file (as a jlong) of a given filehandle.
* Returns: jlong current file position on success, or a
* negative error code on failure.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
(JNIEnv *env, jobject obj, jint fh)
{
// seek a distance of 0 to get current offset
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In CephTalker::ceph_getpos" << dendl;
return ceph_lseek(cmount, fh, 0, SEEK_CUR);
}
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_write
* Signature: (I[BII)I
* Write the given buffer contents to the given filehandle.
* Inputs:
* jint fh: The filehandle to write to.
* jbyteArray j_buffer: The buffer to write from
* jint buffer_offset: The position in the buffer to write from
* jint length: The number of (sequential) bytes to write.
* Returns: jint, on success the number of bytes written, on failure
* a negative error code.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
(JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
CephContext *cct = ceph_get_mount_context(cmount);
ldout(cct, 10) << "In write" << dendl;
// IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we
// have to convert. The write is *always* from the current position in the file,
// and buffer_offset is the location in the *buffer* where we start writing.
jint result;
// Step 1: get a pointer to the buffer.
jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
if (j_buffer_ptr == NULL)
return -ENOMEM;
char *c_buffer = (char*) j_buffer_ptr;
// Step 2: pointer arithmetic to start in the right buffer position
c_buffer += (int)buffer_offset;
// Step 3: do the write
result = ceph_write(cmount, (int)fh, c_buffer, length, -1);
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
return result;
}

View File

@ -1,236 +0,0 @@
// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
/* BE CAREFUL EDITING THIS FILE - it is a compilation of JNI
machine-generated headers */
#include <jni.h>
#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker
#define _Included_org_apache_hadoop_fs_ceph_CephTalker
#ifdef __cplusplus
extern "C" {
#endif
//these constants are machine-generated to match Java constants in the source
#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE
#define org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE 8388608LL
#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE
#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_initializeClient
* Signature: (Ljava/lang/String;I)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getcwd
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
(JNIEnv *, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setcwd
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rmdir
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_unlink
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rename
* Signature: (Ljava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
(JNIEnv *, jobject, jstring, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_exists
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getblocksize
* Signature: (Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isdirectory
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isfile
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getdir
* Signature: (Ljava/lang/String;)[Ljava/lang/String;
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_mkdirs
* Signature: (Ljava/lang/String;I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_append
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_read
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_overwrite
* Signature: (Ljava/lang/String;I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_close
* Signature: (I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setPermission
* Signature: (Ljava/lang/String;I)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_kill_client
* Signature: ()Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
(JNIEnv *, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_stat
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
(JNIEnv *, jobject, jstring, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_statfs
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
(JNIEnv * env, jobject obj, jstring j_path, jobject j_cephstat);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_replication
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_hosts
* Signature: (IJ)[Ljava/lang/String;
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
(JNIEnv *, jobject, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setTimes
* Signature: (Ljava/lang/String;JJ)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
(JNIEnv *, jobject, jstring, jlong, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_read
* Signature: (I[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_seek_from_start
* Signature: (IJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
(JNIEnv *, jobject, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getpos
* Signature: (I)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_write
* Signature: (I[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
#ifdef __cplusplus
}
#endif
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +0,0 @@
This directory contains:
CephFSInterface.cc/h: A C++ JNI library used by the Hadoop Java code.
ceph: A directory containing all the Java source files for a
Hadoop-compliant CephFileSystem.
HADOOP-ceph.patch: A patch for Hadoop. It should apply fine to one of the
.20 branches. (It was generated against .20.205.0) This
patch adds in all the files contained in the ceph dir as well as making
some changes so that Hadoop's configuration code will recognize the
CephFileSystem properties and classes. It is possible that this will be
out-of-date compared to the files contained in the ceph dir, so you
should apply this patch and then copy ceph/* into the appropriate Hadoop
dir.
There are also a number of javah-generated C header files which are used
in writing CephFSInterface but can be safely ignored otherwise.
Configuration instructions are included in Javadoc format in the ceph dir.

View File

@ -1,250 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Abstract base class for communicating with a Ceph filesystem and its
* C++ codebase from Java, or pretending to do so (for unit testing purposes).
* As only the Ceph package should be using this directly, all methods
* are protected.
*/
package org.apache.hadoop.fs.ceph;
import org.apache.hadoop.conf.Configuration;
abstract class CephFS {
protected static final int ENOTDIR = 20;
protected static final int EEXIST = 17;
protected static final int ENOENT = 2;
/*
* Performs any necessary setup to allow general use of the filesystem.
* Inputs:
* String argsuments -- a command-line style input of Ceph config params
* int block_size -- the size in bytes to use for blocks
* Returns: true on success, false otherwise
*/
abstract protected boolean ceph_initializeClient(String arguments, int block_size);
/*
* Returns the current working directory (absolute) as a String
*/
abstract protected String ceph_getcwd();
/*
* Changes the working directory.
* Inputs:
* String path: The path (relative or absolute) to switch to
* Returns: true on success, false otherwise.
*/
abstract protected boolean ceph_setcwd(String path);
/*
* Given a path to a directory, removes the directory if empty.
* Inputs:
* jstring j_path: The path (relative or absolute) to the directory
* Returns: true on successful delete; false otherwise
*/
abstract protected boolean ceph_rmdir(String path);
/*
* Given a path, unlinks it.
* Inputs:
* String path: The path (relative or absolute) to the file or empty dir
* Returns: true if the unlink occurred, false otherwise.
*/
abstract protected boolean ceph_unlink(String path);
/*
* Changes a given path name to a new name, assuming new_path doesn't exist.
* Inputs:
* jstring j_from: The path whose name you want to change.
* jstring j_to: The new name for the path.
* Returns: true if the rename occurred, false otherwise
*/
abstract protected boolean ceph_rename(String old_path, String new_path);
/*
* Returns true if it the input path exists, false
* if it does not or there is an unexpected failure.
*/
abstract protected boolean ceph_exists(String path);
/*
* Get the block size for a given path.
* Input:
* String path: The path (relative or absolute) you want
* the block size for.
* Returns: block size if the path exists, otherwise a negative number
* corresponding to the standard C++ error codes (which are positive).
*/
abstract protected long ceph_getblocksize(String path);
/*
* Returns true if the given path is a directory, false otherwise.
*/
abstract protected boolean ceph_isdirectory(String path);
/*
* Returns true if the given path is a file; false otherwise.
*/
abstract protected boolean ceph_isfile(String path);
/*
* Get the contents of a given directory.
* Inputs:
* String path: The path (relative or absolute) to the directory.
* Returns: A Java String[] of the contents of the directory, or
* NULL if there is an error (ie, path is not a dir). This listing
* will not contain . or .. entries.
*/
abstract protected String[] ceph_getdir(String path);
/*
* Create the specified directory and any required intermediate ones with the
* given mode.
*/
abstract protected int ceph_mkdirs(String path, int mode);
/*
* Open a file to append. If the file does not exist, it will be created.
* Opening a dir is possible but may have bad results.
* Inputs:
* String path: The path to open.
* Returns: an int filehandle, or a number<0 if an error occurs.
*/
abstract protected int ceph_open_for_append(String path);
/*
* Open a file for reading.
* Opening a dir is possible but may have bad results.
* Inputs:
* String path: The path to open.
* Returns: an int filehandle, or a number<0 if an error occurs.
*/
abstract protected int ceph_open_for_read(String path);
/*
* Opens a file for overwriting; creates it if necessary.
* Opening a dir is possible but may have bad results.
* Inputs:
* String path: The path to open.
* int mode: The mode to open with.
* Returns: an int filehandle, or a number<0 if an error occurs.
*/
abstract protected int ceph_open_for_overwrite(String path, int mode);
/*
* Closes the given file. Returns 0 on success, or a negative
* error code otherwise.
*/
abstract protected int ceph_close(int filehandle);
/*
* Change the mode on a path.
* Inputs:
* String path: The path to change mode on.
* int mode: The mode to apply.
* Returns: true if the mode is properly applied, false if there
* is any error.
*/
abstract protected boolean ceph_setPermission(String path, int mode);
/*
* Closes the Ceph client. This should be called before shutting down
* (multiple times is okay but redundant).
*/
abstract protected boolean ceph_kill_client();
/*
* Get the statistics on a path returned in a custom format defined
* in CephFileSystem.
* Inputs:
* String path: The path to stat.
* Stat fill: The stat object to fill.
* Returns: true if the stat is successful, false otherwise.
*/
abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
/*
* Check how many times a file should be replicated. If it is,
* degraded it may not actually be replicated this often.
* Inputs:
* int fh: a file descriptor
* Returns: an int containing the number of times replicated.
*/
abstract protected int ceph_replication(String path);
/*
* Find the IP address of the primary OSD for a given file and offset.
* Inputs:
* int fh: The filehandle for the file.
* long offset: The offset to get the location of.
* Returns: an array of String of the location as IP, or NULL if there is an error.
*/
abstract protected String[] ceph_hosts(int fh, long offset);
/*
* Set the mtime and atime for a given path.
* Inputs:
* String path: The path to set the times for.
* long mtime: The mtime to set, in millis since epoch (-1 to not set).
* long atime: The atime to set, in millis since epoch (-1 to not set)
* Returns: 0 if successful, an error code otherwise.
*/
abstract protected int ceph_setTimes(String path, long mtime, long atime);
/*
* Get the current position in a file (as a long) of a given filehandle.
* Returns: (long) current file position on success, or a
* negative error code on failure.
*/
abstract protected long ceph_getpos(int fh);
/*
* Write the given buffer contents to the given filehandle.
* Inputs:
* int fh: The filehandle to write to.
* byte[] buffer: The buffer to write from
* int buffer_offset: The position in the buffer to write from
* int length: The number of (sequential) bytes to write.
* Returns: int, on success the number of bytes written, on failure
* a negative error code.
*/
abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
/*
* Reads into the given byte array from the current position.
* Inputs:
* int fh: the filehandle to read from
* byte[] buffer: the byte array to read into
* int buffer_offset: where in the buffer to start writing
* int length: how much to read.
* There'd better be enough space in the buffer to write all
* the data from the given offset!
* Returns: the number of bytes read on success (as an int),
* or an error code otherwise. */
abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
/*
* Seeks to the given position in the given file.
* Inputs:
* int fh: The filehandle to seek in.
* long pos: The position to seek to.
* Returns: the new position (as a long) of the filehandle on success,
* or a negative error code on failure. */
abstract protected long ceph_seek_from_start(int fh, long pos);
}

View File

@ -1,483 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* This uses the local Filesystem but pretends to be communicating
* with a Ceph deployment, for unit testing the CephFileSystem.
*/
package org.apache.hadoop.fs.ceph;
import java.net.URI;
import java.util.Hashtable;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
class CephFaker extends CephFS {
private static final Log LOG = LogFactory.getLog(CephFaker.class);
FileSystem localFS;
String localPrefix;
int blockSize;
Configuration conf;
Hashtable<Integer, Object> files;
Hashtable<Integer, String> filenames;
int fileCount = 0;
boolean initialized = false;
public CephFaker(Configuration con, Log log) {
conf = con;
files = new Hashtable<Integer, Object>();
filenames = new Hashtable<Integer, String>();
}
protected boolean ceph_initializeClient(String args, int block_size) {
if (!initialized) {
// let's remember the default block_size
blockSize = block_size;
/* for a real Ceph deployment, this starts up the client,
* sets debugging levels, etc. We just need to get the
* local FileSystem to use, and we'll ignore any
* command-line arguments. */
try {
localFS = FileSystem.getLocal(conf);
localFS.initialize(URI.create("file://localhost"), conf);
localFS.setVerifyChecksum(false);
String testDir = conf.get("hadoop.tmp.dir");
localPrefix = localFS.getWorkingDirectory().toString();
int testDirLoc = localPrefix.indexOf(testDir) - 1;
if (-2 == testDirLoc) {
testDirLoc = localPrefix.length();
}
localPrefix = localPrefix.substring(0, testDirLoc) + "/"
+ conf.get("hadoop.tmp.dir");
localFS.setWorkingDirectory(
new Path(localPrefix + "/user/" + System.getProperty("user.name")));
// I don't know why, but the unit tests expect the default
// working dir to be /user/username, so satisfy them!
// debug("localPrefix is " + localPrefix, INFO);
} catch (IOException e) {
return false;
}
initialized = true;
}
return true;
}
protected String ceph_getcwd() {
return sanitize_path(localFS.getWorkingDirectory().toString());
}
protected boolean ceph_setcwd(String path) {
localFS.setWorkingDirectory(new Path(prepare_path(path)));
return true;
}
// the caller is responsible for ensuring empty dirs
protected boolean ceph_rmdir(String pth) {
Path path = new Path(prepare_path(pth));
boolean ret = false;
try {
if (localFS.listStatus(path).length <= 1) {
ret = localFS.delete(path, true);
}
} catch (IOException e) {}
return ret;
}
// this needs to work on (empty) directories too
protected boolean ceph_unlink(String path) {
path = prepare_path(path);
boolean ret = false;
if (ceph_isdirectory(path)) {
ret = ceph_rmdir(path);
} else {
try {
ret = localFS.delete(new Path(path), false);
} catch (IOException e) {}
}
return ret;
}
protected boolean ceph_rename(String oldName, String newName) {
oldName = prepare_path(oldName);
newName = prepare_path(newName);
try {
Path parent = new Path(newName).getParent();
Path newPath = new Path(newName);
if (localFS.exists(parent) && !localFS.exists(newPath)) {
return localFS.rename(new Path(oldName), newPath);
}
return false;
} catch (IOException e) {
return false;
}
}
protected boolean ceph_exists(String path) {
path = prepare_path(path);
boolean ret = false;
try {
ret = localFS.exists(new Path(path));
} catch (IOException e) {}
return ret;
}
protected long ceph_getblocksize(String path) {
path = prepare_path(path);
try {
FileStatus status = localFS.getFileStatus(new Path(path));
return status.getBlockSize();
} catch (FileNotFoundException e) {
return -CephFS.ENOENT;
} catch (IOException e) {
return -1; // just fail generically
}
}
protected boolean ceph_isdirectory(String path) {
path = prepare_path(path);
try {
FileStatus status = localFS.getFileStatus(new Path(path));
return status.isDir();
} catch (IOException e) {
return false;
}
}
protected boolean ceph_isfile(String path) {
path = prepare_path(path);
boolean ret = false;
try {
FileStatus status = localFS.getFileStatus(new Path(path));
ret = !status.isDir();
} catch (Exception e) {}
return ret;
}
protected String[] ceph_getdir(String path) {
path = prepare_path(path);
if (!ceph_isdirectory(path)) {
return null;
}
try {
FileStatus[] stats = localFS.listStatus(new Path(path));
String[] names = new String[stats.length];
String name;
for (int i = 0; i < stats.length; ++i) {
name = stats[i].getPath().toString();
names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
}
return names;
} catch (IOException e) {}
return null;
}
protected int ceph_mkdirs(String path, int mode) {
path = prepare_path(path);
// debug("ceph_mkdirs on " + path, INFO);
try {
if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
return 0;
}
} catch (IOException e) {}
if (ceph_isdirectory(path)) { // apparently it already existed
return -EEXIST;
} else if (ceph_isfile(path)) {
return -ENOTDIR;
}
return -1;
}
/*
* Unlike a real Ceph deployment, you can't do opens on a directory.
* Since that has unpredictable behavior and you shouldn't do it anyway,
* it's okay.
*/
protected int ceph_open_for_append(String path) {
path = prepare_path(path);
FSDataOutputStream stream;
try {
stream = localFS.append(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
return fileCount++;
} catch (IOException e) {}
return -1; // failure
}
protected int ceph_open_for_read(String path) {
path = prepare_path(path);
FSDataInputStream stream;
try {
stream = localFS.open(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
return fileCount++;
} catch (IOException e) {}
return -1; // failure
}
protected int ceph_open_for_overwrite(String path, int mode) {
path = prepare_path(path);
FSDataOutputStream stream;
try {
stream = localFS.create(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
return fileCount++;
} catch (IOException e) {}
return -1; // failure
}
protected int ceph_close(int filehandle) {
LOG.info("ceph_close(filehandle " + filehandle + ")");
try {
((Closeable) files.get(new Integer(filehandle))).close();
if (null == files.get(new Integer(filehandle))) {
return -ENOENT; // this isn't quite the right error code,
// but the important part is it's negative
}
return 0; // hurray, success
} catch (NullPointerException ne) {
LOG.warn("ceph_close caught NullPointerException!" + ne);
} // err, how?
catch (IOException ie) {
LOG.warn("ceph_close caught IOException!" + ie);
}
return -1; // failure
}
protected boolean ceph_setPermission(String pth, int mode) {
pth = prepare_path(pth);
Path path = new Path(pth);
boolean ret = false;
try {
localFS.setPermission(path, new FsPermission((short) mode));
ret = true;
} catch (IOException e) {}
return ret;
}
// rather than try and match a Ceph deployment's behavior exactly,
// just make bad things happen if they try and call methods after this
protected boolean ceph_kill_client() {
// debug("ceph_kill_client", INFO);
localFS.setWorkingDirectory(new Path(localPrefix));
// debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
try {
localFS.close();
} catch (Exception e) {}
localFS = null;
files = null;
filenames = null;
return true;
}
protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
pth = prepare_path(pth);
Path path = new Path(pth);
boolean ret = false;
try {
FileStatus status = localFS.getFileStatus(path);
fill.size = status.getLen();
fill.is_dir = status.isDir();
fill.block_size = status.getBlockSize();
fill.mod_time = status.getModificationTime();
fill.access_time = status.getAccessTime();
fill.mode = status.getPermission().toShort();
ret = true;
} catch (IOException e) {}
return ret;
}
protected int ceph_replication(String path) {
path = prepare_path(path);
int ret = -1; // -1 for failure
try {
ret = localFS.getFileStatus(new Path(path)).getReplication();
} catch (IOException e) {}
return ret;
}
protected String[] ceph_hosts(int fh, long offset) {
String[] ret = null;
try {
BlockLocation[] locs = localFS.getFileBlockLocations(
localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
offset, 1);
ret = locs[0].getNames();
} catch (IOException e) {} catch (NullPointerException f) {}
return ret;
}
protected int ceph_setTimes(String pth, long mtime, long atime) {
pth = prepare_path(pth);
Path path = new Path(pth);
int ret = -1; // generic fail
try {
localFS.setTimes(path, mtime, atime);
ret = 0;
} catch (IOException e) {}
return ret;
}
protected long ceph_getpos(int fh) {
long ret = -1; // generic fail
try {
Object stream = files.get(new Integer(fh));
if (stream instanceof FSDataInputStream) {
ret = ((FSDataInputStream) stream).getPos();
} else if (stream instanceof FSDataOutputStream) {
ret = ((FSDataOutputStream) stream).getPos();
}
} catch (IOException e) {} catch (NullPointerException f) {}
return ret;
}
protected int ceph_write(int fh, byte[] buffer,
int buffer_offset, int length) {
LOG.info(
"ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
+ length);
long ret = -1; // generic fail
try {
FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
LOG.info("ceph_write got outputstream");
long startPos = os.getPos();
os.write(buffer, buffer_offset, length);
ret = os.getPos() - startPos;
} catch (IOException e) {
LOG.warn("ceph_write caught IOException!");
} catch (NullPointerException f) {
LOG.warn("ceph_write caught NullPointerException!");
}
return (int) ret;
}
protected int ceph_read(int fh, byte[] buffer,
int buffer_offset, int length) {
long ret = -1; // generic fail
try {
FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
long startPos = is.getPos();
is.read(buffer, buffer_offset, length);
ret = is.getPos() - startPos;
} catch (IOException e) {} catch (NullPointerException f) {}
return (int) ret;
}
protected long ceph_seek_from_start(int fh, long pos) {
LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
long ret = -1; // generic fail
try {
LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
if (null == files.get(new Integer(fh))) {
LOG.warn("ceph_seek_from_start: is is null!");
}
FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
LOG.info("ceph_seek_from_start retrieved is!");
is.seek(pos);
ret = is.getPos();
} catch (IOException e) {
LOG.warn("ceph_seek_from_start caught IOException!");
} catch (NullPointerException f) {
LOG.warn("ceph_seek_from_start caught NullPointerException!");
}
return (int) ret;
}
/*
* We need to remove the localFS file prefix before returning to Ceph
*/
private String sanitize_path(String path) {
// debug("sanitize_path(" + path + ")", INFO);
/* if (path.startsWith("file:"))
path = path.substring("file:".length()); */
if (path.startsWith(localPrefix)) {
path = path.substring(localPrefix.length());
if (path.length() == 0) { // it was a root path
path = "/";
}
}
// debug("sanitize_path returning " + path, INFO);
return path;
}
/*
* If it's an absolute path we need to shove the
* test dir onto the front as a prefix.
*/
private String prepare_path(String path) {
// debug("prepare_path(" + path + ")", INFO);
if (path.startsWith("/")) {
path = localPrefix + path;
} else if (path.equals("..")) {
if (ceph_getcwd().equals("/")) {
path = ".";
} // you can't go up past root!
}
// debug("prepare_path returning" + path, INFO);
return path;
}
}

View File

@ -1,804 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FS interfaces to allow applications to store
* files in Ceph.
*/
package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.OutputStream;
import java.net.URI;
import java.net.InetAddress;
import java.util.EnumSet;
import java.lang.Math;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.net.DNS;
/**
* <p>
* A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
* This will not start a Ceph instance; one must already be running.
* </p>
* Configuration of the CephFileSystem is handled via a few Hadoop
* Configuration properties: <br>
* fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br>
* fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are
* located in. This assumes Hadoop is being run on a linux-style machine
* with names like libcephfs.so.
* fs.ceph.commandLine -- if you prefer you can fill in this property
* just as you would when starting Ceph up from the command line. Specific
* properties override any configuration specified here.
* <p>
* You can also enable debugging of the CephFileSystem and Ceph itself: <br>
* fs.ceph.debug -- if 'true' will print out method enter/exit messages,
* plus a little more.
* fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging
* from the respective Ceph system of at least that importance.
*/
public class CephFileSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
private URI uri;
private Path workingDir;
private final Path root;
private CephFS ceph = null;
private static String CEPH_NAMESERVER;
private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
/**
* Create a new CephFileSystem.
*/
public CephFileSystem() {
root = new Path("/");
}
/**
* Used for testing purposes, this constructor
* sets the given CephFS instead of defaulting to a
* CephTalker (with its assumed real Ceph instance to talk to).
*/
public CephFileSystem(CephFS ceph_fs) {
super();
root = new Path("/");
ceph = ceph_fs;
}
/**
* Lets you get the URI of this CephFileSystem.
* @return the URI.
*/
public URI getUri() {
LOG.debug("getUri:exit with return " + uri);
return uri;
}
/**
* Should be called after constructing a CephFileSystem but before calling
* any other methods.
* Starts up the connection to Ceph, reads in configuraton options, etc.
* @param uri The URI for this filesystem.
* @param conf The Hadoop Configuration to retrieve properties from.
* @throws IOException if necessary properties are unset.
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
if (ceph == null) {
ceph = new CephTalker(conf, LOG);
}
CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
// build up the arguments for Ceph
String arguments = "CephFSInterface";
arguments += conf.get("fs.ceph.commandLine", "");
if (conf.get("fs.ceph.clientDebug") != null) {
arguments += " --debug_client ";
arguments += conf.get("fs.ceph.clientDebug");
}
if (conf.get("fs.ceph.messengerDebug") != null) {
arguments += " --debug_ms ";
arguments += conf.get("fs.ceph.messengerDebug");
}
if (conf.get("fs.ceph.monAddr") != null) {
arguments += " -m ";
arguments += conf.get("fs.ceph.monAddr");
}
arguments += " --client-readahead-max-periods="
+ conf.get("fs.ceph.readahead", "1");
// make sure they gave us a ceph monitor address or conf file
LOG.info("initialize:Ceph initialization arguments: " + arguments);
if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
&& (arguments.indexOf("-c") == -1)) {
LOG.fatal("initialize:You need to specify a Ceph monitor address.");
throw new IOException(
"You must specify a Ceph monitor address or config file!");
}
// Initialize the client
if (!ceph.ceph_initializeClient(arguments,
conf.getInt("fs.ceph.blockSize", 1 << 26))) {
LOG.fatal("initialize:Ceph initialization failed!");
throw new IOException("Ceph initialization failed!");
}
LOG.info("initialize:Ceph initialized client. Setting cwd to /");
ceph.ceph_setcwd("/");
LOG.debug("initialize:exit");
this.workingDir = getHomeDirectory();
}
/**
* Close down the CephFileSystem. Runs the base-class close method
* and then kills the Ceph client itself.
*/
@Override
public void close() throws IOException {
LOG.debug("close:enter");
super.close(); // this method does stuff, make sure it's run!
LOG.trace("close: Calling ceph_kill_client from Java");
ceph.ceph_kill_client();
LOG.debug("close:exit");
}
/**
* Get an FSDataOutputStream to append onto a file.
* @param file The File you want to append onto
* @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
* @param progress The Progressable to report progress to.
* Reporting is limited but exists.
* @return An FSDataOutputStream that connects to the file on Ceph.
* @throws IOException If the file cannot be found or appended to.
*/
public FSDataOutputStream append(Path file, int bufferSize,
Progressable progress) throws IOException {
LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
Path abs_path = makeAbsolute(file);
if (progress != null) {
progress.progress();
}
LOG.trace("append: Entering ceph_open_for_append from Java");
int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
LOG.trace("append: Returned to Java");
if (progress != null) {
progress.progress();
}
if (fd < 0) { // error in open
throw new IOException(
"append: Open for append failed on path \"" + abs_path.toString()
+ "\"");
}
CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
bufferSize);
LOG.debug("append:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
/**
* Get the current working directory for the given file system
* @return the directory Path
*/
public Path getWorkingDirectory() {
return workingDir;
}
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param dir The directory to change to.
*/
@Override
public void setWorkingDirectory(Path dir) {
workingDir = makeAbsolute(dir);
}
/**
* Return only the path component from a potentially fully qualified path.
*/
private String getCephPath(Path path) {
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
return path.toUri().getPath();
}
/**
* Check if a path exists.
* Overriden because it's moderately faster than the generic implementation.
* @param path The file to check existence on.
* @return true if the file exists, false otherwise.
*/
@Override
public boolean exists(Path path) throws IOException {
LOG.debug("exists:enter with path " + path);
boolean result;
Path abs_path = makeAbsolute(path);
if (abs_path.equals(root)) {
result = true;
} else {
LOG.trace(
"exists:Calling ceph_exists from Java on path " + abs_path.toString());
result = ceph.ceph_exists(getCephPath(abs_path));
LOG.trace("exists:Returned from ceph_exists to Java");
}
LOG.debug("exists:exit with value " + result);
return result;
}
/**
* Create a directory and any nonexistent parents. Any portion
* of the directory tree can exist without error.
* @param path The directory path to create
* @param perms The permissions to apply to the created directories.
* @return true if successful, false otherwise
* @throws IOException if the path is a child of a file.
*/
@Override
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
LOG.debug("mkdirs:enter with path " + path);
Path abs_path = makeAbsolute(path);
LOG.trace("mkdirs:calling ceph_mkdirs from Java");
int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
if (result != 0) {
LOG.warn(
"mkdirs: make directory " + abs_path + "Failing with result " + result);
if (-ceph.ENOTDIR == result) {
throw new IOException("Parent path is not a directory");
}
return false;
} else {
LOG.debug("mkdirs:exiting succesfully");
return true;
}
}
/**
* Check if a path is a file. This is moderately faster than the
* generic implementation.
* @param path The path to check.
* @return true if the path is definitely a file, false otherwise.
*/
@Override
public boolean isFile(Path path) throws IOException {
LOG.debug("isFile:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = false;
} else {
LOG.trace("isFile:entering ceph_isfile from Java");
result = ceph.ceph_isfile(getCephPath(abs_path));
}
LOG.debug("isFile:exit with result " + result);
return result;
}
/**
* Get stat information on a file. This does not fill owner or group, as
* Ceph's support for these is a bit different than HDFS'.
* @param path The path to stat.
* @return FileStatus object containing the stat information.
* @throws FileNotFoundException if the path could not be resolved.
*/
public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
// sadly, Ceph doesn't really do uids/gids just yet, but
// everything else is filled
FileStatus status;
Stat lstat = new Stat();
LOG.trace("getFileStatus: calling ceph_stat from Java");
if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
lstat.mod_time, lstat.access_time,
new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
path.makeQualified(this));
} else { // fail out
throw new FileNotFoundException(
"org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
+ " does not exist or could not be accessed");
}
LOG.debug("getFileStatus:exit");
return status;
}
/**
* Get the FileStatus for each listing in a directory.
* @param path The directory to get listings from.
* @return FileStatus[] containing one FileStatus for each directory listing;
* null if path does not exist.
*/
public FileStatus[] listStatus(Path path) throws IOException {
LOG.debug("listStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
if (paths != null) {
FileStatus[] statuses = new FileStatus[paths.length];
for (int i = 0; i < paths.length; ++i) {
statuses[i] = getFileStatus(paths[i]);
}
LOG.debug("listStatus:exit");
return statuses;
}
if (isFile(path)) {
return new FileStatus[] { getFileStatus(path) };
}
return null;
}
@Override
public void setPermission(Path p, FsPermission permission) throws IOException {
LOG.debug(
"setPermission:enter with path " + p + " and permissions " + permission);
Path abs_path = makeAbsolute(p);
LOG.trace("setPermission:calling ceph_setpermission from Java");
ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
LOG.debug("setPermission:exit");
}
/**
* Set access/modification times of a file.
* @param p The path
* @param mtime Set modification time in number of millis since Jan 1, 1970.
* @param atime Set access time in number of millis since Jan 1, 1970.
*/
@Override
public void setTimes(Path p, long mtime, long atime) throws IOException {
LOG.debug(
"setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
Path abs_path = makeAbsolute(p);
LOG.trace("setTimes:calling ceph_setTimes from Java");
int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
if (r < 0) {
throw new IOException(
"Failed to set times on path " + abs_path.toString() + " Error code: "
+ r);
}
LOG.debug("setTimes:exit");
}
/**
* Create a new file and open an FSDataOutputStream that's connected to it.
* @param path The file to create.
* @param permission The permissions to apply to the file.
* @param overwrite If true, overwrite any existing file with
* this name; otherwise don't.
* @param bufferSize Ceph does internal buffering, but you can buffer
* in the Java code too if you like.
* @param replication Ignored by Ceph. This can be
* configured via Ceph configuration.
* @param blockSize Ignored by Ceph. You can set client-wide block sizes
* via the fs.ceph.blockSize param if you like.
* @param progress A Progressable to report back to.
* Reporting is limited but exists.
* @return An FSDataOutputStream pointing to the created file.
* @throws IOException if the path is an
* existing directory, or the path exists but overwrite is false, or there is a
* failure in attempting to open for append with Ceph.
*/
public FSDataOutputStream create(Path path,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
LOG.debug("create:enter with path " + path);
Path abs_path = makeAbsolute(path);
if (progress != null) {
progress.progress();
}
// We ignore replication since that's not configurable here, and
// progress reporting is quite limited.
// Required semantics: if the file exists, overwrite if 'overwrite' is set;
// otherwise, throw an exception
// Step 1: existence test
boolean exists = exists(abs_path);
if (exists) {
if (getFileStatus(abs_path).isDir()) {
throw new IOException(
"create: Cannot overwrite existing directory \"" + path.toString()
+ "\" with a file");
}
if (!overwrite) {
throw new IOException(
"createRaw: Cannot open existing file \"" + abs_path.toString()
+ "\" for writing without overwrite flag");
}
}
if (progress != null) {
progress.progress();
}
// Step 2: create any nonexistent directories in the path
if (!exists) {
Path parent = abs_path.getParent();
if (parent != null) { // if parent is root, we're done
int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
if (!(r == 0 || r == -ceph.EEXIST)) {
throw new IOException("Error creating parent directory; code: " + r);
}
}
if (progress != null) {
progress.progress();
}
}
// Step 3: open the file
LOG.trace("calling ceph_open_for_overwrite from Java");
int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
(int) permission.toShort());
if (progress != null) {
progress.progress();
}
LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
if (fh < 0) {
throw new IOException(
"create: Open for overwrite failed on path \"" + path.toString()
+ "\"");
}
// Step 4: create the stream
OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
bufferSize);
LOG.debug("create:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
/**
* Open a Ceph file and attach the file handle to an FSDataInputStream.
* @param path The file to open
* @param bufferSize Ceph does internal buffering; but you can buffer in
* the Java code too if you like.
* @return FSDataInputStream reading from the given path.
* @throws IOException if the path DNE or is a
* directory, or there is an error getting data to set up the FSDataInputStream.
*/
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("open:enter with path " + path);
Path abs_path = makeAbsolute(path);
int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
if (fh < 0) { // uh-oh, something's bad!
if (fh == -ceph.ENOENT) { // well that was a stupid open
throw new IOException(
"open: absolute path \"" + abs_path.toString()
+ "\" does not exist");
} else { // hrm...the file exists but we can't open it :(
throw new IOException("open: Failed to open file " + abs_path.toString());
}
}
if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
// but that doesn't mean you should in Hadoop!
ceph.ceph_close(fh);
throw new IOException(
"open: absolute path \"" + abs_path.toString() + "\" is a directory!");
}
Stat lstat = new Stat();
LOG.trace("open:calling ceph_stat from Java");
ceph.ceph_stat(getCephPath(abs_path), lstat);
LOG.trace("open:returned to Java");
long size = lstat.size;
if (size < 0) {
throw new IOException(
"Failed to get file size for file " + abs_path.toString()
+ " but succeeded in opening file. Something bizarre is going on.");
}
FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
bufferSize);
LOG.debug("open:exit");
return new FSDataInputStream(cephIStream);
}
/**
* Rename a file or directory.
* @param src The current path of the file/directory
* @param dst The new name for the path.
* @return true if the rename succeeded, false otherwise.
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
LOG.trace("calling ceph_rename from Java");
boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
if (!result) {
boolean isDir = false;
try {
isDir = getFileStatus(abs_dst).isDir();
} catch (FileNotFoundException e) {}
if (isDir) { // move the srcdir into destdir
LOG.debug("ceph_rename failed but dst is a directory!");
Path new_dst = new Path(abs_dst, abs_src.getName());
result = rename(abs_src, new_dst);
LOG.debug(
"attempt to move " + abs_src.toString() + " to "
+ new_dst.toString() + "has result:" + result);
}
}
LOG.debug("rename:exit with result: " + result);
return result;
}
/*
* Attempt to convert an IP into its hostname
*/
private String[] ips2Hosts(String[] ips) {
ArrayList<String> hosts = new ArrayList<String>();
for (String ip : ips) {
try {
String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
if (host.charAt(host.length()-1) == '.') {
host = host.substring(0, host.length()-1);
}
hosts.add(host); /* append */
} catch (Exception e) {
LOG.error("reverseDns ["+ip+"] failed: "+ e);
}
}
return hosts.toArray(new String[hosts.size()]);
}
/**
* Get a BlockLocation object for each block in a file.
*
* Note that this doesn't include port numbers in the name field as
* Ceph handles slow/down servers internally. This data should be used
* only for selecting which servers to run which jobs on.
*
* @param file A FileStatus object corresponding to the file you want locations for.
* @param start The offset of the first part of the file you are interested in.
* @param len The amount of the file past the offset you are interested in.
* @return A BlockLocation[] where each object corresponds to a block within
* the given range.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
Path abs_path = makeAbsolute(file.getPath());
int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
if (fh < 0) {
LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
return null;
}
long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
for (int i = 0; i < locations.length; ++i) {
long offset = start + i * blockSize;
long blockStart = start + i * blockSize - (start % blockSize);
String ips[] = ceph.ceph_hosts(fh, offset);
String hosts[] = ips2Hosts(ips);
locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
}
ceph.ceph_close(fh);
return locations;
}
@Deprecated
public boolean delete(Path path) throws IOException {
return delete(path, false);
}
/**
* Delete the given path, and optionally its children.
* @param path the path to delete.
* @param recursive If the path is a non-empty directory and this is false,
* delete will throw an IOException. If path is a file this is ignored.
* @return true if the delete succeeded, false otherwise (including if
* path doesn't exist).
* @throws IOException if you attempt to non-recursively delete a directory,
* or you attempt to delete the root directory.
*/
public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
Path abs_path = makeAbsolute(path);
// sanity check
if (abs_path.equals(root)) {
throw new IOException("Error: deleting the root directory is a Bad Idea.");
}
if (!exists(abs_path)) {
return false;
}
// if the path is a file, try to delete it.
if (isFile(abs_path)) {
LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
boolean result = ceph.ceph_unlink(getCephPath(abs_path));
if (!result) {
LOG.error(
"delete: failed to delete file \"" + abs_path.toString() + "\".");
}
LOG.debug("delete:exit with success=" + result);
return result;
}
/* The path is a directory, so recursively try to delete its contents,
and then delete the directory. */
// get the entries; listPaths will remove . and .. for us
Path[] contents = listPaths(abs_path);
if (contents == null) {
LOG.error(
"delete: Failed to read contents of directory \""
+ abs_path.toString() + "\" while trying to delete it, BAILING");
return false;
}
if (!recursive && contents.length > 0) {
throw new IOException("Directories must be deleted recursively!");
}
// delete the entries
LOG.debug("delete: recursively calling delete on contents of " + abs_path);
for (Path p : contents) {
if (!delete(p, true)) {
LOG.error(
"delete: Failed to delete file \"" + p.toString()
+ "\" while recursively deleting \"" + abs_path.toString()
+ "\", BAILING");
return false;
}
}
// if we've come this far it's a now-empty directory, so delete it!
boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
if (!result) {
LOG.error(
"delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
}
LOG.debug("delete:exit");
return result;
}
/**
* Returns the default replication value of 1. This may
* NOT be the actual value, as replication is controlled
* by a separate Ceph configuration.
*/
@Override
public short getDefaultReplication() {
return 1;
}
/**
* Get the default block size.
* @return the default block size, in bytes, as a long.
*/
@Override
public long getDefaultBlockSize() {
return getConf().getInt("fs.ceph.blockSize", 1 << 26);
}
/**
* Adds the working directory to path if path is not already
* an absolute path. The URI scheme is not removed here. It
* is removed only when users (e.g. ceph native calls) need
* the path-only portion.
*/
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
private Path[] listPaths(Path path) throws IOException {
LOG.debug("listPaths:enter with path " + path);
String dirlist[];
Path abs_path = makeAbsolute(path);
// If it's a directory, get the listing. Otherwise, complain and give up.
LOG.debug("calling ceph_getdir from Java with path " + abs_path);
dirlist = ceph.ceph_getdir(getCephPath(abs_path));
LOG.debug("returning from ceph_getdir to Java");
if (dirlist == null) {
return null;
}
// convert the strings to Paths
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
LOG.trace(
"Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
+ dirlist[i] + "\"");
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
if (raw_path.isAbsolute()) {
paths[i] = raw_path;
} else {
paths[i] = new Path(abs_path, raw_path);
}
}
LOG.debug("listPaths:exit");
return paths;
}
static class Stat {
public long size;
public boolean is_dir;
public long block_size;
public long mod_time;
public long access_time;
public int mode;
public Stat() {}
}
}

View File

@ -1,254 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FS interfaces to allow applications to store
* files in Ceph.
*/
package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
/**
* <p>
* An {@link FSInputStream} for a CephFileSystem and corresponding
* Ceph instance.
*/
public class CephInputStream extends FSInputStream {
private static final Log LOG = LogFactory.getLog(CephInputStream.class);
private boolean closed;
private int fileHandle;
private long fileLength;
private CephFS ceph;
private byte[] buffer;
private int bufPos = 0;
private int bufValid = 0;
private long cephPos = 0;
/**
* Create a new CephInputStream.
* @param conf The system configuration. Unused.
* @param fh The filehandle provided by Ceph to reference.
* @param flength The current length of the file. If the length changes
* you will need to close and re-open it to access the new data.
*/
public CephInputStream(Configuration conf, CephFS cephfs,
int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual ceph_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
ceph = cephfs;
buffer = new byte[bufferSize];
LOG.debug(
"CephInputStream constructor: initializing stream with fh " + fh
+ " and file length " + flength);
}
/** Ceph likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
protected void finalize() throws Throwable {
try {
if (!closed) {
close();
}
} finally {
super.finalize();
}
}
private synchronized boolean fillBuffer() throws IOException {
bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
bufPos = 0;
if (bufValid < 0) {
int err = bufValid;
bufValid = 0;
// attempt to reset to old position. If it fails, too bad.
ceph.ceph_seek_from_start(fileHandle, cephPos);
throw new IOException("Failed to fill read buffer! Error code:" + err);
}
cephPos += bufValid;
return (bufValid != 0);
}
/*
* Get the current position of the stream.
*/
public synchronized long getPos() throws IOException {
return cephPos - bufValid + bufPos;
}
/**
* Find the number of bytes remaining in the file.
*/
@Override
public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}
public synchronized void seek(long targetPos) throws IOException {
LOG.trace(
"CephInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle);
if (targetPos > fileLength) {
throw new IOException(
"CephInputStream.seek: failed seek to position " + targetPos
+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
}
long oldPos = cephPos;
cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
bufValid = 0;
bufPos = 0;
if (cephPos < 0) {
cephPos = oldPos;
throw new IOException("Ceph failed to seek to new position!");
}
}
/**
* Failovers are handled by the Ceph code at a very low level;
* if there are issues that can be solved by changing sources
* they'll be dealt with before anybody even tries to call this method!
* @return false.
*/
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
/**
* Read a byte from the file.
* @return the next byte.
*/
@Override
public synchronized int read() throws IOException {
LOG.trace(
"CephInputStream.read: Reading a single byte from fd " + fileHandle
+ " by calling general read function");
byte result[] = new byte[1];
if (getPos() >= fileLength) {
return -1;
}
if (-1 == read(result, 0, 1)) {
return -1;
}
if (result[0] < 0) {
return 256 + (int) result[0];
} else {
return result[0];
}
}
/**
* Read a specified number of bytes from the file into a byte[].
* @param buf the byte array to read into.
* @param off the offset to start at in the file
* @param len the number of bytes to read
* @return 0 if successful, otherwise an error code.
* @throws IOException on bad input.
*/
@Override
public synchronized int read(byte buf[], int off, int len)
throws IOException {
LOG.trace(
"CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
throw new IOException(
"CephInputStream.read: cannot read " + len + " bytes from fd "
+ fileHandle + ": stream closed");
}
// ensure we're not past the end of the file
if (getPos() >= fileLength) {
LOG.debug(
"CephInputStream.read: cannot read " + len + " bytes from fd "
+ fileHandle + ": current position is " + getPos()
+ " and file length is " + fileLength);
return -1;
}
int totalRead = 0;
int initialLen = len;
int read;
do {
read = Math.min(len, bufValid - bufPos);
try {
System.arraycopy(buffer, bufPos, buf, off, read);
} catch (IndexOutOfBoundsException ie) {
throw new IOException(
"CephInputStream.read: Indices out of bounds:" + "read length is "
+ len + ", buffer offset is " + off + ", and buffer size is "
+ buf.length);
} catch (ArrayStoreException ae) {
throw new IOException(
"Uh-oh, CephInputStream failed to do an array"
+ "copy due to type mismatch...");
} catch (NullPointerException ne) {
throw new IOException(
"CephInputStream.read: cannot read " + len + "bytes from fd:"
+ fileHandle + ": buf is null");
}
bufPos += read;
len -= read;
off += read;
totalRead += read;
} while (len > 0 && fillBuffer());
LOG.trace(
"CephInputStream.read: Reading " + initialLen + " bytes from fd "
+ fileHandle + ": succeeded in reading " + totalRead + " bytes");
return totalRead;
}
/**
* Close the CephInputStream and release the associated filehandle.
*/
@Override
public void close() throws IOException {
LOG.trace("CephOutputStream.close:enter");
if (!closed) {
int result = ceph.ceph_close(fileHandle);
closed = true;
if (result != 0) {
throw new IOException(
"Close somehow failed!"
+ "Don't try and use this stream again, though");
}
LOG.trace("CephOutputStream.close:exit");
}
}
}

View File

@ -1,219 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FS interfaces to allow applications to store
* files in Ceph.
*/
package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/**
* <p>
* An {@link OutputStream} for a CephFileSystem and corresponding
* Ceph instance.
*/
public class CephOutputStream extends OutputStream {
private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
private boolean closed;
private CephFS ceph;
private int fileHandle;
private byte[] buffer;
private int bufUsed = 0;
/**
* Construct the CephOutputStream.
* @param conf The FileSystem configuration.
* @param fh The Ceph filehandle to connect to.
*/
public CephOutputStream(Configuration conf, CephFS cephfs,
int fh, int bufferSize) {
ceph = cephfs;
fileHandle = fh;
closed = false;
buffer = new byte[bufferSize];
}
/** Ceph likes things to be closed before it shuts down,
*so closing the IOStream stuff voluntarily is good
*/
protected void finalize() throws Throwable {
try {
if (!closed) {
close();
}
} finally {
super.finalize();
}
}
/**
* Get the current position in the file.
* @return The file offset in bytes.
*/
public long getPos() throws IOException {
return ceph.ceph_getpos(fileHandle);
}
/**
* Write a byte.
* @param b The byte to write.
* @throws IOException If you have closed the CephOutputStream or the
* write fails.
*/
@Override
public synchronized void write(int b) throws IOException {
LOG.trace(
"CephOutputStream.write: writing a single byte to fd " + fileHandle);
if (closed) {
throw new IOException(
"CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
+ ": stream closed");
}
// Stick the byte in a buffer and write it
byte buf[] = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
return;
}
/**
* Write a byte buffer into the Ceph file.
* @param buf the byte array to write from
* @param off the position in the file to start writing at.
* @param len The number of bytes to actually write.
* @throws IOException if you have closed the CephOutputStream, or
* if buf is null or off + len > buf.length, or
* if the write fails due to a Ceph error.
*/
@Override
public synchronized void write(byte buf[], int off, int len) throws IOException {
LOG.trace(
"CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
// make sure stream is open
if (closed) {
throw new IOException(
"CephOutputStream.write: cannot write " + len + "bytes to fd "
+ fileHandle + ": stream closed");
}
int result;
int write;
while (len > 0) {
write = Math.min(len, buffer.length - bufUsed);
try {
System.arraycopy(buf, off, buffer, bufUsed, write);
} catch (IndexOutOfBoundsException ie) {
throw new IOException(
"CephOutputStream.write: Indices out of bounds: "
+ "write length is " + len + ", buffer offset is " + off
+ ", and buffer size is " + buf.length);
} catch (ArrayStoreException ae) {
throw new IOException(
"Uh-oh, CephOutputStream failed to do an array"
+ " copy due to type mismatch...");
} catch (NullPointerException ne) {
throw new IOException(
"CephOutputStream.write: cannot write " + len + "bytes to fd "
+ fileHandle + ": buffer is null");
}
bufUsed += write;
len -= write;
off += write;
if (bufUsed == buffer.length) {
result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
if (result < 0) {
throw new IOException(
"CephOutputStream.write: Buffered write of " + bufUsed
+ " bytes failed!");
}
if (result != bufUsed) {
throw new IOException(
"CephOutputStream.write: Wrote only " + result + " bytes of "
+ bufUsed + " in buffer! Data may be lost or written"
+ " twice to Ceph!");
}
bufUsed = 0;
}
}
return;
}
/**
* Flush the buffered data.
* @throws IOException if you've closed the stream or the write fails.
*/
@Override
public synchronized void flush() throws IOException {
if (!closed) {
if (bufUsed == 0) {
return;
}
int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
if (result < 0) {
throw new IOException(
"CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+ fileHandle + " failed");
}
if (result != bufUsed) {
throw new IOException(
"CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+ fileHandle + "was incomplete: only " + result + " of " + bufUsed
+ " bytes were written.");
}
bufUsed = 0;
return;
}
}
/**
* Close the CephOutputStream.
* @throws IOException if Ceph somehow returns an error. In current code it can't.
*/
@Override
public synchronized void close() throws IOException {
LOG.trace("CephOutputStream.close:enter");
if (!closed) {
flush();
int result = ceph.ceph_close(fileHandle);
if (result != 0) {
throw new IOException("Close failed!");
}
closed = true;
LOG.trace("CephOutputStream.close:exit");
}
}
}

View File

@ -1,91 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Wraps a number of native function calls to communicate with the Ceph
* filesystem.
*/
package org.apache.hadoop.fs.ceph;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
class CephTalker extends CephFS {
// JNI doesn't give us any way to store pointers, so use a long.
// Here we're assuming pointers aren't longer than 8 bytes.
long cluster;
// we write a constructor so we can load the libraries
public CephTalker(Configuration conf, Log log) {
System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
cluster = 0;
}
protected native boolean ceph_initializeClient(String arguments, int block_size);
protected native String ceph_getcwd();
protected native boolean ceph_setcwd(String path);
protected native boolean ceph_rmdir(String path);
protected native boolean ceph_unlink(String path);
protected native boolean ceph_rename(String old_path, String new_path);
protected native boolean ceph_exists(String path);
protected native long ceph_getblocksize(String path);
protected native boolean ceph_isdirectory(String path);
protected native boolean ceph_isfile(String path);
protected native String[] ceph_getdir(String path);
protected native int ceph_mkdirs(String path, int mode);
protected native int ceph_open_for_append(String path);
protected native int ceph_open_for_read(String path);
protected native int ceph_open_for_overwrite(String path, int mode);
protected native int ceph_close(int filehandle);
protected native boolean ceph_setPermission(String path, int mode);
protected native boolean ceph_kill_client();
protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
protected native int ceph_replication(String Path);
protected native String[] ceph_hosts(int fh, long offset);
protected native int ceph_setTimes(String path, long mtime, long atime);
protected native long ceph_getpos(int fh);
protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
protected native long ceph_seek_from_start(int fh, long pos);
}

View File

@ -1,4 +0,0 @@
Unlike the rest of the code in this repository, this
directory (src/client/hadoop) is licensed under the Apache License 2.0. This
is for the obvious reason that we want to integrate it into the Apache Hadoop
project.

View File

@ -1,45 +0,0 @@
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unit tests for the CephFileSystem API implementation.
*/
package org.apache.hadoop.fs.ceph;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class TestCeph extends FileSystemContractBaseTest {
@Override
protected void setUp() throws IOException {
Configuration conf = new Configuration();
CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
CephFileSystem cephfs = new CephFileSystem(cephfaker);
cephfs.initialize(URI.create("ceph://null"), conf);
fs = cephfs;
}
}

View File

@ -1,101 +0,0 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head></head>
<body>
<h1>A client for the Ceph filesystem</h1>
<h3>Introduction</h3>
This page describes how to use <a href="http://ceph.newdream.net">Ceph</a>
as a backing store with Hadoop. This page assumes that you have downloaded
the Ceph software and installed necessary binaries as outlined in the Ceph
documentation.
<h3>Steps</h3>
<ul>
<li>In the Hadoop conf directory edit core-site.xml,
adding the following (with appropriate substitutions). Note that
different nodes can connect to different monitors in the same cluster
without issue (the Ceph client will automatically redirect as necessary).
<pre>
&lt;property&gt;
&lt;name&gt;fs.default.name&lt;/name&gt;
&lt;value&gt;ceph://null&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.monAddr&lt;/name&gt;
&lt;value&gt;&lt;serverIP:port&gt;&lt;/value&gt;
&lt;description&gt;The location of the Ceph monitor to connect to.
This should be an IP address, not a domain-based web address.&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.libDir&lt;/name&gt;
&lt;value&gt;/usr/local/lib&lt;/value&gt;
&lt;description&gt;The folder holding libcephfs and libhadoopceph&lt;/description&gt;
&lt;/property&gt;
</pre>
<li>There are also a number of optional Ceph configuration options.
<pre>
&lt;property&gt;
&lt;name&gt;fs.ceph.blockSize&lt;/name&gt;
&lt;value&gt;67108864&lt;/value&gt;
&lt;description&gt;Defaulting to 64MB, this is the size (in bytes) you want Ceph to use in striping data internally and presenting it to Hadoop.&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.debug&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;description&gt;If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.clientDebug&lt;/name&gt;
&lt;value&gt;1&lt;/value&gt;
&lt;description&gt;If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.messengerDebug&lt;/name&gt;
&lt;value&gt;1&lt;/value&gt;
&lt;description&gt;If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.readahead&lt;/name&gt;
&lt;value&gt;1&lt;/value&gt;
&lt;description&gt;Sets the number of object periods to read ahead in prefetching. This should probably be left at the default of 1.&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.ceph.commandLine&lt;/name&gt;
&lt;value&gt;a string&lt;/value&gt;
&lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.&lt;/description&gt;
&lt;/property&gt;
</pre>
<li>Start up your Ceph instance according to the Ceph documentation.</li>
<li>Do not use the bin/start-all.sh commands, as they will attempt to start
up an hdfs instance. Just start whatever systems you need and they will
automatically make use of the Ceph filesystem once configured as above.</li>
</body>
</html>

View File

@ -1,13 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephFS */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephFS
#define _Included_org_apache_hadoop_fs_ceph_CephFS
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,31 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem
#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem
#ifdef __cplusplus
extern "C" {
#endif
#undef org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST
#define org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST 17L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT
#define org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT 2L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_FATAL
#define org_apache_hadoop_fs_ceph_CephFileSystem_FATAL 0L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_ERROR
#define org_apache_hadoop_fs_ceph_CephFileSystem_ERROR 1L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_WARN
#define org_apache_hadoop_fs_ceph_CephFileSystem_WARN 2L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_INFO
#define org_apache_hadoop_fs_ceph_CephFileSystem_INFO 3L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG
#define org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG 4L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_TRACE
#define org_apache_hadoop_fs_ceph_CephFileSystem_TRACE 5L
#undef org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG
#define org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG 6L
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,13 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_CephStat */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat
#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,13 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_Stat */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat
#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,47 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephInputStream */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephInputStream
#define _Included_org_apache_hadoop_fs_ceph_CephInputStream
#ifdef __cplusplus
extern "C" {
#endif
#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE
#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_read
* Signature: (I[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_seek_from_start
* Signature: (IJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
(JNIEnv *, jobject, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_getpos
* Signature: (I)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_close
* Signature: (I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
(JNIEnv *, jobject, jint);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,37 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephOutputStream */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephOutputStream
#define _Included_org_apache_hadoop_fs_ceph_CephOutputStream
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_getpos
* Signature: (I)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_close
* Signature: (I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_write
* Signature: (I[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,197 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephTalker */
#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker
#define _Included_org_apache_hadoop_fs_ceph_CephTalker
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_initializeClient
* Signature: (Ljava/lang/String;I)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getcwd
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
(JNIEnv *, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setcwd
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rmdir
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_unlink
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_rename
* Signature: (Ljava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
(JNIEnv *, jobject, jstring, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_exists
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getblocksize
* Signature: (Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isdirectory
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_isfile
* Signature: (Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getdir
* Signature: (Ljava/lang/String;)[Ljava/lang/String;
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_mkdirs
* Signature: (Ljava/lang/String;I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_append
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_read
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_open_for_overwrite
* Signature: (Ljava/lang/String;I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_close
* Signature: (I)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
(JNIEnv *, jobject, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setPermission
* Signature: (Ljava/lang/String;I)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
(JNIEnv *, jobject, jstring, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_kill_client
* Signature: ()Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
(JNIEnv *, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_stat
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
(JNIEnv *, jobject, jstring, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_statfs
* Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
(JNIEnv *, jobject, jstring, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_replication
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
(JNIEnv *, jobject, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_hosts
* Signature: (IJ)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
(JNIEnv *, jobject, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_setTimes
* Signature: (Ljava/lang/String;JJ)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
(JNIEnv *, jobject, jstring, jlong, jlong);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -91,7 +91,6 @@ SUBSYS(finisher, 1, 1)
SUBSYS(heartbeatmap, 1, 5)
SUBSYS(perfcounter, 1, 5)
SUBSYS(rgw, 1, 5) // log level for the Rados gateway
SUBSYS(hadoop, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)

View File

@ -116,19 +116,6 @@ test_build_libcephfs_CFLAGS = $(AM_CFLAGS)
test_build_libcephfs_CXXFLAGS = $(AM_CXXFLAGS)
bin_DEBUGPROGRAMS += test_build_libcephfs
if WITH_HADOOPCLIENT
test_build_libhadoopcephfs_SOURCES = \
test/buildtest_skeleton.cc \
$(libhadoopcephfs_la_SOURCES)
test_build_libhadoopcephfs_LDADD = \
$(LIBCEPHFS) -lexpat \
$(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS)
test_build_libhadoopcephfs_LDFLAGS = -static-libtool-libs
test_build_libhadoopcephfs_CFLAGS = $(AM_CFLAGS)
test_build_libhadoopcephfs_CXXFLAGS = $(AM_CXXFLAGS)
bin_DEBUGPROGRAMS += test_build_libhadoopcephfs
endif # WITH_HADOOPCLIENT
endif # WITH_BUILD_TESTS