diff --git a/fs/fuse/cuse.c b/fs/fuse/cuse.c
index 6f96a8def14766be9fa173e26f12583dc3f04697..b3aaf7b3578bb068aee5df3a35ced44976d33bfc 100644
--- a/fs/fuse/cuse.c
+++ b/fs/fuse/cuse.c
@@ -92,8 +92,9 @@ static ssize_t cuse_read(struct file *file, char __user *buf, size_t count,
 {
 	loff_t pos = 0;
 	struct iovec iov = { .iov_base = buf, .iov_len = count };
+	struct fuse_io_priv io = { .async = 0, .file = file };
 
-	return fuse_direct_io(file, &iov, 1, count, &pos, 0);
+	return fuse_direct_io(&io, &iov, 1, count, &pos, 0);
 }
 
 static ssize_t cuse_write(struct file *file, const char __user *buf,
@@ -101,12 +102,13 @@ static ssize_t cuse_write(struct file *file, const char __user *buf,
 {
 	loff_t pos = 0;
 	struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
+	struct fuse_io_priv io = { .async = 0, .file = file };
 
 	/*
 	 * No locking or generic_write_checks(), the server is
 	 * responsible for locking and sanity checks.
 	 */
-	return fuse_direct_io(file, &iov, 1, count, &pos, 1);
+	return fuse_direct_io(&io, &iov, 1, count, &pos, 1);
 }
 
 static int cuse_open(struct inode *inode, struct file *file)
@@ -422,7 +424,7 @@ static int cuse_send_init(struct cuse_conn *cc)
 
 	BUILD_BUG_ON(CUSE_INIT_INFO_MAX > PAGE_SIZE);
 
-	req = fuse_get_req(fc, 1);
+	req = fuse_get_req_for_background(fc, 1);
 	if (IS_ERR(req)) {
 		rc = PTR_ERR(req);
 		goto err;
@@ -504,7 +506,7 @@ static int cuse_channel_open(struct inode *inode, struct file *file)
 	cc->fc.release = cuse_fc_release;
 
 	cc->fc.connected = 1;
-	cc->fc.blocked = 0;
+	cc->fc.initialized = 1;
 	rc = cuse_send_init(cc);
 	if (rc) {
 		fuse_conn_put(&cc->fc);
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 9bfd1a3214e663fb579ad3523f0f22eb082e5926..a6c1664e330b0e8610eb3faf3d14df6c8ed3eec2 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -111,7 +111,7 @@ static void restore_sigs(sigset_t *oldset)
 	sigprocmask(SIG_SETMASK, oldset, NULL);
 }
 
-static void __fuse_get_request(struct fuse_req *req)
+void __fuse_get_request(struct fuse_req *req)
 {
 	atomic_inc(&req->count);
 }
@@ -130,20 +130,30 @@ static void fuse_req_init_context(struct fuse_req *req)
 	req->in.h.pid = current->pid;
 }
 
-struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
+static bool fuse_block_alloc(struct fuse_conn *fc, bool for_background)
+{
+	return !fc->initialized || (for_background && fc->blocked);
+}
+
+static struct fuse_req *__fuse_get_req(struct fuse_conn *fc, unsigned npages,
+				       bool for_background)
 {
 	struct fuse_req *req;
-	sigset_t oldset;
-	int intr;
 	int err;
-
 	atomic_inc(&fc->num_waiting);
-	block_sigs(&oldset);
-	intr = wait_event_interruptible(fc->blocked_waitq, !fc->blocked);
-	restore_sigs(&oldset);
-	err = -EINTR;
-	if (intr)
-		goto out;
+
+	if (fuse_block_alloc(fc, for_background)) {
+		sigset_t oldset;
+		int intr;
+
+		block_sigs(&oldset);
+		intr = wait_event_interruptible_exclusive(fc->blocked_waitq,
+				!fuse_block_alloc(fc, for_background));
+		restore_sigs(&oldset);
+		err = -EINTR;
+		if (intr)
+			goto out;
+	}
 
 	err = -ENOTCONN;
 	if (!fc->connected)
@@ -151,19 +161,35 @@ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
 
 	req = fuse_request_alloc(npages);
 	err = -ENOMEM;
-	if (!req)
+	if (!req) {
+		if (for_background)
+			wake_up(&fc->blocked_waitq);
 		goto out;
+	}
 
 	fuse_req_init_context(req);
 	req->waiting = 1;
+	req->background = for_background;
 	return req;
 
  out:
 	atomic_dec(&fc->num_waiting);
 	return ERR_PTR(err);
 }
+
+struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
+{
+	return __fuse_get_req(fc, npages, false);
+}
 EXPORT_SYMBOL_GPL(fuse_get_req);
 
+struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc,
+					     unsigned npages)
+{
+	return __fuse_get_req(fc, npages, true);
+}
+EXPORT_SYMBOL_GPL(fuse_get_req_for_background);
+
 /*
  * Return request in fuse_file->reserved_req.  However that may
  * currently be in use.  If that is the case, wait for it to become
@@ -225,19 +251,31 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc,
 	struct fuse_req *req;
 
 	atomic_inc(&fc->num_waiting);
-	wait_event(fc->blocked_waitq, !fc->blocked);
+	wait_event(fc->blocked_waitq, fc->initialized);
 	req = fuse_request_alloc(0);
 	if (!req)
 		req = get_reserved_req(fc, file);
 
 	fuse_req_init_context(req);
 	req->waiting = 1;
+	req->background = 0;
 	return req;
 }
 
 void fuse_put_request(struct fuse_conn *fc, struct fuse_req *req)
 {
 	if (atomic_dec_and_test(&req->count)) {
+		if (unlikely(req->background)) {
+			/*
+			 * We get here in the unlikely case that a background
+			 * request was allocated but not sent
+			 */
+			spin_lock(&fc->lock);
+			if (!fc->blocked)
+				wake_up(&fc->blocked_waitq);
+			spin_unlock(&fc->lock);
+		}
+
 		if (req->waiting)
 			atomic_dec(&fc->num_waiting);
 
@@ -335,10 +373,15 @@ __releases(fc->lock)
 	list_del(&req->intr_entry);
 	req->state = FUSE_REQ_FINISHED;
 	if (req->background) {
-		if (fc->num_background == fc->max_background) {
+		req->background = 0;
+
+		if (fc->num_background == fc->max_background)
 			fc->blocked = 0;
-			wake_up_all(&fc->blocked_waitq);
-		}
+
+		/* Wake up next waiter, if any */
+		if (!fc->blocked && waitqueue_active(&fc->blocked_waitq))
+			wake_up(&fc->blocked_waitq);
+
 		if (fc->num_background == fc->congestion_threshold &&
 		    fc->connected && fc->bdi_initialized) {
 			clear_bdi_congested(&fc->bdi, BLK_RW_SYNC);
@@ -442,6 +485,7 @@ __acquires(fc->lock)
 
 static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
 {
+	BUG_ON(req->background);
 	spin_lock(&fc->lock);
 	if (!fc->connected)
 		req->out.h.error = -ENOTCONN;
@@ -469,7 +513,7 @@ EXPORT_SYMBOL_GPL(fuse_request_send);
 static void fuse_request_send_nowait_locked(struct fuse_conn *fc,
 					    struct fuse_req *req)
 {
-	req->background = 1;
+	BUG_ON(!req->background);
 	fc->num_background++;
 	if (fc->num_background == fc->max_background)
 		fc->blocked = 1;
@@ -2071,6 +2115,7 @@ void fuse_abort_conn(struct fuse_conn *fc)
 	if (fc->connected) {
 		fc->connected = 0;
 		fc->blocked = 0;
+		fc->initialized = 1;
 		end_io_requests(fc);
 		end_queued_requests(fc);
 		end_polls(fc);
@@ -2089,6 +2134,7 @@ int fuse_dev_release(struct inode *inode, struct file *file)
 		spin_lock(&fc->lock);
 		fc->connected = 0;
 		fc->blocked = 0;
+		fc->initialized = 1;
 		end_queued_requests(fc);
 		end_polls(fc);
 		wake_up_all(&fc->blocked_waitq);
diff --git a/fs/fuse/dir.c b/fs/fuse/dir.c
index ff15522481d4261c042b87107f90108dcd304d87..254df56b847b96d5104e62818c0f1c6f4803673c 100644
--- a/fs/fuse/dir.c
+++ b/fs/fuse/dir.c
@@ -1562,10 +1562,9 @@ void fuse_release_nowrite(struct inode *inode)
  * vmtruncate() doesn't allow for this case, so do the rlimit checking
  * and the actual truncation by hand.
  */
-static int fuse_do_setattr(struct dentry *entry, struct iattr *attr,
-			   struct file *file)
+int fuse_do_setattr(struct inode *inode, struct iattr *attr,
+		    struct file *file)
 {
-	struct inode *inode = entry->d_inode;
 	struct fuse_conn *fc = get_fuse_conn(inode);
 	struct fuse_req *req;
 	struct fuse_setattr_in inarg;
@@ -1574,9 +1573,6 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr,
 	loff_t oldsize;
 	int err;
 
-	if (!fuse_allow_current_process(fc))
-		return -EACCES;
-
 	if (!(fc->flags & FUSE_DEFAULT_PERMISSIONS))
 		attr->ia_valid |= ATTR_FORCE;
 
@@ -1671,10 +1667,15 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr,
 
 static int fuse_setattr(struct dentry *entry, struct iattr *attr)
 {
+	struct inode *inode = entry->d_inode;
+
+	if (!fuse_allow_current_process(get_fuse_conn(inode)))
+		return -EACCES;
+
 	if (attr->ia_valid & ATTR_FILE)
-		return fuse_do_setattr(entry, attr, attr->ia_file);
+		return fuse_do_setattr(inode, attr, attr->ia_file);
 	else
-		return fuse_do_setattr(entry, attr, NULL);
+		return fuse_do_setattr(inode, attr, NULL);
 }
 
 static int fuse_getattr(struct vfsmount *mnt, struct dentry *entry,
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index d15c6f21c17f6eefe1d90f16e77d59afe045394b..4655e59d545b88f7d1494652dd580968a1ff7572 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -126,11 +126,13 @@ static void fuse_file_put(struct fuse_file *ff, bool sync)
 		struct fuse_req *req = ff->reserved_req;
 
 		if (sync) {
+			req->background = 0;
 			fuse_request_send(ff->fc, req);
 			path_put(&req->misc.release.path);
 			fuse_put_request(ff->fc, req);
 		} else {
 			req->end = fuse_release_end;
+			req->background = 1;
 			fuse_request_send_background(ff->fc, req);
 		}
 		kfree(ff);
@@ -282,6 +284,7 @@ void fuse_sync_release(struct fuse_file *ff, int flags)
 	WARN_ON(atomic_read(&ff->count) > 1);
 	fuse_prepare_release(ff, flags, FUSE_RELEASE);
 	ff->reserved_req->force = 1;
+	ff->reserved_req->background = 0;
 	fuse_request_send(ff->fc, ff->reserved_req);
 	fuse_put_request(ff->fc, ff->reserved_req);
 	kfree(ff);
@@ -491,9 +494,115 @@ void fuse_read_fill(struct fuse_req *req, struct file *file, loff_t pos,
 	req->out.args[0].size = count;
 }
 
-static size_t fuse_send_read(struct fuse_req *req, struct file *file,
+static void fuse_release_user_pages(struct fuse_req *req, int write)
+{
+	unsigned i;
+
+	for (i = 0; i < req->num_pages; i++) {
+		struct page *page = req->pages[i];
+		if (write)
+			set_page_dirty_lock(page);
+		put_page(page);
+	}
+}
+
+/**
+ * In case of short read, the caller sets 'pos' to the position of
+ * actual end of fuse request in IO request. Otherwise, if bytes_requested
+ * == bytes_transferred or rw == WRITE, the caller sets 'pos' to -1.
+ *
+ * An example:
+ * User requested DIO read of 64K. It was splitted into two 32K fuse requests,
+ * both submitted asynchronously. The first of them was ACKed by userspace as
+ * fully completed (req->out.args[0].size == 32K) resulting in pos == -1. The
+ * second request was ACKed as short, e.g. only 1K was read, resulting in
+ * pos == 33K.
+ *
+ * Thus, when all fuse requests are completed, the minimal non-negative 'pos'
+ * will be equal to the length of the longest contiguous fragment of
+ * transferred data starting from the beginning of IO request.
+ */
+static void fuse_aio_complete(struct fuse_io_priv *io, int err, ssize_t pos)
+{
+	int left;
+
+	spin_lock(&io->lock);
+	if (err)
+		io->err = io->err ? : err;
+	else if (pos >= 0 && (io->bytes < 0 || pos < io->bytes))
+		io->bytes = pos;
+
+	left = --io->reqs;
+	spin_unlock(&io->lock);
+
+	if (!left) {
+		long res;
+
+		if (io->err)
+			res = io->err;
+		else if (io->bytes >= 0 && io->write)
+			res = -EIO;
+		else {
+			res = io->bytes < 0 ? io->size : io->bytes;
+
+			if (!is_sync_kiocb(io->iocb)) {
+				struct path *path = &io->iocb->ki_filp->f_path;
+				struct inode *inode = path->dentry->d_inode;
+				struct fuse_conn *fc = get_fuse_conn(inode);
+				struct fuse_inode *fi = get_fuse_inode(inode);
+
+				spin_lock(&fc->lock);
+				fi->attr_version = ++fc->attr_version;
+				spin_unlock(&fc->lock);
+			}
+		}
+
+		aio_complete(io->iocb, res, 0);
+		kfree(io);
+	}
+}
+
+static void fuse_aio_complete_req(struct fuse_conn *fc, struct fuse_req *req)
+{
+	struct fuse_io_priv *io = req->io;
+	ssize_t pos = -1;
+
+	fuse_release_user_pages(req, !io->write);
+
+	if (io->write) {
+		if (req->misc.write.in.size != req->misc.write.out.size)
+			pos = req->misc.write.in.offset - io->offset +
+				req->misc.write.out.size;
+	} else {
+		if (req->misc.read.in.size != req->out.args[0].size)
+			pos = req->misc.read.in.offset - io->offset +
+				req->out.args[0].size;
+	}
+
+	fuse_aio_complete(io, req->out.h.error, pos);
+}
+
+static size_t fuse_async_req_send(struct fuse_conn *fc, struct fuse_req *req,
+		size_t num_bytes, struct fuse_io_priv *io)
+{
+	spin_lock(&io->lock);
+	io->size += num_bytes;
+	io->reqs++;
+	spin_unlock(&io->lock);
+
+	req->io = io;
+	req->end = fuse_aio_complete_req;
+
+	__fuse_get_request(req);
+	fuse_request_send_background(fc, req);
+
+	return num_bytes;
+}
+
+static size_t fuse_send_read(struct fuse_req *req, struct fuse_io_priv *io,
 			     loff_t pos, size_t count, fl_owner_t owner)
 {
+	struct file *file = io->file;
 	struct fuse_file *ff = file->private_data;
 	struct fuse_conn *fc = ff->fc;
 
@@ -504,6 +613,10 @@ static size_t fuse_send_read(struct fuse_req *req, struct file *file,
 		inarg->read_flags |= FUSE_READ_LOCKOWNER;
 		inarg->lock_owner = fuse_lock_owner_id(fc, owner);
 	}
+
+	if (io->async)
+		return fuse_async_req_send(fc, req, count, io);
+
 	fuse_request_send(fc, req);
 	return req->out.args[0].size;
 }
@@ -524,6 +637,7 @@ static void fuse_read_update_size(struct inode *inode, loff_t size,
 
 static int fuse_readpage(struct file *file, struct page *page)
 {
+	struct fuse_io_priv io = { .async = 0, .file = file };
 	struct inode *inode = page->mapping->host;
 	struct fuse_conn *fc = get_fuse_conn(inode);
 	struct fuse_req *req;
@@ -556,7 +670,7 @@ static int fuse_readpage(struct file *file, struct page *page)
 	req->num_pages = 1;
 	req->pages[0] = page;
 	req->page_descs[0].length = count;
-	num_read = fuse_send_read(req, file, pos, count, NULL);
+	num_read = fuse_send_read(req, &io, pos, count, NULL);
 	err = req->out.h.error;
 	fuse_put_request(fc, req);
 
@@ -661,7 +775,12 @@ static int fuse_readpages_fill(void *_data, struct page *page)
 		int nr_alloc = min_t(unsigned, data->nr_pages,
 				     FUSE_MAX_PAGES_PER_REQ);
 		fuse_send_readpages(req, data->file);
-		data->req = req = fuse_get_req(fc, nr_alloc);
+		if (fc->async_read)
+			req = fuse_get_req_for_background(fc, nr_alloc);
+		else
+			req = fuse_get_req(fc, nr_alloc);
+
+		data->req = req;
 		if (IS_ERR(req)) {
 			unlock_page(page);
 			return PTR_ERR(req);
@@ -696,7 +815,10 @@ static int fuse_readpages(struct file *file, struct address_space *mapping,
 
 	data.file = file;
 	data.inode = inode;
-	data.req = fuse_get_req(fc, nr_alloc);
+	if (fc->async_read)
+		data.req = fuse_get_req_for_background(fc, nr_alloc);
+	else
+		data.req = fuse_get_req(fc, nr_alloc);
 	data.nr_pages = nr_pages;
 	err = PTR_ERR(data.req);
 	if (IS_ERR(data.req))
@@ -758,9 +880,10 @@ static void fuse_write_fill(struct fuse_req *req, struct fuse_file *ff,
 	req->out.args[0].value = outarg;
 }
 
-static size_t fuse_send_write(struct fuse_req *req, struct file *file,
+static size_t fuse_send_write(struct fuse_req *req, struct fuse_io_priv *io,
 			      loff_t pos, size_t count, fl_owner_t owner)
 {
+	struct file *file = io->file;
 	struct fuse_file *ff = file->private_data;
 	struct fuse_conn *fc = ff->fc;
 	struct fuse_write_in *inarg = &req->misc.write.in;
@@ -771,6 +894,10 @@ static size_t fuse_send_write(struct fuse_req *req, struct file *file,
 		inarg->write_flags |= FUSE_WRITE_LOCKOWNER;
 		inarg->lock_owner = fuse_lock_owner_id(fc, owner);
 	}
+
+	if (io->async)
+		return fuse_async_req_send(fc, req, count, io);
+
 	fuse_request_send(fc, req);
 	return req->misc.write.out.size;
 }
@@ -794,11 +921,12 @@ static size_t fuse_send_write_pages(struct fuse_req *req, struct file *file,
 	size_t res;
 	unsigned offset;
 	unsigned i;
+	struct fuse_io_priv io = { .async = 0, .file = file };
 
 	for (i = 0; i < req->num_pages; i++)
 		fuse_wait_on_page_writeback(inode, req->pages[i]->index);
 
-	res = fuse_send_write(req, file, pos, count, NULL);
+	res = fuse_send_write(req, &io, pos, count, NULL);
 
 	offset = req->page_descs[0].offset;
 	count = res;
@@ -1033,18 +1161,6 @@ static ssize_t fuse_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
 	return written ? written : err;
 }
 
-static void fuse_release_user_pages(struct fuse_req *req, int write)
-{
-	unsigned i;
-
-	for (i = 0; i < req->num_pages; i++) {
-		struct page *page = req->pages[i];
-		if (write)
-			set_page_dirty_lock(page);
-		put_page(page);
-	}
-}
-
 static inline void fuse_page_descs_length_init(struct fuse_req *req,
 		unsigned index, unsigned nr_pages)
 {
@@ -1146,10 +1262,11 @@ static inline int fuse_iter_npages(const struct iov_iter *ii_p)
 	return min(npages, FUSE_MAX_PAGES_PER_REQ);
 }
 
-ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
+ssize_t fuse_direct_io(struct fuse_io_priv *io, const struct iovec *iov,
 		       unsigned long nr_segs, size_t count, loff_t *ppos,
 		       int write)
 {
+	struct file *file = io->file;
 	struct fuse_file *ff = file->private_data;
 	struct fuse_conn *fc = ff->fc;
 	size_t nmax = write ? fc->max_write : fc->max_read;
@@ -1175,11 +1292,12 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
 		}
 
 		if (write)
-			nres = fuse_send_write(req, file, pos, nbytes, owner);
+			nres = fuse_send_write(req, io, pos, nbytes, owner);
 		else
-			nres = fuse_send_read(req, file, pos, nbytes, owner);
+			nres = fuse_send_read(req, io, pos, nbytes, owner);
 
-		fuse_release_user_pages(req, !write);
+		if (!io->async)
+			fuse_release_user_pages(req, !write);
 		if (req->out.h.error) {
 			if (!res)
 				res = req->out.h.error;
@@ -1209,17 +1327,19 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
 }
 EXPORT_SYMBOL_GPL(fuse_direct_io);
 
-static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov,
-				  unsigned long nr_segs, loff_t *ppos)
+static ssize_t __fuse_direct_read(struct fuse_io_priv *io,
+				  const struct iovec *iov,
+				  unsigned long nr_segs, loff_t *ppos,
+				  size_t count)
 {
 	ssize_t res;
+	struct file *file = io->file;
 	struct inode *inode = file_inode(file);
 
 	if (is_bad_inode(inode))
 		return -EIO;
 
-	res = fuse_direct_io(file, iov, nr_segs, iov_length(iov, nr_segs),
-			     ppos, 0);
+	res = fuse_direct_io(io, iov, nr_segs, count, ppos, 0);
 
 	fuse_invalidate_attr(inode);
 
@@ -1229,23 +1349,23 @@ static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov,
 static ssize_t fuse_direct_read(struct file *file, char __user *buf,
 				     size_t count, loff_t *ppos)
 {
+	struct fuse_io_priv io = { .async = 0, .file = file };
 	struct iovec iov = { .iov_base = buf, .iov_len = count };
-	return __fuse_direct_read(file, &iov, 1, ppos);
+	return __fuse_direct_read(&io, &iov, 1, ppos, count);
 }
 
-static ssize_t __fuse_direct_write(struct file *file, const struct iovec *iov,
+static ssize_t __fuse_direct_write(struct fuse_io_priv *io,
+				   const struct iovec *iov,
 				   unsigned long nr_segs, loff_t *ppos)
 {
+	struct file *file = io->file;
 	struct inode *inode = file_inode(file);
 	size_t count = iov_length(iov, nr_segs);
 	ssize_t res;
 
 	res = generic_write_checks(file, ppos, &count, 0);
-	if (!res) {
-		res = fuse_direct_io(file, iov, nr_segs, count, ppos, 1);
-		if (res > 0)
-			fuse_write_update_size(inode, *ppos);
-	}
+	if (!res)
+		res = fuse_direct_io(io, iov, nr_segs, count, ppos, 1);
 
 	fuse_invalidate_attr(inode);
 
@@ -1258,13 +1378,16 @@ static ssize_t fuse_direct_write(struct file *file, const char __user *buf,
 	struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
 	struct inode *inode = file_inode(file);
 	ssize_t res;
+	struct fuse_io_priv io = { .async = 0, .file = file };
 
 	if (is_bad_inode(inode))
 		return -EIO;
 
 	/* Don't allow parallel writes to the same file */
 	mutex_lock(&inode->i_mutex);
-	res = __fuse_direct_write(file, &iov, 1, ppos);
+	res = __fuse_direct_write(&io, &iov, 1, ppos);
+	if (res > 0)
+		fuse_write_update_size(inode, *ppos);
 	mutex_unlock(&inode->i_mutex);
 
 	return res;
@@ -1373,6 +1496,7 @@ static int fuse_writepage_locked(struct page *page)
 	if (!req)
 		goto err;
 
+	req->background = 1; /* writeback always goes to bg_queue */
 	tmp_page = alloc_page(GFP_NOFS | __GFP_HIGHMEM);
 	if (!tmp_page)
 		goto err_free;
@@ -2226,21 +2350,93 @@ int fuse_notify_poll_wakeup(struct fuse_conn *fc,
 	return 0;
 }
 
+static void fuse_do_truncate(struct file *file)
+{
+	struct inode *inode = file->f_mapping->host;
+	struct iattr attr;
+
+	attr.ia_valid = ATTR_SIZE;
+	attr.ia_size = i_size_read(inode);
+
+	attr.ia_file = file;
+	attr.ia_valid |= ATTR_FILE;
+
+	fuse_do_setattr(inode, &attr, file);
+}
+
 static ssize_t
 fuse_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
 			loff_t offset, unsigned long nr_segs)
 {
 	ssize_t ret = 0;
-	struct file *file = NULL;
+	struct file *file = iocb->ki_filp;
+	struct fuse_file *ff = file->private_data;
 	loff_t pos = 0;
+	struct inode *inode;
+	loff_t i_size;
+	size_t count = iov_length(iov, nr_segs);
+	struct fuse_io_priv *io;
 
-	file = iocb->ki_filp;
 	pos = offset;
+	inode = file->f_mapping->host;
+	i_size = i_size_read(inode);
+
+	/* optimization for short read */
+	if (rw != WRITE && offset + count > i_size) {
+		if (offset >= i_size)
+			return 0;
+		count = i_size - offset;
+	}
+
+	io = kmalloc(sizeof(struct fuse_io_priv), GFP_KERNEL);
+	if (!io)
+		return -ENOMEM;
+	spin_lock_init(&io->lock);
+	io->reqs = 1;
+	io->bytes = -1;
+	io->size = 0;
+	io->offset = offset;
+	io->write = (rw == WRITE);
+	io->err = 0;
+	io->file = file;
+	/*
+	 * By default, we want to optimize all I/Os with async request
+	 * submission to the client filesystem if supported.
+	 */
+	io->async = ff->fc->async_dio;
+	io->iocb = iocb;
+
+	/*
+	 * We cannot asynchronously extend the size of a file. We have no method
+	 * to wait on real async I/O requests, so we must submit this request
+	 * synchronously.
+	 */
+	if (!is_sync_kiocb(iocb) && (offset + count > i_size))
+		io->async = false;
 
 	if (rw == WRITE)
-		ret = __fuse_direct_write(file, iov, nr_segs, &pos);
+		ret = __fuse_direct_write(io, iov, nr_segs, &pos);
 	else
-		ret = __fuse_direct_read(file, iov, nr_segs, &pos);
+		ret = __fuse_direct_read(io, iov, nr_segs, &pos, count);
+
+	if (io->async) {
+		fuse_aio_complete(io, ret < 0 ? ret : 0, -1);
+
+		/* we have a non-extending, async request, so return */
+		if (ret > 0 && !is_sync_kiocb(iocb))
+			return -EIOCBQUEUED;
+
+		ret = wait_on_sync_kiocb(iocb);
+	} else {
+		kfree(io);
+	}
+
+	if (rw == WRITE) {
+		if (ret > 0)
+			fuse_write_update_size(inode, pos);
+		else if (ret < 0 && offset + count > i_size)
+			fuse_do_truncate(file);
+	}
 
 	return ret;
 }
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 6aeba864f07060dd28ae113ae57d3c98cdf000a5..fde7249a3a9608c8c6e49be4316a1d155b7cdfba 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -228,6 +228,20 @@ enum fuse_req_state {
 	FUSE_REQ_FINISHED
 };
 
+/** The request IO state (for asynchronous processing) */
+struct fuse_io_priv {
+	int async;
+	spinlock_t lock;
+	unsigned reqs;
+	ssize_t bytes;
+	size_t size;
+	__u64 offset;
+	bool write;
+	int err;
+	struct kiocb *iocb;
+	struct file *file;
+};
+
 /**
  * A request to the client
  */
@@ -332,6 +346,9 @@ struct fuse_req {
 	/** Inode used in the request or NULL */
 	struct inode *inode;
 
+	/** AIO control block */
+	struct fuse_io_priv *io;
+
 	/** Link on fi->writepages */
 	struct list_head writepages_entry;
 
@@ -417,6 +434,10 @@ struct fuse_conn {
 	/** Batching of FORGET requests (positive indicates FORGET batch) */
 	int forget_batch;
 
+	/** Flag indicating that INIT reply has been received. Allocating
+	 * any fuse request will be suspended until the flag is set */
+	int initialized;
+
 	/** Flag indicating if connection is blocked.  This will be
 	    the case before the INIT reply is received, and if there
 	    are too many outstading backgrounds requests */
@@ -520,6 +541,9 @@ struct fuse_conn {
 	/** Does the filesystem want adaptive readdirplus? */
 	unsigned readdirplus_auto:1;
 
+	/** Does the filesystem support asynchronous direct-IO submission? */
+	unsigned async_dio:1;
+
 	/** The number of requests waiting for completion */
 	atomic_t num_waiting;
 
@@ -708,6 +732,13 @@ void fuse_request_free(struct fuse_req *req);
  * caller should specify # elements in req->pages[] explicitly
  */
 struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages);
+struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc,
+					     unsigned npages);
+
+/*
+ * Increment reference count on request
+ */
+void __fuse_get_request(struct fuse_req *req);
 
 /**
  * Get a request, may fail with -ENOMEM,
@@ -823,7 +854,7 @@ int fuse_reverse_inval_entry(struct super_block *sb, u64 parent_nodeid,
 
 int fuse_do_open(struct fuse_conn *fc, u64 nodeid, struct file *file,
 		 bool isdir);
-ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
+ssize_t fuse_direct_io(struct fuse_io_priv *io, const struct iovec *iov,
 		       unsigned long nr_segs, size_t count, loff_t *ppos,
 		       int write);
 long fuse_do_ioctl(struct file *file, unsigned int cmd, unsigned long arg,
@@ -835,4 +866,7 @@ int fuse_dev_release(struct inode *inode, struct file *file);
 
 void fuse_write_update_size(struct inode *inode, loff_t pos);
 
+int fuse_do_setattr(struct inode *inode, struct iattr *attr,
+		    struct file *file);
+
 #endif /* _FS_FUSE_I_H */
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 137185c3884fbbb9dfcab6115c78952d7aec7f08..6201f81e4d3a54bdc2d1d4a145253d7ab199baa9 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -346,6 +346,7 @@ static void fuse_send_destroy(struct fuse_conn *fc)
 		fc->destroy_req = NULL;
 		req->in.h.opcode = FUSE_DESTROY;
 		req->force = 1;
+		req->background = 0;
 		fuse_request_send(fc, req);
 		fuse_put_request(fc, req);
 	}
@@ -362,6 +363,7 @@ void fuse_conn_kill(struct fuse_conn *fc)
 	spin_lock(&fc->lock);
 	fc->connected = 0;
 	fc->blocked = 0;
+	fc->initialized = 1;
 	spin_unlock(&fc->lock);
 	/* Flush all readers on this fs */
 	kill_fasync(&fc->fasync, SIGIO, POLL_IN);
@@ -581,7 +583,8 @@ void fuse_conn_init(struct fuse_conn *fc)
 	fc->khctr = 0;
 	fc->polled_files = RB_ROOT;
 	fc->reqctr = 0;
-	fc->blocked = 1;
+	fc->blocked = 0;
+	fc->initialized = 0;
 	fc->attr_version = 1;
 	get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key));
 }
@@ -868,6 +871,8 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
 				fc->do_readdirplus = 1;
 			if (arg->flags & FUSE_READDIRPLUS_AUTO)
 				fc->readdirplus_auto = 1;
+			if (arg->flags & FUSE_ASYNC_DIO)
+				fc->async_dio = 1;
 		} else {
 			ra_pages = fc->max_read / PAGE_CACHE_SIZE;
 			fc->no_lock = 1;
@@ -880,7 +885,7 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
 		fc->max_write = max_t(unsigned, 4096, fc->max_write);
 		fc->conn_init = 1;
 	}
-	fc->blocked = 0;
+	fc->initialized = 1;
 	wake_up_all(&fc->blocked_waitq);
 }
 
@@ -895,7 +900,7 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req)
 		FUSE_EXPORT_SUPPORT | FUSE_BIG_WRITES | FUSE_DONT_MASK |
 		FUSE_SPLICE_WRITE | FUSE_SPLICE_MOVE | FUSE_SPLICE_READ |
 		FUSE_FLOCK_LOCKS | FUSE_IOCTL_DIR | FUSE_AUTO_INVAL_DATA |
-		FUSE_DO_READDIRPLUS | FUSE_READDIRPLUS_AUTO;
+		FUSE_DO_READDIRPLUS | FUSE_READDIRPLUS_AUTO | FUSE_ASYNC_DIO;
 	req->in.h.opcode = FUSE_INIT;
 	req->in.numargs = 1;
 	req->in.args[0].size = sizeof(*arg);
@@ -1043,6 +1048,7 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
 	init_req = fuse_request_alloc(0);
 	if (!init_req)
 		goto err_put_root;
+	init_req->background = 1;
 
 	if (is_bdev) {
 		fc->destroy_req = fuse_request_alloc(0);
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 706d035fa7488df73b142b1812c2c3feaa8708d3..60bb2f9f7b74272ae30e9119ded495100e6934b6 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -90,6 +90,9 @@
  * 7.21
  *  - add FUSE_READDIRPLUS
  *  - send the requested events in POLL request
+ *
+ * 7.22
+ *  - add FUSE_ASYNC_DIO
  */
 
 #ifndef _LINUX_FUSE_H
@@ -125,7 +128,7 @@
 #define FUSE_KERNEL_VERSION 7
 
 /** Minor version number of this interface */
-#define FUSE_KERNEL_MINOR_VERSION 21
+#define FUSE_KERNEL_MINOR_VERSION 22
 
 /** The node ID of the root inode */
 #define FUSE_ROOT_ID 1
@@ -215,6 +218,7 @@ struct fuse_file_lock {
  * FUSE_AUTO_INVAL_DATA: automatically invalidate cached pages
  * FUSE_DO_READDIRPLUS: do READDIRPLUS (READDIR+LOOKUP in one)
  * FUSE_READDIRPLUS_AUTO: adaptive readdirplus
+ * FUSE_ASYNC_DIO: asynchronous direct I/O submission
  */
 #define FUSE_ASYNC_READ		(1 << 0)
 #define FUSE_POSIX_LOCKS	(1 << 1)
@@ -231,6 +235,7 @@ struct fuse_file_lock {
 #define FUSE_AUTO_INVAL_DATA	(1 << 12)
 #define FUSE_DO_READDIRPLUS	(1 << 13)
 #define FUSE_READDIRPLUS_AUTO	(1 << 14)
+#define FUSE_ASYNC_DIO		(1 << 15)
 
 /**
  * CUSE INIT request/reply flags