Newer
Older
addr = kmalloc(sizeof(*addr), GFP_KERNEL);
if (!addr)
break;
memcpy(addr, &sas, sizeof(*addr));
dlm_local_addr[dlm_local_count++] = addr;
}
}
/* Bind to an IP address. SCTP allows multiple address so it can do
multi-homing */
static int add_sctp_bind_addr(struct connection *sctp_con,
struct sockaddr_storage *addr,
int addr_len, int num)
{
int result = 0;
if (num == 1)
result = kernel_bind(sctp_con->sock,
(struct sockaddr *) addr,
addr_len);
else
result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
SCTP_SOCKOPT_BINDX_ADD,
(char *)addr, addr_len);
if (result < 0)
log_print("Can't bind to port %d addr number %d",
dlm_config.ci_tcp_port, num);
return result;
}
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
/* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void)
{
struct socket *sock = NULL;
struct sockaddr_storage localaddr;
struct sctp_event_subscribe subscribe;
int result = -EINVAL, num = 1, i, addr_len;
struct connection *con = nodeid2con(0, GFP_KERNEL);
int bufsize = NEEDED_RMEM;
if (!con)
return -ENOMEM;
log_print("Using SCTP for communications");
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
IPPROTO_SCTP, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
/* Listen for events */
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_data_io_event = 1;
subscribe.sctp_association_event = 1;
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_shutdown_event = 1;
subscribe.sctp_partial_delivery_event = 1;
result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
(char *)&bufsize, sizeof(bufsize));
if (result)
log_print("Error increasing buffer space on socket %d", result);
result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
if (result < 0) {
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
result);
goto create_delsock;
}
/* Init con struct */
sock->sk->sk_user_data = con;
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = receive_from_sock;
con->connect_action = sctp_init_assoc;
/* Bind to all interfaces. */
for (i = 0; i < dlm_local_count; i++) {
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
if (result)
goto create_delsock;
++num;
}
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't set socket listening");
goto create_delsock;
}
return 0;
create_delsock:
sock_release(sock);
con->sock = NULL;
out:
return result;
}
static int tcp_listen_for_all(void)
{
struct socket *sock = NULL;
struct connection *con = nodeid2con(0, GFP_KERNEL);
int result = -EINVAL;
if (!con)
return -ENOMEM;
/* We don't support multi-homed hosts */
if (dlm_local_addr[1] != NULL) {
log_print("TCP protocol can't handle multi-homed hosts, "
"try SCTP");
return -EINVAL;
}
log_print("Using TCP for communications");
set_bit(CF_IS_OTHERCON, &con->flags);
sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
if (sock) {
add_sock(sock, con);
result = 0;
}
else {
result = -EADDRINUSE;
}
return result;
}
static struct writequeue_entry *new_writequeue_entry(struct connection *con,
gfp_t allocation)
{
struct writequeue_entry *entry;
entry = kmalloc(sizeof(struct writequeue_entry), allocation);
if (!entry)
return NULL;
entry->page = alloc_page(allocation);
if (!entry->page) {
kfree(entry);
return NULL;
}
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->users = 0;
entry->con = con;
return entry;
}
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
struct connection *con;
struct writequeue_entry *e;
int offset = 0;
int users = 0;
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL;
spin_lock(&con->writequeue_lock);
e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
(PAGE_CACHE_SIZE - e->end < len)) {
e = NULL;
} else {
offset = e->end;
e->end += len;
users = e->users++;
}
spin_unlock(&con->writequeue_lock);
if (e) {
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
if (users == 0)
kmap(e->page);
*ppc = page_address(e->page) + offset;
return e;
}
e = new_writequeue_entry(con, allocation);
if (e) {
spin_lock(&con->writequeue_lock);
offset = e->end;
e->end += len;
users = e->users++;
list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
goto got_one;
}
return NULL;
}
void dlm_lowcomms_commit_buffer(void *mh)
{
struct writequeue_entry *e = (struct writequeue_entry *)mh;
struct connection *con = e->con;
int users;
spin_lock(&con->writequeue_lock);
users = --e->users;
if (users)
goto out;
e->len = e->end - e->offset;
kunmap(e->page);
spin_unlock(&con->writequeue_lock);
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
queue_work(send_workqueue, &con->swork);
spin_unlock(&con->writequeue_lock);
return;
}
/* Send a message */
static void send_to_sock(struct connection *con)
{
int ret = 0;
ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
int len, offset;
if (con->sock == NULL)
goto out_connect;
sendpage = con->sock->ops->sendpage;
spin_lock(&con->writequeue_lock);
for (;;) {
e = list_entry(con->writequeue.next, struct writequeue_entry,
list);
if ((struct list_head *) e == &con->writequeue)
break;
len = e->len;
offset = e->offset;
BUG_ON(len == 0 && e->users == 0);
spin_unlock(&con->writequeue_lock);
ret = 0;
if (len) {
ret = sendpage(con->sock, e->page, offset, len,
msg_flags);
if (ret == -EAGAIN || ret == 0)
goto out;
if (ret <= 0)
goto send_error;
/* Don't starve people filling buffers */
}
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
continue;
}
}
spin_unlock(&con->writequeue_lock);
lowcomms_connect_sock(con);
if (!test_bit(CF_INIT_PENDING, &con->flags))
lowcomms_connect_sock(con);
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
}
static void clean_one_writequeue(struct connection *con)
{
struct list_head *list;
struct list_head *temp;
spin_lock(&con->writequeue_lock);
list_for_each_safe(list, temp, &con->writequeue) {
struct writequeue_entry *e =
list_entry(list, struct writequeue_entry, list);
list_del(&e->list);
free_entry(e);
}
spin_unlock(&con->writequeue_lock);
}
/* Called from recovery when it knows that a node has
left the cluster */
int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
log_print("closing connection to node %d", nodeid);
con = nodeid2con(nodeid, 0);
if (con) {
clean_one_writequeue(con);
}
return 0;
}
/* Receive workqueue function */
static void process_recv_sockets(struct work_struct *work)
struct connection *con = container_of(work, struct connection, rwork);
int err;
clear_bit(CF_READ_PENDING, &con->flags);
do {
err = con->rx_action(con);
} while (!err);
/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
struct connection *con = container_of(work, struct connection, swork);
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
clear_bit(CF_WRITE_PENDING, &con->flags);
send_to_sock(con);
}
/* Discard all entries on the write queues */
static void clean_writequeues(void)
{
int nodeid;
for (nodeid = 1; nodeid <= max_nodeid; nodeid++) {
struct connection *con = __nodeid2con(nodeid, 0);
if (con)
clean_one_writequeue(con);
}
}
destroy_workqueue(recv_workqueue);
destroy_workqueue(send_workqueue);
recv_workqueue = create_workqueue("dlm_recv");
error = IS_ERR(recv_workqueue);
log_print("can't start dlm_recv %d", error);
send_workqueue = create_singlethread_workqueue("dlm_send");
error = IS_ERR(send_workqueue);
log_print("can't start dlm_send %d", error);
destroy_workqueue(recv_workqueue);
return error;
}
return 0;
}
void dlm_lowcomms_stop(void)
{
int i;
for (i = 0; i <= max_nodeid; i++) {
if (con->sock)
con->sock->sk->sk_user_data = NULL;
}
down(&connections_lock);
for (i = 0; i <= max_nodeid; i++) {
if (con) {
close_connection(con, true);
if (con->othercon)
kmem_cache_free(con_cache, con->othercon);
kmem_cache_free(con_cache, con);
kmem_cache_destroy(con_cache);
idr_init(&connections_idr);
}
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
struct connection *con;
init_local();
if (!dlm_local_count) {
log_print("no local IP address has been set");
con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
/* Set some sysctl minima */
if (sysctl_rmem_max < NEEDED_RMEM)
sysctl_rmem_max = NEEDED_RMEM;
/* Start listening */
if (dlm_config.ci_protocol == 0)
error = tcp_listen_for_all();
else
error = sctp_listen_for_all();
if (error)
goto fail_unlisten;
if (error)
goto fail_unlisten;
return 0;
con = nodeid2con(0,0);
if (con) {
close_connection(con, false);
kmem_cache_free(con_cache, con);
}
kmem_cache_destroy(con_cache);