more Hadoop stuff

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1267 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
eestolan 2007-03-20 20:12:15 +00:00
parent f212de37cd
commit f275c16a46
6 changed files with 849 additions and 334 deletions

View File

@ -97,12 +97,13 @@ COMMON_OBJS= \
common/Timer.o\
config.o
CLIENT_OBJS= \
client/FileCache.o\
client/Client.o\
client/SyntheticClient.o\
client/Trace.o
client/Trace.o\
client/hadoop/CephFSInterface.o\
ifeq ($(want_bdb),yes)
OSBDB_OBJS = \
@ -112,11 +113,15 @@ OSBDB_OBJ = osbdb.o
endif
TARGETS = cmon cosd cmds csyn newsyn fakesyn mkmonmap cfuse fakefuse
NO_FUSE = cmon cosd cmds csyn newsyn fakesyn
SRCS=*.cc */*.cc *.h */*.h */*/*.h
all: depend ${TARGETS}
nofuse: depend ${NO_FUSE}
test: depend ${TEST_TARGETS}
obfs: depend obfstest

View File

@ -529,6 +529,7 @@ protected:
// crap
int chdir(const char *s);
const string getcwd() { return cwd; }
// namespace ops
int getdir(const char *path, list<string>& contents);

View File

@ -1,217 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
//#include <fuse.h>
using namespace std;
// globals
//Client *client; // the ceph client
//this has to go - the real client will have to hold the pointer.
//Every function will need to take a Client pointer.
// ------
// fuse hooks
static int ceph_getattr(Client* client, const char *path, struct stat *stbuf)
{
return client->lstat(path, stbuf);
}
static int ceph_readlink(Client* client, const char *path, char *buf, size_t size)
{
int res;
res = client->readlink(path, buf, size - 1);
if (res < 0) return res;
buf[res] = '\0';
return 0;
}
// get rid of the callback thing, perhaps? and return the answer some other way?
/*
static int ceph_getdir(Client* client, const char *path, fuse_dirh_t h, fuse_dirfil_t filler)
{
map<string, inode_t> contents;
int res = client->getdir(path, contents);
if (res < 0) return res;
// return contents to fuse via callback
for (map<string, inode_t>::iterator it = contents.begin();
it != contents.end();
it++) {
// (immutable) inode contents too.
res = filler(h, // fuse's handle
it->first.c_str(), // dentry as char*
it->second.mode & INODE_TYPE_MASK, // mask type bits from mode
it->second.ino); // ino.. 64->32 bit issue here? FIXME
if (res != 0) break; // fuse has had enough
}
return res;
}
*/
static int ceph_mknod(Client* client, const char *path, mode_t mode, dev_t rdev)
{
return client->mknod(path, mode);
}
static int ceph_mkdir(Client* client, const char *path, mode_t mode)
{
return client->mkdir(path, mode);
}
static int ceph_unlink(Client* client, const char *path)
{
return client->unlink(path);
}
static int ceph_rmdir(Client* client, const char *path)
{
return client->rmdir(path);
}
static int ceph_symlink(Client* client, const char *from, const char *to)
{
return client->symlink(from, to);
}
static int ceph_rename(Client* client, const char *from, const char *to)
{
return client->rename(from, to);
}
static int ceph_link(Client* client, const char *from, const char *to)
{
return client->link(from, to);
}
static int ceph_chmod(Client* client, const char *path, mode_t mode)
{
return client->chmod(path, mode);
}
static int ceph_chown(Client* client, const char *path, uid_t uid, gid_t gid)
{
return client->chown(path, uid, gid);
}
static int ceph_truncate(Client* client, const char *path, off_t size)
{
return client->truncate(path, size);
}
static int ceph_utime(Client* client, const char *path, struct utimbuf *buf)
{
return client->utime(path, buf);
}
static int ceph_open(Client* client, const char *path, struct fuse_file_info *fi)
{
int res;
res = client->open(path, fi->flags);
if (res < 0) return res;
fi->fh = res;
return 0; // fuse wants 0 onsucess
}
static int ceph_read(Client* client, const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
fh_t fh = fi->fh;
return client->read(fh, buf, size, offset);
}
static int ceph_write(Client* client, const char *path, const char *buf, size_t size,
off_t offset, struct fuse_file_info *fi)
{
fh_t fh = fi->fh;
return client->write(fh, buf, size, offset);
}
/*
static int ceph_flush(const char *path, struct fuse_file_info *fi)
{
fh_t fh = fi->fh;
return client->flush(fh);
}
*/
#ifdef DARWIN
static int ceph_statfs(Client* client, const char *path, struct statvfs *stbuf)
{
return client->statfs(path, stbuf);
}
#else
static int ceph_statfs(Client* client, const char *path, struct statfs *stbuf)
{
return client->statfs(path, stbuf);
}
#endif
/* remove fuse stuff from these two
static int ceph_release(Client* client, const char *path, struct fuse_file_info *fi)
{
fh_t fh = fi->fh;
int r = client->close(fh); // close the file
return r;
}
static int ceph_fsync(Client* client, const char *path, int isdatasync,
struct fuse_file_info *fi)
{
fh_t fh = fi->fh;
return client->fsync(fh, isdatasync ? true:false);
}
*/
/*
static struct fuse_operations ceph_oper = {
getattr: ceph_getattr,
readlink: ceph_readlink,
getdir: ceph_getdir,
mknod: ceph_mknod,
mkdir: ceph_mkdir,
unlink: ceph_unlink,
rmdir: ceph_rmdir,
symlink: ceph_symlink,
rename: ceph_rename,
link: ceph_link,
chmod: ceph_chmod,
chown: ceph_chown,
truncate: ceph_truncate,
utime: ceph_utime,
open: ceph_open,
read: ceph_read,
write: ceph_write,
statfs: ceph_statfs,
flush: 0, //ceph_flush,
release: ceph_release,
fsync: ceph_fsync
};
*/
// Does this do anything we need? No. All it does is assemble a bunch of
// arguments and call fuse_main.

View File

@ -1,115 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
#ifdef DARWIN
#include <sys/statvfs.h>
#else
#include <sys/statfs.h>
#endif // DARWIN
// ceph stuff
#include "include/types.h"
#include "Client.h"
#include "config.h"
// stl
#include <map>
// stbuf holds the attributes
static int ceph_getattr(Client* client, const char *path, struct stat *stbuf);
// reads a symlink
static int ceph_readlink(Client* client, const char *path, char *buf, size_t size);
// to do: remove fuse stuff from this one
//static int ceph_getdir(Client* client, const char *path, fuse_dirh_t h, fuse_dirfil_t filler);
// looks irrelevant - it's for special device files
static int ceph_mknod(Client* client, const char *path, mode_t mode, dev_t rdev);
// mode is the file permission bits
static int ceph_mkdir(Client* client, const char *path, mode_t mode);
// delete!
static int ceph_unlink(Client* client, const char *path);
// delete! if it's an empty directory
static int ceph_rmdir(Client* client, const char *path);
// make a symlink
static int ceph_symlink(Client* client, const char *from, const char *to);
// self-explanatory
static int ceph_rename(Client* client, const char *from, const char *to);
static int ceph_link(Client* client, const char *from, const char *to); //hard link
static int ceph_chmod(Client* client, const char *path, mode_t mode); //just chmod
static int ceph_chown(Client* client, const char *path, uid_t uid, gid_t gid); //duh
static int ceph_truncate(Client* client, const char *path, off_t size); //chop or zero-pad to size
// set file access/modification times
static int ceph_utime(Client* client, const char *path, struct utimbuf *buf);
// ok, gotta figure out what's in fuse_file_info and how to use it. Presumably it includes
// a file descriptor and the open flags?
static int ceph_open(Client* client, const char *path, struct fuse_file_info *fi);
// read!
static int ceph_read(Client* client, const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi);
// write!
static int ceph_write(Client* client, const char *path, const char *buf, size_t size,
off_t offset, struct fuse_file_info *fi);
/* was already commented out
static int ceph_flush(const char *path, struct fuse_file_info *fi);
*/
// is this statvfs perhaps? we probably don't need it
#ifdef DARWIN
static int ceph_statfs(Client* client, const char *path, struct statvfs *stbuf);
#else
static int ceph_statfs(Client* client, const char *path, struct statfs *stbuf);
#endif
// Remove fuse stuff from these two
//static int ceph_release(Client* client, const char *path, struct fuse_file_info *fi);
//static int ceph_fsync(Client* client, const char *path, int isdatasync, struct fuse_file_info *fi); //kinda like flush?
/* ceph_fuse_main
* - start up fuse glue, attached to Client* cl.
* - argc, argv should include a mount point, and
* any weird fuse options you want. by default,
* we will put fuse in the foreground so that it
* won't fork and we can see stdout.
*/
// int ceph_fuse_main(Client *cl, int argc, char *argv[]);

View File

@ -0,0 +1,612 @@
#include "CephFSInterface.h"
using namespace std;
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_initializeClient
* Signature: ()J
* Initializes a ceph client.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1initializeClient
(JNIEnv *, jobject)
{
//vector<char*> args;
//argv_to_vec(argc, argv, args);
//parse_config_options(args);
// args for fuse
// vec_to_argv(args, argc, argv);
// FUSE will chdir("/"); be ready.
g_conf.use_abspaths = true;
// load monmap
MonMap monmap;
int r = monmap.read(".ceph_monmap");
if (r < 0) {
cout << "could not find .ceph_monmap";
return 0;
}
//assert(r >= 0);
// start up network
// ERROR - undefined reference to "rank"
// rank.start_rank();
// start client
Client *client;
// ERROR - undefined reference to "rank"
// client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
client->init();
// start up fuse
// use my argc, argv (make sure you pass a mount point!)
cout << "mounting" << endl;
client->mount();
//cerr << "starting fuse on pid " << getpid() << endl;
//ceph_fuse_main(client, argc, argv);
//cerr << "fuse finished on pid " << getpid() << endl;
//client->unmount();
//cout << "unmounted" << endl;
//client->shutdown();
//delete client;
// wait for messenger to finish
//rank.wait();
return (jlong)client;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_copyFromLocalFile
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyFromLocalFile
(JNIEnv * env, jobject obj, jlong clientp, jstring j_local_path, jstring j_ceph_path) {
Client* client;
client = (Client*) clientp;
const char* c_local_path = env->GetStringUTFChars(j_local_path, 0);
const char* c_ceph_path = env->GetStringUTFChars(j_ceph_path, 0);
struct stat st;
int r = ::stat(c_local_path, &st);
assert (r == 0);
// open the files
int fh_local = ::open(c_local_path, O_RDONLY);
int fh_ceph = client->open(c_ceph_path, O_WRONLY|O_CREAT|O_TRUNC);
assert (fh_local > -1);
assert (fh_ceph > -1);
// get the source file size
bufferlist finalbl;
off_t remaining = st.st_size;
// copy the file a chunk at a time
const int chunk = 1048576;
while (remaining > 0) {
bufferlist bl;
bufferptr bp(MIN(remaining, chunk));
bl.push_back(bp);
off_t got = ::read(fh_local, bp.c_str(), MIN(remaining,chunk));
assert(got > 0);
remaining -= got;
off_t wrote = client->write(fh_ceph, bp.c_str(), got, -1);
assert (got == wrote);
finalbl.claim_append(bl);
}
client->close(fh_ceph);
::close(fh_local);
env->ReleaseStringUTFChars(j_local_path, c_local_path);
env->ReleaseStringUTFChars(j_ceph_path, c_ceph_path);
return JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_copyToLocalFile
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyToLocalFile
(JNIEnv *env, jobject obj, jlong clientp, jstring j_ceph_path, jstring j_local_path)
{
Client* client;
client = (Client*) clientp;
const char* c_ceph_path = env->GetStringUTFChars(j_ceph_path, 0);
const char* c_local_path = env->GetStringUTFChars(j_local_path, 0);
// get source file size
struct stat st;
int r = client->lstat(c_ceph_path, &st);
assert (r == 0);
int fh_ceph = client->open(c_ceph_path, O_WRONLY|O_CREAT|O_TRUNC);
int fh_local = ::open(c_local_path, O_RDONLY);
assert (fh_ceph > -1);
assert (fh_local > -1);
// copy the file a chunk at a time
const int chunk = 1048576;
bufferlist finalbl;
off_t remaining = st.st_size;
while (remaining > 0) {
bufferlist bl;
bufferptr bp(MIN(remaining, chunk));
bl.push_back(bp);
off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1);
assert(got > 0);
remaining -= got;
off_t wrote = ::write(fh_local, bp.c_str(), got);
assert (got == wrote);
finalbl.claim_append(bl);
}
client->close(fh_ceph);
::close(fh_local);
env->ReleaseStringUTFChars(j_local_path, c_local_path);
env->ReleaseStringUTFChars(j_ceph_path, c_ceph_path);
return JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getcwd
* Signature: (J)Ljava/lang/String;
* Returns the current working directory.
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getcwd
(JNIEnv *env, jobject obj, jlong clientp)
{
Client* client;
client = (Client*) clientp;
return (env->NewStringUTF(client->getcwd().c_str()));
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_setcwd
* Signature: (JLjava/lang/String;)Z
*
* Changes the working directory.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1setcwd
(JNIEnv *env, jobject obj, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
return (0 <= client->chdir(c_path)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_rmdir
* Signature: (JLjava/lang/String;)Z
* Removes an empty directory.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rmdir
(JNIEnv *env, jobject, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
return (0 == client->rmdir(c_path)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_mkdir
* Signature: (JLjava/lang/String;)Z
* Creates a directory with full permissions.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1mkdir
(JNIEnv * env, jobject, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
return (0 == client->mkdir(c_path, 0xFF)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_unlink
* Signature: (JLjava/lang/String;)Z
* Unlinks a path.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1unlink
(JNIEnv * env, jobject, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
return (0 == client->unlink(c_path)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_rename
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
* Renames a file.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rename
(JNIEnv *env, jobject, jlong clientp, jstring j_from, jstring j_to)
{
Client* client;
client = (Client*) clientp;
const char* c_from = env->GetStringUTFChars(j_from, 0);
const char* c_to = env->GetStringUTFChars(j_to, 0);
return (0 <= client->rename(c_from, c_to)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_exists
* Signature: (JLjava/lang/String;)Z
* Returns true if the path exists.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1exists
(JNIEnv *env, jobject, jlong clientp, jstring j_path)
{
Client* client;
struct stat *stbuf;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
return (0 <= client->lstat(c_path, stbuf)) ? JNI_TRUE : JNI_FALSE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getblocksize
* Signature: (JLjava/lang/String;)J
* Returns the block size. Size is -1 if the file
* does not exist.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getblocksize
(JNIEnv *env, jobject obj, jlong clientp, jstring j_path)
{
Client* client;
struct stat *stbuf;
client = (Client*) clientp;
jint result;
const char* c_path = env->GetStringUTFChars(j_path, 0);
if (0 > client->lstat(c_path, stbuf))
result = -1;
else
result = stbuf->st_blksize;
env->ReleaseStringUTFChars(j_path, c_path);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getfilesize
* Signature: (JLjava/lang/String;)J
* Returns the file size, or -1 on failure.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getfilesize
(JNIEnv *env, jobject, jlong clientp, jstring j_path)
{
Client* client;
struct stat *stbuf;
client = (Client*) clientp;
jlong result;
const char* c_path = env->GetStringUTFChars(j_path, 0);
if (0 > client->lstat(c_path, stbuf)) result = -1;
else result = stbuf->st_size;
env->ReleaseStringUTFChars(j_path, c_path);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_isdirectory
* Signature: (JLjava/lang/String;)Z
* Returns true if the path is a directory.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isdirectory
(JNIEnv *env, jobject, jlong clientp, jstring j_path)
{
Client* client;
struct stat *stbuf;
client = (Client*) clientp;
const char* c_path = env->GetStringUTFChars(j_path, 0);
int result = client->lstat(c_path, stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
// if the stat call failed, it's definitely not a directory...
if (0 > result) return JNI_FALSE;
// check the stat result
return (0 == S_ISDIR(stbuf->st_mode)) ? JNI_FALSE : JNI_TRUE;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getdir
* Signature: (JLjava/lang/String;)[Ljava/lang/String;
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getdir
(JNIEnv *env, jobject obj, jlong clientp, jstring j_path) {
Client* client;
client = (Client*) clientp;
// get the directory listing
map<string, inode_t> contents;
const char* c_path = env->GetStringUTFChars(j_path, 0);
int result = client->getdir(c_path, contents);
env->ReleaseStringUTFChars(j_path, c_path);
if (result < 0) return NULL;
jint dir_size = contents.size();
if (dir_size < 1) return NULL;
// Create a Java String array of the size of the directory listing
//jstring blankString = env->NewStringUTF("");
jclass stringClass = env->FindClass("java/lang/string");
jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(dir_size, stringClass, NULL);
// populate the array with the elements of the directory list
int i = 0;
for (map<string, inode_t>::iterator it = contents.begin();
it != contents.end();
it++) {
env->SetObjectArrayElement(dirListingStringArray, i,
env->NewStringUTF(it->first.c_str()));
}
return dirListingStringArray;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_open_for_read
* Signature: (JLjava/lang/String;)I
* Open a file for reading.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1read
(JNIEnv *env, jobject obj, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
jint result;
// open as read-only: flag = O_RDONLY
const char* c_path = env->GetStringUTFChars(j_path, 0);
result = client->open(c_path, O_RDONLY);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_open_for_overwrite
* Signature: (JLjava/lang/String;)I
* Opens a file for overwriting; creates it if necessary.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1overwrite
(JNIEnv *env, jobject obj, jlong clientp, jstring j_path)
{
Client* client;
client = (Client*) clientp;
jint result;
const char* c_path = env->GetStringUTFChars(j_path, 0);
result = client->open(c_path, O_WRONLY|O_CREAT|O_TRUNC);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_read
* Signature: (JI[BII)I
* Reads into the given byte array from the current position.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
(JNIEnv *env, jobject obj, jlong clientp, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
// IMPORTANT NOTE: Hadoop read arguments are a bit different from POSIX so we
// have to convert. The read is *always* from the current position in the file,
// and buffer_offset is the location in the *buffer* where we start writing.
Client* client;
client = (Client*) clientp;
jint result;
// Step 1: get a pointer to the buffer.
jbyte* j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
char* c_buffer = (char*) j_buffer_ptr;
// Step 2: pointer arithmetic to start in the right buffer position
c_buffer += (int)buffer_offset;
// Step 3: do the read
result = client->read((int)fh, c_buffer, length, -1);
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_seek_from_start
* Signature: (JIJ)J
* Seeks to the given position.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
(JNIEnv *env, jobject obj, jlong clientp, jint fh, jlong pos)
{
Client* client;
client = (Client*) clientp;
jint result;
result = client->lseek(fh, pos, SEEK_SET);
return result;
}
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
(JNIEnv *env, jobject obj, jlong clientp, jint fh)
{
Client* client;
client = (Client*) clientp;
jint result;
// seek a distance of 0 to get current offset
result = client->lseek(fh, 0, SEEK_CUR);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_close
* Signature: (JI)I
* Closes the file.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
(JNIEnv *env, jobject obj, jlong clientp, jint fh)
{
Client* client;
client = (Client*) clientp;
jint result;
result = client->close(fh);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_seek_from_start
* Signature: (JIJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1seek_1from_1start
(JNIEnv *env, jobject obj, jlong clientp, jint fh, jlong pos)
{
Client* client;
client = (Client*) clientp;
jint result;
result = client->lseek(fh, pos, SEEK_SET);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_getpos
* Signature: (JI)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
(JNIEnv *env, jobject obj, jlong clientp, jint fh)
{
Client* client;
client = (Client*) clientp;
jint result;
// seek a distance of 0 to get current offset
result = client->lseek(fh, 0, SEEK_CUR);
return result;
}
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_write
* Signature: (JI[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
(JNIEnv *env, jobject obj, jlong clientp, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
// IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we
// have to convert. The write is *always* from the current position in the file,
// and buffer_offset is the location in the *buffer* where we start writing.
Client* client;
client = (Client*) clientp;
jint result;
// Step 1: get a pointer to the buffer.
jbyte* j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
char* c_buffer = (char*) j_buffer_ptr;
// Step 2: pointer arithmetic to start in the right buffer position
c_buffer += (int)buffer_offset;
// Step 3: do the write
result = client->write((int)fh, c_buffer, length, -1);
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
return result;
}

View File

@ -0,0 +1,229 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem */
#include <sys/stat.h>
#include "client/Client.h"
#include "config.h"
#include "client/fuse.h"
#include "msg/SimpleMessenger.h"
#include "common/Timer.h"
#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem
#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem
#ifdef __cplusplus
extern "C" {
#endif
#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE
#define org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE 1048576LL
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_initializeClient
* Signature: ()J
* Initializes a ceph client.
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1initializeClient
(JNIEnv *, jobject);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_copyFromLocalFile
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyFromLocalFile
(JNIEnv *, jobject, jlong, jstring, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_copyToLocalFile
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyToLocalFile
(JNIEnv *, jobject, jlong, jstring, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getcwd
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getcwd
(JNIEnv *, jobject, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_setcwd
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1setcwd
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_rmdir
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rmdir
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_mkdir
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1mkdir
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_unlink
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1unlink
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_rename
* Signature: (JLjava/lang/String;Ljava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rename
(JNIEnv *, jobject, jlong, jstring, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_exists
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1exists
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getblocksize
* Signature: (JLjava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getblocksize
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getfilesize
* Signature: (JLjava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getfilesize
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_isdirectory
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isdirectory
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_isfile
* Signature: (JLjava/lang/String;)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isfile
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_getdir
* Signature: (JLjava/lang/String;)[Ljava/lang/String;
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getdir
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_open_for_read
* Signature: (JLjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1read
(JNIEnv *, jobject, jlong, jstring);
/*
* Class: org_apache_hadoop_fs_ceph_CephFileSystem
* Method: ceph_open_for_overwrite
* Signature: (JLjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1overwrite
(JNIEnv *, jobject, jlong, jstring);
#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE
#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_read
* Signature: (JI[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
(JNIEnv *, jobject, jlong, jint, jbyteArray, jint, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_seek_from_start
* Signature: (JIJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
(JNIEnv *, jobject, jlong, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_getpos
* Signature: (JI)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
(JNIEnv *, jobject, jlong, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephInputStream
* Method: ceph_close
* Signature: (JI)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
(JNIEnv *, jobject, jlong, jint);
/* Header for class org_apache_hadoop_fs_ceph_CephOutputStream */
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_seek_from_start
* Signature: (JIJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1seek_1from_1start
(JNIEnv *, jobject, jlong, jint, jlong);
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_getpos
* Signature: (JI)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
(JNIEnv *, jobject, jlong, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_close
* Signature: (JI)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
(JNIEnv *, jobject, jlong, jint);
/*
* Class: org_apache_hadoop_fs_ceph_CephOutputStream
* Method: ceph_write
* Signature: (JI[BII)I
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
(JNIEnv *, jobject, jlong, jint, jbyteArray, jint, jint);
#ifdef __cplusplus
}
#endif
#endif