diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/Kconfig | 14 | ||||
-rw-r--r-- | net/ceph/auth_x.c | 256 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 48 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 35 | ||||
-rw-r--r-- | net/ceph/crypto.c | 171 | ||||
-rw-r--r-- | net/ceph/messenger.c | 196 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 95 | ||||
-rw-r--r-- | net/ceph/msgpool.c | 42 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 93 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 86 |
10 files changed, 678 insertions, 358 deletions
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig index be683f2..cc04dd6 100644 --- a/net/ceph/Kconfig +++ b/net/ceph/Kconfig @@ -27,3 +27,17 @@ config CEPH_LIB_PRETTYDEBUG If unsure, say N. +config CEPH_LIB_USE_DNS_RESOLVER + bool "Use in-kernel support for DNS lookup" + depends on CEPH_LIB + select DNS_RESOLVER + default n + help + If you say Y here, hostnames (e.g. monitor addresses) will + be resolved using the CONFIG_DNS_RESOLVER facility. + + For information on how to use CONFIG_DNS_RESOLVER consult + Documentation/networking/dns_resolver.txt + + If unsure, say N. + diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 1587dc6..9898d1f 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -13,8 +13,6 @@ #include "auth_x.h" #include "auth_x_protocol.h" -#define TEMP_TICKET_BUF_LEN 256 - static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed); static int ceph_x_is_authenticated(struct ceph_auth_client *ac) @@ -64,7 +62,7 @@ static int ceph_x_encrypt(struct ceph_crypto_key *secret, } static int ceph_x_decrypt(struct ceph_crypto_key *secret, - void **p, void *end, void *obuf, size_t olen) + void **p, void *end, void **obuf, size_t olen) { struct ceph_x_encrypt_header head; size_t head_len = sizeof(head); @@ -75,8 +73,14 @@ static int ceph_x_decrypt(struct ceph_crypto_key *secret, return -EINVAL; dout("ceph_x_decrypt len %d\n", len); - ret = ceph_decrypt2(secret, &head, &head_len, obuf, &olen, - *p, len); + if (*obuf == NULL) { + *obuf = kmalloc(len, GFP_NOFS); + if (!*obuf) + return -ENOMEM; + olen = len; + } + + ret = ceph_decrypt2(secret, &head, &head_len, *obuf, &olen, *p, len); if (ret) return ret; if (head.struct_v != 1 || le64_to_cpu(head.magic) != CEPHX_ENC_MAGIC) @@ -129,139 +133,120 @@ static void remove_ticket_handler(struct ceph_auth_client *ac, kfree(th); } -static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, - struct ceph_crypto_key *secret, - void *buf, void *end) +static int process_one_ticket(struct ceph_auth_client *ac, + struct ceph_crypto_key *secret, + void **p, void *end) { struct ceph_x_info *xi = ac->private; - int num; - void *p = buf; + int type; + u8 tkt_struct_v, blob_struct_v; + struct ceph_x_ticket_handler *th; + void *dbuf = NULL; + void *dp, *dend; + int dlen; + char is_enc; + struct timespec validity; + struct ceph_crypto_key old_key; + void *ticket_buf = NULL; + void *tp, *tpend; + struct ceph_timespec new_validity; + struct ceph_crypto_key new_session_key; + struct ceph_buffer *new_ticket_blob; + unsigned long new_expires, new_renew_after; + u64 new_secret_id; int ret; - char *dbuf; - char *ticket_buf; - u8 reply_struct_v; - dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); - if (!dbuf) - return -ENOMEM; + ceph_decode_need(p, end, sizeof(u32) + 1, bad); - ret = -ENOMEM; - ticket_buf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); - if (!ticket_buf) - goto out_dbuf; + type = ceph_decode_32(p); + dout(" ticket type %d %s\n", type, ceph_entity_type_name(type)); - ceph_decode_need(&p, end, 1 + sizeof(u32), bad); - reply_struct_v = ceph_decode_8(&p); - if (reply_struct_v != 1) + tkt_struct_v = ceph_decode_8(p); + if (tkt_struct_v != 1) goto bad; - num = ceph_decode_32(&p); - dout("%d tickets\n", num); - while (num--) { - int type; - u8 tkt_struct_v, blob_struct_v; - struct ceph_x_ticket_handler *th; - void *dp, *dend; - int dlen; - char is_enc; - struct timespec validity; - struct ceph_crypto_key old_key; - void *tp, *tpend; - struct ceph_timespec new_validity; - struct ceph_crypto_key new_session_key; - struct ceph_buffer *new_ticket_blob; - unsigned long new_expires, new_renew_after; - u64 new_secret_id; - - ceph_decode_need(&p, end, sizeof(u32) + 1, bad); - - type = ceph_decode_32(&p); - dout(" ticket type %d %s\n", type, ceph_entity_type_name(type)); - - tkt_struct_v = ceph_decode_8(&p); - if (tkt_struct_v != 1) - goto bad; - - th = get_ticket_handler(ac, type); - if (IS_ERR(th)) { - ret = PTR_ERR(th); - goto out; - } - /* blob for me */ - dlen = ceph_x_decrypt(secret, &p, end, dbuf, - TEMP_TICKET_BUF_LEN); - if (dlen <= 0) { - ret = dlen; - goto out; - } - dout(" decrypted %d bytes\n", dlen); - dend = dbuf + dlen; - dp = dbuf; + th = get_ticket_handler(ac, type); + if (IS_ERR(th)) { + ret = PTR_ERR(th); + goto out; + } - tkt_struct_v = ceph_decode_8(&dp); - if (tkt_struct_v != 1) - goto bad; + /* blob for me */ + dlen = ceph_x_decrypt(secret, p, end, &dbuf, 0); + if (dlen <= 0) { + ret = dlen; + goto out; + } + dout(" decrypted %d bytes\n", dlen); + dp = dbuf; + dend = dp + dlen; - memcpy(&old_key, &th->session_key, sizeof(old_key)); - ret = ceph_crypto_key_decode(&new_session_key, &dp, dend); - if (ret) - goto out; + tkt_struct_v = ceph_decode_8(&dp); + if (tkt_struct_v != 1) + goto bad; - ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); - ceph_decode_timespec(&validity, &new_validity); - new_expires = get_seconds() + validity.tv_sec; - new_renew_after = new_expires - (validity.tv_sec / 4); - dout(" expires=%lu renew_after=%lu\n", new_expires, - new_renew_after); + memcpy(&old_key, &th->session_key, sizeof(old_key)); + ret = ceph_crypto_key_decode(&new_session_key, &dp, dend); + if (ret) + goto out; - /* ticket blob for service */ - ceph_decode_8_safe(&p, end, is_enc, bad); - tp = ticket_buf; - if (is_enc) { - /* encrypted */ - dout(" encrypted ticket\n"); - dlen = ceph_x_decrypt(&old_key, &p, end, ticket_buf, - TEMP_TICKET_BUF_LEN); - if (dlen < 0) { - ret = dlen; - goto out; - } - dlen = ceph_decode_32(&tp); - } else { - /* unencrypted */ - ceph_decode_32_safe(&p, end, dlen, bad); - ceph_decode_need(&p, end, dlen, bad); - ceph_decode_copy(&p, ticket_buf, dlen); + ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); + ceph_decode_timespec(&validity, &new_validity); + new_expires = get_seconds() + validity.tv_sec; + new_renew_after = new_expires - (validity.tv_sec / 4); + dout(" expires=%lu renew_after=%lu\n", new_expires, + new_renew_after); + + /* ticket blob for service */ + ceph_decode_8_safe(p, end, is_enc, bad); + if (is_enc) { + /* encrypted */ + dout(" encrypted ticket\n"); + dlen = ceph_x_decrypt(&old_key, p, end, &ticket_buf, 0); + if (dlen < 0) { + ret = dlen; + goto out; } - tpend = tp + dlen; - dout(" ticket blob is %d bytes\n", dlen); - ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad); - blob_struct_v = ceph_decode_8(&tp); - new_secret_id = ceph_decode_64(&tp); - ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend); - if (ret) + tp = ticket_buf; + dlen = ceph_decode_32(&tp); + } else { + /* unencrypted */ + ceph_decode_32_safe(p, end, dlen, bad); + ticket_buf = kmalloc(dlen, GFP_NOFS); + if (!ticket_buf) { + ret = -ENOMEM; goto out; - - /* all is well, update our ticket */ - ceph_crypto_key_destroy(&th->session_key); - if (th->ticket_blob) - ceph_buffer_put(th->ticket_blob); - th->session_key = new_session_key; - th->ticket_blob = new_ticket_blob; - th->validity = new_validity; - th->secret_id = new_secret_id; - th->expires = new_expires; - th->renew_after = new_renew_after; - dout(" got ticket service %d (%s) secret_id %lld len %d\n", - type, ceph_entity_type_name(type), th->secret_id, - (int)th->ticket_blob->vec.iov_len); - xi->have_keys |= th->service; + } + tp = ticket_buf; + ceph_decode_need(p, end, dlen, bad); + ceph_decode_copy(p, ticket_buf, dlen); } + tpend = tp + dlen; + dout(" ticket blob is %d bytes\n", dlen); + ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad); + blob_struct_v = ceph_decode_8(&tp); + new_secret_id = ceph_decode_64(&tp); + ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend); + if (ret) + goto out; + + /* all is well, update our ticket */ + ceph_crypto_key_destroy(&th->session_key); + if (th->ticket_blob) + ceph_buffer_put(th->ticket_blob); + th->session_key = new_session_key; + th->ticket_blob = new_ticket_blob; + th->validity = new_validity; + th->secret_id = new_secret_id; + th->expires = new_expires; + th->renew_after = new_renew_after; + dout(" got ticket service %d (%s) secret_id %lld len %d\n", + type, ceph_entity_type_name(type), th->secret_id, + (int)th->ticket_blob->vec.iov_len); + xi->have_keys |= th->service; - ret = 0; out: kfree(ticket_buf); -out_dbuf: kfree(dbuf); return ret; @@ -270,6 +255,34 @@ bad: goto out; } +static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, + struct ceph_crypto_key *secret, + void *buf, void *end) +{ + void *p = buf; + u8 reply_struct_v; + u32 num; + int ret; + + ceph_decode_8_safe(&p, end, reply_struct_v, bad); + if (reply_struct_v != 1) + return -EINVAL; + + ceph_decode_32_safe(&p, end, num, bad); + dout("%d tickets\n", num); + + while (num--) { + ret = process_one_ticket(ac, secret, &p, end); + if (ret) + return ret; + } + + return 0; + +bad: + return -EINVAL; +} + static int ceph_x_build_authorizer(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th, struct ceph_x_authorizer *au) @@ -563,13 +576,14 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th; int ret = 0; struct ceph_x_authorize_reply reply; + void *preply = &reply; void *p = au->reply_buf; void *end = p + sizeof(au->reply_buf); th = get_ticket_handler(ac, au->service); if (IS_ERR(th)) return PTR_ERR(th); - ret = ceph_x_decrypt(&th->session_key, &p, end, &reply, sizeof(reply)); + ret = ceph_x_decrypt(&th->session_key, &p, end, &preply, sizeof(reply)); if (ret < 0) return ret; if (ret != sizeof(reply)) diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 132963a..97f70e5 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -232,6 +232,7 @@ void ceph_destroy_options(struct ceph_options *opt) ceph_crypto_key_destroy(opt->key); kfree(opt->key); } + kfree(opt->mon_addr); kfree(opt); } EXPORT_SYMBOL(ceph_destroy_options); @@ -431,9 +432,12 @@ EXPORT_SYMBOL(ceph_client_id); /* * create a fresh client instance */ -struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) +struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, + unsigned supported_features, + unsigned required_features) { struct ceph_client *client; + struct ceph_entity_addr *myaddr = NULL; int err = -ENOMEM; client = kzalloc(sizeof(*client), GFP_KERNEL); @@ -448,15 +452,27 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) client->auth_err = 0; client->extra_mon_dispatch = NULL; - client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT; - client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT; - - client->msgr = NULL; + client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT | + supported_features; + client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT | + required_features; + + /* msgr */ + if (ceph_test_opt(client, MYIP)) + myaddr = &client->options->my_addr; + client->msgr = ceph_messenger_create(myaddr, + client->supported_features, + client->required_features); + if (IS_ERR(client->msgr)) { + err = PTR_ERR(client->msgr); + goto fail; + } + client->msgr->nocrc = ceph_test_opt(client, NOCRC); /* subsystems */ err = ceph_monc_init(&client->monc, client); if (err < 0) - goto fail; + goto fail_msgr; err = ceph_osdc_init(&client->osdc, client); if (err < 0) goto fail_monc; @@ -465,6 +481,8 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) fail_monc: ceph_monc_stop(&client->monc); +fail_msgr: + ceph_messenger_destroy(client->msgr); fail: kfree(client); return ERR_PTR(err); @@ -489,8 +507,7 @@ void ceph_destroy_client(struct ceph_client *client) ceph_debugfs_client_cleanup(client); - if (client->msgr) - ceph_messenger_destroy(client->msgr); + ceph_messenger_destroy(client->msgr); ceph_destroy_options(client->options); @@ -513,24 +530,9 @@ static int have_mon_and_osd_map(struct ceph_client *client) */ int __ceph_open_session(struct ceph_client *client, unsigned long started) { - struct ceph_entity_addr *myaddr = NULL; int err; unsigned long timeout = client->options->mount_timeout * HZ; - /* initialize the messenger */ - if (client->msgr == NULL) { - if (ceph_test_opt(client, MYIP)) - myaddr = &client->options->my_addr; - client->msgr = ceph_messenger_create(myaddr, - client->supported_features, - client->required_features); - if (IS_ERR(client->msgr)) { - client->msgr = NULL; - return PTR_ERR(client->msgr); - } - client->msgr->nocrc = ceph_test_opt(client, NOCRC); - } - /* open session, and wait for mon and osd maps */ err = ceph_monc_open_session(&client->monc); if (err < 0) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 42599e3..3a94eae 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -477,7 +477,6 @@ int crush_do_rule(struct crush_map *map, int i, j; int numrep; int firstn; - int rc = -1; BUG_ON(ruleno >= map->max_rules); @@ -491,23 +490,18 @@ int crush_do_rule(struct crush_map *map, * that this may or may not correspond to the specific types * referenced by the crush rule. */ - if (force >= 0) { - if (force >= map->max_devices || - map->device_parents[force] == 0) { - /*dprintk("CRUSH: forcefed device dne\n");*/ - rc = -1; /* force fed device dne */ - goto out; - } - if (!is_out(map, weight, force, x)) { - while (1) { - force_context[++force_pos] = force; - if (force >= 0) - force = map->device_parents[force]; - else - force = map->bucket_parents[-1-force]; - if (force == 0) - break; - } + if (force >= 0 && + force < map->max_devices && + map->device_parents[force] != 0 && + !is_out(map, weight, force, x)) { + while (1) { + force_context[++force_pos] = force; + if (force >= 0) + force = map->device_parents[force]; + else + force = map->bucket_parents[-1-force]; + if (force == 0) + break; } } @@ -600,10 +594,7 @@ int crush_do_rule(struct crush_map *map, BUG_ON(1); } } - rc = result_len; - -out: - return rc; + return result_len; } diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index 5a8009c..21e777b 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -90,11 +90,82 @@ static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) static const u8 *aes_iv = (u8 *)CEPH_AES_IV; +/* + * Should be used for buffers allocated with ceph_kvmalloc(). + * Currently these are encrypt out-buffer (ceph_buffer) and decrypt + * in-buffer (msg front). + * + * Dispose of @sgt with teardown_sgtable(). + * + * @prealloc_sg is to avoid memory allocation inside sg_alloc_table() + * in cases where a single sg is sufficient. No attempt to reduce the + * number of sgs by squeezing physically contiguous pages together is + * made though, for simplicity. + */ +static int setup_sgtable(struct sg_table *sgt, struct scatterlist *prealloc_sg, + const void *buf, unsigned int buf_len) +{ + struct scatterlist *sg; + const bool is_vmalloc = is_vmalloc_addr(buf); + unsigned int off = offset_in_page(buf); + unsigned int chunk_cnt = 1; + unsigned int chunk_len = PAGE_ALIGN(off + buf_len); + int i; + int ret; + + if (buf_len == 0) { + memset(sgt, 0, sizeof(*sgt)); + return -EINVAL; + } + + if (is_vmalloc) { + chunk_cnt = chunk_len >> PAGE_SHIFT; + chunk_len = PAGE_SIZE; + } + + if (chunk_cnt > 1) { + ret = sg_alloc_table(sgt, chunk_cnt, GFP_NOFS); + if (ret) + return ret; + } else { + WARN_ON(chunk_cnt != 1); + sg_init_table(prealloc_sg, 1); + sgt->sgl = prealloc_sg; + sgt->nents = sgt->orig_nents = 1; + } + + for_each_sg(sgt->sgl, sg, sgt->orig_nents, i) { + struct page *page; + unsigned int len = min(chunk_len - off, buf_len); + + if (is_vmalloc) + page = vmalloc_to_page(buf); + else + page = virt_to_page(buf); + + sg_set_page(sg, page, len, off); + + off = 0; + buf += len; + buf_len -= len; + } + WARN_ON(buf_len != 0); + + return 0; +} + +static void teardown_sgtable(struct sg_table *sgt) +{ + if (sgt->orig_nents > 1) + sg_free_table(sgt); +} + static int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len, const void *src, size_t src_len) { - struct scatterlist sg_in[2], sg_out[1]; + struct scatterlist sg_in[2], prealloc_sg; + struct sg_table sg_out; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); struct blkcipher_desc desc = { .tfm = tfm, .flags = 0 }; int ret; @@ -110,16 +181,18 @@ static int ceph_aes_encrypt(const void *key, int key_len, *dst_len = src_len + zero_padding; - crypto_blkcipher_setkey((void *)tfm, key, key_len); sg_init_table(sg_in, 2); sg_set_buf(&sg_in[0], src, src_len); sg_set_buf(&sg_in[1], pad, zero_padding); - sg_init_table(sg_out, 1); - sg_set_buf(sg_out, dst, *dst_len); + ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len); + if (ret) + goto out_tfm; + + crypto_blkcipher_setkey((void *)tfm, key, key_len); iv = crypto_blkcipher_crt(tfm)->iv; ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + /* print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, key, key_len, 1); @@ -128,16 +201,22 @@ static int ceph_aes_encrypt(const void *key, int key_len, print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, pad, zero_padding, 1); */ - ret = crypto_blkcipher_encrypt(&desc, sg_out, sg_in, + ret = crypto_blkcipher_encrypt(&desc, sg_out.sgl, sg_in, src_len + zero_padding); - crypto_free_blkcipher(tfm); - if (ret < 0) + if (ret < 0) { pr_err("ceph_aes_crypt failed %d\n", ret); + goto out_sg; + } /* print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1, dst, *dst_len, 1); */ - return 0; + +out_sg: + teardown_sgtable(&sg_out); +out_tfm: + crypto_free_blkcipher(tfm); + return ret; } static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, @@ -145,7 +224,8 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, const void *src1, size_t src1_len, const void *src2, size_t src2_len) { - struct scatterlist sg_in[3], sg_out[1]; + struct scatterlist sg_in[3], prealloc_sg; + struct sg_table sg_out; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); struct blkcipher_desc desc = { .tfm = tfm, .flags = 0 }; int ret; @@ -161,17 +241,19 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, *dst_len = src1_len + src2_len + zero_padding; - crypto_blkcipher_setkey((void *)tfm, key, key_len); sg_init_table(sg_in, 3); sg_set_buf(&sg_in[0], src1, src1_len); sg_set_buf(&sg_in[1], src2, src2_len); sg_set_buf(&sg_in[2], pad, zero_padding); - sg_init_table(sg_out, 1); - sg_set_buf(sg_out, dst, *dst_len); + ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len); + if (ret) + goto out_tfm; + + crypto_blkcipher_setkey((void *)tfm, key, key_len); iv = crypto_blkcipher_crt(tfm)->iv; ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + /* print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, key, key_len, 1); @@ -182,23 +264,30 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, pad, zero_padding, 1); */ - ret = crypto_blkcipher_encrypt(&desc, sg_out, sg_in, + ret = crypto_blkcipher_encrypt(&desc, sg_out.sgl, sg_in, src1_len + src2_len + zero_padding); - crypto_free_blkcipher(tfm); - if (ret < 0) + if (ret < 0) { pr_err("ceph_aes_crypt2 failed %d\n", ret); + goto out_sg; + } /* print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1, dst, *dst_len, 1); */ - return 0; + +out_sg: + teardown_sgtable(&sg_out); +out_tfm: + crypto_free_blkcipher(tfm); + return ret; } static int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len, const void *src, size_t src_len) { - struct scatterlist sg_in[1], sg_out[2]; + struct sg_table sg_in; + struct scatterlist sg_out[2], prealloc_sg; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); struct blkcipher_desc desc = { .tfm = tfm }; char pad[16]; @@ -210,16 +299,16 @@ static int ceph_aes_decrypt(const void *key, int key_len, if (IS_ERR(tfm)) return PTR_ERR(tfm); - crypto_blkcipher_setkey((void *)tfm, key, key_len); - sg_init_table(sg_in, 1); sg_init_table(sg_out, 2); - sg_set_buf(sg_in, src, src_len); sg_set_buf(&sg_out[0], dst, *dst_len); sg_set_buf(&sg_out[1], pad, sizeof(pad)); + ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len); + if (ret) + goto out_tfm; + crypto_blkcipher_setkey((void *)tfm, key, key_len); iv = crypto_blkcipher_crt(tfm)->iv; ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); /* @@ -228,12 +317,10 @@ static int ceph_aes_decrypt(const void *key, int key_len, print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, src, src_len, 1); */ - - ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in, src_len); - crypto_free_blkcipher(tfm); + ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in.sgl, src_len); if (ret < 0) { pr_err("ceph_aes_decrypt failed %d\n", ret); - return ret; + goto out_sg; } if (src_len <= *dst_len) @@ -251,7 +338,12 @@ static int ceph_aes_decrypt(const void *key, int key_len, print_hex_dump(KERN_ERR, "dec out: ", DUMP_PREFIX_NONE, 16, 1, dst, *dst_len, 1); */ - return 0; + +out_sg: + teardown_sgtable(&sg_in); +out_tfm: + crypto_free_blkcipher(tfm); + return ret; } static int ceph_aes_decrypt2(const void *key, int key_len, @@ -259,7 +351,8 @@ static int ceph_aes_decrypt2(const void *key, int key_len, void *dst2, size_t *dst2_len, const void *src, size_t src_len) { - struct scatterlist sg_in[1], sg_out[3]; + struct sg_table sg_in; + struct scatterlist sg_out[3], prealloc_sg; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); struct blkcipher_desc desc = { .tfm = tfm }; char pad[16]; @@ -271,17 +364,17 @@ static int ceph_aes_decrypt2(const void *key, int key_len, if (IS_ERR(tfm)) return PTR_ERR(tfm); - sg_init_table(sg_in, 1); - sg_set_buf(sg_in, src, src_len); sg_init_table(sg_out, 3); sg_set_buf(&sg_out[0], dst1, *dst1_len); sg_set_buf(&sg_out[1], dst2, *dst2_len); sg_set_buf(&sg_out[2], pad, sizeof(pad)); + ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len); + if (ret) + goto out_tfm; crypto_blkcipher_setkey((void *)tfm, key, key_len); iv = crypto_blkcipher_crt(tfm)->iv; ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); /* @@ -290,12 +383,10 @@ static int ceph_aes_decrypt2(const void *key, int key_len, print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, src, src_len, 1); */ - - ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in, src_len); - crypto_free_blkcipher(tfm); + ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in.sgl, src_len); if (ret < 0) { pr_err("ceph_aes_decrypt failed %d\n", ret); - return ret; + goto out_sg; } if (src_len <= *dst1_len) @@ -325,7 +416,11 @@ static int ceph_aes_decrypt2(const void *key, int key_len, dst2, *dst2_len, 1); */ - return 0; +out_sg: + teardown_sgtable(&sg_in); +out_tfm: + crypto_free_blkcipher(tfm); + return ret; } @@ -444,7 +539,7 @@ int ceph_key_instantiate(struct key *key, const void *data, size_t datalen) goto err; /* TODO ceph_crypto_key_decode should really take const input */ - p = (void*)data; + p = (void *)data; ret = ceph_crypto_key_decode(ckey, &p, (char*)data+datalen); if (ret < 0) goto err_ckey; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 78b55f4..e85a8d2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -11,12 +11,14 @@ #include <linux/string.h> #include <linux/bio.h> #include <linux/blkdev.h> +#include <linux/dns_resolver.h> #include <net/tcp.h> #include <linux/ceph/libceph.h> #include <linux/ceph/messenger.h> #include <linux/ceph/decode.h> #include <linux/ceph/pagelist.h> +#include <linux/export.h> /* * Ceph uses the messenger to exchange ceph_msg messages with other @@ -97,7 +99,12 @@ struct workqueue_struct *ceph_msgr_wq; int ceph_msgr_init(void) { - ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); + /* + * The number of active work items is limited by the number of + * connections, so leave @max_active at default. + */ + ceph_msgr_wq = alloc_workqueue("ceph-msgr", + WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); if (!ceph_msgr_wq) { pr_err("msgr_init failed to create workqueue\n"); return -ENOMEM; @@ -282,6 +289,37 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, return r; } +static int __ceph_tcp_sendpage(struct socket *sock, struct page *page, + int offset, size_t size, bool more) +{ + int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); + int ret; + + ret = kernel_sendpage(sock, page, offset, size, flags); + if (ret == -EAGAIN) + ret = 0; + + return ret; +} + +static int ceph_tcp_sendpage(struct socket *sock, struct page *page, + int offset, size_t size, bool more) +{ + int ret; + struct kvec iov; + + /* sendpage cannot properly handle pages with page_count == 0, + * we need to fallback to sendmsg if that's the case */ + if (page_count(page) >= 1) + return __ceph_tcp_sendpage(sock, page, offset, size, more); + + iov.iov_base = kmap(page) + offset; + iov.iov_len = size; + ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more); + kunmap(page); + + return ret; +} /* * Shutdown/close the socket for the given connection. @@ -486,13 +524,10 @@ static void prepare_write_message(struct ceph_connection *con) m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); con->out_msg = m; - if (test_bit(LOSSYTX, &con->state)) { - list_del_init(&m->list_head); - } else { - /* put message on sent list */ - ceph_msg_get(m); - list_move_tail(&m->list_head, &con->out_sent); - } + + /* put message on sent list */ + ceph_msg_get(m); + list_move_tail(&m->list_head, &con->out_sent); /* * only assign outgoing seq # if we haven't sent this message @@ -852,18 +887,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) cpu_to_le32(crc32c(tmpcrc, base, len)); con->out_msg_pos.did_page_crc = 1; } - ret = kernel_sendpage(con->sock, page, + ret = ceph_tcp_sendpage(con->sock, page, con->out_msg_pos.page_pos + page_shift, - len, - MSG_DONTWAIT | MSG_NOSIGNAL | - MSG_MORE); + len, 1); if (crc && (msg->pages || msg->pagelist || msg->bio || in_trail)) kunmap(page); - if (ret == -EAGAIN) - ret = 0; if (ret <= 0) goto out; @@ -1081,6 +1112,101 @@ static void addr_set_port(struct sockaddr_storage *ss, int p) } /* + * Unlike other *_pton function semantics, zero indicates success. + */ +static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, + char delim, const char **ipend) +{ + struct sockaddr_in *in4 = (void *)ss; + struct sockaddr_in6 *in6 = (void *)ss; + + memset(ss, 0, sizeof(*ss)); + + if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) { + ss->ss_family = AF_INET; + return 0; + } + + if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) { + ss->ss_family = AF_INET6; + return 0; + } + + return -EINVAL; +} + +/* + * Extract hostname string and resolve using kernel DNS facility. + */ +#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER +static int ceph_dns_resolve_name(const char *name, size_t namelen, + struct sockaddr_storage *ss, char delim, const char **ipend) +{ + const char *end, *delim_p; + char *colon_p, *ip_addr = NULL; + int ip_len, ret; + + /* + * The end of the hostname occurs immediately preceding the delimiter or + * the port marker (':') where the delimiter takes precedence. + */ + delim_p = memchr(name, delim, namelen); + colon_p = memchr(name, ':', namelen); + + if (delim_p && colon_p) + end = delim_p < colon_p ? delim_p : colon_p; + else if (!delim_p && colon_p) + end = colon_p; + else { + end = delim_p; + if (!end) /* case: hostname:/ */ + end = name + namelen; + } + + if (end <= name) + return -EINVAL; + + /* do dns_resolve upcall */ + ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL); + if (ip_len > 0) + ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL); + else + ret = -ESRCH; + + kfree(ip_addr); + + *ipend = end; + + pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, + ret, ret ? "failed" : ceph_pr_addr(ss)); + + return ret; +} +#else +static inline int ceph_dns_resolve_name(const char *name, size_t namelen, + struct sockaddr_storage *ss, char delim, const char **ipend) +{ + return -EINVAL; +} +#endif + +/* + * Parse a server name (IP or hostname). If a valid IP address is not found + * then try to extract a hostname to resolve using userspace DNS upcall. + */ +static int ceph_parse_server_name(const char *name, size_t namelen, + struct sockaddr_storage *ss, char delim, const char **ipend) +{ + int ret; + + ret = ceph_pton(name, namelen, ss, delim, ipend); + if (ret) + ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend); + + return ret; +} + +/* * Parse an ip[:port] list into an addr array. Use the default * monitor port if a port isn't specified. */ @@ -1088,15 +1214,13 @@ int ceph_parse_ips(const char *c, const char *end, struct ceph_entity_addr *addr, int max_count, int *count) { - int i; + int i, ret = -EINVAL; const char *p = c; dout("parse_ips on '%.*s'\n", (int)(end-c), c); for (i = 0; i < max_count; i++) { const char *ipend; struct sockaddr_storage *ss = &addr[i].in_addr; - struct sockaddr_in *in4 = (void *)ss; - struct sockaddr_in6 *in6 = (void *)ss; int port; char delim = ','; @@ -1105,15 +1229,11 @@ int ceph_parse_ips(const char *c, const char *end, p++; } - memset(ss, 0, sizeof(*ss)); - if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, - delim, &ipend)) - ss->ss_family = AF_INET; - else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, - delim, &ipend)) - ss->ss_family = AF_INET6; - else + ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend); + if (ret) goto bad; + ret = -EINVAL; + p = ipend; if (delim == ']') { @@ -1158,7 +1278,7 @@ int ceph_parse_ips(const char *c, const char *end, bad: pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); - return -EINVAL; + return ret; } EXPORT_SYMBOL(ceph_parse_ips); @@ -1399,6 +1519,7 @@ static void process_ack(struct ceph_connection *con) break; dout("got ack for seq %llu type %d at %p\n", seq, le16_to_cpu(m->hdr.type), m); + m->ack_stamp = jiffies; ceph_msg_remove(m); } prepare_read_tag(con); @@ -2283,7 +2404,8 @@ EXPORT_SYMBOL(ceph_con_keepalive); * construct a new message with given type, size * the new msg has a ref count of 1. */ -struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) +struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, + bool can_fail) { struct ceph_msg *m; @@ -2306,9 +2428,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) m->footer.middle_crc = 0; m->footer.data_crc = 0; m->footer.flags = 0; - m->front_max = front_len; + m->front_alloc_len = front_len; m->front_is_vmalloc = false; m->more_to_follow = false; + m->ack_stamp = 0; m->pool = NULL; /* middle */ @@ -2334,7 +2457,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) m->front.iov_base = kmalloc(front_len, flags); } if (m->front.iov_base == NULL) { - pr_err("msg_new can't allocate %d bytes\n", + dout("ceph_msg_new can't allocate %d bytes\n", front_len); goto out2; } @@ -2349,7 +2472,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) out2: ceph_msg_put(m); out: - pr_err("msg_new can't create type %d front %d\n", type, front_len); + if (!can_fail) { + pr_err("msg_new can't create type %d front %d\n", type, + front_len); + WARN_ON(1); + } else { + dout("msg_new can't create type %d front %d\n", type, + front_len); + } return NULL; } EXPORT_SYMBOL(ceph_msg_new); @@ -2399,7 +2529,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, } if (!msg) { *skip = 0; - msg = ceph_msg_new(type, front_len, GFP_NOFS); + msg = ceph_msg_new(type, front_len, GFP_NOFS, false); if (!msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); @@ -2469,8 +2599,8 @@ EXPORT_SYMBOL(ceph_msg_last_put); void ceph_msg_dump(struct ceph_msg *msg) { - pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, - msg->front_max, msg->nr_pages); + pr_debug("msg_dump %p (front_alloc_len %d nr_pages %d)\n", msg, + msg->front_alloc_len, msg->nr_pages); print_hex_dump(KERN_DEBUG, "header: ", DUMP_PREFIX_OFFSET, 16, 1, &msg->hdr, sizeof(msg->hdr), true); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index cbe31fa..0c0859b 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -116,14 +116,12 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) */ static void __close_session(struct ceph_mon_client *monc) { - if (monc->con) { - dout("__close_session closing mon%d\n", monc->cur_mon); - ceph_con_revoke(monc->con, monc->m_auth); - ceph_con_close(monc->con); - monc->cur_mon = -1; - monc->pending_auth = 0; - ceph_auth_reset(monc->auth); - } + dout("__close_session closing mon%d\n", monc->cur_mon); + ceph_con_revoke(monc->con, monc->m_auth); + ceph_con_close(monc->con); + monc->cur_mon = -1; + monc->pending_auth = 0; + ceph_auth_reset(monc->auth); } /* @@ -152,7 +150,7 @@ static int __open_session(struct ceph_mon_client *monc) /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); __send_prepared_auth_request(monc, ret); } else { dout("open_session mon%d already open\n", monc->cur_mon); @@ -196,7 +194,7 @@ static void __send_subscribe(struct ceph_mon_client *monc) int num; p = msg->front.iov_base; - end = p + msg->front_max; + end = p + msg->front_alloc_len; num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; ceph_encode_32(&p, num); @@ -302,15 +300,6 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) */ int ceph_monc_open_session(struct ceph_mon_client *monc) { - if (!monc->con) { - monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); - if (!monc->con) - return -ENOMEM; - ceph_con_init(monc->client->msgr, monc->con); - monc->con->private = monc; - monc->con->ops = &mon_con_ops; - } - mutex_lock(&monc->mutex); __open_session(monc); __schedule_delayed(monc); @@ -528,10 +517,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) init_completion(&req->completion); err = -ENOMEM; - req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); + req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, + true); if (!req->request) goto out; - req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); + req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, + true); if (!req->reply) goto out; @@ -626,10 +617,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, init_completion(&req->completion); err = -ENOMEM; - req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); + req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, + true); if (!req->request) goto out; - req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); + req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, + true); if (!req->reply) goto out; @@ -755,13 +748,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) if (err) goto out; - monc->con = NULL; + /* connection */ + monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); + if (!monc->con) + goto out_monmap; + ceph_con_init(monc->client->msgr, monc->con); + monc->con->private = monc; + monc->con->ops = &mon_con_ops; /* authentication */ monc->auth = ceph_auth_init(cl->options->name, cl->options->key); - if (IS_ERR(monc->auth)) - return PTR_ERR(monc->auth); + if (IS_ERR(monc->auth)) { + err = PTR_ERR(monc->auth); + goto out_con; + } monc->auth->want_keys = CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; @@ -770,19 +771,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) err = -ENOMEM; monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, sizeof(struct ceph_mon_subscribe_ack), - GFP_NOFS); + GFP_NOFS, true); if (!monc->m_subscribe_ack) - goto out_monmap; + goto out_auth; - monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); + monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, + true); if (!monc->m_subscribe) goto out_subscribe_ack; - monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); + monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, + true); if (!monc->m_auth_reply) goto out_subscribe; - monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); + monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); monc->pending_auth = 0; if (!monc->m_auth) goto out_auth_reply; @@ -808,6 +811,10 @@ out_subscribe: ceph_msg_put(monc->m_subscribe); out_subscribe_ack: ceph_msg_put(monc->m_subscribe_ack); +out_auth: + ceph_auth_destroy(monc->auth); +out_con: + monc->con->ops->put(monc->con); out_monmap: kfree(monc->monmap); out: @@ -822,11 +829,11 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - if (monc->con) { - monc->con->private = NULL; - monc->con->ops->put(monc->con); - monc->con = NULL; - } + + monc->con->private = NULL; + monc->con->ops->put(monc->con); + monc->con = NULL; + mutex_unlock(&monc->mutex); ceph_auth_destroy(monc->auth); @@ -853,7 +860,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc, ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret < 0) { monc->client->auth_err = ret; wake_up_all(&monc->client->auth_wq); @@ -880,7 +887,7 @@ static int __validate_auth(struct ceph_mon_client *monc) return 0; ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret <= 0) return ret; /* either an error, or no need to authenticate */ __send_prepared_auth_request(monc, ret); @@ -973,14 +980,22 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, case CEPH_MSG_MON_MAP: case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: - m = ceph_msg_new(type, front_len, GFP_NOFS); + m = ceph_msg_new(type, front_len, GFP_NOFS, false); break; } if (!m) { pr_info("alloc_msg unknown type %d\n", type); *skip = 1; + } else if (front_len > m->front_alloc_len) { + pr_warning("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", + front_len, m->front_alloc_len, + (unsigned int)con->peer_name.type, + le64_to_cpu(con->peer_name.num)); + ceph_msg_put(m); + m = ceph_msg_new(type, front_len, GFP_NOFS, false); } + return m; } @@ -1000,7 +1015,7 @@ static void mon_fault(struct ceph_connection *con) if (!con->private) goto out; - if (monc->con && !monc->hunting) + if (!monc->hunting) pr_info("mon%d %s session lost, " "hunting for new mon\n", monc->cur_mon, ceph_pr_addr(&monc->con->peer_addr.in_addr)); diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index d5f2d97..11d5f41 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -7,27 +7,37 @@ #include <linux/ceph/msgpool.h> -static void *alloc_fn(gfp_t gfp_mask, void *arg) +static void *msgpool_alloc(gfp_t gfp_mask, void *arg) { struct ceph_msgpool *pool = arg; - void *p; + struct ceph_msg *msg; - p = ceph_msg_new(0, pool->front_len, gfp_mask); - if (!p) - pr_err("msgpool %s alloc failed\n", pool->name); - return p; + msg = ceph_msg_new(0, pool->front_len, gfp_mask, true); + if (!msg) { + dout("msgpool_alloc %s failed\n", pool->name); + } else { + dout("msgpool_alloc %s %p\n", pool->name, msg); + msg->pool = pool; + } + return msg; } -static void free_fn(void *element, void *arg) +static void msgpool_free(void *element, void *arg) { - ceph_msg_put(element); + struct ceph_msgpool *pool = arg; + struct ceph_msg *msg = element; + + dout("msgpool_release %s %p\n", pool->name, msg); + msg->pool = NULL; + ceph_msg_put(msg); } int ceph_msgpool_init(struct ceph_msgpool *pool, int front_len, int size, bool blocking, const char *name) { + dout("msgpool %s init\n", name); pool->front_len = front_len; - pool->pool = mempool_create(size, alloc_fn, free_fn, pool); + pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); if (!pool->pool) return -ENOMEM; pool->name = name; @@ -36,29 +46,37 @@ int ceph_msgpool_init(struct ceph_msgpool *pool, void ceph_msgpool_destroy(struct ceph_msgpool *pool) { + dout("msgpool %s destroy\n", pool->name); mempool_destroy(pool->pool); } struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len) { + struct ceph_msg *msg; + if (front_len > pool->front_len) { - pr_err("msgpool_get pool %s need front %d, pool size is %d\n", + dout("msgpool_get %s need front %d, pool size is %d\n", pool->name, front_len, pool->front_len); WARN_ON(1); /* try to alloc a fresh message */ - return ceph_msg_new(0, front_len, GFP_NOFS); + return ceph_msg_new(0, front_len, GFP_NOFS, false); } - return mempool_alloc(pool->pool, GFP_NOFS); + msg = mempool_alloc(pool->pool, GFP_NOFS); + dout("msgpool_get %s %p\n", pool->name, msg); + return msg; } void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) { + dout("msgpool_put %s %p\n", pool->name, msg); + /* reset msg front_len; user may have changed it */ msg->front.iov_len = pool->front_len; msg->hdr.front_len = cpu_to_le32(pool->front_len); kref_init(&msg->kref); /* retake single ref */ + mempool_free(msg, pool->pool); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7330c27..2df98a6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -217,6 +217,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); INIT_LIST_HEAD(&req->r_linger_osd); + INIT_LIST_HEAD(&req->r_req_lru_item); req->r_flags = flags; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -226,7 +227,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, gfp_flags); + OSD_OPREPLY_FRONT_LEN, gfp_flags, true); if (!msg) { ceph_osdc_put_request(req); return NULL; @@ -243,13 +244,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ceph_pagelist_init(req->r_trail); } /* create request message; allow space for oid */ - msg_size += 40; + msg_size += MAX_OBJ_NAME_SIZE; if (snapc) msg_size += sizeof(u64) * snapc->num_snaps; if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); + msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); if (!msg) { ceph_osdc_put_request(req); return NULL; @@ -677,12 +678,34 @@ static void put_osd(struct ceph_osd *osd) */ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - dout("__remove_osd %p\n", osd); + dout("%s %p osd%d\n", __func__, osd, osd->o_osd); BUG_ON(!list_empty(&osd->o_requests)); - rb_erase(&osd->o_node, &osdc->osds); list_del_init(&osd->o_osd_lru); - ceph_con_close(&osd->o_con); - put_osd(osd); + rb_erase(&osd->o_node, &osdc->osds); + RB_CLEAR_NODE(&osd->o_node); +} + +static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) +{ + dout("%s %p osd%d\n", __func__, osd, osd->o_osd); + + if (!RB_EMPTY_NODE(&osd->o_node)) { + ceph_con_close(&osd->o_con); + __remove_osd(osdc, osd); + put_osd(osd); + } +} + +static void remove_all_osds(struct ceph_osd_client *osdc) +{ + dout("__remove_old_osds %p\n", osdc); + mutex_lock(&osdc->request_mutex); + while (!RB_EMPTY_ROOT(&osdc->osds)) { + struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), + struct ceph_osd, o_node); + remove_osd(osdc, osd); + } + mutex_unlock(&osdc->request_mutex); } static void __move_osd_to_lru(struct ceph_osd_client *osdc, @@ -701,16 +724,16 @@ static void __remove_osd_from_lru(struct ceph_osd *osd) list_del_init(&osd->o_osd_lru); } -static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all) +static void remove_old_osds(struct ceph_osd_client *osdc) { struct ceph_osd *osd, *nosd; dout("__remove_old_osds %p\n", osdc); mutex_lock(&osdc->request_mutex); list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { - if (!remove_all && time_before(jiffies, osd->lru_ttl)) + if (time_before(jiffies, osd->lru_ttl)) break; - __remove_osd(osdc, osd); + remove_osd(osdc, osd); } mutex_unlock(&osdc->request_mutex); } @@ -726,7 +749,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { - __remove_osd(osdc, osd); + remove_osd(osdc, osd); } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], &osd->o_con.peer_addr, sizeof(osd->o_con.peer_addr)) == 0 && @@ -751,6 +774,7 @@ static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) struct rb_node *parent = NULL; struct ceph_osd *osd = NULL; + dout("__insert_osd %p osd%d\n", new, new->o_osd); while (*p) { parent = *p; osd = rb_entry(parent, struct ceph_osd, o_node); @@ -803,13 +827,10 @@ static void __register_request(struct ceph_osd_client *osdc, { req->r_tid = ++osdc->last_tid; req->r_request->hdr.tid = cpu_to_le64(req->r_tid); - INIT_LIST_HEAD(&req->r_req_lru_item); - dout("__register_request %p tid %lld\n", req, req->r_tid); __insert_request(osdc, req); ceph_osdc_get_request(req); osdc->num_requests++; - if (osdc->num_requests == 1) { dout(" first request, scheduling timeout\n"); __schedule_osd_timeout(osdc); @@ -932,7 +953,7 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger); * Caller should hold map_sem for read and request_mutex. */ static int __map_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) + struct ceph_osd_request *req, int force_resend) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; @@ -956,7 +977,8 @@ static int __map_request(struct ceph_osd_client *osdc, num = err; } - if ((req->r_osd && req->r_osd->o_osd == o && + if ((!force_resend && + req->r_osd && req->r_osd->o_osd == o && req->r_sent >= req->r_osd->o_incarnation && req->r_num_pg_osds == num && memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || @@ -1085,9 +1107,15 @@ static void handle_timeout(struct work_struct *work) req = list_entry(osdc->req_lru.next, struct ceph_osd_request, r_req_lru_item); + /* hasn't been long enough since we sent it? */ if (time_before(jiffies, req->r_stamp + timeout)) break; + /* hasn't been long enough since it was acked? */ + if (req->r_request->ack_stamp == 0 || + time_before(jiffies, req->r_request->ack_stamp + timeout)) + break; + BUG_ON(req == last_req && req->r_stamp == last_stamp); last_req = req; last_stamp = req->r_stamp; @@ -1138,7 +1166,7 @@ static void handle_osds_timeout(struct work_struct *work) dout("osds timeout\n"); down_read(&osdc->map_sem); - remove_old_osds(osdc, 0); + remove_old_osds(osdc); up_read(&osdc->map_sem); schedule_delayed_work(&osdc->osds_timeout_work, @@ -1253,6 +1281,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) { struct rb_node *p, *n; + dout("%s %p\n", __func__, osdc); for (p = rb_first(&osdc->osds); p; p = n) { struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); @@ -1272,18 +1301,18 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * * Caller should hold map_sem for read and request_mutex. */ -static void kick_requests(struct ceph_osd_client *osdc) +static void kick_requests(struct ceph_osd_client *osdc, int force_resend) { struct ceph_osd_request *req, *nreq; struct rb_node *p; int needmap = 0; int err; - dout("kick_requests\n"); + dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_osd_request, r_node); - err = __map_request(osdc, req); + err = __map_request(osdc, req, force_resend); if (err < 0) continue; /* error */ if (req->r_osd == NULL) { @@ -1301,7 +1330,7 @@ static void kick_requests(struct ceph_osd_client *osdc) r_linger_item) { dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_request(osdc, req); + err = __map_request(osdc, req, force_resend); if (err == 0) continue; /* no change and no osd was specified */ if (err < 0) @@ -1378,7 +1407,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } - kick_requests(osdc); + kick_requests(osdc, 0); reset_changed_osds(osdc); } else { dout("ignoring incremental map %u len %d\n", @@ -1406,6 +1435,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) "older than our %u\n", epoch, maplen, osdc->osdmap->epoch); } else { + int skipped_map = 0; + dout("taking full map %u len %d\n", epoch, maplen); newmap = osdmap_decode(&p, p+maplen); if (IS_ERR(newmap)) { @@ -1415,9 +1446,12 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) BUG_ON(!newmap); oldmap = osdc->osdmap; osdc->osdmap = newmap; - if (oldmap) + if (oldmap) { + if (oldmap->epoch + 1 < newmap->epoch) + skipped_map = 1; ceph_osdmap_destroy(oldmap); - kick_requests(osdc); + } + kick_requests(osdc, skipped_map); } p += maplen; nr_maps--; @@ -1690,12 +1724,14 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, * the request still han't been touched yet. */ if (req->r_sent == 0) { - rc = __map_request(osdc, req); + rc = __map_request(osdc, req, 0); if (rc < 0) { if (nofail) { dout("osdc_start_request failed map, " " will retry %lld\n", req->r_tid); rc = 0; + } else { + __unregister_request(osdc, req); } goto out_unlock; } @@ -1856,8 +1892,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = NULL; } - remove_old_osds(osdc, 1); - WARN_ON(!RB_EMPTY_ROOT(&osdc->osds)); + remove_all_osds(osdc); mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); @@ -2016,7 +2051,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (front > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d\n", front, (int)req->r_reply->front.iov_len); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); if (!m) goto out; ceph_msg_put(req->r_reply); @@ -2064,7 +2099,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_OSD_MAP: case CEPH_MSG_WATCH_NOTIFY: - return ceph_msg_new(type, front, GFP_NOFS); + return ceph_msg_new(type, front, GFP_NOFS, false); case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); default: diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index e97c358..bb38a3c 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -102,7 +102,7 @@ static int crush_decode_tree_bucket(void **p, void *end, { int j; dout("crush_decode_tree_bucket %p to %p\n", *p, end); - ceph_decode_32_safe(p, end, b->num_nodes, bad); + ceph_decode_8_safe(p, end, b->num_nodes, bad); b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); if (b->node_weights == NULL) return -ENOMEM; @@ -339,6 +339,7 @@ static int __insert_pg_mapping(struct ceph_pg_mapping *new, struct ceph_pg_mapping *pg = NULL; int c; + dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new); while (*p) { parent = *p; pg = rb_entry(parent, struct ceph_pg_mapping, node); @@ -366,16 +367,33 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, while (n) { pg = rb_entry(n, struct ceph_pg_mapping, node); c = pgid_cmp(pgid, pg->pgid); - if (c < 0) + if (c < 0) { n = n->rb_left; - else if (c > 0) + } else if (c > 0) { n = n->rb_right; - else + } else { + dout("__lookup_pg_mapping %llx got %p\n", + *(u64 *)&pgid, pg); return pg; + } } return NULL; } +static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) +{ + struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); + + if (pg) { + dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg); + rb_erase(&pg->node, root); + kfree(pg); + return 0; + } + dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid); + return -ENOENT; +} + /* * rbtree of pg pool info */ @@ -711,7 +729,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, void *start = *p; int err = -EINVAL; u16 version; - struct rb_node *rbp; ceph_decode_16_safe(p, end, version, bad); if (version > CEPH_OSDMAP_INC_VERSION) { @@ -861,7 +878,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_pg_temp */ - rbp = rb_first(&map->pg_temp); ceph_decode_32_safe(p, end, len, bad); while (len--) { struct ceph_pg_mapping *pg; @@ -872,18 +888,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ceph_decode_copy(p, &pgid, sizeof(pgid)); pglen = ceph_decode_32(p); - /* remove any? */ - while (rbp && pgid_cmp(rb_entry(rbp, struct ceph_pg_mapping, - node)->pgid, pgid) <= 0) { - struct ceph_pg_mapping *cur = - rb_entry(rbp, struct ceph_pg_mapping, node); - - rbp = rb_next(rbp); - dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid); - rb_erase(&cur->node, &map->pg_temp); - kfree(cur); - } - if (pglen) { /* insert */ ceph_decode_need(p, end, pglen*sizeof(u32), bad); @@ -903,17 +907,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, pglen); + } else { + /* remove */ + __remove_pg_mapping(&map->pg_temp, pgid); } } - while (rbp) { - struct ceph_pg_mapping *cur = - rb_entry(rbp, struct ceph_pg_mapping, node); - - rbp = rb_next(rbp); - dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid); - rb_erase(&cur->node, &map->pg_temp); - kfree(cur); - } /* ignore the rest */ *p = end; @@ -1046,10 +1044,25 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned poolid, ps, pps; + unsigned poolid, ps, pps, t; int preferred; + poolid = le32_to_cpu(pgid.pool); + ps = le16_to_cpu(pgid.ps); + preferred = (s16)le16_to_cpu(pgid.preferred); + + pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + if (!pool) + return NULL; + /* pg_temp? */ + if (preferred >= 0) + t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), + pool->lpgp_num_mask); + else + t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), + pool->pgp_num_mask); + pgid.ps = cpu_to_le16(t); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len; @@ -1057,18 +1070,6 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* crush */ - poolid = le32_to_cpu(pgid.pool); - ps = le16_to_cpu(pgid.ps); - preferred = (s16)le16_to_cpu(pgid.preferred); - - /* don't forcefeed bad device ids to crush */ - if (preferred >= osdmap->max_osd || - preferred >= osdmap->crush->max_devices) - preferred = -1; - - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); - if (!pool) - return NULL; ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, pool->v.type, pool->v.size); if (ruleno < 0) { @@ -1078,6 +1079,11 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, return NULL; } + /* don't forcefeed bad device ids to crush */ + if (preferred >= osdmap->max_osd || + preferred >= osdmap->crush->max_devices) + preferred = -1; + if (preferred >= 0) pps = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpgp_num), |