/* An entry waiting to be sent */
struct outqueue_entry {
- u32 evt;
+ bool inactive;
+ struct tipc_event evt;
struct list_head list;
- struct kvec iov;
};
static void tipc_recv_work(struct work_struct *work);
return con;
}
+/* sock_data_ready - interrupt callback indicating the socket has data to read
+ * The queued job is launched in tipc_recv_from_sock()
+ */
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
read_unlock_bh(&sk->sk_callback_lock);
}
+/* sock_write_space - interrupt callback after a sendmsg EAGAIN
+ * Indicates that there now is more is space in the send buffer
+ * The queued job is launched in tipc_send_to_sock()
+ */
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
return con;
}
-int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
- void *buf, size_t len)
+static int tipc_con_rcv_sub(struct tipc_server *srv,
+ struct tipc_conn *con,
+ struct tipc_subscr *s)
{
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
+ sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;
static int tipc_receive_from_sock(struct tipc_conn *con)
{
- struct tipc_server *s = con->server;
+ struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
+ struct tipc_subscr s;
struct kvec iov;
- void *buf;
int ret;
- buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
- if (!buf) {
- ret = -ENOMEM;
- goto out_close;
- }
-
- iov.iov_base = buf;
- iov.iov_len = s->max_rcvbuf_size;
+ iov.iov_base = &s;
+ iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
- if (ret <= 0) {
- kmem_cache_free(s->rcvbuf_cache, buf);
- goto out_close;
+ if (ret == -EWOULDBLOCK)
+ return -EWOULDBLOCK;
+ if (ret > 0) {
+ read_lock_bh(&sk->sk_callback_lock);
+ ret = tipc_con_rcv_sub(srv, con, &s);
+ read_unlock_bh(&sk->sk_callback_lock);
}
-
- read_lock_bh(&sk->sk_callback_lock);
- ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
- read_unlock_bh(&sk->sk_callback_lock);
- kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
- tipc_conn_terminate(s, con->conid);
- return ret;
-
-out_close:
- if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
- else if (ret == 0)
- /* Don't return success if we really got EOF */
- ret = -EAGAIN;
return ret;
}
return 0;
}
-static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
-{
- struct outqueue_entry *entry;
- void *buf;
-
- entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
- if (!entry)
- return NULL;
-
- buf = kmemdup(data, len, GFP_ATOMIC);
- if (!buf) {
- kfree(entry);
- return NULL;
- }
-
- entry->iov.iov_base = buf;
- entry->iov.iov_len = len;
-
- return entry;
-}
-
-static void tipc_free_entry(struct outqueue_entry *e)
-{
- kfree(e->iov.iov_base);
- kfree(e);
-}
-
static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;
spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len)
+/* tipc_conn_queue_evt - interrupt level call from a subscription instance
+ * The queued job is launched in tipc_send_to_sock()
+ */
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (!con)
- return -EINVAL;
+ return;
- if (!connected(con)) {
- conn_put(con);
- return 0;
- }
+ if (!connected(con))
+ goto err;
- e = tipc_alloc_entry(data, len);
- if (!e) {
- conn_put(con);
- return -ENOMEM;
- }
- e->evt = evt;
+ e = kmalloc(sizeof(*e), GFP_ATOMIC);
+ if (!e)
+ goto err;
+ e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
+ memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
- if (!queue_work(s->send_wq, &con->swork))
- conn_put(con);
- return 0;
-}
-
-void tipc_conn_terminate(struct tipc_server *s, int conid)
-{
- struct tipc_conn *con;
-
- con = tipc_conn_lookup(s, conid);
- if (con) {
- tipc_close_conn(con);
- conn_put(con);
- }
+ if (queue_work(s->send_wq, &con->swork))
+ return;
+err:
+ conn_put(con);
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
*conid = con->conid;
con->sock = NULL;
- rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
+ rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
+ struct kvec iov;
int count = 0;
int ret;
while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);
-
+ evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);
- if (e->evt == TIPC_SUBSCR_TIMEOUT) {
- evt = (struct tipc_event *)e->iov.iov_base;
+ if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
- }
+
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
+ iov.iov_base = evt;
+ iov.iov_len = sizeof(*evt);
+ msg.msg_name = NULL;
if (con->sock) {
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
+ ret = kernel_sendmsg(con->sock, &msg, &iov,
+ 1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
- goto send_err;
+ goto err;
}
} else {
- evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;
-
-send_err:
+err:
tipc_close_conn(con);
}
idr_init(&s->conn_idr);
s->idr_in_use = 0;
- s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
- 0, SLAB_HWCACHE_ALIGN, NULL);
- if (!s->rcvbuf_cache)
- return -ENOMEM;
-
ret = tipc_work_start(s);
- if (ret < 0) {
- kmem_cache_destroy(s->rcvbuf_cache);
+ if (ret < 0)
return ret;
- }
+
ret = tipc_open_listening_sock(s);
- if (ret < 0) {
+ if (ret < 0)
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
- return ret;
- }
+
return ret;
}
spin_unlock_bh(&s->idr_lock);
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}
static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper,
- u32 event, u32 port_ref, u32 node)
+ u32 event, u32 port, u32 node)
{
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct kvec msg_sect;
+ struct tipc_event *evt = &sub->evt;
+ bool swap = sub->swap;
if (sub->inactive)
return;
- msg_sect.iov_base = (void *)&sub->evt;
- msg_sect.iov_len = sizeof(struct tipc_event);
- sub->evt.event = htohl(event, sub->swap);
- sub->evt.found_lower = htohl(found_lower, sub->swap);
- sub->evt.found_upper = htohl(found_upper, sub->swap);
- sub->evt.port.ref = htohl(port_ref, sub->swap);
- sub->evt.port.node = htohl(node, sub->swap);
- tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
- msg_sect.iov_base, msg_sect.iov_len);
+ evt->event = htohl(event, swap);
+ evt->found_lower = htohl(found_lower, swap);
+ evt->found_upper = htohl(found_upper, swap);
+ evt->port.ref = htohl(port, swap);
+ evt->port.node = htohl(node, swap);
+ tipc_conn_queue_evt(sub->server, sub->conid, event, evt);
}
/**
static void tipc_subscrp_kref_release(struct kref *kref)
{
- struct tipc_subscription *sub = container_of(kref,
- struct tipc_subscription,
- kref);
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
+ struct tipc_subscription *sub;
+ struct tipc_net *tn;
+
+ sub = container_of(kref, struct tipc_subscription, kref);
+ tn = tipc_net(sub->server->net);
atomic_dec(&tn->subscription_count);
kfree(sub);
kref_get(&subscription->kref);
}
-static struct tipc_subscription *tipc_subscrp_create(struct net *net,
+static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub;
u32 filter = htohl(s->filter, swap);
}
/* Initialize subscription object */
- sub->net = net;
+ sub->server = srv;
sub->conid = conid;
sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
return sub;
}
-struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status)
struct tipc_subscription *sub = NULL;
u32 timeout;
- sub = tipc_subscrp_create(net, s, conid, swap);
+ sub = tipc_subscrp_create(srv, s, conid, swap);
if (!sub)
return NULL;