diff --git a/net/rds/ib.h b/net/rds/ib.h
index d64b5087eefe92c59ec56059cb1fb3e0e3282b2c..202140a84f0c2836615e6e15275e232607919bfd 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -53,8 +53,7 @@ struct rds_ib_connect_private {
 };
 
 struct rds_ib_send_work {
-	struct rds_message	*s_rm;
-	struct rm_rdma_op	*s_op;
+	void			*s_op;
 	struct ib_send_wr	s_wr;
 	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 	unsigned long		s_queued;
@@ -92,7 +91,7 @@ struct rds_ib_connection {
 
 	/* tx */
 	struct rds_ib_work_ring	i_send_ring;
-	struct rds_message	*i_rm;
+	struct rm_data_op	*i_data_op;
 	struct rds_header	*i_send_hdrs;
 	u64			i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
@@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
 			     u32 *adv_credits, int need_posted, int max_posted);
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm);
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
 
 /* ib_stats.c */
 DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 8b0c743c090062cb9aaea45bc43e7063a0eafdd9..b5b5ebbc0bb6758e50e15739fe38cbbbd9a6ac0a 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 	BUG_ON(ic->rds_ibdev);
 
 	/* Clear pending transmit */
-	if (ic->i_rm) {
-		rds_message_put(ic->i_rm);
-		ic->i_rm = NULL;
+	if (ic->i_data_op) {
+		struct rds_message *rm;
+
+		rm = container_of(ic->i_data_op, struct rds_message, data);
+		rds_message_put(rm);
+		ic->i_data_op = NULL;
 	}
 
 	/* Clear the ACK state */
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 95f15247acd72918737b2e7f17e698154e3254a8..6461a152bd5b27332c6016761b25f19600246595 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
 	complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
-			  struct rds_ib_send_work *send,
-			  int wc_status)
+static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
+				   struct rm_data_op *op,
+				   int wc_status)
 {
-	struct rds_message *rm = send->s_rm;
-
-	rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
-
-	ib_dma_unmap_sg(ic->i_cm_id->device,
-			rm->data.op_sg, rm->data.op_nents,
-			DMA_TO_DEVICE);
+	if (op->op_nents)
+		ib_dma_unmap_sg(ic->i_cm_id->device,
+				op->op_sg, op->op_nents,
+				DMA_TO_DEVICE);
+}
 
-	if (rm->rdma.op_active) {
-		struct rm_rdma_op *op = &rm->rdma;
+static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
+				   struct rm_rdma_op *op,
+				   int wc_status)
+{
+	if (op->op_mapped) {
+		ib_dma_unmap_sg(ic->i_cm_id->device,
+				op->op_sg, op->op_nents,
+				op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+		op->op_mapped = 0;
+	}
 
-		if (op->op_mapped) {
-			ib_dma_unmap_sg(ic->i_cm_id->device,
-					op->op_sg, op->op_nents,
-					op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
-			op->op_mapped = 0;
-		}
+	/* If the user asked for a completion notification on this
+	 * message, we can implement three different semantics:
+	 *  1.	Notify when we received the ACK on the RDS message
+	 *	that was queued with the RDMA. This provides reliable
+	 *	notification of RDMA status at the expense of a one-way
+	 *	packet delay.
+	 *  2.	Notify when the IB stack gives us the completion event for
+	 *	the RDMA operation.
+	 *  3.	Notify when the IB stack gives us the completion event for
+	 *	the accompanying RDS messages.
+	 * Here, we implement approach #3. To implement approach #2,
+	 * we would need to take an event for the rdma WR. To implement #1,
+	 * don't call rds_rdma_send_complete at all, and fall back to the notify
+	 * handling in the ACK processing code.
+	 *
+	 * Note: There's no need to explicitly sync any RDMA buffers using
+	 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
+	 * operation itself unmapped the RDMA buffers, which takes care
+	 * of synching.
+	 */
+	rds_ib_send_complete(container_of(op, struct rds_message, rdma),
+			     wc_status, rds_rdma_send_complete);
 
-		/* If the user asked for a completion notification on this
-		 * message, we can implement three different semantics:
-		 *  1.	Notify when we received the ACK on the RDS message
-		 *	that was queued with the RDMA. This provides reliable
-		 *	notification of RDMA status at the expense of a one-way
-		 *	packet delay.
-		 *  2.	Notify when the IB stack gives us the completion event for
-		 *	the RDMA operation.
-		 *  3.	Notify when the IB stack gives us the completion event for
-		 *	the accompanying RDS messages.
-		 * Here, we implement approach #3. To implement approach #2,
-		 * call rds_rdma_send_complete from the cq_handler. To implement #1,
-		 * don't call rds_rdma_send_complete at all, and fall back to the notify
-		 * handling in the ACK processing code.
-		 *
-		 * Note: There's no need to explicitly sync any RDMA buffers using
-		 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
-		 * operation itself unmapped the RDMA buffers, which takes care
-		 * of synching.
-		 */
-		rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
+	if (op->op_write)
+		rds_stats_add(s_send_rdma_bytes, op->op_bytes);
+	else
+		rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
+}
 
-		if (rm->rdma.op_write)
-			rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
-		else
-			rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
+static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
+				     struct rm_atomic_op *op,
+				     int wc_status)
+{
+	/* unmap atomic recvbuf */
+	if (op->op_mapped) {
+		ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
+				DMA_FROM_DEVICE);
+		op->op_mapped = 0;
 	}
 
-	if (rm->atomic.op_active) {
-		struct rm_atomic_op *op = &rm->atomic;
-
-		/* unmap atomic recvbuf */
-		if (op->op_mapped) {
-			ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
-					DMA_FROM_DEVICE);
-			op->op_mapped = 0;
-		}
+	rds_ib_send_complete(container_of(op, struct rds_message, atomic),
+			     wc_status, rds_atomic_send_complete);
 
-		rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete);
+	if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
+		rds_stats_inc(s_atomic_cswp);
+	else
+		rds_stats_inc(s_atomic_fadd);
+}
 
-		if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
-			rds_stats_inc(s_atomic_cswp);
-		else
-			rds_stats_inc(s_atomic_fadd);
+/*
+ * Unmap the resources associated with a struct send_work.
+ *
+ * Returns the rm for no good reason other than it is unobtainable
+ * other than by switching on wr.opcode, currently, and the caller,
+ * the event handler, needs it.
+ */
+static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
+						struct rds_ib_send_work *send,
+						int wc_status)
+{
+	struct rds_message *rm = NULL;
+
+	/* In the error case, wc.opcode sometimes contains garbage */
+	switch (send->s_wr.opcode) {
+	case IB_WR_SEND:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, data);
+			rds_ib_send_unmap_data(ic, send->s_op, wc_status);
+		}
+		break;
+	case IB_WR_RDMA_WRITE:
+	case IB_WR_RDMA_READ:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, rdma);
+			rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
+		}
+		break;
+	case IB_WR_ATOMIC_FETCH_AND_ADD:
+	case IB_WR_ATOMIC_CMP_AND_SWP:
+		if (send->s_op) {
+			rm = container_of(send->s_op, struct rds_message, atomic);
+			rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
+		}
+		break;
+	default:
+		if (printk_ratelimit())
+			printk(KERN_NOTICE
+			       "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
+			       __func__, send->s_wr.opcode);
+		break;
 	}
 
-	/* If anyone waited for this message to get flushed out, wake
-	 * them up now */
-	rds_message_unmapped(rm);
+	send->s_wr.opcode = 0xdead;
 
-	rds_message_put(rm);
-	send->s_rm = NULL;
+	return rm;
 }
 
 void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 		struct ib_sge *sge;
 
-		send->s_rm = NULL;
 		send->s_op = NULL;
 
 		send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
 	u32 i;
 
 	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
-		if (!send->s_rm || send->s_wr.opcode == 0xdead)
-			continue;
-		rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
+		if (send->s_op && send->s_wr.opcode != 0xdead)
+			rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
 	}
 }
 
@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 {
 	struct rds_connection *conn = context;
 	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_message *rm = NULL;
 	struct ib_wc wc;
 	struct rds_ib_send_work *send;
 	u32 completed;
@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 		for (i = 0; i < completed; i++) {
 			send = &ic->i_sends[oldest];
 
-			/* In the error case, wc.opcode sometimes contains garbage */
-			switch (send->s_wr.opcode) {
-			case IB_WR_SEND:
-			case IB_WR_RDMA_WRITE:
-			case IB_WR_RDMA_READ:
-			case IB_WR_ATOMIC_FETCH_AND_ADD:
-			case IB_WR_ATOMIC_CMP_AND_SWP:
-				if (send->s_rm)
-					rds_ib_send_unmap_rm(ic, send, wc.status);
-				break;
-			default:
-				if (printk_ratelimit())
-					printk(KERN_NOTICE
-						"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
-						__func__, send->s_wr.opcode);
-				break;
-			}
+			rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
-			send->s_wr.opcode = 0xdead;
-			send->s_wr.num_sge = 1;
 			if (send->s_queued + HZ/2 < jiffies)
 				rds_ib_stats_inc(s_ib_tx_stalled);
 
-			/* If a RDMA operation produced an error, signal this right
-			 * away. If we don't, the subsequent SEND that goes with this
-			 * RDMA will be canceled with ERR_WFLUSH, and the application
-			 * never learn that the RDMA failed. */
-			if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
-				struct rds_message *rm;
-
-				rm = rds_send_get_message(conn, send->s_op);
-				if (rm) {
-					rds_ib_send_unmap_rm(ic, send, wc.status);
-					rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
-					rds_message_put(rm);
-				}
+			if (&send->s_op == &rm->m_final_op) {
+				/* If anyone waited for this message to get flushed out, wake
+				 * them up now */
+				rds_message_unmapped(rm);
+
+				rds_message_put(rm);
+				send->s_op = NULL;
 			}
 
 			oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	}
 
 	/* map the message the first time we see it */
-	if (!ic->i_rm) {
+	if (!ic->i_data_op) {
 		if (rm->data.op_nents) {
 			rm->data.op_count = ib_dma_map_sg(dev,
 							  rm->data.op_sg,
@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		}
 
 		rds_message_addref(rm);
-		ic->i_rm = rm;
+		ic->i_data_op = &rm->data;
 
 		/* Finalize the header */
 		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	send = &ic->i_sends[pos];
 	first = send;
 	prev = NULL;
-	scat = &rm->data.op_sg[sg];
+	scat = &ic->i_data_op->op_sg[sg];
 	i = 0;
 	do {
 		unsigned int len = 0;
@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 
 	/* if we finished the message then send completion owns it */
 	if (scat == &rm->data.op_sg[rm->data.op_count]) {
-		prev->s_rm = ic->i_rm;
+		prev->s_op = ic->i_data_op;
 		prev->s_wr.send_flags |= IB_SEND_SOLICITED;
-		ic->i_rm = NULL;
+		ic->i_data_op = NULL;
 	}
 
 	/* Put back wrs & credits we didn't use */
@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
-		if (prev->s_rm) {
-			ic->i_rm = prev->s_rm;
-			prev->s_rm = NULL;
+		if (prev->s_op) {
+			ic->i_data_op = prev->s_op;
+			prev->s_op = NULL;
 		}
 
 		rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +718,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
  * A simplified version of the rdma case, we always map 1 SG, and
  * only 8 bytes, for the return value from the atomic operation.
  */
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rm_atomic_op *op = &rm->atomic;
 	struct rds_ib_send_work *send = NULL;
 	struct ib_send_wr *failed_wr;
 	struct rds_ib_device *rds_ibdev;
@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
 	send->s_wr.wr.atomic.rkey = op->op_rkey;
 
-	/*
-	 * If there is no data or rdma ops in the message, then
-	 * we must fill in s_rm ourselves, so we properly clean up
-	 * on completion.
-	 */
-	if (!rm->rdma.op_active && !rm->data.op_active)
-		send->s_rm = rm;
-
 	/* map 8 byte retval buffer to the device */
 	ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
 	rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
-	/* map the message the first time we see it */
+	/* map the op the first time we see it */
 	if (!op->op_mapped) {
 		op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
 					     op->op_sg, op->op_nents, (op->op_write) ?
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 23b921000e740399571fced384bbcbddfb182799..7291f006f364ab60060aea67873f6dcfcbb398d3 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -308,6 +308,8 @@ struct rds_message {
 	unsigned int		m_used_sgs;
 	unsigned int		m_total_sgs;
 
+	void			*m_final_op;
+
 	struct {
 		struct rm_atomic_op {
 			int			op_type;
@@ -421,7 +423,7 @@ struct rds_transport {
 	int (*xmit_cong_map)(struct rds_connection *conn,
 			     struct rds_cong_map *map, unsigned long offset);
 	int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
-	int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm);
+	int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
 	int (*recv)(struct rds_connection *conn);
 	int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
 				size_t size);
diff --git a/net/rds/send.c b/net/rds/send.c
index 69ab1040d02dd14ece11133d958881d0484db631..d1f364e44e36e6496e055c70e001b5f8023d4e15 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn)
 
 		/* The transport either sends the whole rdma or none of it */
 		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+			rm->m_final_op = &rm->rdma;
 			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 			if (ret)
 				break;
@@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn)
 		}
 
 		if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
-			ret = conn->c_trans->xmit_atomic(conn, rm);
+			rm->m_final_op = &rm->atomic;
+			ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
 			if (ret)
 				break;
 			conn->c_xmit_atomic_sent = 1;
+
 			/* The transport owns the mapped memory for now.
 			 * You can't unmap it while it's on the send queue */
 			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
@@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn)
 		}
 
 		if (rm->data.op_active && !conn->c_xmit_data_sent) {
+			rm->m_final_op = &rm->data;
 			ret = conn->c_trans->xmit(conn, rm,
 						  conn->c_xmit_hdr_off,
 						  conn->c_xmit_sg,