diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 120 |
1 files changed, 66 insertions, 54 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 0c1d917..b24e2d3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -483,22 +483,26 @@ void ceph_mdsc_release_request(struct kref *kref) destroy_reply_info(&req->r_reply_info); } if (req->r_inode) { - ceph_put_cap_refs(ceph_inode(req->r_inode), - CEPH_CAP_PIN); + ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); iput(req->r_inode); } if (req->r_locked_dir) - ceph_put_cap_refs(ceph_inode(req->r_locked_dir), - CEPH_CAP_PIN); + ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); if (req->r_target_inode) iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) { - ceph_put_cap_refs( - ceph_inode(req->r_old_dentry->d_parent->d_inode), - CEPH_CAP_PIN); + /* + * track (and drop pins for) r_old_dentry_dir + * separately, since r_old_dentry's d_parent may have + * changed between the dir mutex being dropped and + * this request being freed. + */ + ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), + CEPH_CAP_PIN); dput(req->r_old_dentry); + iput(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); @@ -604,6 +608,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, req->r_unsafe_dir = NULL; } + complete_all(&req->r_safe_completion); + ceph_mdsc_put_request(req); } @@ -615,8 +621,14 @@ static void __unregister_request(struct ceph_mds_client *mdsc, * * Called under mdsc->mutex. */ -struct dentry *get_nonsnap_parent(struct dentry *dentry) +static struct dentry *get_nonsnap_parent(struct dentry *dentry) { + /* + * we don't need to worry about protecting the d_parent access + * here because we never renaming inside the snapped namespace + * except to resplice to another snapdir, and either the old or new + * result is a valid result. + */ while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) dentry = dentry->d_parent; return dentry; @@ -652,7 +664,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, if (req->r_inode) { inode = req->r_inode; } else if (req->r_dentry) { - struct inode *dir = req->r_dentry->d_parent->d_inode; + /* ignore race with rename; old or new d_parent is okay */ + struct dentry *parent = req->r_dentry->d_parent; + struct inode *dir = parent->d_inode; if (dir->i_sb != mdsc->fsc->sb) { /* not this fs! */ @@ -660,8 +674,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } else if (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests * based on parent dir inode */ - struct dentry *dn = - get_nonsnap_parent(req->r_dentry->d_parent); + struct dentry *dn = get_nonsnap_parent(parent); inode = dn->d_inode; dout("__choose_mds using nonsnap parent %p\n", inode); } else if (req->r_dentry->d_inode) { @@ -670,7 +683,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } else { /* dir + name */ inode = dir; - hash = ceph_dentry_hash(req->r_dentry); + hash = ceph_dentry_hash(dir, req->r_dentry); is_hash = true; } } @@ -721,21 +734,21 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = NULL; if (mode == USE_AUTH_MDS) cap = ci->i_auth_cap; if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); goto random; } mds = cap->session->s_mds; dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", inode, ceph_vinop(inode), mds, cap == ci->i_auth_cap ? "auth " : "", cap); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return mds; random: @@ -753,7 +766,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) struct ceph_msg *msg; struct ceph_mds_session_head *h; - msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, + false); if (!msg) { pr_err("create_session_msg ENOMEM creating msg\n"); return NULL; @@ -939,7 +953,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap); if (!__ceph_is_any_real_caps(ci)) { struct ceph_mds_client *mdsc = @@ -972,7 +986,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); while (drop--) iput(inode); return 0; @@ -1003,10 +1017,10 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, wake_up_all(&ci->i_cap_wq); if (arg) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_wanted_max_size = 0; ci->i_requested_max_size = 0; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } return 0; } @@ -1139,7 +1153,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) if (session->s_trim_caps <= 0) return -1; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); oissued = __ceph_caps_issued_other(ci, cap); @@ -1158,7 +1172,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) __ceph_remove_cap(cap); } else { /* try to drop referring dentries */ - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); d_prune_aliases(inode); dout("trim_caps_cb %p cap %p pruned, count now %d\n", inode, cap, atomic_read(&inode->i_count)); @@ -1166,7 +1180,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) } out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return 0; } @@ -1229,7 +1243,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, while (session->s_num_cap_releases < session->s_nr_caps + extra) { spin_unlock(&session->s_cap_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS); + GFP_NOFS, false); if (!msg) goto out_unlocked; dout("add_cap_releases %p msg %p now %d\n", session, msg, @@ -1284,7 +1298,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) i_flushing_item); struct inode *inode = &ci->vfs_inode; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_cap_flush_seq <= want_flush_seq) { dout("check_cap_flush still flushing %p " "seq %lld <= %lld to mds%d\n", inode, @@ -1292,7 +1306,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) session->s_mds); ret = 0; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); @@ -1483,6 +1497,7 @@ retry: pos, temp); } else if (stop_on_nosnap && inode && ceph_snap(inode) == CEPH_NOSNAP) { + spin_unlock(&temp->d_lock); break; } else { pos -= temp->d_name.len; @@ -1584,7 +1599,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); - } else if (rpath) { + } else if (rpath || rino) { *ino = rino; *ppath = rpath; *pathlen = strlen(rpath); @@ -1641,7 +1656,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); goto out_free2; @@ -1802,8 +1817,11 @@ static int __do_request(struct ceph_mds_client *mdsc, int mds = -1; int err = -EAGAIN; - if (req->r_err || req->r_got_result) + if (req->r_err || req->r_got_result) { + if (req->r_aborted) + __unregister_request(mdsc, req); goto out; + } if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { @@ -1931,9 +1949,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, if (req->r_locked_dir) ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); if (req->r_old_dentry) - ceph_get_cap_refs( - ceph_inode(req->r_old_dentry->d_parent->d_inode), - CEPH_CAP_PIN); + ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), + CEPH_CAP_PIN); /* issue */ mutex_lock(&mdsc->mutex); @@ -1991,7 +2008,7 @@ out: } /* - * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS + * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS * namespace request. */ void ceph_invalidate_dir_request(struct ceph_mds_request *req) @@ -1999,11 +2016,11 @@ void ceph_invalidate_dir_request(struct ceph_mds_request *req) struct inode *inode = req->r_locked_dir; struct ceph_inode_info *ci = ceph_inode(inode); - dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); - spin_lock(&inode->i_lock); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; + dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode); + spin_lock(&ci->i_ceph_lock); + ceph_dir_clear_complete(inode); ci->i_release_count++; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (req->r_dentry) ceph_invalidate_dentry_lease(req->r_dentry); @@ -2112,7 +2129,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (head->safe) { req->r_got_safe = true; __unregister_request(mdsc, req); - complete_all(&req->r_safe_completion); if (req->r_got_unsafe) { /* @@ -2411,7 +2427,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, if (err) goto out_free; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ @@ -2434,7 +2450,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, rec.v1.pathbase = cpu_to_le64(pathbase); reclen = sizeof(rec.v1); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (recon_state->flock) { int num_fcntl_locks, num_flock_locks; @@ -2508,7 +2524,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, goto fail_nopagelist; ceph_pagelist_init(pagelist); - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); if (!reply) goto fail_nomsg; @@ -2714,7 +2730,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_mds_lease *h = msg->front.iov_base; u32 seq; struct ceph_vino vino; - int mask; struct qstr dname; int release = 0; @@ -2725,7 +2740,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, goto bad; vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; - mask = le16_to_cpu(h->mask); seq = le32_to_cpu(h->seq); dname.name = (void *)h + sizeof(*h) + sizeof(u32); dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); @@ -2737,8 +2751,8 @@ static void handle_lease(struct ceph_mds_client *mdsc, /* lookup inode */ inode = ceph_find_inode(sb, vino); - dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", - ceph_lease_op_name(h->action), mask, vino.ino, inode, + dout("handle_lease %s, ino %llx %p %.*s\n", + ceph_lease_op_name(h->action), vino.ino, inode, dname.len, dname.name); if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); @@ -2823,12 +2837,11 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, dnamelen = dentry->d_name.len; len += dnamelen; - msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); if (!msg) return; lease = msg->front.iov_base; lease->action = action; - lease->mask = cpu_to_le16(1); lease->ino = cpu_to_le64(ceph_vino(inode).ino); lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->seq = cpu_to_le32(seq); @@ -2850,7 +2863,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, * Pass @inode always, @dentry is optional. */ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, - struct dentry *dentry, int mask) + struct dentry *dentry) { struct ceph_dentry_info *di; struct ceph_mds_session *session; @@ -2858,7 +2871,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, BUG_ON(inode == NULL); BUG_ON(dentry == NULL); - BUG_ON(mask == 0); /* is dentry lease valid? */ spin_lock(&dentry->d_lock); @@ -2868,8 +2880,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, di->lease_gen != di->lease_session->s_cap_gen || !time_before(jiffies, dentry->d_time)) { dout("lease_release inode %p dentry %p -- " - "no lease on %d\n", - inode, dentry, mask); + "no lease\n", + inode, dentry); spin_unlock(&dentry->d_lock); return; } @@ -2880,8 +2892,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, __ceph_mdsc_drop_dentry_lease(dentry); spin_unlock(&dentry->d_lock); - dout("lease_release inode %p dentry %p mask %d to mds%d\n", - inode, dentry, mask, session->s_mds); + dout("lease_release inode %p dentry %p to mds%d\n", + inode, dentry, session->s_mds); ceph_mdsc_lease_send_msg(session, inode, dentry, CEPH_MDS_LEASE_RELEASE, seq); ceph_put_mds_session(session); @@ -3147,7 +3159,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) /* * true if all sessions are closed, or we force unmount */ -bool done_closing_sessions(struct ceph_mds_client *mdsc) +static bool done_closing_sessions(struct ceph_mds_client *mdsc) { int i, n = 0; |