aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c196
1 files changed, 163 insertions, 33 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 78b55f4..e85a8d2 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -11,12 +11,14 @@
#include <linux/string.h>
#include <linux/bio.h>
#include <linux/blkdev.h>
+#include <linux/dns_resolver.h>
#include <net/tcp.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
+#include <linux/export.h>
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
@@ -97,7 +99,12 @@ struct workqueue_struct *ceph_msgr_wq;
int ceph_msgr_init(void)
{
- ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+ /*
+ * The number of active work items is limited by the number of
+ * connections, so leave @max_active at default.
+ */
+ ceph_msgr_wq = alloc_workqueue("ceph-msgr",
+ WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
if (!ceph_msgr_wq) {
pr_err("msgr_init failed to create workqueue\n");
return -ENOMEM;
@@ -282,6 +289,37 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
return r;
}
+static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
+ int offset, size_t size, bool more)
+{
+ int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
+ int ret;
+
+ ret = kernel_sendpage(sock, page, offset, size, flags);
+ if (ret == -EAGAIN)
+ ret = 0;
+
+ return ret;
+}
+
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+ int offset, size_t size, bool more)
+{
+ int ret;
+ struct kvec iov;
+
+ /* sendpage cannot properly handle pages with page_count == 0,
+ * we need to fallback to sendmsg if that's the case */
+ if (page_count(page) >= 1)
+ return __ceph_tcp_sendpage(sock, page, offset, size, more);
+
+ iov.iov_base = kmap(page) + offset;
+ iov.iov_len = size;
+ ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more);
+ kunmap(page);
+
+ return ret;
+}
/*
* Shutdown/close the socket for the given connection.
@@ -486,13 +524,10 @@ static void prepare_write_message(struct ceph_connection *con)
m = list_first_entry(&con->out_queue,
struct ceph_msg, list_head);
con->out_msg = m;
- if (test_bit(LOSSYTX, &con->state)) {
- list_del_init(&m->list_head);
- } else {
- /* put message on sent list */
- ceph_msg_get(m);
- list_move_tail(&m->list_head, &con->out_sent);
- }
+
+ /* put message on sent list */
+ ceph_msg_get(m);
+ list_move_tail(&m->list_head, &con->out_sent);
/*
* only assign outgoing seq # if we haven't sent this message
@@ -852,18 +887,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
}
- ret = kernel_sendpage(con->sock, page,
+ ret = ceph_tcp_sendpage(con->sock, page,
con->out_msg_pos.page_pos + page_shift,
- len,
- MSG_DONTWAIT | MSG_NOSIGNAL |
- MSG_MORE);
+ len, 1);
if (crc &&
(msg->pages || msg->pagelist || msg->bio || in_trail))
kunmap(page);
- if (ret == -EAGAIN)
- ret = 0;
if (ret <= 0)
goto out;
@@ -1081,6 +1112,101 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
}
/*
+ * Unlike other *_pton function semantics, zero indicates success.
+ */
+static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
+ char delim, const char **ipend)
+{
+ struct sockaddr_in *in4 = (void *)ss;
+ struct sockaddr_in6 *in6 = (void *)ss;
+
+ memset(ss, 0, sizeof(*ss));
+
+ if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
+ ss->ss_family = AF_INET;
+ return 0;
+ }
+
+ if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
+ ss->ss_family = AF_INET6;
+ return 0;
+ }
+
+ return -EINVAL;
+}
+
+/*
+ * Extract hostname string and resolve using kernel DNS facility.
+ */
+#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
+static int ceph_dns_resolve_name(const char *name, size_t namelen,
+ struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+ const char *end, *delim_p;
+ char *colon_p, *ip_addr = NULL;
+ int ip_len, ret;
+
+ /*
+ * The end of the hostname occurs immediately preceding the delimiter or
+ * the port marker (':') where the delimiter takes precedence.
+ */
+ delim_p = memchr(name, delim, namelen);
+ colon_p = memchr(name, ':', namelen);
+
+ if (delim_p && colon_p)
+ end = delim_p < colon_p ? delim_p : colon_p;
+ else if (!delim_p && colon_p)
+ end = colon_p;
+ else {
+ end = delim_p;
+ if (!end) /* case: hostname:/ */
+ end = name + namelen;
+ }
+
+ if (end <= name)
+ return -EINVAL;
+
+ /* do dns_resolve upcall */
+ ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
+ if (ip_len > 0)
+ ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
+ else
+ ret = -ESRCH;
+
+ kfree(ip_addr);
+
+ *ipend = end;
+
+ pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
+ ret, ret ? "failed" : ceph_pr_addr(ss));
+
+ return ret;
+}
+#else
+static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
+ struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+ return -EINVAL;
+}
+#endif
+
+/*
+ * Parse a server name (IP or hostname). If a valid IP address is not found
+ * then try to extract a hostname to resolve using userspace DNS upcall.
+ */
+static int ceph_parse_server_name(const char *name, size_t namelen,
+ struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+ int ret;
+
+ ret = ceph_pton(name, namelen, ss, delim, ipend);
+ if (ret)
+ ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
+
+ return ret;
+}
+
+/*
* Parse an ip[:port] list into an addr array. Use the default
* monitor port if a port isn't specified.
*/
@@ -1088,15 +1214,13 @@ int ceph_parse_ips(const char *c, const char *end,
struct ceph_entity_addr *addr,
int max_count, int *count)
{
- int i;
+ int i, ret = -EINVAL;
const char *p = c;
dout("parse_ips on '%.*s'\n", (int)(end-c), c);
for (i = 0; i < max_count; i++) {
const char *ipend;
struct sockaddr_storage *ss = &addr[i].in_addr;
- struct sockaddr_in *in4 = (void *)ss;
- struct sockaddr_in6 *in6 = (void *)ss;
int port;
char delim = ',';
@@ -1105,15 +1229,11 @@ int ceph_parse_ips(const char *c, const char *end,
p++;
}
- memset(ss, 0, sizeof(*ss));
- if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
- delim, &ipend))
- ss->ss_family = AF_INET;
- else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
- delim, &ipend))
- ss->ss_family = AF_INET6;
- else
+ ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
+ if (ret)
goto bad;
+ ret = -EINVAL;
+
p = ipend;
if (delim == ']') {
@@ -1158,7 +1278,7 @@ int ceph_parse_ips(const char *c, const char *end,
bad:
pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
- return -EINVAL;
+ return ret;
}
EXPORT_SYMBOL(ceph_parse_ips);
@@ -1399,6 +1519,7 @@ static void process_ack(struct ceph_connection *con)
break;
dout("got ack for seq %llu type %d at %p\n", seq,
le16_to_cpu(m->hdr.type), m);
+ m->ack_stamp = jiffies;
ceph_msg_remove(m);
}
prepare_read_tag(con);
@@ -2283,7 +2404,8 @@ EXPORT_SYMBOL(ceph_con_keepalive);
* construct a new message with given type, size
* the new msg has a ref count of 1.
*/
-struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+ bool can_fail)
{
struct ceph_msg *m;
@@ -2306,9 +2428,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->footer.middle_crc = 0;
m->footer.data_crc = 0;
m->footer.flags = 0;
- m->front_max = front_len;
+ m->front_alloc_len = front_len;
m->front_is_vmalloc = false;
m->more_to_follow = false;
+ m->ack_stamp = 0;
m->pool = NULL;
/* middle */
@@ -2334,7 +2457,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->front.iov_base = kmalloc(front_len, flags);
}
if (m->front.iov_base == NULL) {
- pr_err("msg_new can't allocate %d bytes\n",
+ dout("ceph_msg_new can't allocate %d bytes\n",
front_len);
goto out2;
}
@@ -2349,7 +2472,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
out2:
ceph_msg_put(m);
out:
- pr_err("msg_new can't create type %d front %d\n", type, front_len);
+ if (!can_fail) {
+ pr_err("msg_new can't create type %d front %d\n", type,
+ front_len);
+ WARN_ON(1);
+ } else {
+ dout("msg_new can't create type %d front %d\n", type,
+ front_len);
+ }
return NULL;
}
EXPORT_SYMBOL(ceph_msg_new);
@@ -2399,7 +2529,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
}
if (!msg) {
*skip = 0;
- msg = ceph_msg_new(type, front_len, GFP_NOFS);
+ msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
@@ -2469,8 +2599,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
- pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
- msg->front_max, msg->nr_pages);
+ pr_debug("msg_dump %p (front_alloc_len %d nr_pages %d)\n", msg,
+ msg->front_alloc_len, msg->nr_pages);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);