Skip to content
Snippets Groups Projects
tcp.c 58.1 KiB
Newer Older
	INIT_WORK(&o2net_listen_work, o2net_accept_many);
	sock->sk->sk_reuse = SK_CAN_REUSE;
	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
	if (ret < 0) {
		printk(KERN_ERR "o2net: Error %d while binding socket at "
		       "%pI4:%u\n", ret, &addr, ntohs(port)); 
		goto out;
	}

	ret = sock->ops->listen(sock, 64);
	if (ret < 0)
		printk(KERN_ERR "o2net: Error %d while listening on %pI4:%u\n",
		       ret, &addr, ntohs(port));

out:
	if (ret) {
		o2net_listen_sock = NULL;
		if (sock)
			sock_release(sock);
	}
	return ret;
}

/*
 * called from node manager when we should bring up our network listening
 * socket.  node manager handles all the serialization to only call this
 * once and to match it with o2net_stop_listening().  note,
 * o2nm_this_node() doesn't work yet as we're being called while it
 * is being set up.
 */
int o2net_start_listening(struct o2nm_node *node)
{
	int ret = 0;

	BUG_ON(o2net_wq != NULL);
	BUG_ON(o2net_listen_sock != NULL);

	mlog(ML_KTHREAD, "starting o2net thread...\n");
	o2net_wq = create_singlethread_workqueue("o2net");
	if (o2net_wq == NULL) {
		mlog(ML_ERROR, "unable to launch o2net thread\n");
		return -ENOMEM; /* ? */
	}

	ret = o2net_open_listening_sock(node->nd_ipv4_address,
					node->nd_ipv4_port);
	if (ret) {
		destroy_workqueue(o2net_wq);
		o2net_wq = NULL;
	} else
		o2quo_conn_up(node->nd_num);

	return ret;
}

/* again, o2nm_this_node() doesn't work here as we're involved in
 * tearing it down */
void o2net_stop_listening(struct o2nm_node *node)
{
	struct socket *sock = o2net_listen_sock;
	size_t i;

	BUG_ON(o2net_wq == NULL);
	BUG_ON(o2net_listen_sock == NULL);

	/* stop the listening socket from generating work */
	write_lock_bh(&sock->sk->sk_callback_lock);
	sock->sk->sk_data_ready = sock->sk->sk_user_data;
	sock->sk->sk_user_data = NULL;
	write_unlock_bh(&sock->sk->sk_callback_lock);

	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
		struct o2nm_node *node = o2nm_get_node_by_num(i);
		if (node) {
			o2net_disconnect_node(node);
			o2nm_node_put(node);
		}
	}

	/* finish all work and tear down the work queue */
	mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
	destroy_workqueue(o2net_wq);
	o2net_wq = NULL;

	sock_release(o2net_listen_sock);
	o2net_listen_sock = NULL;

	o2quo_conn_err(node->nd_num);
}

/* ------------------------------------------------------------ */

int o2net_init(void)
{
	unsigned long i;

	o2quo_init();

	if (o2net_debugfs_init())
		return -ENOMEM;

	o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
	o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
	o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
	if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
		kfree(o2net_hand);
		kfree(o2net_keep_req);
		kfree(o2net_keep_resp);
		return -ENOMEM;
	}

	o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
	o2net_hand->connector_id = cpu_to_be64(1);

	o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC);
	o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC);

	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
		struct o2net_node *nn = o2net_nn_from_num(i);

		atomic_set(&nn->nn_timeout, 0);
		spin_lock_init(&nn->nn_lock);
		INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
		INIT_DELAYED_WORK(&nn->nn_connect_expired,
				  o2net_connect_expired);
		INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
		/* until we see hb from a node we'll return einval */
		nn->nn_persistent_error = -ENOTCONN;
		init_waitqueue_head(&nn->nn_sc_wq);
		idr_init(&nn->nn_status_idr);
		INIT_LIST_HEAD(&nn->nn_status_list);
	}

	return 0;
}

void o2net_exit(void)
{
	o2quo_exit();
	kfree(o2net_hand);
	kfree(o2net_keep_req);
	kfree(o2net_keep_resp);
	o2net_debugfs_exit();