diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 1cf2adf46b118cd4b2f7bc2449a1065d764a2bd6..cd9213ccf3dc263ba2e87ff2ecc660ef370c1f1a 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -70,6 +70,10 @@ snap_*
 
 	A directory per each snapshot
 
+parent
+
+	Information identifying the pool, image, and snapshot id for
+	the parent image in a layered rbd image (format 2 only).
 
 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
 -------------------------------------------------------------
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb3d9be3b1b4eecb82ba82f8bc1045e825ff7a75..89576a0b3f2ed5b099ecf7ce675104d282b0dd48 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -61,15 +61,29 @@
 
 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
 
-#define RBD_MAX_SNAP_NAME_LEN	32
+#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
+#define RBD_MAX_SNAP_NAME_LEN	\
+			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN		1024
 
 #define RBD_SNAP_HEAD_NAME	"-"
 
+/* This allows a single page to hold an image name sent by OSD */
+#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
 #define RBD_IMAGE_ID_LEN_MAX	64
+
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING      1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL          (0)
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@ struct rbd_image_header {
 	u64 obj_version;
 };
 
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+	u64		pool_id;
+	char		*pool_name;
+
+	char		*image_id;
+	size_t		image_id_len;
+	char		*image_name;
+	size_t		image_name_len;
+
+	u64		snap_id;
+	char		*snap_name;
+
+	struct kref	kref;
+};
+
 struct rbd_options {
 	bool	read_only;
 };
@@ -155,11 +190,8 @@ struct rbd_snap {
 };
 
 struct rbd_mapping {
-	char                    *snap_name;
-	u64                     snap_id;
 	u64                     size;
 	u64                     features;
-	bool                    snap_exists;
 	bool			read_only;
 };
 
@@ -173,7 +205,6 @@ struct rbd_device {
 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
 
 	u32			image_format;	/* Either 1 or 2 */
-	struct rbd_options	rbd_opts;
 	struct rbd_client	*rbd_client;
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@ struct rbd_device {
 	spinlock_t		lock;		/* queue lock */
 
 	struct rbd_image_header	header;
-	char			*image_id;
-	size_t			image_id_len;
-	char			*image_name;
-	size_t			image_name_len;
+	bool                    exists;
+	struct rbd_spec		*spec;
+
 	char			*header_name;
-	char			*pool_name;
-	int			pool_id;
 
 	struct ceph_osd_event   *watch_event;
 	struct ceph_osd_request *watch_request;
 
+	struct rbd_spec		*parent_spec;
+	u64			parent_overlap;
+
 	/* protects updating the header */
 	struct rw_semaphore     header_rwsem;
 
@@ -204,6 +235,7 @@ struct rbd_device {
 
 	/* sysfs related */
 	struct device		dev;
+	unsigned long		open_count;
 };
 
 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
 static void rbd_dev_release(struct device *dev);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 		       size_t count);
@@ -258,17 +290,8 @@ static struct device rbd_root_dev = {
 #  define rbd_assert(expr)	((void) 0)
 #endif /* !RBD_DEBUG */
 
-static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
-{
-	return get_device(&rbd_dev->dev);
-}
-
-static void rbd_put_dev(struct rbd_device *rbd_dev)
-{
-	put_device(&rbd_dev->dev);
-}
-
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -277,8 +300,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
 		return -EROFS;
 
-	rbd_get_dev(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	(void) get_device(&rbd_dev->dev);
 	set_device_ro(bdev, rbd_dev->mapping.read_only);
+	rbd_dev->open_count++;
+	mutex_unlock(&ctl_mutex);
 
 	return 0;
 }
@@ -287,7 +313,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
 {
 	struct rbd_device *rbd_dev = disk->private_data;
 
-	rbd_put_dev(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	rbd_assert(rbd_dev->open_count > 0);
+	rbd_dev->open_count--;
+	put_device(&rbd_dev->dev);
+	mutex_unlock(&ctl_mutex);
 
 	return 0;
 }
@@ -388,7 +418,7 @@ enum {
 static match_table_t rbd_opts_tokens = {
 	/* int args above */
 	/* string args above */
-	{Opt_read_only, "mapping.read_only"},
+	{Opt_read_only, "read_only"},
 	{Opt_read_only, "ro"},		/* Alternate spelling */
 	{Opt_read_write, "read_write"},
 	{Opt_read_write, "rw"},		/* Alternate spelling */
@@ -441,33 +471,17 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-				size_t mon_addr_len, char *options)
+static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
-	struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
-	struct ceph_options *ceph_opts;
 	struct rbd_client *rbdc;
 
-	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
-	ceph_opts = ceph_parse_options(options, mon_addr,
-					mon_addr + mon_addr_len,
-					parse_rbd_opts_token, rbd_opts);
-	if (IS_ERR(ceph_opts))
-		return PTR_ERR(ceph_opts);
-
 	rbdc = rbd_client_find(ceph_opts);
-	if (rbdc) {
-		/* using an existing client */
+	if (rbdc)	/* using an existing client */
 		ceph_destroy_options(ceph_opts);
-	} else {
+	else
 		rbdc = rbd_client_create(ceph_opts);
-		if (IS_ERR(rbdc))
-			return PTR_ERR(rbdc);
-	}
-	rbd_dev->rbd_client = rbdc;
 
-	return 0;
+	return rbdc;
 }
 
 /*
@@ -492,10 +506,10 @@ static void rbd_client_release(struct kref *kref)
  * Drop reference to ceph client node. If it's not referenced anymore, release
  * it.
  */
-static void rbd_put_client(struct rbd_device *rbd_dev)
+static void rbd_put_client(struct rbd_client *rbdc)
 {
-	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-	rbd_dev->rbd_client = NULL;
+	if (rbdc)
+		kref_put(&rbdc->kref, rbd_client_release);
 }
 
 /*
@@ -524,6 +538,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 		return false;
 
+	/* The bio layer requires at least sector-sized I/O */
+
+	if (ondisk->options.order < SECTOR_SHIFT)
+		return false;
+
+	/* If we use u64 in a few spots we may be able to loosen this */
+
+	if (ondisk->options.order > 8 * sizeof (int) - 1)
+		return false;
+
 	/*
 	 * The size of a snapshot header has to fit in a size_t, and
 	 * that limits the number of snapshots.
@@ -635,6 +659,20 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
 	return -ENOMEM;
 }
 
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
+	struct rbd_snap *snap;
+
+	if (snap_id == CEPH_NOSNAP)
+		return RBD_SNAP_HEAD_NAME;
+
+	list_for_each_entry(snap, &rbd_dev->snaps, node)
+		if (snap_id == snap->id)
+			return snap->name;
+
+	return NULL;
+}
+
 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 {
 
@@ -642,7 +680,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 
 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
 		if (!strcmp(snap_name, snap->name)) {
-			rbd_dev->mapping.snap_id = snap->id;
+			rbd_dev->spec->snap_id = snap->id;
 			rbd_dev->mapping.size = snap->size;
 			rbd_dev->mapping.features = snap->features;
 
@@ -653,26 +691,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 	return -ENOENT;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+	if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
 		    sizeof (RBD_SNAP_HEAD_NAME))) {
-		rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+		rbd_dev->spec->snap_id = CEPH_NOSNAP;
 		rbd_dev->mapping.size = rbd_dev->header.image_size;
 		rbd_dev->mapping.features = rbd_dev->header.features;
-		rbd_dev->mapping.snap_exists = false;
-		rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
 		ret = 0;
 	} else {
-		ret = snap_by_name(rbd_dev, snap_name);
+		ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
 		if (ret < 0)
 			goto done;
-		rbd_dev->mapping.snap_exists = true;
 		rbd_dev->mapping.read_only = true;
 	}
-	rbd_dev->mapping.snap_name = snap_name;
+	rbd_dev->exists = true;
 done:
 	return ret;
 }
@@ -695,13 +730,13 @@ static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 	u64 segment;
 	int ret;
 
-	name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+	name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
 	if (!name)
 		return NULL;
 	segment = offset >> rbd_dev->header.obj_order;
-	ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
 			rbd_dev->header.object_prefix, segment);
-	if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
 		pr_err("error formatting segment name for #%llu (%d)\n",
 			segment, ret);
 		kfree(name);
@@ -800,77 +835,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
 }
 
 /*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
  */
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
-				   struct bio_pair **bp,
-				   int len, gfp_t gfpmask)
-{
-	struct bio *old_chain = *old;
-	struct bio *new_chain = NULL;
-	struct bio *tail;
-	int total = 0;
-
-	if (*bp) {
-		bio_pair_release(*bp);
-		*bp = NULL;
-	}
+static struct bio *bio_clone_range(struct bio *bio_src,
+					unsigned int offset,
+					unsigned int len,
+					gfp_t gfpmask)
+{
+	struct bio_vec *bv;
+	unsigned int resid;
+	unsigned short idx;
+	unsigned int voff;
+	unsigned short end_idx;
+	unsigned short vcnt;
+	struct bio *bio;
 
-	while (old_chain && (total < len)) {
-		struct bio *tmp;
+	/* Handle the easy case for the caller */
 
-		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
-		if (!tmp)
-			goto err_out;
-		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
+	if (!offset && len == bio_src->bi_size)
+		return bio_clone(bio_src, gfpmask);
 
-		if (total + old_chain->bi_size > len) {
-			struct bio_pair *bp;
+	if (WARN_ON_ONCE(!len))
+		return NULL;
+	if (WARN_ON_ONCE(len > bio_src->bi_size))
+		return NULL;
+	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+		return NULL;
 
-			/*
-			 * this split can only happen with a single paged bio,
-			 * split_bio will BUG_ON if this is not the case
-			 */
-			dout("bio_chain_clone split! total=%d remaining=%d"
-			     "bi_size=%u\n",
-			     total, len - total, old_chain->bi_size);
+	/* Find first affected segment... */
 
-			/* split the bio. We'll release it either in the next
-			   call, or it will have to be released outside */
-			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
-			if (!bp)
-				goto err_out;
+	resid = offset;
+	__bio_for_each_segment(bv, bio_src, idx, 0) {
+		if (resid < bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	voff = resid;
 
-			__bio_clone(tmp, &bp->bio1);
+	/* ...and the last affected segment */
 
-			*next = &bp->bio2;
-		} else {
-			__bio_clone(tmp, old_chain);
-			*next = old_chain->bi_next;
-		}
+	resid += len;
+	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
+		if (resid <= bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	vcnt = end_idx - idx + 1;
+
+	/* Build the clone */
 
-		tmp->bi_bdev = NULL;
-		tmp->bi_next = NULL;
-		if (new_chain)
-			tail->bi_next = tmp;
-		else
-			new_chain = tmp;
-		tail = tmp;
-		old_chain = old_chain->bi_next;
+	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+	if (!bio)
+		return NULL;	/* ENOMEM */
 
-		total += tmp->bi_size;
+	bio->bi_bdev = bio_src->bi_bdev;
+	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+	bio->bi_rw = bio_src->bi_rw;
+	bio->bi_flags |= 1 << BIO_CLONED;
+
+	/*
+	 * Copy over our part of the bio_vec, then update the first
+	 * and last (or only) entries.
+	 */
+	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+			vcnt * sizeof (struct bio_vec));
+	bio->bi_io_vec[0].bv_offset += voff;
+	if (vcnt > 1) {
+		bio->bi_io_vec[0].bv_len -= voff;
+		bio->bi_io_vec[vcnt - 1].bv_len = resid;
+	} else {
+		bio->bi_io_vec[0].bv_len = len;
 	}
 
-	rbd_assert(total == len);
+	bio->bi_vcnt = vcnt;
+	bio->bi_size = len;
+	bio->bi_idx = 0;
+
+	return bio;
+}
+
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated.  The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out.  On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+					unsigned int *offset,
+					unsigned int len,
+					gfp_t gfpmask)
+{
+	struct bio *bi = *bio_src;
+	unsigned int off = *offset;
+	struct bio *chain = NULL;
+	struct bio **end;
+
+	/* Build up a chain of clone bios up to the limit */
+
+	if (!bi || off >= bi->bi_size || !len)
+		return NULL;		/* Nothing to clone */
 
-	*old = old_chain;
+	end = &chain;
+	while (len) {
+		unsigned int bi_size;
+		struct bio *bio;
+
+		if (!bi)
+			goto out_err;	/* EINVAL; ran out of bio's */
+		bi_size = min_t(unsigned int, bi->bi_size - off, len);
+		bio = bio_clone_range(bi, off, bi_size, gfpmask);
+		if (!bio)
+			goto out_err;	/* ENOMEM */
+
+		*end = bio;
+		end = &bio->bi_next;
+
+		off += bi_size;
+		if (off == bi->bi_size) {
+			bi = bi->bi_next;
+			off = 0;
+		}
+		len -= bi_size;
+	}
+	*bio_src = bi;
+	*offset = off;
 
-	return new_chain;
+	return chain;
+out_err:
+	bio_chain_put(chain);
 
-err_out:
-	dout("bio_chain_clone with err\n");
-	bio_chain_put(new_chain);
 	return NULL;
 }
 
@@ -988,8 +1090,9 @@ static int rbd_do_request(struct request *rq,
 		req_data->coll_index = coll_index;
 	}
 
-	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
-		(unsigned long long) ofs, (unsigned long long) len);
+	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+		object_name, (unsigned long long) ofs,
+		(unsigned long long) len, coll, coll_index);
 
 	osdc = &rbd_dev->rbd_client->client->osdc;
 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@ static int rbd_do_request(struct request *rq,
 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 	layout->fl_stripe_count = cpu_to_le32(1);
 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
+	layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
 	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 				   req, ops);
 	rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
 static int rbd_do_op(struct request *rq,
 		     struct rbd_device *rbd_dev,
 		     struct ceph_snap_context *snapc,
-		     u64 snapid,
-		     int opcode, int flags,
 		     u64 ofs, u64 len,
 		     struct bio *bio,
 		     struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@ static int rbd_do_op(struct request *rq,
 	int ret;
 	struct ceph_osd_req_op *ops;
 	u32 payload_len;
+	int opcode;
+	int flags;
+	u64 snapid;
 
 	seg_name = rbd_segment_name(rbd_dev, ofs);
 	if (!seg_name)
@@ -1174,7 +1278,18 @@ static int rbd_do_op(struct request *rq,
 	seg_len = rbd_segment_length(rbd_dev, ofs, len);
 	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
-	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+	if (rq_data_dir(rq) == WRITE) {
+		opcode = CEPH_OSD_OP_WRITE;
+		flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+		snapid = CEPH_NOSNAP;
+		payload_len = seg_len;
+	} else {
+		opcode = CEPH_OSD_OP_READ;
+		flags = CEPH_OSD_FLAG_READ;
+		snapc = NULL;
+		snapid = rbd_dev->spec->snap_id;
+		payload_len = 0;
+	}
 
 	ret = -ENOMEM;
 	ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1201,41 +1316,6 @@ static int rbd_do_op(struct request *rq,
 	return ret;
 }
 
-/*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
-			 struct rbd_device *rbd_dev,
-			 struct ceph_snap_context *snapc,
-			 u64 ofs, u64 len,
-			 struct bio *bio,
-			 struct rbd_req_coll *coll,
-			 int coll_index)
-{
-	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
-			 CEPH_OSD_OP_WRITE,
-			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			 ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
-			 struct rbd_device *rbd_dev,
-			 u64 snapid,
-			 u64 ofs, u64 len,
-			 struct bio *bio,
-			 struct rbd_req_coll *coll,
-			 int coll_index)
-{
-	return rbd_do_op(rq, rbd_dev, NULL,
-			 snapid,
-			 CEPH_OSD_OP_READ,
-			 CEPH_OSD_FLAG_READ,
-			 ofs, len, bio, coll, coll_index);
-}
-
 /*
  * Request sync osd read
  */
@@ -1304,7 +1384,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
 		rbd_dev->header_name, (unsigned long long) notify_id,
 		(unsigned int) opcode);
-	rc = rbd_refresh_header(rbd_dev, &hver);
+	rc = rbd_dev_refresh(rbd_dev, &hver);
 	if (rc)
 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
 			   " update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@ static void rbd_rq_fn(struct request_queue *q)
 {
 	struct rbd_device *rbd_dev = q->queuedata;
 	struct request *rq;
-	struct bio_pair *bp = NULL;
 
 	while ((rq = blk_fetch_request(q))) {
 		struct bio *bio;
-		struct bio *rq_bio, *next_bio = NULL;
 		bool do_write;
 		unsigned int size;
-		u64 op_size = 0;
 		u64 ofs;
 		int num_segs, cur_seg = 0;
 		struct rbd_req_coll *coll;
 		struct ceph_snap_context *snapc;
+		unsigned int bio_offset;
 
 		dout("fetched request\n");
 
@@ -1483,10 +1561,6 @@ static void rbd_rq_fn(struct request_queue *q)
 
 		/* deduce our operation (read, write) */
 		do_write = (rq_data_dir(rq) == WRITE);
-
-		size = blk_rq_bytes(rq);
-		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-		rq_bio = rq->bio;
 		if (do_write && rbd_dev->mapping.read_only) {
 			__blk_end_request_all(rq, -EROFS);
 			continue;
@@ -1496,8 +1570,8 @@ static void rbd_rq_fn(struct request_queue *q)
 
 		down_read(&rbd_dev->header_rwsem);
 
-		if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
-				!rbd_dev->mapping.snap_exists) {
+		if (!rbd_dev->exists) {
+			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
 			up_read(&rbd_dev->header_rwsem);
 			dout("request for non-existent snapshot");
 			spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@ static void rbd_rq_fn(struct request_queue *q)
 
 		up_read(&rbd_dev->header_rwsem);
 
+		size = blk_rq_bytes(rq);
+		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+		bio = rq->bio;
+
 		dout("%s 0x%x bytes at 0x%llx\n",
 		     do_write ? "write" : "read",
 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@ static void rbd_rq_fn(struct request_queue *q)
 			continue;
 		}
 
+		bio_offset = 0;
 		do {
-			/* a bio clone to be passed down to OSD req */
+			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+			unsigned int chain_size;
+			struct bio *bio_chain;
+
+			BUG_ON(limit > (u64) UINT_MAX);
+			chain_size = (unsigned int) limit;
 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-			op_size = rbd_segment_length(rbd_dev, ofs, size);
+
 			kref_get(&coll->kref);
-			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
-					      op_size, GFP_ATOMIC);
-			if (!bio) {
-				rbd_coll_end_req_index(rq, coll, cur_seg,
-						       -ENOMEM, op_size);
-				goto next_seg;
-			}
 
+			/* Pass a cloned bio chain via an osd request */
 
-			/* init OSD command: write or read */
-			if (do_write)
-				rbd_req_write(rq, rbd_dev,
-					      snapc,
-					      ofs,
-					      op_size, bio,
-					      coll, cur_seg);
+			bio_chain = bio_chain_clone_range(&bio,
+						&bio_offset, chain_size,
+						GFP_ATOMIC);
+			if (bio_chain)
+				(void) rbd_do_op(rq, rbd_dev, snapc,
+						ofs, chain_size,
+						bio_chain, coll, cur_seg);
 			else
-				rbd_req_read(rq, rbd_dev,
-					     rbd_dev->mapping.snap_id,
-					     ofs,
-					     op_size, bio,
-					     coll, cur_seg);
-
-next_seg:
-			size -= op_size;
-			ofs += op_size;
+				rbd_coll_end_req_index(rq, coll, cur_seg,
+						       -ENOMEM, chain_size);
+			size -= chain_size;
+			ofs += chain_size;
 
 			cur_seg++;
-			rq_bio = next_bio;
 		} while (size > 0);
 		kref_put(&coll->kref, rbd_coll_release);
 
-		if (bp)
-			bio_pair_release(bp);
 		spin_lock_irq(q->queue_lock);
 
 		ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@ static void rbd_rq_fn(struct request_queue *q)
 /*
  * a queue callback. Makes sure that we don't create a bio that spans across
  * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
  */
 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
 			  struct bio_vec *bvec)
 {
 	struct rbd_device *rbd_dev = q->queuedata;
-	unsigned int chunk_sectors;
-	sector_t sector;
-	unsigned int bio_sectors;
-	int max;
+	sector_t sector_offset;
+	sector_t sectors_per_obj;
+	sector_t obj_sector_offset;
+	int ret;
 
-	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
-	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+	/*
+	 * Find how far into its rbd object the partition-relative
+	 * bio start sector is to offset relative to the enclosing
+	 * device.
+	 */
+	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
+
+	/*
+	 * Compute the number of bytes from that offset to the end
+	 * of the object.  Account for what's already used by the bio.
+	 */
+	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+	if (ret > bmd->bi_size)
+		ret -= bmd->bi_size;
+	else
+		ret = 0;
 
-	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-				 + bio_sectors)) << SECTOR_SHIFT;
-	if (max < 0)
-		max = 0; /* bio_add cannot handle a negative return */
-	if (max <= bvec->bv_len && bio_sectors == 0)
-		return bvec->bv_len;
-	return max;
+	/*
+	 * Don't send back more than was asked for.  And if the bio
+	 * was empty, let the whole thing through because:  "Note
+	 * that a block device *must* allow a single page to be
+	 * added to an empty bio."
+	 */
+	rbd_assert(bvec->bv_len <= PAGE_SIZE);
+	if (ret > (int) bvec->bv_len || !bmd->bi_size)
+		ret = (int) bvec->bv_len;
+
+	return ret;
 }
 
 static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
 			ret = -ENXIO;
 			pr_warning("short header read for image %s"
 					" (want %zd got %d)\n",
-				rbd_dev->image_name, size, ret);
+				rbd_dev->spec->image_name, size, ret);
 			goto out_err;
 		}
 		if (!rbd_dev_ondisk_valid(ondisk)) {
 			ret = -ENXIO;
 			pr_warning("invalid header for image %s\n",
-				rbd_dev->image_name);
+				rbd_dev->spec->image_name);
 			goto out_err;
 		}
 
@@ -1707,19 +1796,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
 	return ret;
 }
 
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 {
 	struct rbd_snap *snap;
 	struct rbd_snap *next;
 
 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-		__rbd_remove_snap_dev(snap);
+		rbd_remove_snap_dev(snap);
+}
+
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
+{
+	sector_t size;
+
+	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+		return;
+
+	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+	dout("setting size to %llu sectors", (unsigned long long) size);
+	rbd_dev->mapping.size = (u64) size;
+	set_capacity(rbd_dev->disk, size);
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
 	int ret;
 	struct rbd_image_header h;
@@ -1730,17 +1832,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 
 	down_write(&rbd_dev->header_rwsem);
 
-	/* resized? */
-	if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
-		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
-		if (size != (sector_t) rbd_dev->mapping.size) {
-			dout("setting size to %llu sectors",
-				(unsigned long long) size);
-			rbd_dev->mapping.size = (u64) size;
-			set_capacity(rbd_dev->disk, size);
-		}
-	}
+	/* Update image size, and check for resize of mapped image */
+	rbd_dev->header.image_size = h.image_size;
+	rbd_update_mapping_size(rbd_dev);
 
 	/* rbd_dev->header.object_prefix shouldn't change */
 	kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 	return ret;
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
 	int ret;
 
+	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-	ret = __rbd_refresh_header(rbd_dev, hver);
+	if (rbd_dev->image_format == 1)
+		ret = rbd_dev_v1_refresh(rbd_dev, hver);
+	else
+		ret = rbd_dev_v2_refresh(rbd_dev, hver);
 	mutex_unlock(&ctl_mutex);
 
 	return ret;
@@ -1885,7 +1983,7 @@ static ssize_t rbd_pool_show(struct device *dev,
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->pool_name);
+	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
 }
 
 static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%d\n", rbd_dev->pool_id);
+	return sprintf(buf, "%llu\n",
+		(unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@ static ssize_t rbd_name_show(struct device *dev,
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->image_name);
+	if (rbd_dev->spec->image_name)
+		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
+
+	return sprintf(buf, "(unknown)\n");
 }
 
 static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@ static ssize_t rbd_image_id_show(struct device *dev,
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->image_id);
+	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
 }
 
 /*
@@ -1922,7 +2024,50 @@ static ssize_t rbd_snap_show(struct device *dev,
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
+}
+
+/*
+ * For an rbd v2 image, shows the pool id, image id, and snapshot id
+ * for the parent image.  If there is no parent, simply shows
+ * "(no parent image)".
+ */
+static ssize_t rbd_parent_show(struct device *dev,
+			     struct device_attribute *attr,
+			     char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	struct rbd_spec *spec = rbd_dev->parent_spec;
+	int count;
+	char *bufp = buf;
+
+	if (!spec)
+		return sprintf(buf, "(no parent image)\n");
+
+	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
+			(unsigned long long) spec->pool_id, spec->pool_name);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
+			spec->image_name ? spec->image_name : "(unknown)");
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
+			(unsigned long long) spec->snap_id, spec->snap_name);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	return (ssize_t) (bufp - buf);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 	int ret;
 
-	ret = rbd_refresh_header(rbd_dev, NULL);
+	ret = rbd_dev_refresh(rbd_dev, NULL);
 
 	return ret < 0 ? ret : size;
 }
@@ -1948,6 +2093,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
 
 static struct attribute *rbd_attrs[] = {
 	&dev_attr_size.attr,
@@ -1959,6 +2105,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_name.attr,
 	&dev_attr_image_id.attr,
 	&dev_attr_current_snap.attr,
+	&dev_attr_parent.attr,
 	&dev_attr_refresh.attr,
 	NULL
 };
@@ -2047,6 +2194,74 @@ static struct device_type rbd_snap_device_type = {
 	.release	= rbd_snap_dev_release,
 };
 
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+	kref_get(&spec->kref);
+
+	return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+	if (spec)
+		kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+	struct rbd_spec *spec;
+
+	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+	if (!spec)
+		return NULL;
+	kref_init(&spec->kref);
+
+	rbd_spec_put(rbd_spec_get(spec));	/* TEMPORARY */
+
+	return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+	kfree(spec->pool_name);
+	kfree(spec->image_id);
+	kfree(spec->image_name);
+	kfree(spec->snap_name);
+	kfree(spec);
+}
+
+struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+				struct rbd_spec *spec)
+{
+	struct rbd_device *rbd_dev;
+
+	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+	if (!rbd_dev)
+		return NULL;
+
+	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->node);
+	INIT_LIST_HEAD(&rbd_dev->snaps);
+	init_rwsem(&rbd_dev->header_rwsem);
+
+	rbd_dev->spec = spec;
+	rbd_dev->rbd_client = rbdc;
+
+	return rbd_dev;
+}
+
+static void rbd_dev_destroy(struct rbd_device *rbd_dev)
+{
+	rbd_spec_put(rbd_dev->parent_spec);
+	kfree(rbd_dev->header_name);
+	rbd_put_client(rbd_dev->rbd_client);
+	rbd_spec_put(rbd_dev->spec);
+	kfree(rbd_dev);
+}
+
 static bool rbd_snap_registered(struct rbd_snap *snap)
 {
 	bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
 	return ret;
 }
 
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
 {
 	list_del(&snap->node);
 	if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
 	dev->type = &rbd_snap_device_type;
 	dev->parent = parent;
 	dev->release = rbd_snap_dev_release;
-	dev_set_name(dev, "snap_%s", snap->name);
+	dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
 	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
 
 	ret = device_register(dev);
@@ -2189,6 +2404,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
+	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = reply_buf;
 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 		__le64 features;
 		__le64 incompat;
 	} features_buf = { 0 };
+	u64 incompat;
 	int ret;
 
 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		return ret;
+
+	incompat = le64_to_cpu(features_buf.incompat);
+	if (incompat & ~RBD_FEATURES_ALL)
+		return -ENXIO;
+
 	*snap_features = le64_to_cpu(features_buf.features);
 
 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
 						&rbd_dev->header.features);
 }
 
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
+{
+	struct rbd_spec *parent_spec;
+	size_t size;
+	void *reply_buf = NULL;
+	__le64 snapid;
+	void *p;
+	void *end;
+	char *image_id;
+	u64 overlap;
+	size_t len = 0;
+	int ret;
+
+	parent_spec = rbd_spec_alloc();
+	if (!parent_spec)
+		return -ENOMEM;
+
+	size = sizeof (__le64) +				/* pool_id */
+		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
+		sizeof (__le64) +				/* snap_id */
+		sizeof (__le64);				/* overlap */
+	reply_buf = kmalloc(size, GFP_KERNEL);
+	if (!reply_buf) {
+		ret = -ENOMEM;
+		goto out_err;
+	}
+
+	snapid = cpu_to_le64(CEPH_NOSNAP);
+	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+				"rbd", "get_parent",
+				(char *) &snapid, sizeof (snapid),
+				(char *) reply_buf, size,
+				CEPH_OSD_FLAG_READ, NULL);
+	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+	if (ret < 0)
+		goto out_err;
+
+	ret = -ERANGE;
+	p = reply_buf;
+	end = (char *) reply_buf + size;
+	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
+	if (parent_spec->pool_id == CEPH_NOPOOL)
+		goto out;	/* No parent?  No problem. */
+
+	image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+	if (IS_ERR(image_id)) {
+		ret = PTR_ERR(image_id);
+		goto out_err;
+	}
+	parent_spec->image_id = image_id;
+	parent_spec->image_id_len = len;
+	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+	ceph_decode_64_safe(&p, end, overlap, out_err);
+
+	rbd_dev->parent_overlap = overlap;
+	rbd_dev->parent_spec = parent_spec;
+	parent_spec = NULL;	/* rbd_dev now owns this */
+out:
+	ret = 0;
+out_err:
+	kfree(reply_buf);
+	rbd_spec_put(parent_spec);
+
+	return ret;
+}
+
+static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
+{
+	size_t image_id_size;
+	char *image_id;
+	void *p;
+	void *end;
+	size_t size;
+	void *reply_buf = NULL;
+	size_t len = 0;
+	char *image_name = NULL;
+	int ret;
+
+	rbd_assert(!rbd_dev->spec->image_name);
+
+	image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+	image_id = kmalloc(image_id_size, GFP_KERNEL);
+	if (!image_id)
+		return NULL;
+
+	p = image_id;
+	end = (char *) image_id + image_id_size;
+	ceph_encode_string(&p, end, rbd_dev->spec->image_id,
+				(u32) rbd_dev->spec->image_id_len);
+
+	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
+	reply_buf = kmalloc(size, GFP_KERNEL);
+	if (!reply_buf)
+		goto out;
+
+	ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+				"rbd", "dir_get_name",
+				image_id, image_id_size,
+				(char *) reply_buf, size,
+				CEPH_OSD_FLAG_READ, NULL);
+	if (ret < 0)
+		goto out;
+	p = reply_buf;
+	end = (char *) reply_buf + size;
+	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+	if (IS_ERR(image_name))
+		image_name = NULL;
+	else
+		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
+out:
+	kfree(reply_buf);
+	kfree(image_id);
+
+	return image_name;
+}
+
+/*
+ * When a parent image gets probed, we only have the pool, image,
+ * and snapshot ids but not the names of any of them.  This call
+ * is made later to fill in those names.  It has to be done after
+ * rbd_dev_snaps_update() has completed because some of the
+ * information (in particular, snapshot name) is not available
+ * until then.
+ */
+static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc;
+	const char *name;
+	void *reply_buf = NULL;
+	int ret;
+
+	if (rbd_dev->spec->pool_name)
+		return 0;	/* Already have the names */
+
+	/* Look up the pool name */
+
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
+	if (!name)
+		return -EIO;	/* pool id too large (>= 2^31) */
+
+	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
+	if (!rbd_dev->spec->pool_name)
+		return -ENOMEM;
+
+	/* Fetch the image name; tolerate failure here */
+
+	name = rbd_dev_image_name(rbd_dev);
+	if (name) {
+		rbd_dev->spec->image_name_len = strlen(name);
+		rbd_dev->spec->image_name = (char *) name;
+	} else {
+		pr_warning(RBD_DRV_NAME "%d "
+			"unable to get image name for image id %s\n",
+			rbd_dev->major, rbd_dev->spec->image_id);
+	}
+
+	/* Look up the snapshot name. */
+
+	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
+	if (!name) {
+		ret = -EIO;
+		goto out_err;
+	}
+	rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
+	if(!rbd_dev->spec->snap_name)
+		goto out_err;
+
+	return 0;
+out_err:
+	kfree(reply_buf);
+	kfree(rbd_dev->spec->pool_name);
+	rbd_dev->spec->pool_name = NULL;
+
+	return ret;
+}
+
 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
 {
 	size_t size;
@@ -2328,7 +2727,6 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
 	int ret;
 	void *p;
 	void *end;
-	size_t snap_name_len;
 	char *snap_name;
 
 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
 
 	p = reply_buf;
 	end = (char *) reply_buf + size;
-	snap_name_len = 0;
-	snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
-				GFP_KERNEL);
+	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
 	if (IS_ERR(snap_name)) {
 		ret = PTR_ERR(snap_name);
 		goto out;
@@ -2397,6 +2793,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
 	return ERR_PTR(-EINVAL);
 }
 
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+	int ret;
+	__u8 obj_order;
+
+	down_write(&rbd_dev->header_rwsem);
+
+	/* Grab old order first, to see if it changes */
+
+	obj_order = rbd_dev->header.obj_order,
+	ret = rbd_dev_v2_image_size(rbd_dev);
+	if (ret)
+		goto out;
+	if (rbd_dev->header.obj_order != obj_order) {
+		ret = -EIO;
+		goto out;
+	}
+	rbd_update_mapping_size(rbd_dev);
+
+	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+	dout("rbd_dev_v2_snap_context returned %d\n", ret);
+	if (ret)
+		goto out;
+	ret = rbd_dev_snaps_update(rbd_dev);
+	dout("rbd_dev_snaps_update returned %d\n", ret);
+	if (ret)
+		goto out;
+	ret = rbd_dev_snaps_register(rbd_dev);
+	dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+	up_write(&rbd_dev->header_rwsem);
+
+	return ret;
+}
+
 /*
  * Scan the rbd device's current snapshot list and compare it to the
  * newly-received snapshot context.  Remove any existing snapshots
@@ -2436,12 +2867,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
 			/* Existing snapshot not in the new snap context */
 
-			if (rbd_dev->mapping.snap_id == snap->id)
-				rbd_dev->mapping.snap_exists = false;
-			__rbd_remove_snap_dev(snap);
+			if (rbd_dev->spec->snap_id == snap->id)
+				rbd_dev->exists = false;
+			rbd_remove_snap_dev(snap);
 			dout("%ssnap id %llu has been removed\n",
-				rbd_dev->mapping.snap_id == snap->id ?
-								"mapped " : "",
+				rbd_dev->spec->snap_id == snap->id ?
+							"mapped " : "",
 				(unsigned long long) snap->id);
 
 			/* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
 	do {
 		ret = rbd_req_sync_watch(rbd_dev);
 		if (ret == -ERANGE) {
-			rc = rbd_refresh_header(rbd_dev, NULL);
+			rc = rbd_dev_refresh(rbd_dev, NULL);
 			if (rc < 0)
 				return rc;
 		}
@@ -2621,8 +3052,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
 		struct rbd_device *rbd_dev;
 
 		rbd_dev = list_entry(tmp, struct rbd_device, node);
-		if (rbd_id > max_id)
-			max_id = rbd_id;
+		if (rbd_dev->dev_id > max_id)
+			max_id = rbd_dev->dev_id;
 	}
 	spin_unlock(&rbd_dev_list_lock);
 
@@ -2722,73 +3153,140 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
+ *
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ *  ceph_opts
+ *      The address of a pointer that will refer to a ceph options
+ *      structure.  Caller must release the returned pointer using
+ *      ceph_destroy_options() when it is no longer needed.
+ *  rbd_opts
+ *	Address of an rbd options pointer.  Fully initialized by
+ *	this function; caller must release with kfree().
+ *  spec
+ *	Address of an rbd image specification pointer.  Fully
+ *	initialized by this function based on parsed options.
+ *	Caller must release with rbd_spec_put().
  *
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The options passed take this form:
+ *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ *  <mon_addrs>
+ *      A comma-separated list of one or more monitor addresses.
+ *      A monitor address is an ip address, optionally followed
+ *      by a port number (separated by a colon).
+ *        I.e.:  ip1[:port1][,ip2[:port2]...]
+ *  <options>
+ *      A comma-separated list of ceph and/or rbd options.
+ *  <pool_name>
+ *      The name of the rados pool containing the rbd image.
+ *  <image_name>
+ *      The name of the image in that pool to map.
+ *  <snap_id>
+ *      An optional snapshot id.  If provided, the mapping will
+ *      present data from the image at the time that snapshot was
+ *      created.  The image head is used if no snapshot id is
+ *      provided.  Snapshot mappings are always read-only.
  */
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
-				const char *buf,
-				const char **mon_addrs,
-				size_t *mon_addrs_size,
-				char *options,
-				size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+				struct ceph_options **ceph_opts,
+				struct rbd_options **opts,
+				struct rbd_spec **rbd_spec)
 {
 	size_t len;
-	char *err_ptr = ERR_PTR(-EINVAL);
-	char *snap_name;
+	char *options;
+	const char *mon_addrs;
+	size_t mon_addrs_size;
+	struct rbd_spec *spec = NULL;
+	struct rbd_options *rbd_opts = NULL;
+	struct ceph_options *copts;
+	int ret;
 
 	/* The first four tokens are required */
 
 	len = next_token(&buf);
 	if (!len)
-		return err_ptr;
-	*mon_addrs_size = len + 1;
-	*mon_addrs = buf;
-
+		return -EINVAL;	/* Missing monitor address(es) */
+	mon_addrs = buf;
+	mon_addrs_size = len + 1;
 	buf += len;
 
-	len = copy_token(&buf, options, options_size);
-	if (!len || len >= options_size)
-		return err_ptr;
+	ret = -EINVAL;
+	options = dup_token(&buf, NULL);
+	if (!options)
+		return -ENOMEM;
+	if (!*options)
+		goto out_err;	/* Missing options */
 
-	err_ptr = ERR_PTR(-ENOMEM);
-	rbd_dev->pool_name = dup_token(&buf, NULL);
-	if (!rbd_dev->pool_name)
-		goto out_err;
+	spec = rbd_spec_alloc();
+	if (!spec)
+		goto out_mem;
 
-	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
-	if (!rbd_dev->image_name)
-		goto out_err;
+	spec->pool_name = dup_token(&buf, NULL);
+	if (!spec->pool_name)
+		goto out_mem;
+	if (!*spec->pool_name)
+		goto out_err;	/* Missing pool name */
 
-	/* Snapshot name is optional */
+	spec->image_name = dup_token(&buf, &spec->image_name_len);
+	if (!spec->image_name)
+		goto out_mem;
+	if (!*spec->image_name)
+		goto out_err;	/* Missing image name */
+
+	/*
+	 * Snapshot name is optional; default is to use "-"
+	 * (indicating the head/no snapshot).
+	 */
 	len = next_token(&buf);
 	if (!len) {
 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
-	}
-	snap_name = kmalloc(len + 1, GFP_KERNEL);
-	if (!snap_name)
+	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
+		ret = -ENAMETOOLONG;
 		goto out_err;
-	memcpy(snap_name, buf, len);
-	*(snap_name + len) = '\0';
+	}
+	spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+	if (!spec->snap_name)
+		goto out_mem;
+	memcpy(spec->snap_name, buf, len);
+	*(spec->snap_name + len) = '\0';
 
-dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+	/* Initialize all rbd options to the defaults */
 
-	return snap_name;
+	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+	if (!rbd_opts)
+		goto out_mem;
+
+	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+	copts = ceph_parse_options(options, mon_addrs,
+					mon_addrs + mon_addrs_size - 1,
+					parse_rbd_opts_token, rbd_opts);
+	if (IS_ERR(copts)) {
+		ret = PTR_ERR(copts);
+		goto out_err;
+	}
+	kfree(options);
 
+	*ceph_opts = copts;
+	*opts = rbd_opts;
+	*rbd_spec = spec;
+
+	return 0;
+out_mem:
+	ret = -ENOMEM;
 out_err:
-	kfree(rbd_dev->image_name);
-	rbd_dev->image_name = NULL;
-	rbd_dev->image_name_len = 0;
-	kfree(rbd_dev->pool_name);
-	rbd_dev->pool_name = NULL;
+	kfree(rbd_opts);
+	rbd_spec_put(spec);
+	kfree(options);
 
-	return err_ptr;
+	return ret;
 }
 
 /*
@@ -2813,15 +3311,23 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 	void *response;
 	void *p;
 
+	/*
+	 * When probing a parent image, the image id is already
+	 * known (and the image name likely is not).  There's no
+	 * need to fetch the image id again in this case.
+	 */
+	if (rbd_dev->spec->image_id)
+		return 0;
+
 	/*
 	 * First, see if the format 2 image id file exists, and if
 	 * so, get the image's persistent id from it.
 	 */
-	size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+	size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
 	object_name = kmalloc(size, GFP_NOIO);
 	if (!object_name)
 		return -ENOMEM;
-	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
 	dout("rbd id object name is %s\n", object_name);
 
 	/* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
+	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = response;
-	rbd_dev->image_id = ceph_extract_encoded_string(&p,
+	rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
 						p + RBD_IMAGE_ID_LEN_MAX,
-						&rbd_dev->image_id_len,
+						&rbd_dev->spec->image_id_len,
 						GFP_NOIO);
-	if (IS_ERR(rbd_dev->image_id)) {
-		ret = PTR_ERR(rbd_dev->image_id);
-		rbd_dev->image_id = NULL;
+	if (IS_ERR(rbd_dev->spec->image_id)) {
+		ret = PTR_ERR(rbd_dev->spec->image_id);
+		rbd_dev->spec->image_id = NULL;
 	} else {
-		dout("image_id is %s\n", rbd_dev->image_id);
+		dout("image_id is %s\n", rbd_dev->spec->image_id);
 	}
 out:
 	kfree(response);
@@ -2867,26 +3374,33 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
 
 	/* Version 1 images have no id; empty string is used */
 
-	rbd_dev->image_id = kstrdup("", GFP_KERNEL);
-	if (!rbd_dev->image_id)
+	rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+	if (!rbd_dev->spec->image_id)
 		return -ENOMEM;
-	rbd_dev->image_id_len = 0;
+	rbd_dev->spec->image_id_len = 0;
 
 	/* Record the header object name for this rbd image. */
 
-	size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+	size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name) {
 		ret = -ENOMEM;
 		goto out_err;
 	}
-	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+	sprintf(rbd_dev->header_name, "%s%s",
+		rbd_dev->spec->image_name, RBD_SUFFIX);
 
 	/* Populate rbd image metadata */
 
 	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
 	if (ret < 0)
 		goto out_err;
+
+	/* Version 1 images have no parent (no layering) */
+
+	rbd_dev->parent_spec = NULL;
+	rbd_dev->parent_overlap = 0;
+
 	rbd_dev->image_format = 1;
 
 	dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
 out_err:
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
-	kfree(rbd_dev->image_id);
-	rbd_dev->image_id = NULL;
+	kfree(rbd_dev->spec->image_id);
+	rbd_dev->spec->image_id = NULL;
 
 	return ret;
 }
@@ -2913,12 +3427,12 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 	 * Image id was filled in by the caller.  Record the header
 	 * object name for this rbd image.
 	 */
-	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name)
 		return -ENOMEM;
 	sprintf(rbd_dev->header_name, "%s%s",
-			RBD_HEADER_PREFIX, rbd_dev->image_id);
+			RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
 
 	/* Get the size and object order for the image */
 
@@ -2932,12 +3446,20 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 	if (ret < 0)
 		goto out_err;
 
-	/* Get the features for the image */
+	/* Get the and check features for the image */
 
 	ret = rbd_dev_v2_features(rbd_dev);
 	if (ret < 0)
 		goto out_err;
 
+	/* If the image supports layering, get the parent info */
+
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret < 0)
+			goto out_err;
+	}
+
 	/* crypto and compression type aren't (yet) supported for v2 images */
 
 	rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 	dout("discovered version 2 image, header name is %s\n",
 		rbd_dev->header_name);
 
-	return -ENOTSUPP;
+	return 0;
 out_err:
+	rbd_dev->parent_overlap = 0;
+	rbd_spec_put(rbd_dev->parent_spec);
+	rbd_dev->parent_spec = NULL;
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
 	kfree(rbd_dev->header.object_prefix);
@@ -2965,91 +3490,22 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 	return ret;
 }
 
-/*
- * Probe for the existence of the header object for the given rbd
- * device.  For format 2 images this includes determining the image
- * id.
- */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	/*
-	 * Get the id from the image id object.  If it's not a
-	 * format 2 image, we'll get ENOENT back, and we'll assume
-	 * it's a format 1 image.
-	 */
-	ret = rbd_dev_image_id(rbd_dev);
-	if (ret)
-		ret = rbd_dev_v1_probe(rbd_dev);
-	else
-		ret = rbd_dev_v2_probe(rbd_dev);
+	/* no need to lock here, as rbd_dev is not registered yet */
+	ret = rbd_dev_snaps_update(rbd_dev);
 	if (ret)
-		dout("probe failed, returning %d\n", ret);
-
-	return ret;
-}
-
-static ssize_t rbd_add(struct bus_type *bus,
-		       const char *buf,
-		       size_t count)
-{
-	char *options;
-	struct rbd_device *rbd_dev = NULL;
-	const char *mon_addrs = NULL;
-	size_t mon_addrs_size = 0;
-	struct ceph_osd_client *osdc;
-	int rc = -ENOMEM;
-	char *snap_name;
-
-	if (!try_module_get(THIS_MODULE))
-		return -ENODEV;
-
-	options = kmalloc(count, GFP_KERNEL);
-	if (!options)
-		goto err_out_mem;
-	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
-	if (!rbd_dev)
-		goto err_out_mem;
-
-	/* static rbd_device initialization */
-	spin_lock_init(&rbd_dev->lock);
-	INIT_LIST_HEAD(&rbd_dev->node);
-	INIT_LIST_HEAD(&rbd_dev->snaps);
-	init_rwsem(&rbd_dev->header_rwsem);
-
-	/* parse add command */
-	snap_name = rbd_add_parse_args(rbd_dev, buf,
-				&mon_addrs, &mon_addrs_size, options, count);
-	if (IS_ERR(snap_name)) {
-		rc = PTR_ERR(snap_name);
-		goto err_out_mem;
-	}
-
-	rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
-	if (rc < 0)
-		goto err_out_args;
-
-	/* pick the pool */
-	osdc = &rbd_dev->rbd_client->client->osdc;
-	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
-	if (rc < 0)
-		goto err_out_client;
-	rbd_dev->pool_id = rc;
-
-	rc = rbd_dev_probe(rbd_dev);
-	if (rc < 0)
-		goto err_out_client;
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+		return ret;
 
-	/* no need to lock here, as rbd_dev is not registered yet */
-	rc = rbd_dev_snaps_update(rbd_dev);
-	if (rc)
-		goto err_out_header;
+	ret = rbd_dev_probe_update_spec(rbd_dev);
+	if (ret)
+		goto err_out_snaps;
 
-	rc = rbd_dev_set_mapping(rbd_dev, snap_name);
-	if (rc)
-		goto err_out_header;
+	ret = rbd_dev_set_mapping(rbd_dev);
+	if (ret)
+		goto err_out_snaps;
 
 	/* generate unique id: find highest unique id, add one */
 	rbd_dev_id_get(rbd_dev);
@@ -3061,34 +3517,33 @@ static ssize_t rbd_add(struct bus_type *bus,
 
 	/* Get our block major device number. */
 
-	rc = register_blkdev(0, rbd_dev->name);
-	if (rc < 0)
+	ret = register_blkdev(0, rbd_dev->name);
+	if (ret < 0)
 		goto err_out_id;
-	rbd_dev->major = rc;
+	rbd_dev->major = ret;
 
 	/* Set up the blkdev mapping. */
 
-	rc = rbd_init_disk(rbd_dev);
-	if (rc)
+	ret = rbd_init_disk(rbd_dev);
+	if (ret)
 		goto err_out_blkdev;
 
-	rc = rbd_bus_add_dev(rbd_dev);
-	if (rc)
+	ret = rbd_bus_add_dev(rbd_dev);
+	if (ret)
 		goto err_out_disk;
 
 	/*
 	 * At this point cleanup in the event of an error is the job
 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
 	 */
-
 	down_write(&rbd_dev->header_rwsem);
-	rc = rbd_dev_snaps_register(rbd_dev);
+	ret = rbd_dev_snaps_register(rbd_dev);
 	up_write(&rbd_dev->header_rwsem);
-	if (rc)
+	if (ret)
 		goto err_out_bus;
 
-	rc = rbd_init_watch_dev(rbd_dev);
-	if (rc)
+	ret = rbd_init_watch_dev(rbd_dev);
+	if (ret)
 		goto err_out_bus;
 
 	/* Everything's ready.  Announce the disk to the world. */
@@ -3098,37 +3553,119 @@ static ssize_t rbd_add(struct bus_type *bus,
 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
 		(unsigned long long) rbd_dev->mapping.size);
 
-	return count;
-
+	return ret;
 err_out_bus:
 	/* this will also clean up rest of rbd_dev stuff */
 
 	rbd_bus_del_dev(rbd_dev);
-	kfree(options);
-	return rc;
 
+	return ret;
 err_out_disk:
 	rbd_free_disk(rbd_dev);
 err_out_blkdev:
 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
 	rbd_dev_id_put(rbd_dev);
-err_out_header:
-	rbd_header_free(&rbd_dev->header);
+err_out_snaps:
+	rbd_remove_all_snaps(rbd_dev);
+
+	return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device.  For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+	int ret;
+
+	/*
+	 * Get the id from the image id object.  If it's not a
+	 * format 2 image, we'll get ENOENT back, and we'll assume
+	 * it's a format 1 image.
+	 */
+	ret = rbd_dev_image_id(rbd_dev);
+	if (ret)
+		ret = rbd_dev_v1_probe(rbd_dev);
+	else
+		ret = rbd_dev_v2_probe(rbd_dev);
+	if (ret) {
+		dout("probe failed, returning %d\n", ret);
+
+		return ret;
+	}
+
+	ret = rbd_dev_probe_finish(rbd_dev);
+	if (ret)
+		rbd_header_free(&rbd_dev->header);
+
+	return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+		       const char *buf,
+		       size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	struct ceph_options *ceph_opts = NULL;
+	struct rbd_options *rbd_opts = NULL;
+	struct rbd_spec *spec = NULL;
+	struct rbd_client *rbdc;
+	struct ceph_osd_client *osdc;
+	int rc = -ENOMEM;
+
+	if (!try_module_get(THIS_MODULE))
+		return -ENODEV;
+
+	/* parse add command */
+	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
+	if (rc < 0)
+		goto err_out_module;
+
+	rbdc = rbd_get_client(ceph_opts);
+	if (IS_ERR(rbdc)) {
+		rc = PTR_ERR(rbdc);
+		goto err_out_args;
+	}
+	ceph_opts = NULL;	/* rbd_dev client now owns this */
+
+	/* pick the pool */
+	osdc = &rbdc->client->osdc;
+	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+	if (rc < 0)
+		goto err_out_client;
+	spec->pool_id = (u64) rc;
+
+	rbd_dev = rbd_dev_create(rbdc, spec);
+	if (!rbd_dev)
+		goto err_out_client;
+	rbdc = NULL;		/* rbd_dev now owns this */
+	spec = NULL;		/* rbd_dev now owns this */
+
+	rbd_dev->mapping.read_only = rbd_opts->read_only;
+	kfree(rbd_opts);
+	rbd_opts = NULL;	/* done with this */
+
+	rc = rbd_dev_probe(rbd_dev);
+	if (rc < 0)
+		goto err_out_rbd_dev;
+
+	return count;
+err_out_rbd_dev:
+	rbd_dev_destroy(rbd_dev);
 err_out_client:
-	kfree(rbd_dev->header_name);
-	rbd_put_client(rbd_dev);
-	kfree(rbd_dev->image_id);
+	rbd_put_client(rbdc);
 err_out_args:
-	kfree(rbd_dev->mapping.snap_name);
-	kfree(rbd_dev->image_name);
-	kfree(rbd_dev->pool_name);
-err_out_mem:
-	kfree(rbd_dev);
-	kfree(options);
+	if (ceph_opts)
+		ceph_destroy_options(ceph_opts);
+	kfree(rbd_opts);
+	rbd_spec_put(spec);
+err_out_module:
+	module_put(THIS_MODULE);
 
 	dout("Error adding device %s\n", buf);
-	module_put(THIS_MODULE);
 
 	return (ssize_t) rc;
 }
@@ -3163,7 +3700,6 @@ static void rbd_dev_release(struct device *dev)
 	if (rbd_dev->watch_event)
 		rbd_req_sync_unwatch(rbd_dev);
 
-	rbd_put_client(rbd_dev);
 
 	/* clean up and free blkdev */
 	rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@ static void rbd_dev_release(struct device *dev)
 	rbd_header_free(&rbd_dev->header);
 
 	/* done with the id, and with the rbd_dev */
-	kfree(rbd_dev->mapping.snap_name);
-	kfree(rbd_dev->image_id);
-	kfree(rbd_dev->header_name);
-	kfree(rbd_dev->pool_name);
-	kfree(rbd_dev->image_name);
 	rbd_dev_id_put(rbd_dev);
-	kfree(rbd_dev);
+	rbd_assert(rbd_dev->rbd_client != NULL);
+	rbd_dev_destroy(rbd_dev);
 
 	/* release module ref */
 	module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@ static ssize_t rbd_remove(struct bus_type *bus,
 		goto done;
 	}
 
-	__rbd_remove_all_snaps(rbd_dev);
+	if (rbd_dev->open_count) {
+		ret = -EBUSY;
+		goto done;
+	}
+
+	rbd_remove_all_snaps(rbd_dev);
 	rbd_bus_del_dev(rbd_dev);
 
 done:
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index cbe77fa105baace065c8714c348c3cf782391b78..49d77cbcf8bdb0601ac9db5f1ad80309a80cb32e 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -46,8 +46,6 @@
 #define RBD_MIN_OBJ_ORDER       16
 #define RBD_MAX_OBJ_ORDER       30
 
-#define RBD_MAX_SEG_NAME_LEN	128
-
 #define RBD_COMP_NONE		0
 #define RBD_CRYPT_NONE		0
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6690269f5dde2c5617cff3238167788f93636985..064d1a68d2c1d561cc3cd2b4bf63b511b4a76430 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -267,6 +267,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 	kfree(req->r_pages);
 }
 
+static void ceph_unlock_page_vector(struct page **pages, int num_pages)
+{
+	int i;
+
+	for (i = 0; i < num_pages; i++)
+		unlock_page(pages[i]);
+}
+
 /*
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
@@ -347,6 +355,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 	return nr_pages;
 
 out_pages:
+	ceph_unlock_page_vector(pages, nr_pages);
 	ceph_release_page_vector(pages, nr_pages);
 out:
 	ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 			    struct page **pagep, void **fsdata)
 {
 	struct inode *inode = file->f_dentry->d_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_file_info *fi = file->private_data;
 	struct page *page;
 	pgoff_t index = pos >> PAGE_CACHE_SHIFT;
-	int r;
+	int r, want, got = 0;
+
+	if (fi->fmode & CEPH_FILE_MODE_LAZY)
+		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+	else
+		want = CEPH_CAP_FILE_BUFFER;
+
+	dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+	     inode, ceph_vinop(inode), pos, len, inode->i_size);
+	r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
+	if (r < 0)
+		return r;
+	dout("write_begin %p %llx.%llx %llu~%u  got cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+	if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+		ceph_put_cap_refs(ci, got);
+		return -EAGAIN;
+	}
 
 	do {
 		/* get a page */
 		page = grab_cache_page_write_begin(mapping, index, 0);
-		if (!page)
-			return -ENOMEM;
-		*pagep = page;
+		if (!page) {
+			r = -ENOMEM;
+			break;
+		}
 
 		dout("write_begin file %p inode %p page %p %d~%d\n", file,
 		     inode, page, (int)pos, (int)len);
 
 		r = ceph_update_writeable_page(file, pos, len, page);
+		if (r)
+			page_cache_release(page);
 	} while (r == -EAGAIN);
 
+	if (r) {
+		ceph_put_cap_refs(ci, got);
+	} else {
+		*pagep = page;
+		*(int *)fsdata = got;
+	}
 	return r;
 }
 
@@ -1108,10 +1145,12 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 			  struct page *page, void *fsdata)
 {
 	struct inode *inode = file->f_dentry->d_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
 	int check_cap = 0;
+	int got = (unsigned long)fsdata;
 
 	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
 	     inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 	up_read(&mdsc->snap_rwsem);
 	page_cache_release(page);
 
+	if (copied > 0) {
+		int dirty;
+		spin_lock(&ci->i_ceph_lock);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		spin_unlock(&ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+	}
+
+	dout("write_end %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+	ceph_put_cap_refs(ci, got);
+
 	if (check_cap)
 		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
 
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3251e9cc64014fc99f7660d46388427732c88114..a1d9bb30c1bf9fc5460286ff465e2bcbc338bf30 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -236,8 +236,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
 	if (!ctx) {
 		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
 		if (cap) {
+			spin_lock(&mdsc->caps_list_lock);
 			mdsc->caps_use_count++;
 			mdsc->caps_total_count++;
+			spin_unlock(&mdsc->caps_list_lock);
 		}
 		return cap;
 	}
@@ -1349,11 +1351,15 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 		if (!ci->i_head_snapc)
 			ci->i_head_snapc = ceph_get_snap_context(
 				ci->i_snap_realm->cached_context);
-		dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
-			ci->i_head_snapc);
+		dout(" inode %p now dirty snapc %p auth cap %p\n",
+		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
 		spin_lock(&mdsc->cap_dirty_lock);
-		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		if (ci->i_auth_cap)
+			list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		else
+			list_add(&ci->i_dirty_item,
+				 &mdsc->cap_dirty_migrating);
 		spin_unlock(&mdsc->cap_dirty_lock);
 		if (ci->i_flushing_caps == 0) {
 			ihold(inode);
@@ -2388,7 +2394,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
 			    &atime);
 
 	/* max size increase? */
-	if (max_size != ci->i_max_size) {
+	if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
 		dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
 		ci->i_max_size = max_size;
 		if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
 
 	/* make sure we re-request max_size, if necessary */
 	spin_lock(&ci->i_ceph_lock);
+	ci->i_wanted_max_size = 0;  /* reset */
 	ci->i_requested_max_size = 0;
 	spin_unlock(&ci->i_ceph_lock);
 }
@@ -2840,8 +2847,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	case CEPH_CAP_OP_IMPORT:
 		handle_cap_import(mdsc, inode, h, session,
 				  snaptrace, snaptrace_len);
-		ceph_check_caps(ceph_inode(inode), 0, session);
-		goto done_unlocked;
 	}
 
 	/* the rest require a cap */
@@ -2858,6 +2863,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	switch (op) {
 	case CEPH_CAP_OP_REVOKE:
 	case CEPH_CAP_OP_GRANT:
+	case CEPH_CAP_OP_IMPORT:
 		handle_cap_grant(inode, h, session, cap, msg->middle);
 		goto done_unlocked;
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d4dfdcf76d7fdfbff4c318e1e4f5d9a2487d0088..e51558fca3a346a93797561e26e148de037d971a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -712,63 +712,53 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
 	loff_t endoff = pos + iov->iov_len;
-	int want, got = 0;
-	int ret, err;
+	int got = 0;
+	int ret, err, written;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
 retry_snap:
+	written = 0;
 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
 		return -ENOSPC;
 	__ceph_do_pending_vmtruncate(inode);
-	dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     inode->i_size);
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
-	else
-		want = CEPH_CAP_FILE_BUFFER;
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
-	if (ret < 0)
-		goto out_put;
-
-	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     ceph_cap_string(got));
-
-	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
-	    (fi->flags & CEPH_F_SYNC)) {
-		ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
-			&iocb->ki_pos);
-	} else {
-		/*
-		 * buffered write; drop Fw early to avoid slow
-		 * revocation if we get stuck on balance_dirty_pages
-		 */
-		int dirty;
-
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
-		spin_unlock(&ci->i_ceph_lock);
-		ceph_put_cap_refs(ci, got);
 
+	/*
+	 * try to do a buffered write.  if we don't have sufficient
+	 * caps, we'll get -EAGAIN from generic_file_aio_write, or a
+	 * short write if we only get caps for some pages.
+	 */
+	if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
+	    !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
+	    !(fi->flags & CEPH_F_SYNC)) {
 		ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
+		if (ret >= 0)
+			written = ret;
+
 		if ((ret >= 0 || ret == -EIOCBQUEUED) &&
 		    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
 		     || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
-			err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
+			err = vfs_fsync_range(file, pos, pos + written - 1, 1);
 			if (err < 0)
 				ret = err;
 		}
+		if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
+			goto out;
+	}
 
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
+	dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, inode->i_size);
+	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
+	if (ret < 0)
 		goto out;
-	}
 
+	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
+	ret = ceph_sync_write(file, iov->iov_base + written,
+			      iov->iov_len - written, &iocb->ki_pos);
 	if (ret >= 0) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
 	}
-
-out_put:
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     ceph_cap_string(got));
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
-
 out:
 	if (ret == -EOLDSNAPC) {
 		dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ba95eea201bf1ef8e809159d6b41019fb9169752..2971eaa65cdce9b3b31edca64ea979eeb814ab7a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1466,7 +1466,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 to;
-	int wrbuffer_refs, wake = 0;
+	int wrbuffer_refs, finish = 0;
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
 	truncate_inode_pages(inode->i_mapping, to);
 
 	spin_lock(&ci->i_ceph_lock);
-	ci->i_truncate_pending--;
-	if (ci->i_truncate_pending == 0)
-		wake = 1;
+	if (to == ci->i_truncate_size) {
+		ci->i_truncate_pending = 0;
+		finish = 1;
+	}
 	spin_unlock(&ci->i_ceph_lock);
+	if (!finish)
+		goto retry;
 
 	if (wrbuffer_refs == 0)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
-	if (wake)
-		wake_up_all(&ci->i_cap_wq);
+
+	wake_up_all(&ci->i_cap_wq);
 }
 
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1bcf712655d90a6001db4e8de8e86cfa3a70187d..9165eb8309eba442efa237aaa313a43c92641607 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
 	} else if (rpath || rino) {
 		*ino = rino;
 		*ppath = rpath;
-		*pathlen = strlen(rpath);
+		*pathlen = rpath ? strlen(rpath) : 0;
 		dout(" path %.*s\n", *pathlen, rpath);
 	}
 
@@ -1876,9 +1876,14 @@ static int __do_request(struct ceph_mds_client *mdsc,
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head)
 {
-	struct ceph_mds_request *req, *nreq;
+	struct ceph_mds_request *req;
+	LIST_HEAD(tmp_list);
+
+	list_splice_init(head, &tmp_list);
 
-	list_for_each_entry_safe(req, nreq, head, r_wait) {
+	while (!list_empty(&tmp_list)) {
+		req = list_entry(tmp_list.next,
+				 struct ceph_mds_request, r_wait);
 		list_del_init(&req->r_wait);
 		__do_request(mdsc, req);
 	}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 2eb43f211325860a9d8c4eb870fa32c494568966..e86aa9948124b3601e1734515e4fb629627855de 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -403,8 +403,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
 	if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
 		seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
-	if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
-		seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
 	if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
 		seq_printf(m, ",osdkeepalivetimeout=%d",
 			   opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@ static int ceph_register_bdi(struct super_block *sb,
 		fsc->backing_dev_info.ra_pages =
 			default_backing_dev_info.ra_pages;
 
-	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
+	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
 			   atomic_long_inc_return(&bdi_seq));
 	if (!err)
 		sb->s_bdi = &fsc->backing_dev_info;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index 2a9a9abc91260c09a7940136a965a08209c5828b..12731a19ef060792b5500c3731551b636c437fe8 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -114,6 +114,7 @@ struct backing_dev_info {
 int bdi_init(struct backing_dev_info *bdi);
 void bdi_destroy(struct backing_dev_info *bdi);
 
+__printf(3, 4)
 int bdi_register(struct backing_dev_info *bdi, struct device *parent,
 		const char *fmt, ...);
 int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 6470792b13d3c79734a9a3ae7321a9df691cec78..084d3c622b12a8ac58ad626da2e8f325d6d3d1c9 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -43,7 +43,6 @@ struct ceph_options {
 	struct ceph_entity_addr my_addr;
 	int mount_timeout;
 	int osd_idle_ttl;
-	int osd_timeout;
 	int osd_keepalive_timeout;
 
 	/*
@@ -63,7 +62,6 @@ struct ceph_options {
  * defaults
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
-#define CEPH_OSD_TIMEOUT_DEFAULT    60  /* seconds */
 #define CEPH_OSD_KEEPALIVE_DEFAULT  5
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
 
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e37acbe989a906c8cdead48c6eea2e68f53e6ed8..10a417f9f76fa9ccd2e2fffbc27950dbc355df59 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -123,6 +123,7 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
 				struct ceph_pg pgid);
 
+extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
 extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
 
 #endif
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index de91fbdf127e64b343f3fc3530d6d7fb9bd4e59e..2c04afeead1cb3d776feea90e51f93731417b1a3 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -87,6 +87,8 @@ struct ceph_pg {
  *
  *  lpgp_num -- as above.
  */
+#define CEPH_NOPOOL  ((__u64) (-1))  /* pool id not defined */
+
 #define CEPH_PG_TYPE_REP     1
 #define CEPH_PG_TYPE_RAID4   2
 #define CEPH_PG_POOL_VERSION 2
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a8020293f34210620eff72f6193ee35e064f5d69..ee71ea26777ae892fafd13a8338b3f556d9f427b 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
 
 	/* start with defaults */
 	opt->flags = CEPH_OPT_DEFAULT;
-	opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
 	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
 	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
 	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
 
 			/* misc */
 		case Opt_osdtimeout:
-			opt->osd_timeout = intval;
+			pr_warning("ignoring deprecated osdtimeout option\n");
 			break;
 		case Opt_osdkeepalivetimeout:
 			opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759403b411fe53595e2ddf1eb6314a4f9ef8..4d111fd2b4923f7fe7cd287de00029b19c61f274 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@ static int try_read(struct ceph_connection *con)
 
 
 /*
- * Atomically queue work on a connection.  Bump @con reference to
- * avoid races with connection teardown.
+ * Atomically queue work on a connection after the specified delay.
+ * Bump @con reference to avoid races with connection teardown.
+ * Returns 0 if work was queued, or an error code otherwise.
  */
-static void queue_con(struct ceph_connection *con)
+static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 {
 	if (!con->ops->get(con)) {
-		dout("queue_con %p ref count 0\n", con);
-		return;
+		dout("%s %p ref count 0\n", __func__, con);
+
+		return -ENOENT;
 	}
 
-	if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
-		dout("queue_con %p - already queued\n", con);
+	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
+		dout("%s %p - already queued\n", __func__, con);
 		con->ops->put(con);
-	} else {
-		dout("queue_con %p\n", con);
+
+		return -EBUSY;
 	}
+
+	dout("%s %p %lu\n", __func__, con, delay);
+
+	return 0;
+}
+
+static void queue_con(struct ceph_connection *con)
+{
+	(void) queue_con_delay(con, 0);
+}
+
+static bool con_sock_closed(struct ceph_connection *con)
+{
+	if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+		return false;
+
+#define CASE(x)								\
+	case CON_STATE_ ## x:						\
+		con->error_msg = "socket closed (con state " #x ")";	\
+		break;
+
+	switch (con->state) {
+	CASE(CLOSED);
+	CASE(PREOPEN);
+	CASE(CONNECTING);
+	CASE(NEGOTIATING);
+	CASE(OPEN);
+	CASE(STANDBY);
+	default:
+		pr_warning("%s con %p unrecognized state %lu\n",
+			__func__, con, con->state);
+		con->error_msg = "unrecognized con state";
+		BUG();
+		break;
+	}
+#undef CASE
+
+	return true;
 }
 
 /*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
 
 	mutex_lock(&con->mutex);
 restart:
-	if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
-		switch (con->state) {
-		case CON_STATE_CONNECTING:
-			con->error_msg = "connection failed";
-			break;
-		case CON_STATE_NEGOTIATING:
-			con->error_msg = "negotiation failed";
-			break;
-		case CON_STATE_OPEN:
-			con->error_msg = "socket closed";
-			break;
-		default:
-			dout("unrecognized con state %d\n", (int)con->state);
-			con->error_msg = "unrecognized con state";
-			BUG();
-		}
+	if (con_sock_closed(con))
 		goto fault;
-	}
 
 	if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
 		dout("con_work %p backing off\n", con);
-		if (queue_delayed_work(ceph_msgr_wq, &con->work,
-				       round_jiffies_relative(con->delay))) {
-			dout("con_work %p backoff %lu\n", con, con->delay);
-			mutex_unlock(&con->mutex);
-			return;
-		} else {
+		ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+		if (ret) {
 			dout("con_work %p FAILED to back off %lu\n", con,
 			     con->delay);
+			BUG_ON(ret == -ENOENT);
 			set_bit(CON_FLAG_BACKOFF, &con->flags);
 		}
 		goto done;
@@ -2356,7 +2377,7 @@ static void con_work(struct work_struct *work)
 static void ceph_fault(struct ceph_connection *con)
 	__releases(con->mutex)
 {
-	pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+	pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
 	       ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
 	dout("fault %p state %lu to peer %s\n",
 	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
 			con->delay = BASE_DELAY_INTERVAL;
 		else if (con->delay < MAX_DELAY_INTERVAL)
 			con->delay *= 2;
-		con->ops->get(con);
-		if (queue_delayed_work(ceph_msgr_wq, &con->work,
-				       round_jiffies_relative(con->delay))) {
-			dout("fault queued %p delay %lu\n", con, con->delay);
-		} else {
-			con->ops->put(con);
-			dout("fault failed to queue %p delay %lu, backoff\n",
-			     con, con->delay);
-			/*
-			 * In many cases we see a socket state change
-			 * while con_work is running and end up
-			 * queuing (non-delayed) work, such that we
-			 * can't backoff with a delay.  Set a flag so
-			 * that when con_work restarts we schedule the
-			 * delay then.
-			 */
-			set_bit(CON_FLAG_BACKOFF, &con->flags);
-		}
+		set_bit(CON_FLAG_BACKOFF, &con->flags);
+		queue_con(con);
 	}
 
 out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc74486178ed8d1f662d14172e3e59bb1c..780caf6b049188cd20e043f2206b23600223b820 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
+	RB_CLEAR_NODE(&req->r_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
 	INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 
 	dout("__kick_osd_requests osd%d\n", osd->o_osd);
 	err = __reset_osd(osdc, osd);
-	if (err == -EAGAIN)
+	if (err)
 		return;
 
 	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 	}
 }
 
-static void kick_osd_requests(struct ceph_osd_client *osdc,
-			      struct ceph_osd *kickosd)
-{
-	mutex_lock(&osdc->request_mutex);
-	__kick_osd_requests(osdc, kickosd);
-	mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * If the osd connection drops, we need to resubmit all requests.
  */
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
 	dout("osd_reset osd%d\n", osd->o_osd);
 	osdc = osd->o_osdc;
 	down_read(&osdc->map_sem);
-	kick_osd_requests(osdc, osd);
+	mutex_lock(&osdc->request_mutex);
+	__kick_osd_requests(osdc, osd);
+	mutex_unlock(&osdc->request_mutex);
 	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
 	osd->o_osd = onum;
+	RB_CLEAR_NODE(&osd->o_node);
 	INIT_LIST_HEAD(&osd->o_requests);
 	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 	if (list_empty(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests)) {
 		__remove_osd(osdc, osd);
+		ret = -ENODEV;
 	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 			  &osd->o_con.peer_addr,
 			  sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 			req->r_osd = NULL;
 	}
 
+	list_del_init(&req->r_req_lru_item);
 	ceph_osdc_put_request(req);
 
-	list_del_init(&req->r_req_lru_item);
 	if (osdc->num_requests == 0) {
 		dout(" no requests, canceling timeout\n");
 		__cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req)
 {
 	dout("__unregister_linger_request %p\n", req);
+	list_del_init(&req->r_linger_item);
 	if (req->r_osd) {
-		list_del_init(&req->r_linger_item);
 		list_del_init(&req->r_linger_osd);
 
 		if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
 {
 	struct ceph_osd_client *osdc =
 		container_of(work, struct ceph_osd_client, timeout_work.work);
-	struct ceph_osd_request *req, *last_req = NULL;
+	struct ceph_osd_request *req;
 	struct ceph_osd *osd;
-	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
 	unsigned long keepalive =
 		osdc->client->options->osd_keepalive_timeout * HZ;
-	unsigned long last_stamp = 0;
 	struct list_head slow_osds;
 	dout("timeout\n");
 	down_read(&osdc->map_sem);
@@ -1104,37 +1099,6 @@ static void handle_timeout(struct work_struct *work)
 
 	mutex_lock(&osdc->request_mutex);
 
-	/*
-	 * reset osds that appear to be _really_ unresponsive.  this
-	 * is a failsafe measure.. we really shouldn't be getting to
-	 * this point if the system is working properly.  the monitors
-	 * should mark the osd as failed and we should find out about
-	 * it from an updated osd map.
-	 */
-	while (timeout && !list_empty(&osdc->req_lru)) {
-		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
-				 r_req_lru_item);
-
-		/* hasn't been long enough since we sent it? */
-		if (time_before(jiffies, req->r_stamp + timeout))
-			break;
-
-		/* hasn't been long enough since it was acked? */
-		if (req->r_request->ack_stamp == 0 ||
-		    time_before(jiffies, req->r_request->ack_stamp + timeout))
-			break;
-
-		BUG_ON(req == last_req && req->r_stamp == last_stamp);
-		last_req = req;
-		last_stamp = req->r_stamp;
-
-		osd = req->r_osd;
-		BUG_ON(!osd);
-		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
-			   req->r_tid, osd->o_osd);
-		__kick_osd_requests(osdc, osd);
-	}
-
 	/*
 	 * ping osds that are a bit slow.  this ensures that if there
 	 * is a break in the TCP connection we will notice, and reopen
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
 		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
 		     req->r_osd ? req->r_osd->o_osd : -1);
-		__unregister_linger_request(osdc, req);
 		__register_request(osdc, req);
+		__unregister_linger_request(osdc, req);
 	}
 	mutex_unlock(&osdc->request_mutex);
 
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 	event->data = data;
 	event->osdc = osdc;
 	INIT_LIST_HEAD(&event->osd_node);
+	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
 	init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0eb3c68596b375388b3df74a03c6e8441c..de73214b5d26c04989905bafc62bd9c056f2131c 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
 	return NULL;
 }
 
+const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
+{
+	struct ceph_pg_pool_info *pi;
+
+	if (id == CEPH_NOPOOL)
+		return NULL;
+
+	if (WARN_ON_ONCE(id > (u64) INT_MAX))
+		return NULL;
+
+	pi = __lookup_pg_pool(&map->pg_pools, (int) id);
+
+	return pi ? pi->name : NULL;
+}
+EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
+
 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 {
 	struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
 	ceph_decode_32_safe(p, end, max, bad);
 	while (max--) {
 		ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+		err = -ENOMEM;
 		pi = kzalloc(sizeof(*pi), GFP_NOFS);
 		if (!pi)
 			goto bad;
 		pi->id = ceph_decode_32(p);
+		err = -EINVAL;
 		ev = ceph_decode_8(p); /* encoding version */
 		if (ev > CEPH_PG_POOL_VERSION) {
 			pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
 		__insert_pg_pool(&map->pg_pools, pi);
 	}
 
-	if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-		goto bad;
+	if (version >= 5) {
+		err = __decode_pool_names(p, end, map);
+		if (err < 0) {
+			dout("fail to decode pool names");
+			goto bad;
+		}
+	}
 
 	ceph_decode_32_safe(p, end, map->pool_max, bad);
 
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
 	return map;
 
 bad:
-	dout("osdmap_decode fail\n");
+	dout("osdmap_decode fail err %d\n", err);
 	ceph_osdmap_destroy(map);
 	return ERR_PTR(err);
 }
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 		if (ev > CEPH_PG_POOL_VERSION) {
 			pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
 				   ev, CEPH_PG_POOL_VERSION);
+			err = -EINVAL;
 			goto bad;
 		}
 		pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 		if (err < 0)
 			goto bad;
 	}
-	if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-		goto bad;
+	if (version >= 5) {
+		err = __decode_pool_names(p, end, map);
+		if (err < 0)
+			goto bad;
+	}
 
 	/* old_pool */
 	ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 			(void) __remove_pg_mapping(&map->pg_temp, pgid);
 
 			/* insert */
-			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
-				err = -EINVAL;
+			err = -EINVAL;
+			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
 				goto bad;
-			}
+			err = -ENOMEM;
 			pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
-			if (!pg) {
-				err = -ENOMEM;
+			if (!pg)
 				goto bad;
-			}
 			pg->pgid = pgid;
 			pg->len = pglen;
 			for (j = 0; j < pglen; j++)