view src/objstore/obj_ops.c @ 800:f679541c8142

switch to new buffer_init_static libjeffpc API The function gained a second size argument removing the need for some truncate calls. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Fri, 03 Apr 2020 15:54:51 -0400
parents 14dbffbeb60c
children 987c34feb31b
line wrap: on
line source

/*
 * Copyright (c) 2015-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include "objstore_impl.h"

int objstore_obj_info(struct objstore *vol, const struct noid *oid,
		      struct obj_info **infos, size_t *ninfos)
{
	struct obj *obj;
	int ret;

	if (!vol || !oid || !infos || !ninfos)
		return -EINVAL;

	obj = obj_by_oid(vol->clone, oid);
	if (IS_ERR(obj))
		return PTR_ERR(obj);

	if (obj->ops->info)
		ret = obj->ops->info(obj, infos, ninfos);
	else
		ret = -ENOTSUP;

	MXUNLOCK(&obj->lock);

	obj_putref(obj);

	return ret;
}

struct objstore_open_obj_info *objstore_open(struct objstore *vol,
					     const struct noid *oid,
					     const struct nvclock *clock)
{
	struct objstore_open_obj_info *open;
	struct objver *objver;
	struct obj *obj;
	bool qualified;
	int ret;

	if (!vol || !oid || !clock)
		return ERR_PTR(-EINVAL);

	objver = objver_by_clock(vol->clone, oid, clock);
	if (IS_ERR(objver))
		return ERR_CAST(objver);

	qualified = !nvclock_is_null(clock);

	obj = objver->obj;
	open = objver->open[qualified];

	if (!open) {
		/* first open */
		open = mem_cache_alloc(open_obj_cache);
		if (!open) {
			ret = -ENOMEM;
			goto err;
		}

		if (obj->ops && obj->ops->open)
			ret = obj->ops->open(objver);
		else
			ret = 0;

		if (ret)
			goto err_free;

		open->obj = obj;
		open->qualified = qualified;
		open->ver = objver;
		open->open_count = 0; /* will be incremented below */
		open->cow.needed = true;
		open->cow.prev_ver = NULL;

		objver->open[qualified] = open;
	}

	ASSERT(objver->open[qualified]);

	open->open_count++;

	/*
	 * The object associated with 'objver' is locked and held.  We
	 * unlock it, but keep the hold on it.  Then we return the pointer
	 * to the objver to the caller.  Then, later on when the caller
	 * calls the close function, we release the object's hold.
	 *
	 * While the object version is open, we maintain this reference hold
	 * keeping the object and the version structures in memory.  Even if
	 * all links are removed, we keep the structures around and let
	 * operations use them.  This mimics the POSIX file system semantics
	 * well.
	 */

	MXUNLOCK(&obj->lock);

	return open;

err_free:
	mem_cache_free(open_obj_cache, open);

err:
	MXUNLOCK(&obj->lock);

	obj_putref(obj);

	return ERR_PTR(ret);
}

int objstore_close(struct objstore_open_obj_info *open)
{
	struct obj *obj;
	bool putref;
	int ret;

	if (!open)
		return -EINVAL;

	obj = open->obj;

	MXLOCK(&obj->lock);
	open->open_count--;
	putref = true;

	if (open->open_count) {
		ret = 0;
		putref = false;
	} else if (obj->ops && obj->ops->close) {
		/* last close */
		ret = obj->ops->close(open->ver);
	} else {
		ret = 0;
	}

	if (putref && !ret)
		open->ver->open[open->qualified] = NULL;
	else if (ret)
		open->open_count++; /* undo earlier decrement */
	MXUNLOCK(&obj->lock);

	/* release the reference obtained in objstore_open() */
	if (putref && !ret) {
		mem_cache_free(open_obj_cache, open);
		obj_putref(obj);
	}

	return ret;
}

int objstore_getattr(struct objstore_open_obj_info *open, struct nattr *attr)
{
	struct obj *obj;

	if (!open || !attr)
		return -EINVAL;

	obj = open->obj;

	MXLOCK(&obj->lock);
	*attr = open->ver->attrs;
	MXUNLOCK(&obj->lock);

	/* sanity check that the backend didn't try to give us an inode number */
	ASSERT0(attr->ino);

	return 0;
}

int objstore_setattr(struct objstore_open_obj_info *open, struct nattr *attr,
		     const unsigned valid)
{
	struct txn txn;
	struct obj *obj;
	int ret;

	if (!open || !attr)
		return -EINVAL;

	obj = open->obj;

	if (!obj->ops || !obj->ops->setattr)
		return -ENOTSUP;

	if (open->qualified)
		return -EROFS;

	/* clear whatever garbage the user may have supplied */
	attr->ino = 0;

	MXLOCK(&obj->lock);

	/*
	 * first do some checks
	 */

	/* we can't truncate anything other than regular files */
	if ((valid & OBJ_ATTR_SIZE) && !NATTR_ISREG(open->ver->attrs.mode)) {
		ret = -EINVAL;
		goto err;
	}

	if (valid & OBJ_ATTR_MODE) {
		const uint16_t current_mode = open->ver->attrs.mode;

		/* if we didn't get a type, copy it from the obj */
		if (_NATTR_ISNOTYPE(attr->mode))
			attr->mode |= current_mode & NATTR_TMASK;

		/* we can't change the type of the object */
		if ((attr->mode & NATTR_TMASK) != (current_mode & NATTR_TMASK)) {
			ret = -EINVAL;
			goto err;
		}
	}

	/*
	 * fire off the txn
	 */

	ret = txn_begin(&txn, obj->clone);
	if (ret)
		goto err;

	ret = obj_cow(&txn, open);
	if (ret)
		goto err_txn;

	obj_setattr(&txn, open->ver, attr, valid);

err_txn:
	ret = txn_commitabort(&txn, ret);

	/* return current attributes */
	*attr = open->ver->attrs;

err:
	MXUNLOCK(&obj->lock);

	/* sanity check that the backend didn't try to give us an inode number */
	ASSERT0(attr->ino);

	/* FIXME: safely copy attrs to open->ver.attrs */

	return ret;
}

ssize_t objstore_read(struct objstore_open_obj_info *open, void *buf,
		      size_t len, uint64_t offset)
{
	struct obj *obj;
	ssize_t ret;

	if (!open || !buf)
		return -EINVAL;

	if (len > (SIZE_MAX / 2))
		return -EOVERFLOW;

	obj = open->obj;

	if (!obj->ops || !obj->ops->read)
		return -ENOTSUP;

	/* nothing to do */
	if (!len)
		return 0;

	MXLOCK(&obj->lock);
	if (NATTR_ISDIR(open->ver->attrs.mode))
		/* TODO: do we need to check for other types? */
		ret = -EISDIR;
	else
		ret = obj->ops->read(open->ver, buf, len, offset);
	MXUNLOCK(&obj->lock);

	return ret;
}

ssize_t objstore_write(struct objstore_open_obj_info *open, const void *buf,
		       size_t len, uint64_t offset)
{
	struct buffer buffer;
	struct txn txn;
	struct obj *obj;
	ssize_t ret;

	if (!open || !buf)
		return -EINVAL;

	if (len > (SIZE_MAX / 2))
		return -EOVERFLOW;

	obj = open->obj;

	if (!obj->ops || !obj->ops->write)
		return -ENOTSUP;

	if (open->qualified)
		return -EROFS;

	/* nothing to do */
	if (!len)
		return 0;

	MXLOCK(&obj->lock);

	if (NATTR_ISDIR(open->ver->attrs.mode)) {
		/* TODO: do we need to check for other types? */
		ret = -EISDIR;
		goto err;
	}

	/*
	 * fire off the txn
	 */
	ret = txn_begin(&txn, obj->clone);
	if (ret)
		goto err;

	ret = obj_cow(&txn, open);
	if (ret)
		goto err_txn;

	/* extend the file if necessary */
	if ((offset + len) > open->ver->attrs.size) {
		struct nattr attrs = {
			.size = offset + len,
		};

		obj_setattr(&txn, open->ver, &attrs, OBJ_ATTR_SIZE);
	}

	buffer_init_static(&buffer, buf, len, len, false);
	obj_write(&txn, open->ver, &buffer, offset);

err_txn:
	ret = txn_commitabort(&txn, ret);

err:
	MXUNLOCK(&obj->lock);

	/*
	 * We have to return the number of bytes written.
	 *
	 * Because of the transaction being atomic, we never do a partial
	 * write.  Therefore, we either return an error or the requested
	 * length.
	 */
	return ret ? ret : len;
}

/*
 * Directory operations
 *
 * Technically, the following are not wrappers around object ops.  However,
 * they kind of belong here.
 */

int objstore_lookup_one(struct objstore_open_obj_info *diropen,
			const char *name, const struct noid *desired,
			struct noid *child, uint8_t *type)
{
	struct obj *dir;
	int ret;

	if (!diropen || !name || !desired || !child || !type)
		return -EINVAL;

	dir = diropen->obj;

	if (!dir->ops || !dir->ops->read)
		return -ENOTSUP;

	*type = NDIRENT_TYPE_UNKNOWN; /* unknown by default */

	MXLOCK(&dir->lock);
	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
		ret = -ENOTDIR;
	else
		ret = dir_lookup_one(diropen->ver, name, desired, child, type);
	MXUNLOCK(&dir->lock);

	return ret;
}

ssize_t objstore_lookup_all(struct objstore_open_obj_info *diropen,
			    const char *name, struct noid **child,
			    uint8_t **type)
{
	struct obj *dir;
	ssize_t ret;

	if (!diropen || !name || !child || !type)
		return -EINVAL;

	dir = diropen->obj;

	if (!dir->ops || !dir->ops->read)
		return -ENOTSUP;

	MXLOCK(&dir->lock);
	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
		ret = -ENOTDIR;
	else
		ret = dir_lookup_all(diropen->ver, name, child, type);
	MXUNLOCK(&dir->lock);

	return ret;
}

int objstore_unlink(struct objstore_open_obj_info *diropen, const char *name,
		    const struct noid *desired)
{
	struct txn txn;
	struct obj *dir;
	int ret;

	if (!diropen || !name)
		return -EINVAL;

	dir = diropen->obj;

	if (!dir->ops || !dir->ops->read || !dir->ops->write)
		return -ENOTSUP;

	if (diropen->qualified)
		return -EROFS;

	MXLOCK(&dir->lock);

	if (!NATTR_ISDIR(diropen->ver->attrs.mode)) {
		ret = -ENOTDIR;
		goto err;
	}

	ret = txn_begin(&txn, diropen->ver->obj->clone);
	if (ret)
		goto err;

	ret = obj_cow(&txn, diropen);
	if (ret)
		goto err_txn;

	ret = dir_unlink(&txn, diropen->ver, name, desired);

err_txn:
	ret = txn_commitabort(&txn, ret);

err:
	MXUNLOCK(&dir->lock);

	return ret;
}

int objstore_create(struct objstore_open_obj_info *diropen, const char *name,
		    uint32_t owner, uint32_t group, uint16_t mode,
		    struct noid *child)
{
	struct txn txn;
	struct obj *dir;
	int ret;

	if (!diropen || !name || !child)
		return -EINVAL;

	dir = diropen->obj;

	if (!dir->ops || !dir->ops->read || !dir->ops->write)
		return -ENOTSUP;

	if (diropen->qualified)
		return -EROFS;

	/* must have a type, grafts are created differently */
	if (_NATTR_ISNOTYPE(mode) || NATTR_ISGRAFT(mode))
		return -EINVAL;

	MXLOCK(&dir->lock);

	if (!NATTR_ISDIR(diropen->ver->attrs.mode)) {
		ret = -ENOTDIR;
		goto err;
	}

	ret = txn_begin(&txn, diropen->ver->obj->clone);
	if (ret)
		goto err;

	ret = obj_cow(&txn, diropen);
	if (ret)
		goto err_txn;

	ret = dir_create(&txn, diropen->ver, name, owner, group, mode, child);

err_txn:
	ret = txn_commitabort(&txn, ret);

err:
	MXUNLOCK(&dir->lock);

	return ret;
}

int objstore_getdent(struct objstore_open_obj_info *diropen,
		     const uint64_t offset, struct ndirent *child)
{
	struct obj *dir;
	int ret;

	if (!diropen)
		return -EINVAL;

	dir = diropen->obj;

	if (!dir->ops || !dir->ops->read)
		return -ENOTSUP;

	/* clear whatever garbage the user may have supplied */
	child->ino = 0;

	MXLOCK(&dir->lock);
	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
		ret = -ENOTDIR;
	else
		ret = dir_getdent(diropen->ver, offset, child);
	MXUNLOCK(&dir->lock);

	/* sanity check that the backend didn't try to give us an inode number */
	ASSERT0(child->ino);

	return ret;
}