Newer
Older
if (dlm_local_addr[0]->ss_family == AF_INET)
addr_len = sizeof(struct sockaddr_in);
else
addr_len = sizeof(struct sockaddr_in6);
/* Create a socket to communicate with */
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
IPPROTO_TCP, &sock);
log_print("Can't create listening comms socket");
goto create_out;
}
result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&one, sizeof(one));
log_print("Failed to set SO_REUSEADDR on socket: %d", result);
}
sock->sk->sk_user_data = con;
con->rx_action = tcp_accept_from_sock;
con->connect_action = tcp_connect_to_sock;
con->sock = sock;
/* Bind to our port */
make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
if (result < 0) {
log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
sock_release(sock);
sock = NULL;
con->sock = NULL;
goto create_out;
}
result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
}
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
sock_release(sock);
sock = NULL;
goto create_out;
}
/* Get local addresses */
static void init_local(void)
{
struct sockaddr_storage sas, *addr;
int i;
dlm_local_count = 0;
for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
if (dlm_our_addr(&sas, i))
break;
if (!addr)
break;
memcpy(addr, &sas, sizeof(*addr));
dlm_local_addr[dlm_local_count++] = addr;
}
}
/* Bind to an IP address. SCTP allows multiple address so it can do
multi-homing */
static int add_sctp_bind_addr(struct connection *sctp_con,
struct sockaddr_storage *addr,
int addr_len, int num)
{
int result = 0;
if (num == 1)
result = kernel_bind(sctp_con->sock,
(struct sockaddr *) addr,
addr_len);
else
result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)addr, addr_len);
if (result < 0)
log_print("Can't bind to port %d addr number %d",
dlm_config.ci_tcp_port, num);
return result;
}
/* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void)
{
struct socket *sock = NULL;
struct sockaddr_storage localaddr;
struct sctp_event_subscribe subscribe;
int result = -EINVAL, num = 1, i, addr_len;
struct connection *con = nodeid2con(0, GFP_NOFS);
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
int bufsize = NEEDED_RMEM;
if (!con)
return -ENOMEM;
log_print("Using SCTP for communications");
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
IPPROTO_SCTP, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
/* Listen for events */
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_data_io_event = 1;
subscribe.sctp_association_event = 1;
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_shutdown_event = 1;
subscribe.sctp_partial_delivery_event = 1;
result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
(char *)&bufsize, sizeof(bufsize));
if (result)
log_print("Error increasing buffer space on socket %d", result);
result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
if (result < 0) {
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
result);
goto create_delsock;
}
/* Init con struct */
sock->sk->sk_user_data = con;
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = receive_from_sock;
con->connect_action = sctp_init_assoc;
/* Bind to all interfaces. */
for (i = 0; i < dlm_local_count; i++) {
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
if (result)
goto create_delsock;
++num;
}
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't set socket listening");
goto create_delsock;
}
return 0;
create_delsock:
sock_release(sock);
con->sock = NULL;
out:
return result;
}
static int tcp_listen_for_all(void)
{
struct socket *sock = NULL;
struct connection *con = nodeid2con(0, GFP_NOFS);
int result = -EINVAL;
if (!con)
return -ENOMEM;
/* We don't support multi-homed hosts */
if (dlm_local_addr[1] != NULL) {
log_print("TCP protocol can't handle multi-homed hosts, "
"try SCTP");
return -EINVAL;
}
log_print("Using TCP for communications");
sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
if (sock) {
add_sock(sock, con);
result = 0;
}
else {
result = -EADDRINUSE;
}
return result;
}
static struct writequeue_entry *new_writequeue_entry(struct connection *con,
gfp_t allocation)
{
struct writequeue_entry *entry;
entry = kmalloc(sizeof(struct writequeue_entry), allocation);
if (!entry)
return NULL;
entry->page = alloc_page(allocation);
if (!entry->page) {
kfree(entry);
return NULL;
}
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->users = 0;
entry->con = con;
return entry;
}
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
struct connection *con;
struct writequeue_entry *e;
int offset = 0;
int users = 0;
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL;
spin_lock(&con->writequeue_lock);
e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
(PAGE_CACHE_SIZE - e->end < len)) {
e = NULL;
} else {
offset = e->end;
e->end += len;
users = e->users++;
}
spin_unlock(&con->writequeue_lock);
if (e) {
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
*ppc = page_address(e->page) + offset;
return e;
}
e = new_writequeue_entry(con, allocation);
if (e) {
spin_lock(&con->writequeue_lock);
offset = e->end;
e->end += len;
users = e->users++;
list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
goto got_one;
}
return NULL;
}
void dlm_lowcomms_commit_buffer(void *mh)
{
struct writequeue_entry *e = (struct writequeue_entry *)mh;
struct connection *con = e->con;
int users;
spin_lock(&con->writequeue_lock);
users = --e->users;
if (users)
goto out;
e->len = e->end - e->offset;
spin_unlock(&con->writequeue_lock);
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
queue_work(send_workqueue, &con->swork);
spin_unlock(&con->writequeue_lock);
return;
}
/* Send a message */
static void send_to_sock(struct connection *con)
{
int ret = 0;
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
int len, offset;
if (con->sock == NULL)
goto out_connect;
spin_lock(&con->writequeue_lock);
for (;;) {
e = list_entry(con->writequeue.next, struct writequeue_entry,
list);
if ((struct list_head *) e == &con->writequeue)
break;
len = e->len;
offset = e->offset;
BUG_ON(len == 0 && e->users == 0);
spin_unlock(&con->writequeue_lock);
ret = 0;
if (len) {
ret = kernel_sendpage(con->sock, e->page, offset, len,
msg_flags);
if (ret == -EAGAIN || ret == 0) {
cond_resched();
if (ret <= 0)
goto send_error;
/* Don't starve people filling buffers */
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
continue;
}
}
spin_unlock(&con->writequeue_lock);
lowcomms_connect_sock(con);
if (!test_bit(CF_INIT_PENDING, &con->flags))
lowcomms_connect_sock(con);
}
static void clean_one_writequeue(struct connection *con)
{
struct writequeue_entry *e, *safe;
spin_lock(&con->writequeue_lock);
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
list_del(&e->list);
free_entry(e);
}
spin_unlock(&con->writequeue_lock);
}
/* Called from recovery when it knows that a node has
left the cluster */
int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
log_print("closing connection to node %d", nodeid);
con = nodeid2con(nodeid, 0);
if (con) {
clear_bit(CF_CONNECT_PENDING, &con->flags);
clear_bit(CF_WRITE_PENDING, &con->flags);
set_bit(CF_CLOSE, &con->flags);
if (cancel_work_sync(&con->swork))
log_print("canceled swork for node %d", nodeid);
if (cancel_work_sync(&con->rwork))
log_print("canceled rwork for node %d", nodeid);
clean_one_writequeue(con);
}
return 0;
}
/* Receive workqueue function */
static void process_recv_sockets(struct work_struct *work)
struct connection *con = container_of(work, struct connection, rwork);
int err;
clear_bit(CF_READ_PENDING, &con->flags);
do {
err = con->rx_action(con);
} while (!err);
/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
struct connection *con = container_of(work, struct connection, swork);
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
set_bit(CF_WRITE_PENDING, &con->flags);
if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
send_to_sock(con);
}
/* Discard all entries on the write queues */
static void clean_writequeues(void)
{
foreach_conn(clean_one_writequeue);
destroy_workqueue(recv_workqueue);
destroy_workqueue(send_workqueue);
recv_workqueue = create_workqueue("dlm_recv");
error = IS_ERR(recv_workqueue);
log_print("can't start dlm_recv %d", error);
send_workqueue = create_singlethread_workqueue("dlm_send");
error = IS_ERR(send_workqueue);
log_print("can't start dlm_send %d", error);
destroy_workqueue(recv_workqueue);
return error;
}
return 0;
}
static void stop_conn(struct connection *con)
con->flags |= 0x0F;
con->sock->sk->sk_user_data = NULL;
}
static void free_conn(struct connection *con)
{
close_connection(con, true);
if (con->othercon)
kmem_cache_free(con_cache, con->othercon);
hlist_del(&con->list);
kmem_cache_free(con_cache, con);
}
void dlm_lowcomms_stop(void)
{
mutex_lock(&connections_lock);
foreach_conn(stop_conn);
mutex_unlock(&connections_lock);
mutex_lock(&connections_lock);
foreach_conn(free_conn);
mutex_unlock(&connections_lock);
kmem_cache_destroy(con_cache);
}
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
struct connection *con;
int i;
for (i = 0; i < CONN_HASH_SIZE; i++)
INIT_HLIST_HEAD(&connection_hash[i]);
init_local();
if (!dlm_local_count) {
log_print("no local IP address has been set");
con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
/* Start listening */
if (dlm_config.ci_protocol == 0)
error = tcp_listen_for_all();
else
error = sctp_listen_for_all();
if (error)
goto fail_unlisten;
if (error)
goto fail_unlisten;
return 0;
con = nodeid2con(0,0);
if (con) {
close_connection(con, false);
kmem_cache_free(con_cache, con);
}
kmem_cache_destroy(con_cache);