diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/addr.c | 201 | ||||
-rw-r--r-- | fs/ceph/caps.c | 205 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 2 | ||||
-rw-r--r-- | fs/ceph/dir.c | 233 | ||||
-rw-r--r-- | fs/ceph/export.c | 24 | ||||
-rw-r--r-- | fs/ceph/file.c | 100 | ||||
-rw-r--r-- | fs/ceph/inode.c | 174 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 53 | ||||
-rw-r--r-- | fs/ceph/ioctl.h | 56 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 120 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 5 | ||||
-rw-r--r-- | fs/ceph/snap.c | 70 | ||||
-rw-r--r-- | fs/ceph/super.c | 99 | ||||
-rw-r--r-- | fs/ceph/super.h | 98 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 50 |
15 files changed, 924 insertions, 566 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5a3953d..173b1d2 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -87,7 +87,7 @@ static int ceph_set_page_dirty(struct page *page) snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); /* dirty the head */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_head_snapc == NULL) ci->i_head_snapc = ceph_get_snap_context(snapc); ++ci->i_wrbuffer_ref_head; @@ -100,7 +100,7 @@ static int ceph_set_page_dirty(struct page *page) ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, snapc, snapc->seq, snapc->num_snaps); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); /* now adjust page */ spin_lock_irq(&mapping->tree_lock); @@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page) } /* - * Build a vector of contiguous pages from the provided page list. + * Finish an async read(ahead) op. */ -static struct page **page_vector_from_list(struct list_head *page_list, - unsigned *nr_pages) +static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) { - struct page **pages; - struct page *page; - int next_index, contig_pages = 0; + struct inode *inode = req->r_inode; + struct ceph_osd_reply_head *replyhead; + int rc, bytes; + int i; - /* build page vector */ - pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); - if (!pages) - return ERR_PTR(-ENOMEM); + /* parse reply */ + replyhead = msg->front.iov_base; + WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); + rc = le32_to_cpu(replyhead->result); + bytes = le32_to_cpu(msg->hdr.data_len); - BUG_ON(list_empty(page_list)); - next_index = list_entry(page_list->prev, struct page, lru)->index; - list_for_each_entry_reverse(page, page_list, lru) { - if (page->index == next_index) { - dout("readpages page %d %p\n", contig_pages, page); - pages[contig_pages] = page; - contig_pages++; - next_index++; - } else { - break; + dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); + + /* unlock all pages, zeroing any data we didn't read */ + for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) { + struct page *page = req->r_pages[i]; + + if (bytes < (int)PAGE_CACHE_SIZE) { + /* zero (remainder of) page */ + int s = bytes < 0 ? 0 : bytes; + zero_user_segment(page, s, PAGE_CACHE_SIZE); } + dout("finish_read %p uptodate %p idx %lu\n", inode, page, + page->index); + flush_dcache_page(page); + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); } - *nr_pages = contig_pages; - return pages; + kfree(req->r_pages); } /* - * Read multiple pages. Leave pages we don't read + unlock in page_list; - * the caller (VM) cleans them up. + * start an async read(ahead) operation. return nr_pages we submitted + * a read for on success, or negative error code. */ -static int ceph_readpages(struct file *file, struct address_space *mapping, - struct list_head *page_list, unsigned nr_pages) +static int start_read(struct inode *inode, struct list_head *page_list, int max) { - struct inode *inode = file->f_dentry->d_inode; - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; - int rc = 0; - struct page **pages; - loff_t offset; + struct ceph_inode_info *ci = ceph_inode(inode); + struct page *page = list_entry(page_list->prev, struct page, lru); + struct ceph_osd_request *req; + u64 off; u64 len; + int i; + struct page **pages; + pgoff_t next_index; + int nr_pages = 0; + int ret; - dout("readpages %p file %p nr_pages %d\n", - inode, file, nr_pages); - - pages = page_vector_from_list(page_list, &nr_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); + off = page->index << PAGE_CACHE_SHIFT; - /* guess read extent */ - offset = pages[0]->index << PAGE_CACHE_SHIFT; + /* count pages */ + next_index = page->index; + list_for_each_entry_reverse(page, page_list, lru) { + if (page->index != next_index) + break; + nr_pages++; + next_index++; + if (max && nr_pages == max) + break; + } len = nr_pages << PAGE_CACHE_SHIFT; - rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, - offset, &len, - ci->i_truncate_seq, ci->i_truncate_size, - pages, nr_pages, 0); - if (rc == -ENOENT) - rc = 0; - if (rc < 0) - goto out; - - for (; !list_empty(page_list) && len > 0; - rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { - struct page *page = - list_entry(page_list->prev, struct page, lru); + dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, + off, len); + + req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), + off, &len, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, 0, + ci->i_truncate_seq, ci->i_truncate_size, + NULL, false, 1, 0); + if (!req) + return -ENOMEM; + /* build page vector */ + nr_pages = len >> PAGE_CACHE_SHIFT; + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + ret = -ENOMEM; + if (!pages) + goto out; + for (i = 0; i < nr_pages; ++i) { + page = list_entry(page_list->prev, struct page, lru); + BUG_ON(PageLocked(page)); list_del(&page->lru); - - if (rc < (int)PAGE_CACHE_SIZE) { - /* zero (remainder of) page */ - int s = rc < 0 ? 0 : rc; - zero_user_segment(page, s, PAGE_CACHE_SIZE); - } - - if (add_to_page_cache_lru(page, mapping, page->index, + + dout("start_read %p adding %p idx %lu\n", inode, page, + page->index); + if (add_to_page_cache_lru(page, &inode->i_data, page->index, GFP_NOFS)) { page_cache_release(page); - dout("readpages %p add_to_page_cache failed %p\n", + dout("start_read %p add_to_page_cache failed %p\n", inode, page); - continue; + nr_pages = i; + goto out_pages; } - dout("readpages %p adding %p idx %lu\n", inode, page, - page->index); - flush_dcache_page(page); - SetPageUptodate(page); - unlock_page(page); - page_cache_release(page); + pages[i] = page; } - rc = 0; + req->r_pages = pages; + req->r_num_pages = nr_pages; + req->r_callback = finish_read; + req->r_inode = inode; + + dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); + ret = ceph_osdc_start_request(osdc, req, false); + if (ret < 0) + goto out_pages; + ceph_osdc_put_request(req); + return nr_pages; +out_pages: + ceph_release_page_vector(pages, nr_pages); +out: + ceph_osdc_put_request(req); + return ret; +} + + +/* + * Read multiple pages. Leave pages we don't read + unlock in page_list; + * the caller (VM) cleans them up. + */ +static int ceph_readpages(struct file *file, struct address_space *mapping, + struct list_head *page_list, unsigned nr_pages) +{ + struct inode *inode = file->f_dentry->d_inode; + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + int rc = 0; + int max = 0; + + if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) + max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) + >> PAGE_SHIFT; + + dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages, + max); + while (!list_empty(page_list)) { + rc = start_read(inode, page_list, max); + if (rc < 0) + goto out; + BUG_ON(rc == 0); + } out: - kfree(pages); + dout("readpages %p file %p ret %d\n", inode, file, rc); return rc; } @@ -338,7 +391,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, struct ceph_snap_context *snapc = NULL; struct ceph_cap_snap *capsnap = NULL; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, capsnap->context, capsnap->dirty_pages); @@ -354,7 +407,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, dout(" head snapc %p has %d dirty pages\n", snapc, ci->i_wrbuffer_ref_head); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return snapc; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f605753..8b53193 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -309,7 +309,7 @@ void ceph_reservation_status(struct ceph_fs_client *fsc, /* * Find ceph_cap for given mds, if any. * - * Called with i_lock held. + * Called with i_ceph_lock held. */ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) { @@ -332,9 +332,9 @@ struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) { struct ceph_cap *cap; - spin_lock(&ci->vfs_inode.i_lock); + spin_lock(&ci->i_ceph_lock); cap = __get_cap_for_mds(ci, mds); - spin_unlock(&ci->vfs_inode.i_lock); + spin_unlock(&ci->i_ceph_lock); return cap; } @@ -361,15 +361,16 @@ static int __ceph_get_cap_mds(struct ceph_inode_info *ci) int ceph_get_cap_mds(struct inode *inode) { + struct ceph_inode_info *ci = ceph_inode(inode); int mds; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); mds = __ceph_get_cap_mds(ceph_inode(inode)); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return mds; } /* - * Called under i_lock. + * Called under i_ceph_lock. */ static void __insert_cap_node(struct ceph_inode_info *ci, struct ceph_cap *new) @@ -415,7 +416,7 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc, * * If I_FLUSH is set, leave the inode at the front of the list. * - * Caller holds i_lock + * Caller holds i_ceph_lock * -> we take mdsc->cap_delay_lock */ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, @@ -457,7 +458,7 @@ static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc, /* * Cancel delayed work on cap. * - * Caller must hold i_lock. + * Caller must hold i_ceph_lock. */ static void __cap_delay_cancel(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) @@ -487,17 +488,15 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, ci->i_rdcache_gen++; /* - * if we are newly issued FILE_SHARED, clear I_COMPLETE; we + * if we are newly issued FILE_SHARED, clear D_COMPLETE; we * don't know what happened to this directory while we didn't * have the cap. */ if ((issued & CEPH_CAP_FILE_SHARED) && (had & CEPH_CAP_FILE_SHARED) == 0) { ci->i_shared_gen++; - if (S_ISDIR(ci->vfs_inode.i_mode)) { - dout(" marking %p NOT complete\n", &ci->vfs_inode); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; - } + if (S_ISDIR(ci->vfs_inode.i_mode)) + ceph_dir_clear_complete(&ci->vfs_inode); } } @@ -534,14 +533,14 @@ int ceph_add_cap(struct inode *inode, wanted |= ceph_caps_for_mode(fmode); retry: - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = __get_cap_for_mds(ci, mds); if (!cap) { if (new_cap) { cap = new_cap; new_cap = NULL; } else { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); new_cap = get_cap(mdsc, caps_reservation); if (new_cap == NULL) return -ENOMEM; @@ -627,7 +626,7 @@ retry: if (fmode >= 0) __ceph_get_fmode(ci, fmode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); wake_up_all(&ci->i_cap_wq); return 0; } @@ -794,7 +793,7 @@ int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) struct rb_node *p; int ret = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); if (__cap_is_valid(cap) && @@ -803,7 +802,7 @@ int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) break; } } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("ceph_caps_revoking %p %s = %d\n", inode, ceph_cap_string(mask), ret); return ret; @@ -857,7 +856,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) } /* - * called under i_lock + * called under i_ceph_lock */ static int __ceph_is_any_caps(struct ceph_inode_info *ci) { @@ -867,7 +866,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) /* * Remove a cap. Take steps to deal with a racing iterate_session_caps. * - * caller should hold i_lock. + * caller should hold i_ceph_lock. * caller will not hold session s_mutex if called from destroy_inode. */ void __ceph_remove_cap(struct ceph_cap *cap) @@ -945,7 +944,7 @@ static int send_cap_msg(struct ceph_mds_session *session, seq, issue_seq, mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); if (!msg) return -ENOMEM; @@ -1030,7 +1029,7 @@ static void __queue_cap_release(struct ceph_mds_session *session, /* * Queue cap releases when an inode is dropped from our cache. Since - * inode is about to be destroyed, there is no need for i_lock. + * inode is about to be destroyed, there is no need for i_ceph_lock. */ void ceph_queue_caps_release(struct inode *inode) { @@ -1051,7 +1050,7 @@ void ceph_queue_caps_release(struct inode *inode) /* * Send a cap msg on the given inode. Update our caps state, then - * drop i_lock and send the message. + * drop i_ceph_lock and send the message. * * Make note of max_size reported/requested from mds, revoked caps * that have now been implemented. @@ -1063,13 +1062,13 @@ void ceph_queue_caps_release(struct inode *inode) * Return non-zero if delayed release, or we experienced an error * such that the caller should requeue + retry later. * - * called with i_lock, then drops it. + * called with i_ceph_lock, then drops it. * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, int op, int used, int want, int retain, int flushing, unsigned *pflush_tid) - __releases(cap->ci->vfs_inode->i_lock) + __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; @@ -1172,7 +1171,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, xattr_version = ci->i_xattrs.version; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, @@ -1200,13 +1199,13 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * Unless @again is true, skip cap_snaps that were already sent to * the MDS (i.e., during this session). * - * Called under i_lock. Takes s_mutex as needed. + * Called under i_ceph_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session **psession, int again) - __releases(ci->vfs_inode->i_lock) - __acquires(ci->vfs_inode->i_lock) + __releases(ci->i_ceph_lock) + __acquires(ci->i_ceph_lock) { struct inode *inode = &ci->vfs_inode; int mds; @@ -1263,7 +1262,7 @@ retry: session = NULL; } if (!session) { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); mutex_lock(&mdsc->mutex); session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); @@ -1277,7 +1276,7 @@ retry: * deletion or migration. retry, and we'll * get a better @mds value next time. */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); goto retry; } @@ -1287,7 +1286,7 @@ retry: list_del_init(&capsnap->flushing_item); list_add_tail(&capsnap->flushing_item, &session->s_cap_snaps_flushing); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", inode, capsnap, capsnap->follows, capsnap->flush_tid); @@ -1304,7 +1303,7 @@ retry: next_follows = capsnap->follows + 1; ceph_put_cap_snap(capsnap); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); goto retry; } @@ -1324,11 +1323,9 @@ out: static void ceph_flush_snaps(struct ceph_inode_info *ci) { - struct inode *inode = &ci->vfs_inode; - - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __ceph_flush_snaps(ci, NULL, 0); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } /* @@ -1375,7 +1372,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. * - * Called under i_lock. + * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session) @@ -1423,9 +1420,9 @@ static int try_nonblocking_invalidate(struct inode *inode) struct ceph_inode_info *ci = ceph_inode(inode); u32 invalidating_gen = ci->i_rdcache_gen; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); invalidate_mapping_pages(&inode->i_data, 0, -1); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (inode->i_data.nrpages == 0 && invalidating_gen == ci->i_rdcache_gen) { @@ -1472,7 +1469,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, if (mdsc->stopping) is_delayed = 1; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; @@ -1482,7 +1479,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, __ceph_flush_snaps(ci, &session, 0); goto retry_locked; retry: - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); retry_locked: file_wanted = __ceph_caps_file_wanted(ci); used = __ceph_caps_used(ci); @@ -1636,7 +1633,7 @@ ack: if (mutex_trylock(&session->s_mutex) == 0) { dout("inverting session/ino locks on %p\n", session); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (took_snap_rwsem) { up_read(&mdsc->snap_rwsem); took_snap_rwsem = 0; @@ -1650,7 +1647,7 @@ ack: if (down_read_trylock(&mdsc->snap_rwsem) == 0) { dout("inverting snap/in locks on %p\n", inode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); down_read(&mdsc->snap_rwsem); took_snap_rwsem = 1; goto retry; @@ -1666,10 +1663,10 @@ ack: mds = cap->mds; /* remember mds, so we don't repeat */ sent++; - /* __send_cap drops i_lock */ + /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, retain, flushing, NULL); - goto retry; /* retake i_lock and restart our cap scan. */ + goto retry; /* retake i_ceph_lock and restart our cap scan. */ } /* @@ -1683,7 +1680,7 @@ ack: else if (!is_delayed || force_requeue) __cap_delay_requeue(mdsc, ci); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (queue_invalidate) ceph_queue_invalidate(inode); @@ -1706,7 +1703,7 @@ static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, int flushing = 0; retry: - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); goto out; @@ -1718,7 +1715,7 @@ retry: int delayed; if (!session) { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); session = cap->session; mutex_lock(&session->s_mutex); goto retry; @@ -1729,18 +1726,18 @@ retry: flushing = __mark_caps_flushing(inode, session); - /* __send_cap drops i_lock */ + /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, cap->issued | cap->implemented, flushing, flush_tid); if (!delayed) goto out_unlocked; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); } out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); out_unlocked: if (session && unlock_session) mutex_unlock(&session->s_mutex); @@ -1755,7 +1752,7 @@ static int caps_are_flushed(struct inode *inode, unsigned tid) struct ceph_inode_info *ci = ceph_inode(inode); int i, ret = 1; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); for (i = 0; i < CEPH_CAP_BITS; i++) if ((ci->i_flushing_caps & (1 << i)) && ci->i_cap_flush_tid[i] <= tid) { @@ -1763,7 +1760,7 @@ static int caps_are_flushed(struct inode *inode, unsigned tid) ret = 0; break; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return ret; } @@ -1811,7 +1808,7 @@ out: spin_unlock(&ci->i_unsafe_lock); } -int ceph_fsync(struct file *file, int datasync) +int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); @@ -1822,9 +1819,10 @@ int ceph_fsync(struct file *file, int datasync) dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); sync_write_wait(inode); - ret = filemap_write_and_wait(inode->i_mapping); + ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) return ret; + mutex_lock(&inode->i_mutex); dirty = try_flush_caps(inode, NULL, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); @@ -1841,6 +1839,7 @@ int ceph_fsync(struct file *file, int datasync) } dout("fsync %p%s done\n", inode, datasync ? " datasync" : ""); + mutex_unlock(&inode->i_mutex); return ret; } @@ -1868,10 +1867,10 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (__ceph_caps_dirty(ci)) __cap_delay_requeue_front(mdsc, ci); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } return err; } @@ -1894,7 +1893,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; if (cap && cap->session == session) { dout("kick_flushing_caps %p cap %p capsnap %p\n", inode, @@ -1904,7 +1903,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, pr_err("%p auth cap %p not mds%d ???\n", inode, cap, session->s_mds); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } @@ -1921,7 +1920,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_cap *cap; int delayed = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; if (cap && cap->session == session) { dout("kick_flushing_caps %p cap %p %s\n", inode, @@ -1932,14 +1931,14 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, cap->issued | cap->implemented, ci->i_flushing_caps, NULL); if (delayed) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } else { pr_err("%p auth cap %p not mds%d ???\n", inode, cap, session->s_mds); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } } @@ -1952,7 +1951,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, struct ceph_cap *cap; int delayed = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); @@ -1964,12 +1963,12 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, cap->issued | cap->implemented, ci->i_flushing_caps, NULL); if (delayed) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } else { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } @@ -1978,7 +1977,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, * Take references to capabilities we hold, so that we don't release * them to the MDS prematurely. * - * Protected by i_lock. + * Protected by i_ceph_lock. */ static void __take_cap_refs(struct ceph_inode_info *ci, int got) { @@ -2016,7 +2015,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ file_wanted = __ceph_caps_file_wanted(ci); @@ -2077,7 +2076,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ceph_cap_string(have), ceph_cap_string(need)); } out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("get_cap_refs %p ret %d got %s\n", inode, ret, ceph_cap_string(*got)); return ret; @@ -2094,7 +2093,7 @@ static void check_max_size(struct inode *inode, loff_t endoff) int check = 0; /* do we need to explicitly request a larger max_size? */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if ((endoff >= ci->i_max_size || endoff > (inode->i_size << 1)) && endoff > ci->i_wanted_max_size) { @@ -2103,7 +2102,7 @@ static void check_max_size(struct inode *inode, loff_t endoff) ci->i_wanted_max_size = endoff; check = 1; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (check) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } @@ -2140,9 +2139,9 @@ retry: */ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) { - spin_lock(&ci->vfs_inode.i_lock); + spin_lock(&ci->i_ceph_lock); __take_cap_refs(ci, caps); - spin_unlock(&ci->vfs_inode.i_lock); + spin_unlock(&ci->i_ceph_lock); } /* @@ -2160,7 +2159,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) int last = 0, put = 0, flushsnaps = 0, wake = 0; struct ceph_cap_snap *capsnap; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (had & CEPH_CAP_PIN) --ci->i_pin_ref; if (had & CEPH_CAP_FILE_RD) @@ -2193,7 +2192,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) } } } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), last ? " last" : "", put ? " put" : ""); @@ -2225,7 +2224,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, int found = 0; struct ceph_cap_snap *capsnap = NULL; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_wrbuffer_ref -= nr; last = !ci->i_wrbuffer_ref; @@ -2274,7 +2273,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); @@ -2291,7 +2290,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * - * caller holds s_mutex and i_lock, we drop both. + * caller holds s_mutex and i_ceph_lock, we drop both. * * return value: * 0 - ok @@ -2302,7 +2301,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, struct ceph_mds_session *session, struct ceph_cap *cap, struct ceph_buffer *xattr_buf) - __releases(inode->i_lock) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2361,7 +2360,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } if ((issued & CEPH_CAP_LINK_EXCL) == 0) - inode->i_nlink = le32_to_cpu(grant->nlink); + set_nlink(inode, le32_to_cpu(grant->nlink)); if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len); @@ -2453,7 +2452,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } BUG_ON(cap->issued & ~cap->implemented); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (writeback) /* * queue inode for writeback: we can't actually call @@ -2483,7 +2482,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_caps *m, struct ceph_mds_session *session, struct ceph_cap *cap) - __releases(inode->i_lock) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; @@ -2539,7 +2538,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, wake_up_all(&ci->i_cap_wq); out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (drop) iput(inode); } @@ -2562,7 +2561,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", inode, ci, session->s_mds, follows); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->follows == follows) { if (capsnap->flush_tid != flush_tid) { @@ -2585,7 +2584,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, capsnap, capsnap->follows); } } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (drop) iput(inode); } @@ -2598,7 +2597,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, static void handle_cap_trunc(struct inode *inode, struct ceph_mds_caps *trunc, struct ceph_mds_session *session) - __releases(inode->i_lock) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2617,7 +2616,7 @@ static void handle_cap_trunc(struct inode *inode, inode, mds, seq, truncate_size, truncate_seq); queue_trunc = ceph_fill_file_size(inode, issued, truncate_seq, truncate_size, size); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (queue_trunc) ceph_queue_vmtruncate(inode); @@ -2646,7 +2645,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", inode, ci, mds, mseq); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); /* make sure we haven't seen a higher mseq */ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { @@ -2690,7 +2689,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, } /* else, we already released it */ - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } /* @@ -2745,9 +2744,9 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, up_read(&mdsc->snap_rwsem); /* make sure we re-request max_size, if necessary */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_requested_max_size = 0; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } /* @@ -2762,6 +2761,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_mds_client *mdsc = session->s_mdsc; struct super_block *sb = mdsc->fsc->sb; struct inode *inode; + struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; int mds = session->s_mds; @@ -2815,6 +2815,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, /* lookup ino */ inode = ceph_find_inode(sb, vino); + ci = ceph_inode(inode); dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, vino.snap, inode); if (!inode) { @@ -2844,16 +2845,16 @@ void ceph_handle_caps(struct ceph_mds_session *session, } /* the rest require a cap */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = __get_cap_for_mds(ceph_inode(inode), mds); if (!cap) { dout(" no cap on %p ino %llx.%llx from mds%d\n", inode, ceph_ino(inode), ceph_snap(inode), mds); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); goto flush_cap_releases; } - /* note that each of these drops i_lock for us */ + /* note that each of these drops i_ceph_lock for us */ switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT: @@ -2869,7 +2870,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, break; default: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); pr_err("ceph_handle_caps: unknown cap op %d %s\n", op, ceph_cap_op_name(op)); } @@ -2962,13 +2963,13 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) struct inode *inode = &ci->vfs_inode; int last = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode, ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1); BUG_ON(ci->i_nr_by_mode[fmode] == 0); if (--ci->i_nr_by_mode[fmode] == 0) last++; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (last && ci->i_vino.snap == CEPH_NOSNAP) ceph_check_caps(ci, 0, NULL); @@ -2991,7 +2992,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, int used, dirty; int ret = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); used = __ceph_caps_used(ci); dirty = __ceph_caps_dirty(ci); @@ -3046,7 +3047,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, inode, cap, ceph_cap_string(cap->issued)); } } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return ret; } @@ -3061,7 +3062,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry, /* * force an record for the directory caps if we have a dentry lease. - * this is racy (can't take i_lock and d_lock together), but it + * this is racy (can't take i_ceph_lock and d_lock together), but it * doesn't have to be perfect; the mds will revoke anything we don't * release. */ diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 0dba691..fb962ef 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -102,7 +102,7 @@ static int mdsc_show(struct seq_file *s, void *p) path = NULL; spin_lock(&req->r_old_dentry->d_lock); seq_printf(s, " #%llx/%.*s (%s)", - ceph_ino(req->r_old_dentry->d_parent->d_inode), + ceph_ino(req->r_old_dentry_dir), req->r_old_dentry->d_name.len, req->r_old_dentry->d_name.name, path ? path : ""); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index ef8f08c..7903e62 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -40,14 +40,6 @@ int ceph_init_dentry(struct dentry *dentry) if (dentry->d_fsdata) return 0; - if (dentry->d_parent == NULL || /* nfs fh_to_dentry */ - ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) - d_set_d_op(dentry, &ceph_dentry_ops); - else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR) - d_set_d_op(dentry, &ceph_snapdir_dentry_ops); - else - d_set_d_op(dentry, &ceph_snap_dentry_ops); - di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO); if (!di) return -ENOMEM; /* oh well */ @@ -58,16 +50,42 @@ int ceph_init_dentry(struct dentry *dentry) kmem_cache_free(ceph_dentry_cachep, di); goto out_unlock; } + + if (dentry->d_parent == NULL || /* nfs fh_to_dentry */ + ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) + d_set_d_op(dentry, &ceph_dentry_ops); + else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR) + d_set_d_op(dentry, &ceph_snapdir_dentry_ops); + else + d_set_d_op(dentry, &ceph_snap_dentry_ops); + di->dentry = dentry; di->lease_session = NULL; - dentry->d_fsdata = di; dentry->d_time = jiffies; + /* avoid reordering d_fsdata setup so that the check above is safe */ + smp_mb(); + dentry->d_fsdata = di; ceph_dentry_lru_add(dentry); out_unlock: spin_unlock(&dentry->d_lock); return 0; } +struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry) +{ + struct inode *inode = NULL; + + if (!dentry) + return NULL; + + spin_lock(&dentry->d_lock); + if (dentry->d_parent) { + inode = dentry->d_parent->d_inode; + ihold(inode); + } + spin_unlock(&dentry->d_lock); + return inode; +} /* @@ -86,11 +104,11 @@ static unsigned fpos_off(loff_t p) /* * When possible, we try to satisfy a readdir by peeking at the * dcache. We make this work by carefully ordering dentries on - * d_u.d_child when we initially get results back from the MDS, and + * d_child when we initially get results back from the MDS, and * falling back to a "normal" sync readdir if any dentries in the dir * are dropped. * - * I_COMPLETE tells indicates we have all dentries in the dir. It is + * D_COMPLETE tells indicates we have all dentries in the dir. It is * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by * the MDS if/when the directory is modified). */ @@ -122,18 +140,18 @@ static int __dcache_readdir(struct file *filp, p = parent->d_subdirs.prev; dout(" initial p %p/%p\n", p->prev, p->next); } else { - p = last->d_u.d_child.prev; + p = last->d_child.prev; } more: - dentry = list_entry(p, struct dentry, d_u.d_child); + dentry = list_entry(p, struct dentry, d_child); di = ceph_dentry(dentry); while (1) { dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, d_unhashed(dentry) ? "!hashed" : "hashed", parent->d_subdirs.prev, parent->d_subdirs.next); if (p == &parent->d_subdirs) { - fi->at_end = 1; + fi->flags |= CEPH_F_ATEND; goto out_unlock; } spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); @@ -148,7 +166,7 @@ more: !dentry->d_inode ? " null" : ""); spin_unlock(&dentry->d_lock); p = p->prev; - dentry = list_entry(p, struct dentry, d_u.d_child); + dentry = list_entry(p, struct dentry, d_child); di = ceph_dentry(dentry); } @@ -181,8 +199,8 @@ more: filp->f_pos++; /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_i_test(dir, CEPH_I_COMPLETE)) { - dout(" lost I_COMPLETE on %p; falling back to mds\n", dir); + if (!ceph_dir_test_complete(dir)) { + dout(" lost D_COMPLETE on %p; falling back to mds\n", dir); err = -EAGAIN; goto out; } @@ -234,7 +252,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) const int max_bytes = fsc->mount_options->max_readdir_bytes; dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); - if (fi->at_end) + if (fi->flags & CEPH_F_ATEND) return 0; /* always start with . and .. */ @@ -252,7 +270,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) off = 1; } if (filp->f_pos == 1) { - ino_t ino = filp->f_dentry->d_parent->d_inode->i_ino; + ino_t ino = parent_ino(filp->f_dentry); dout("readdir off 1 -> '..'\n"); if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1), ceph_translate_ino(inode->i_sb, ino), @@ -263,18 +281,18 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) } /* can we use the dcache? */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if ((filp->f_pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && - (ci->i_ceph_flags & CEPH_I_COMPLETE) && + ceph_dir_test_complete(inode) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); err = __dcache_readdir(filp, dirent, filldir); if (err != -EAGAIN) return err; } else { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } if (fi->dentry) { err = note_last_dentry(fi, fi->dentry->d_name.name, @@ -333,7 +351,7 @@ more: if (!req->r_did_prepopulate) { dout("readdir !did_prepopulate"); - fi->dir_release_count--; /* preclude I_COMPLETE */ + fi->dir_release_count--; /* preclude D_COMPLETE */ } /* note next offset and last dentry name */ @@ -403,20 +421,19 @@ more: dout("readdir next frag is %x\n", frag); goto more; } - fi->at_end = 1; + fi->flags |= CEPH_F_ATEND; /* * if dir_release_count still matches the dir, no dentries * were released during the whole readdir, and we should have * the complete dir contents in our cache. */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_release_count == fi->dir_release_count) { - dout(" marking %p complete\n", inode); - /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */ + ceph_dir_set_complete(inode); ci->i_max_offset = filp->f_pos; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("readdir %p filp %p done.\n", inode, filp); return 0; @@ -435,7 +452,7 @@ static void reset_readdir(struct ceph_file_info *fi) dput(fi->dentry); fi->dentry = NULL; } - fi->at_end = 0; + fi->flags &= ~CEPH_F_ATEND; } static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) @@ -446,19 +463,24 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) loff_t retval; mutex_lock(&inode->i_mutex); + retval = -EINVAL; switch (origin) { case SEEK_END: offset += inode->i_size + 2; /* FIXME */ break; case SEEK_CUR: offset += file->f_pos; + case SEEK_SET: + break; + default: + goto out; } - retval = -EINVAL; + if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) { if (offset != file->f_pos) { file->f_pos = offset; file->f_version = 0; - fi->at_end = 0; + fi->flags &= ~CEPH_F_ATEND; } retval = offset; @@ -477,26 +499,19 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) if (offset > old_offset) fi->dir_release_count--; } +out: mutex_unlock(&inode->i_mutex); return retval; } /* - * Process result of a lookup/open request. - * - * Mainly, make sure we return the final req->r_dentry (if it already - * existed) in place of the original VFS-provided dentry when they - * differ. - * - * Gracefully handle the case where the MDS replies with -ENOENT and - * no trace (which it may do, at its discretion, e.g., if it doesn't - * care to issue a lease on the negative dentry). + * Handle lookups for the hidden .snap directory. */ -struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, - struct dentry *dentry, int err) +int ceph_handle_snapdir(struct ceph_mds_request *req, + struct dentry *dentry, int err) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); - struct inode *parent = dentry->d_parent->d_inode; + struct inode *parent = dentry->d_parent->d_inode; /* we hold i_mutex */ /* .snap dir? */ if (err == -ENOENT && @@ -510,7 +525,23 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, d_add(dentry, inode); err = 0; } + return err; +} +/* + * Figure out final result of a lookup/open request. + * + * Mainly, make sure we return the final req->r_dentry (if it already + * existed) in place of the original VFS-provided dentry when they + * differ. + * + * Gracefully handle the case where the MDS replies with -ENOENT and + * no trace (which it may do, at its discretion, e.g., if it doesn't + * care to issue a lease on the negative dentry). + */ +struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, + struct dentry *dentry, int err) +{ if (err == -ENOENT) { /* no trace? */ err = 0; @@ -566,7 +597,6 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, /* open (but not create!) intent? */ if (nd && (nd->flags & LOOKUP_OPEN) && - (nd->flags & LOOKUP_CONTINUE) == 0 && /* only open last component */ !(nd->intent.open.flags & O_CREAT)) { int mode = nd->intent.open.create_mode & ~current->fs->umask; return ceph_lookup_open(dir, dentry, nd, mode, 1); @@ -577,21 +607,21 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, struct ceph_inode_info *ci = ceph_inode(dir); struct ceph_dentry_info *di = ceph_dentry(dentry); - spin_lock(&dir->i_lock); + spin_lock(&ci->i_ceph_lock); dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); if (strncmp(dentry->d_name.name, fsc->mount_options->snapdir_name, dentry->d_name.len) && !is_root_ceph_dentry(dir, dentry) && - (ci->i_ceph_flags & CEPH_I_COMPLETE) && + ceph_dir_test_complete(dir) && (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { - spin_unlock(&dir->i_lock); + spin_unlock(&ci->i_ceph_lock); dout(" dir %p complete, -ENOENT\n", dir); d_add(dentry, NULL); di->lease_shared_gen = ci->i_shared_gen; return NULL; } - spin_unlock(&dir->i_lock); + spin_unlock(&ci->i_ceph_lock); } op = ceph_snap(dir) == CEPH_SNAPDIR ? @@ -605,6 +635,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); req->r_locked_dir = dir; err = ceph_mdsc_do_request(mdsc, NULL, req); + err = ceph_handle_snapdir(req, dentry, err); dentry = ceph_finish_lookup(req, dentry, err); ceph_mdsc_put_request(req); /* will dput(dentry) */ dout("lookup result=%p\n", dentry); @@ -784,6 +815,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ + req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); req->r_locked_dir = dir; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -809,12 +841,12 @@ static int drop_caps_for_unlink(struct inode *inode) struct ceph_inode_info *ci = ceph_inode(inode); int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (inode->i_nlink == 1) { drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); ci->i_ceph_flags |= CEPH_I_NODELAY; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return drop; } @@ -882,6 +914,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req->r_dentry = dget(new_dentry); req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); + req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); req->r_locked_dir = new_dir; req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -900,7 +933,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, */ /* d_move screws up d_subdirs order */ - ceph_i_clear(new_dir, CEPH_I_COMPLETE); + ceph_dir_clear_complete(new_dir); d_move(old_dentry, new_dentry); @@ -982,10 +1015,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) struct ceph_dentry_info *di = ceph_dentry(dentry); int valid = 0; - spin_lock(&dir->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_shared_gen == di->lease_shared_gen) valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); - spin_unlock(&dir->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", dir, (unsigned)ci->i_shared_gen, dentry, (unsigned)di->lease_shared_gen, valid); @@ -997,36 +1030,38 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) */ static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd) { + int valid = 0; struct inode *dir; if (nd && nd->flags & LOOKUP_RCU) return -ECHILD; - dir = dentry->d_parent->d_inode; - dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode, ceph_dentry(dentry)->offset); + dir = ceph_get_dentry_parent_inode(dentry); + /* always trust cached snapped dentries, snapdir dentry */ if (ceph_snap(dir) != CEPH_NOSNAP) { dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode); - goto out_touch; + valid = 1; + } else if (dentry->d_inode && + ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) { + valid = 1; + } else if (dentry_lease_is_valid(dentry) || + dir_lease_is_valid(dir, dentry)) { + valid = 1; } - if (dentry->d_inode && ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) - goto out_touch; - if (dentry_lease_is_valid(dentry) || - dir_lease_is_valid(dir, dentry)) - goto out_touch; - - dout("d_revalidate %p invalid\n", dentry); - d_drop(dentry); - return 0; -out_touch: - ceph_dentry_lru_touch(dentry); - return 1; + dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); + if (valid) + ceph_dentry_lru_touch(dentry); + else + d_drop(dentry); + iput(dir); + return valid; } /* @@ -1056,7 +1091,52 @@ static int ceph_snapdir_d_revalidate(struct dentry *dentry, return 1; } +/* + * Set/clear/test dir complete flag on the dir's dentry. + */ +void ceph_dir_set_complete(struct inode *inode) +{ + /* not yet implemented */ +} + +void ceph_dir_clear_complete(struct inode *inode) +{ + /* not yet implemented */ +} +bool ceph_dir_test_complete(struct inode *inode) +{ + /* not yet implemented */ + return false; +} + +/* + * When the VFS prunes a dentry from the cache, we need to clear the + * complete flag on the parent directory. + * + * Called under dentry->d_lock. + */ +static void ceph_d_prune(struct dentry *dentry) +{ + struct ceph_dentry_info *di; + + dout("ceph_d_prune %p\n", dentry); + + /* do we have a valid parent? */ + if (!dentry->d_parent || IS_ROOT(dentry)) + return; + + /* if we are not hashed, we don't affect D_COMPLETE */ + if (d_unhashed(dentry)) + return; + + /* + * we hold d_lock, so d_parent is stable, and d_fsdata is never + * cleared until d_release + */ + di = ceph_dentry(dentry->d_parent); + clear_bit(CEPH_D_COMPLETE, &di->flags); +} /* * read() on a dir. This weird interface hack only works if mounted @@ -1113,7 +1193,8 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, * an fsync() on a dir will wait for any uncommitted directory * operations to commit. */ -static int ceph_dir_fsync(struct file *file, int datasync) +static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end, + int datasync) { struct inode *inode = file->f_path.dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); @@ -1123,6 +1204,11 @@ static int ceph_dir_fsync(struct file *file, int datasync) int ret = 0; dout("dir_fsync %p\n", inode); + ret = filemap_write_and_wait_range(inode->i_mapping, start, end); + if (ret) + return ret; + mutex_lock(&inode->i_mutex); + spin_lock(&ci->i_unsafe_lock); if (list_empty(head)) goto out; @@ -1156,6 +1242,8 @@ static int ceph_dir_fsync(struct file *file, int datasync) } while (req->r_tid < last_tid); out: spin_unlock(&ci->i_unsafe_lock); + mutex_unlock(&inode->i_mutex); + return ret; } @@ -1215,9 +1303,8 @@ void ceph_dentry_lru_del(struct dentry *dn) * Return name hash for a given dentry. This is dependent on * the parent directory's hash function. */ -unsigned ceph_dentry_hash(struct dentry *dn) +unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) { - struct inode *dir = dn->d_parent->d_inode; struct ceph_inode_info *dci = ceph_inode(dir); switch (dci->i_dir_layout.dl_dir_hash) { @@ -1263,6 +1350,7 @@ const struct inode_operations ceph_dir_iops = { const struct dentry_operations ceph_dentry_ops = { .d_revalidate = ceph_d_revalidate, .d_release = ceph_d_release, + .d_prune = ceph_d_prune, }; const struct dentry_operations ceph_snapdir_dentry_ops = { @@ -1272,4 +1360,5 @@ const struct dentry_operations ceph_snapdir_dentry_ops = { const struct dentry_operations ceph_snap_dentry_ops = { .d_release = ceph_d_release, + .d_prune = ceph_d_prune, }; diff --git a/fs/ceph/export.c b/fs/ceph/export.c index a080779..b001030 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -46,7 +46,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len, int type; struct ceph_nfs_fh *fh = (void *)rawfh; struct ceph_nfs_confh *cfh = (void *)rawfh; - struct dentry *parent = dentry->d_parent; + struct dentry *parent; struct inode *inode = dentry->d_inode; int connected_handle_length = sizeof(*cfh)/4; int handle_length = sizeof(*fh)/4; @@ -55,26 +55,33 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len, if (ceph_snap(inode) != CEPH_NOSNAP) return -EINVAL; + spin_lock(&dentry->d_lock); + parent = dget(dentry->d_parent); + spin_unlock(&dentry->d_lock); + if (*max_len >= connected_handle_length) { dout("encode_fh %p connectable\n", dentry); cfh->ino = ceph_ino(dentry->d_inode); cfh->parent_ino = ceph_ino(parent->d_inode); - cfh->parent_name_hash = ceph_dentry_hash(parent); + cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode, + dentry); *max_len = connected_handle_length; type = 2; } else if (*max_len >= handle_length) { if (connectable) { *max_len = connected_handle_length; - return 255; + type = 255; + } else { + dout("encode_fh %p\n", dentry); + fh->ino = ceph_ino(dentry->d_inode); + *max_len = handle_length; + type = 1; } - dout("encode_fh %p\n", dentry); - fh->ino = ceph_ino(dentry->d_inode); - *max_len = handle_length; - type = 1; } else { *max_len = handle_length; - return 255; + type = 255; } + dput(parent); return type; } @@ -126,7 +133,6 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, return dentry; } err = ceph_init_dentry(dentry); - if (err < 0) { iput(inode); return ERR_PTR(err); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 4698a5c..ed72428 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -122,7 +122,7 @@ int ceph_open(struct inode *inode, struct file *file) struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct ceph_file_info *cf = file->private_data; - struct inode *parent_inode = file->f_dentry->d_parent->d_inode; + struct inode *parent_inode = NULL; int err; int flags, fmode, wanted; @@ -147,9 +147,9 @@ int ceph_open(struct inode *inode, struct file *file) /* trivially open snapdir */ if (ceph_snap(inode) == CEPH_SNAPDIR) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __ceph_get_fmode(ci, fmode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } @@ -158,7 +158,7 @@ int ceph_open(struct inode *inode, struct file *file) * write) or any MDS (for read). Update wanted set * asynchronously. */ - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (__ceph_is_any_real_caps(ci) && (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { int mds_wanted = __ceph_caps_mds_wanted(ci); @@ -168,7 +168,7 @@ int ceph_open(struct inode *inode, struct file *file) inode, fmode, ceph_cap_string(wanted), ceph_cap_string(issued)); __ceph_get_fmode(ci, fmode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); /* adjust wanted? */ if ((issued & wanted) != wanted && @@ -180,10 +180,10 @@ int ceph_open(struct inode *inode, struct file *file) } else if (ceph_snap(inode) != CEPH_NOSNAP && (ci->i_snap_caps & wanted) == wanted) { __ceph_get_fmode(ci, fmode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); req = prepare_open_request(inode->i_sb, flags, 0); @@ -194,7 +194,10 @@ int ceph_open(struct inode *inode, struct file *file) req->r_inode = inode; ihold(inode); req->r_num_caps = 1; + if (flags & (O_CREAT|O_TRUNC)) + parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); + iput(parent_inode); if (!err) err = ceph_init_file(inode, file, req->r_fmode); ceph_mdsc_put_request(req); @@ -222,11 +225,11 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, { struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; - struct file *file = nd->intent.open.file; - struct inode *parent_inode = get_dentry_parent_inode(file->f_dentry); + struct file *file; struct ceph_mds_request *req; + struct dentry *ret; int err; - int flags = nd->intent.open.flags - 1; /* silly vfs! */ + int flags = nd->intent.open.flags; dout("ceph_lookup_open dentry %p '%.*s' flags %d mode 0%o\n", dentry, dentry->d_name.len, dentry->d_name.name, flags, mode); @@ -242,16 +245,24 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, req->r_dentry_unless = CEPH_CAP_FILE_EXCL; } req->r_locked_dir = dir; /* caller holds dir->i_mutex */ - err = ceph_mdsc_do_request(mdsc, parent_inode, req); - dentry = ceph_finish_lookup(req, dentry, err); - if (!err && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) + err = ceph_mdsc_do_request(mdsc, + (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, + req); + err = ceph_handle_snapdir(req, dentry, err); + if (err) + goto out; + if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); - if (!err) - err = ceph_init_file(req->r_dentry->d_inode, file, - req->r_fmode); + if (err) + goto out; + file = lookup_instantiate_filp(nd, req->r_dentry, ceph_open); + if (IS_ERR(file)) + err = PTR_ERR(file); +out: + ret = ceph_finish_lookup(req, dentry, err); ceph_mdsc_put_request(req); - dout("ceph_lookup_open result=%p\n", dentry); - return dentry; + dout("ceph_lookup_open result=%p\n", ret); + return ret; } int ceph_release(struct inode *inode, struct file *file) @@ -643,7 +654,8 @@ again: if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS)) + (inode->i_sb->s_flags & MS_SYNCHRONOUS) || + (fi->flags & CEPH_F_SYNC)) /* hmm, this isn't really async... */ ret = ceph_sync_read(filp, base, len, ppos, &checkeof); else @@ -712,7 +724,7 @@ retry_snap: want = CEPH_CAP_FILE_BUFFER; ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); if (ret < 0) - goto out; + goto out_put; dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, @@ -720,12 +732,23 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS)) { + (inode->i_sb->s_flags & MS_SYNCHRONOUS) || + (fi->flags & CEPH_F_SYNC)) { ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, &iocb->ki_pos); } else { - ret = generic_file_aio_write(iocb, iov, nr_segs, pos); + /* + * buffered write; drop Fw early to avoid slow + * revocation if we get stuck on balance_dirty_pages + */ + int dirty; + + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + ceph_put_cap_refs(ci, got); + ret = generic_file_aio_write(iocb, iov, nr_segs, pos); if ((ret >= 0 || ret == -EIOCBQUEUED) && ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { @@ -733,22 +756,28 @@ retry_snap: if (err < 0) ret = err; } + + if (dirty) + __mark_inode_dirty(inode, dirty); + goto out; } + if (ret >= 0) { int dirty; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); } -out: +out_put: dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, ceph_cap_string(got)); ceph_put_cap_refs(ci, got); +out: if (ret == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); @@ -768,13 +797,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) mutex_lock(&inode->i_mutex); __ceph_do_pending_vmtruncate(inode); - switch (origin) { - case SEEK_END: + + if (origin == SEEK_END || origin == SEEK_DATA || origin == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); if (ret < 0) { offset = ret; goto out; } + } + + switch (origin) { + case SEEK_END: offset += inode->i_size; break; case SEEK_CUR: @@ -790,6 +823,19 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin) } offset += file->f_pos; break; + case SEEK_DATA: + if (offset >= inode->i_size) { + ret = -ENXIO; + goto out; + } + break; + case SEEK_HOLE: + if (offset >= inode->i_size) { + ret = -ENXIO; + goto out; + } + offset = inode->i_size; + break; } if (offset < 0 || offset > inode->i_sb->s_maxbytes) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index d8858e9..8e889b7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -9,7 +9,6 @@ #include <linux/namei.h> #include <linux/writeback.h> #include <linux/vmalloc.h> -#include <linux/pagevec.h> #include "super.h" #include "mds_client.h" @@ -298,6 +297,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) dout("alloc_inode %p\n", &ci->vfs_inode); + spin_lock_init(&ci->i_ceph_lock); + ci->i_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; @@ -560,7 +561,8 @@ static int fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info = iinfo->in; struct ceph_inode_info *ci = ceph_inode(inode); int i; - int issued, implemented; + int issued = 0, implemented; + int updating_inode = 0; struct timespec mtime, atime, ctime; u32 nsplits; struct ceph_buffer *xattr_blob = NULL; @@ -583,7 +585,7 @@ static int fill_inode(struct inode *inode, iinfo->xattr_len); } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); /* * provided version will be odd if inode value is projected, @@ -599,7 +601,8 @@ static int fill_inode(struct inode *inode, if (le64_to_cpu(info->version) > 0 && (ci->i_version & ~1) >= le64_to_cpu(info->version)) goto no_change; - + + updating_inode = 1; issued = __ceph_caps_issued(ci, &implemented); issued |= implemented | __ceph_caps_dirty(ci); @@ -617,7 +620,7 @@ static int fill_inode(struct inode *inode, } if ((issued & CEPH_CAP_LINK_EXCL) == 0) - inode->i_nlink = le32_to_cpu(info->nlink); + set_nlink(inode, le32_to_cpu(info->nlink)); /* be careful with mtime, atime, size */ ceph_decode_timespec(&atime, &info->atime); @@ -679,7 +682,7 @@ static int fill_inode(struct inode *inode, char *sym; BUG_ON(symlen != inode->i_size); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); err = -ENOMEM; sym = kmalloc(symlen+1, GFP_NOFS); @@ -688,7 +691,7 @@ static int fill_inode(struct inode *inode, memcpy(sym, iinfo->symlink, symlen); sym[symlen] = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (!ci->i_symlink) ci->i_symlink = sym; else @@ -707,17 +710,6 @@ static int fill_inode(struct inode *inode, ci->i_rfiles = le64_to_cpu(info->rfiles); ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); ceph_decode_timespec(&ci->i_rctime, &info->rctime); - - /* set dir completion flag? */ - if (ci->i_files == 0 && ci->i_subdirs == 0 && - ceph_snap(inode) == CEPH_NOSNAP && - (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && - (issued & CEPH_CAP_FILE_EXCL) == 0 && - (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { - dout(" marking %p complete (empty)\n", inode); - /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */ - ci->i_max_offset = 2; - } break; default: pr_err("fill_inode %llx.%llx BAD mode 0%o\n", @@ -725,7 +717,7 @@ static int fill_inode(struct inode *inode, } no_change: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); /* queue truncate if we saw i_size decrease */ if (queue_trunc) @@ -760,13 +752,13 @@ no_change: info->cap.flags, caps_reservation); } else { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout(" %p got snap_caps %s\n", inode, ceph_cap_string(le32_to_cpu(info->cap.caps))); ci->i_snap_caps |= le32_to_cpu(info->cap.caps); if (cap_fmode >= 0) __ceph_get_fmode(ci, cap_fmode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } } else if (cap_fmode >= 0) { pr_warning("mds issued no caps on %llx.%llx\n", @@ -774,6 +766,19 @@ no_change: __ceph_get_fmode(ci, cap_fmode); } + /* set dir completion flag? */ + if (S_ISDIR(inode->i_mode) && + updating_inode && /* didn't jump to no_change */ + ci->i_files == 0 && ci->i_subdirs == 0 && + ceph_snap(inode) == CEPH_NOSNAP && + (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && + (issued & CEPH_CAP_FILE_EXCL) == 0 && + !ceph_dir_test_complete(inode)) { + dout(" marking %p complete (empty)\n", inode); + ceph_dir_set_complete(inode); + ci->i_max_offset = 2; + } + /* update delegation info? */ if (dirinfo) ceph_fill_dirfrag(inode, dirinfo); @@ -805,14 +810,14 @@ static void update_dentry_lease(struct dentry *dentry, return; spin_lock(&dentry->d_lock); - dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n", - dentry, le16_to_cpu(lease->mask), duration, ttl); + dout("update_dentry_lease %p duration %lu ms ttl %lu\n", + dentry, duration, ttl); /* make lease_rdcache_gen match directory */ dir = dentry->d_parent->d_inode; di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; - if (lease->mask == 0) + if (duration == 0) goto out_unlock; if (di->lease_gen == session->s_cap_gen && @@ -839,30 +844,33 @@ out_unlock: /* * Set dentry's directory position based on the current dir's max, and * order it in d_subdirs, so that dcache_readdir behaves. + * + * Always called under directory's i_mutex. */ static void ceph_set_dentry_offset(struct dentry *dn) { struct dentry *dir = dn->d_parent; - struct inode *inode = dn->d_parent->d_inode; + struct inode *inode = dir->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_dentry_info *di; BUG_ON(!inode); di = ceph_dentry(dn); - spin_lock(&inode->i_lock); - if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) { - spin_unlock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); + if (!ceph_dir_test_complete(inode)) { + spin_unlock(&ci->i_ceph_lock); return; } di->offset = ceph_inode(inode)->i_max_offset++; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); spin_lock(&dir->d_lock); spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_u.d_child, &dir->d_subdirs); + list_move(&dn->d_child, &dir->d_subdirs); dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset, - dn->d_u.d_child.prev, dn->d_u.d_child.next); + dn->d_child.prev, dn->d_child.next); spin_unlock(&dn->d_lock); spin_unlock(&dir->d_lock); } @@ -1022,9 +1030,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, /* do we have a dn lease? */ have_lease = have_dir_cap || - (le16_to_cpu(rinfo->dlease->mask) & - CEPH_LOCK_DN); - + le32_to_cpu(rinfo->dlease->duration_ms); if (!have_lease) dout("fill_trace no dentry lease or dir cap\n"); @@ -1053,7 +1059,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, * d_move() puts the renamed dentry at the end of * d_subdirs. We need to assign it an appropriate * directory offset so we can behave when holding - * I_COMPLETE. + * D_COMPLETE. */ ceph_set_dentry_offset(req->r_old_dentry); dout("dn %p gets new offset %lld\n", req->r_old_dentry, @@ -1250,7 +1256,7 @@ retry_lookup: /* reorder parent's d_subdirs */ spin_lock(&parent->d_lock); spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_u.d_child, &parent->d_subdirs); + list_move(&dn->d_child, &parent->d_subdirs); spin_unlock(&dn->d_lock); spin_unlock(&parent->d_lock); } @@ -1305,7 +1311,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size) struct ceph_inode_info *ci = ceph_inode(inode); int ret = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); inode->i_size = size; inode->i_blocks = (size + (1 << 9) - 1) >> 9; @@ -1315,7 +1321,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size) (ci->i_reported_size << 1) < ci->i_max_size) ret = 1; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return ret; } @@ -1325,12 +1331,13 @@ int ceph_inode_set_size(struct inode *inode, loff_t size) */ void ceph_queue_writeback(struct inode *inode) { + ihold(inode); if (queue_work(ceph_inode_to_client(inode)->wb_wq, &ceph_inode(inode)->i_wb_work)) { dout("ceph_queue_writeback %p\n", inode); - ihold(inode); } else { dout("ceph_queue_writeback %p failed\n", inode); + iput(inode); } } @@ -1350,55 +1357,13 @@ static void ceph_writeback_work(struct work_struct *work) */ void ceph_queue_invalidate(struct inode *inode) { + ihold(inode); if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq, &ceph_inode(inode)->i_pg_inv_work)) { dout("ceph_queue_invalidate %p\n", inode); - ihold(inode); } else { dout("ceph_queue_invalidate %p failed\n", inode); - } -} - -/* - * invalidate any pages that are not dirty or under writeback. this - * includes pages that are clean and mapped. - */ -static void ceph_invalidate_nondirty_pages(struct address_space *mapping) -{ - struct pagevec pvec; - pgoff_t next = 0; - int i; - - pagevec_init(&pvec, 0); - while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) { - for (i = 0; i < pagevec_count(&pvec); i++) { - struct page *page = pvec.pages[i]; - pgoff_t index; - int skip_page = - (PageDirty(page) || PageWriteback(page)); - - if (!skip_page) - skip_page = !trylock_page(page); - - /* - * We really shouldn't be looking at the ->index of an - * unlocked page. But we're not allowed to lock these - * pages. So we rely upon nobody altering the ->index - * of this (pinned-by-us) page. - */ - index = page->index; - if (index > next) - next = index; - next++; - - if (skip_page) - continue; - - generic_error_remove_page(mapping, page); - unlock_page(page); - } - pagevec_release(&pvec); - cond_resched(); + iput(inode); } } @@ -1414,20 +1379,20 @@ static void ceph_invalidate_work(struct work_struct *work) u32 orig_gen; int check = 0; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { /* nevermind! */ - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); goto out; } orig_gen = ci->i_rdcache_gen; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); - ceph_invalidate_nondirty_pages(inode->i_mapping); + truncate_inode_pages(&inode->i_data, 0); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (orig_gen == ci->i_rdcache_gen && orig_gen == ci->i_rdcache_revoking) { dout("invalidate_pages %p gen %d successful\n", inode, @@ -1439,7 +1404,7 @@ static void ceph_invalidate_work(struct work_struct *work) inode, orig_gen, ci->i_rdcache_gen, ci->i_rdcache_revoking); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (check) ceph_check_caps(ci, 0, NULL); @@ -1474,13 +1439,14 @@ void ceph_queue_vmtruncate(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); + ihold(inode); if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, &ci->i_vmtruncate_work)) { dout("ceph_queue_vmtruncate %p\n", inode); - ihold(inode); } else { dout("ceph_queue_vmtruncate %p failed, pending=%d\n", inode, ci->i_truncate_pending); + iput(inode); } } @@ -1497,10 +1463,10 @@ void __ceph_do_pending_vmtruncate(struct inode *inode) int wrbuffer_refs, wake = 0; retry: - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_truncate_pending == 0) { dout("__do_pending_vmtruncate %p none pending\n", inode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return; } @@ -1511,7 +1477,7 @@ retry: if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) { dout("__do_pending_vmtruncate %p flushing snaps first\n", inode); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); filemap_write_and_wait_range(&inode->i_data, 0, inode->i_sb->s_maxbytes); goto retry; @@ -1521,15 +1487,15 @@ retry: wrbuffer_refs = ci->i_wrbuffer_ref; dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode, ci->i_truncate_pending, to); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); truncate_inode_pages(inode->i_mapping, to); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_truncate_pending--; if (ci->i_truncate_pending == 0) wake = 1; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (wrbuffer_refs == 0) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); @@ -1560,7 +1526,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct inode *parent_inode = dentry->d_parent->d_inode; + struct inode *parent_inode; const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; @@ -1584,7 +1550,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (IS_ERR(req)) return PTR_ERR(req); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); @@ -1732,7 +1698,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) } release &= issued; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); @@ -1743,7 +1709,9 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req->r_inode_drop = release; req->r_args.setattr.mask = cpu_to_le32(mask); req->r_num_caps = 1; + parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); + iput(parent_inode); } dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, ceph_cap_string(dirtied), mask); @@ -1752,7 +1720,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) __ceph_do_pending_vmtruncate(inode); return err; out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); ceph_mdsc_put_request(req); return err; } @@ -1795,17 +1763,17 @@ int ceph_do_getattr(struct inode *inode, int mask) * Check inode permissions. We verify we have a valid value for * the AUTH cap, then call the generic handler. */ -int ceph_permission(struct inode *inode, int mask, unsigned int flags) +int ceph_permission(struct inode *inode, int mask) { int err; - if (flags & IPERM_FLAG_RCU) + if (mask & MAY_NOT_BLOCK) return -ECHILD; err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED); if (!err) - err = generic_permission(inode, mask, flags, NULL); + err = generic_permission(inode, mask); return err; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index ef0b5f4..790914a59 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -38,21 +38,43 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) static long ceph_ioctl_set_layout(struct file *file, void __user *arg) { struct inode *inode = file->f_dentry->d_inode; - struct inode *parent_inode = file->f_dentry->d_parent->d_inode; + struct inode *parent_inode; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_request *req; struct ceph_ioctl_layout l; + struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode); + struct ceph_ioctl_layout nl; int err, i; - /* copy and validate */ if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; - if ((l.object_size & ~PAGE_MASK) || - (l.stripe_unit & ~PAGE_MASK) || - !l.stripe_unit || - (l.object_size && - (unsigned)l.object_size % (unsigned)l.stripe_unit)) + /* validate changed params against current layout */ + err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); + if (!err) { + nl.stripe_unit = ceph_file_layout_su(ci->i_layout); + nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); + nl.object_size = ceph_file_layout_object_size(ci->i_layout); + nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); + nl.preferred_osd = + (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); + } else + return err; + + if (l.stripe_count) + nl.stripe_count = l.stripe_count; + if (l.stripe_unit) + nl.stripe_unit = l.stripe_unit; + if (l.object_size) + nl.object_size = l.object_size; + if (l.data_pool) + nl.data_pool = l.data_pool; + if (l.preferred_osd) + nl.preferred_osd = l.preferred_osd; + + if ((nl.object_size & ~PAGE_MASK) || + (nl.stripe_unit & ~PAGE_MASK) || + ((unsigned)nl.object_size % (unsigned)nl.stripe_unit)) return -EINVAL; /* make sure it's a valid data pool */ @@ -87,7 +109,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) req->r_args.setlayout.layout.fl_pg_preferred = cpu_to_le32(l.preferred_osd); + parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); + iput(parent_inode); ceph_mdsc_put_request(req); return err; } @@ -217,11 +241,11 @@ static long ceph_ioctl_lazyio(struct file *file) struct ceph_inode_info *ci = ceph_inode(inode); if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_nr_by_mode[fi->fmode]--; fi->fmode |= CEPH_FILE_MODE_LAZY; ci->i_nr_by_mode[fi->fmode]++; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout("ioctl_layzio: file %p marked lazy\n", file); ceph_check_caps(ci, 0, NULL); @@ -231,6 +255,14 @@ static long ceph_ioctl_lazyio(struct file *file) return 0; } +static long ceph_ioctl_syncio(struct file *file) +{ + struct ceph_file_info *fi = file->private_data; + + fi->flags |= CEPH_F_SYNC; + return 0; +} + long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg); @@ -249,6 +281,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) case CEPH_IOC_LAZYIO: return ceph_ioctl_lazyio(file); + + case CEPH_IOC_SYNCIO: + return ceph_ioctl_syncio(file); } return -ENOTTY; diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h index 52e8fd7..be4a604 100644 --- a/fs/ceph/ioctl.h +++ b/fs/ceph/ioctl.h @@ -6,7 +6,31 @@ #define CEPH_IOCTL_MAGIC 0x97 -/* just use u64 to align sanely on all archs */ +/* + * CEPH_IOC_GET_LAYOUT - get file layout or dir layout policy + * CEPH_IOC_SET_LAYOUT - set file layout + * CEPH_IOC_SET_LAYOUT_POLICY - set dir layout policy + * + * The file layout specifies how file data is striped over objects in + * the distributed object store, which object pool they belong to (if + * it differs from the default), and an optional 'preferred osd' to + * store them on. + * + * Files get a new layout based on the policy set on the containing + * directory or one of its ancestors. The GET_LAYOUT ioctl will let + * you examine the layout for a file or the policy on a directory. + * + * SET_LAYOUT will let you set a layout on a newly created file. This + * only works immediately after the file is created and before any + * data is written to it. + * + * SET_LAYOUT_POLICY will let you set a layout policy (default layout) + * on a directory that will apply to any new files created in that + * directory (or any child directory that doesn't specify a layout of + * its own). + */ + +/* use u64 to align sanely on all archs */ struct ceph_ioctl_layout { __u64 stripe_unit, stripe_count, object_size; __u64 data_pool; @@ -21,6 +45,8 @@ struct ceph_ioctl_layout { struct ceph_ioctl_layout) /* + * CEPH_IOC_GET_DATALOC - get location of file data in the cluster + * * Extract identity, address of the OSD and object storing a given * file offset. */ @@ -39,6 +65,34 @@ struct ceph_ioctl_dataloc { #define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \ struct ceph_ioctl_dataloc) +/* + * CEPH_IOC_LAZYIO - relax consistency + * + * Normally Ceph switches to synchronous IO when multiple clients have + * the file open (and or more for write). Reads and writes bypass the + * page cache and go directly to the OSD. Setting this flag on a file + * descriptor will allow buffered IO for this file in cases where the + * application knows it won't interfere with other nodes (or doesn't + * care). + */ #define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) +/* + * CEPH_IOC_SYNCIO - force synchronous IO + * + * This ioctl sets a file flag that forces the synchronous IO that + * bypasses the page cache, even if it is not necessary. This is + * essentially the opposite behavior of IOC_LAZYIO. This forces the + * same read/write path as a file opened by multiple clients when one + * or more of those clients is opened for write. + * + * Note that this type of sync IO takes a different path than a file + * opened with O_SYNC/D_SYNC (writes hit the page cache and are + * immediately flushed on page boundaries). It is very similar to + * O_DIRECT (writes bypass the page cache) excep that O_DIRECT writes + * are not copied (user page must remain stable) and O_DIRECT writes + * have alignment restrictions (on the buffer and file offset). + */ +#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5) + #endif diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 0c1d917..b24e2d3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -483,22 +483,26 @@ void ceph_mdsc_release_request(struct kref *kref) destroy_reply_info(&req->r_reply_info); } if (req->r_inode) { - ceph_put_cap_refs(ceph_inode(req->r_inode), - CEPH_CAP_PIN); + ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); iput(req->r_inode); } if (req->r_locked_dir) - ceph_put_cap_refs(ceph_inode(req->r_locked_dir), - CEPH_CAP_PIN); + ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); if (req->r_target_inode) iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) { - ceph_put_cap_refs( - ceph_inode(req->r_old_dentry->d_parent->d_inode), - CEPH_CAP_PIN); + /* + * track (and drop pins for) r_old_dentry_dir + * separately, since r_old_dentry's d_parent may have + * changed between the dir mutex being dropped and + * this request being freed. + */ + ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), + CEPH_CAP_PIN); dput(req->r_old_dentry); + iput(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); @@ -604,6 +608,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, req->r_unsafe_dir = NULL; } + complete_all(&req->r_safe_completion); + ceph_mdsc_put_request(req); } @@ -615,8 +621,14 @@ static void __unregister_request(struct ceph_mds_client *mdsc, * * Called under mdsc->mutex. */ -struct dentry *get_nonsnap_parent(struct dentry *dentry) +static struct dentry *get_nonsnap_parent(struct dentry *dentry) { + /* + * we don't need to worry about protecting the d_parent access + * here because we never renaming inside the snapped namespace + * except to resplice to another snapdir, and either the old or new + * result is a valid result. + */ while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) dentry = dentry->d_parent; return dentry; @@ -652,7 +664,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, if (req->r_inode) { inode = req->r_inode; } else if (req->r_dentry) { - struct inode *dir = req->r_dentry->d_parent->d_inode; + /* ignore race with rename; old or new d_parent is okay */ + struct dentry *parent = req->r_dentry->d_parent; + struct inode *dir = parent->d_inode; if (dir->i_sb != mdsc->fsc->sb) { /* not this fs! */ @@ -660,8 +674,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } else if (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests * based on parent dir inode */ - struct dentry *dn = - get_nonsnap_parent(req->r_dentry->d_parent); + struct dentry *dn = get_nonsnap_parent(parent); inode = dn->d_inode; dout("__choose_mds using nonsnap parent %p\n", inode); } else if (req->r_dentry->d_inode) { @@ -670,7 +683,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } else { /* dir + name */ inode = dir; - hash = ceph_dentry_hash(req->r_dentry); + hash = ceph_dentry_hash(dir, req->r_dentry); is_hash = true; } } @@ -721,21 +734,21 @@ static int __choose_mds(struct ceph_mds_client *mdsc, } } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap = NULL; if (mode == USE_AUTH_MDS) cap = ci->i_auth_cap; if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); goto random; } mds = cap->session->s_mds; dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", inode, ceph_vinop(inode), mds, cap == ci->i_auth_cap ? "auth " : "", cap); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return mds; random: @@ -753,7 +766,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) struct ceph_msg *msg; struct ceph_mds_session_head *h; - msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, + false); if (!msg) { pr_err("create_session_msg ENOMEM creating msg\n"); return NULL; @@ -939,7 +953,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap); if (!__ceph_is_any_real_caps(ci)) { struct ceph_mds_client *mdsc = @@ -972,7 +986,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); while (drop--) iput(inode); return 0; @@ -1003,10 +1017,10 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, wake_up_all(&ci->i_cap_wq); if (arg) { - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_wanted_max_size = 0; ci->i_requested_max_size = 0; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } return 0; } @@ -1139,7 +1153,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) if (session->s_trim_caps <= 0) return -1; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); oissued = __ceph_caps_issued_other(ci, cap); @@ -1158,7 +1172,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) __ceph_remove_cap(cap); } else { /* try to drop referring dentries */ - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); d_prune_aliases(inode); dout("trim_caps_cb %p cap %p pruned, count now %d\n", inode, cap, atomic_read(&inode->i_count)); @@ -1166,7 +1180,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) } out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return 0; } @@ -1229,7 +1243,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, while (session->s_num_cap_releases < session->s_nr_caps + extra) { spin_unlock(&session->s_cap_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS); + GFP_NOFS, false); if (!msg) goto out_unlocked; dout("add_cap_releases %p msg %p now %d\n", session, msg, @@ -1284,7 +1298,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) i_flushing_item); struct inode *inode = &ci->vfs_inode; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_cap_flush_seq <= want_flush_seq) { dout("check_cap_flush still flushing %p " "seq %lld <= %lld to mds%d\n", inode, @@ -1292,7 +1306,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) session->s_mds); ret = 0; } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); @@ -1483,6 +1497,7 @@ retry: pos, temp); } else if (stop_on_nosnap && inode && ceph_snap(inode) == CEPH_NOSNAP) { + spin_unlock(&temp->d_lock); break; } else { pos -= temp->d_name.len; @@ -1584,7 +1599,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); - } else if (rpath) { + } else if (rpath || rino) { *ino = rino; *ppath = rpath; *pathlen = strlen(rpath); @@ -1641,7 +1656,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); goto out_free2; @@ -1802,8 +1817,11 @@ static int __do_request(struct ceph_mds_client *mdsc, int mds = -1; int err = -EAGAIN; - if (req->r_err || req->r_got_result) + if (req->r_err || req->r_got_result) { + if (req->r_aborted) + __unregister_request(mdsc, req); goto out; + } if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { @@ -1931,9 +1949,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, if (req->r_locked_dir) ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); if (req->r_old_dentry) - ceph_get_cap_refs( - ceph_inode(req->r_old_dentry->d_parent->d_inode), - CEPH_CAP_PIN); + ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), + CEPH_CAP_PIN); /* issue */ mutex_lock(&mdsc->mutex); @@ -1991,7 +2008,7 @@ out: } /* - * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS + * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS * namespace request. */ void ceph_invalidate_dir_request(struct ceph_mds_request *req) @@ -1999,11 +2016,11 @@ void ceph_invalidate_dir_request(struct ceph_mds_request *req) struct inode *inode = req->r_locked_dir; struct ceph_inode_info *ci = ceph_inode(inode); - dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); - spin_lock(&inode->i_lock); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; + dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode); + spin_lock(&ci->i_ceph_lock); + ceph_dir_clear_complete(inode); ci->i_release_count++; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (req->r_dentry) ceph_invalidate_dentry_lease(req->r_dentry); @@ -2112,7 +2129,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (head->safe) { req->r_got_safe = true; __unregister_request(mdsc, req); - complete_all(&req->r_safe_completion); if (req->r_got_unsafe) { /* @@ -2411,7 +2427,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, if (err) goto out_free; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ @@ -2434,7 +2450,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, rec.v1.pathbase = cpu_to_le64(pathbase); reclen = sizeof(rec.v1); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (recon_state->flock) { int num_fcntl_locks, num_flock_locks; @@ -2508,7 +2524,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, goto fail_nopagelist; ceph_pagelist_init(pagelist); - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); if (!reply) goto fail_nomsg; @@ -2714,7 +2730,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_mds_lease *h = msg->front.iov_base; u32 seq; struct ceph_vino vino; - int mask; struct qstr dname; int release = 0; @@ -2725,7 +2740,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, goto bad; vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; - mask = le16_to_cpu(h->mask); seq = le32_to_cpu(h->seq); dname.name = (void *)h + sizeof(*h) + sizeof(u32); dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); @@ -2737,8 +2751,8 @@ static void handle_lease(struct ceph_mds_client *mdsc, /* lookup inode */ inode = ceph_find_inode(sb, vino); - dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", - ceph_lease_op_name(h->action), mask, vino.ino, inode, + dout("handle_lease %s, ino %llx %p %.*s\n", + ceph_lease_op_name(h->action), vino.ino, inode, dname.len, dname.name); if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); @@ -2823,12 +2837,11 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, dnamelen = dentry->d_name.len; len += dnamelen; - msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); + msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); if (!msg) return; lease = msg->front.iov_base; lease->action = action; - lease->mask = cpu_to_le16(1); lease->ino = cpu_to_le64(ceph_vino(inode).ino); lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->seq = cpu_to_le32(seq); @@ -2850,7 +2863,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, * Pass @inode always, @dentry is optional. */ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, - struct dentry *dentry, int mask) + struct dentry *dentry) { struct ceph_dentry_info *di; struct ceph_mds_session *session; @@ -2858,7 +2871,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, BUG_ON(inode == NULL); BUG_ON(dentry == NULL); - BUG_ON(mask == 0); /* is dentry lease valid? */ spin_lock(&dentry->d_lock); @@ -2868,8 +2880,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, di->lease_gen != di->lease_session->s_cap_gen || !time_before(jiffies, dentry->d_time)) { dout("lease_release inode %p dentry %p -- " - "no lease on %d\n", - inode, dentry, mask); + "no lease\n", + inode, dentry); spin_unlock(&dentry->d_lock); return; } @@ -2880,8 +2892,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, __ceph_mdsc_drop_dentry_lease(dentry); spin_unlock(&dentry->d_lock); - dout("lease_release inode %p dentry %p mask %d to mds%d\n", - inode, dentry, mask, session->s_mds); + dout("lease_release inode %p dentry %p to mds%d\n", + inode, dentry, session->s_mds); ceph_mdsc_lease_send_msg(session, inode, dentry, CEPH_MDS_LEASE_RELEASE, seq); ceph_put_mds_session(session); @@ -3147,7 +3159,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) /* * true if all sessions are closed, or we force unmount */ -bool done_closing_sessions(struct ceph_mds_client *mdsc) +static bool done_closing_sessions(struct ceph_mds_client *mdsc) { int i, n = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 7d8a0d6..a50ca0e 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -20,7 +20,7 @@ * * mdsc->snap_rwsem * - * inode->i_lock + * ci->i_ceph_lock * mdsc->snap_flush_lock * mdsc->cap_delay_lock * @@ -171,6 +171,7 @@ struct ceph_mds_request { struct inode *r_inode; /* arg1 */ struct dentry *r_dentry; /* arg1 */ struct dentry *r_old_dentry; /* arg2: rename from or link from */ + struct inode *r_old_dentry_dir; /* arg2: old dentry's parent dir */ char *r_path1, *r_path2; struct ceph_vino r_ino1, r_ino2; @@ -333,7 +334,7 @@ extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc); extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, - struct dentry *dn, int mask); + struct dentry *dn); extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 54b14de..d5df940 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b) return 0; } + +static struct ceph_snap_context *empty_snapc; + /* * build the snap context for a given realm. */ @@ -329,9 +332,15 @@ static int build_snap_context(struct ceph_snap_realm *realm) return 0; } + if (num == 0 && realm->seq == empty_snapc->seq) { + ceph_get_snap_context(empty_snapc); + snapc = empty_snapc; + goto done; + } + /* alloc new snap context */ err = -ENOMEM; - if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc)) + if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) goto fail; snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS); if (!snapc) @@ -364,6 +373,7 @@ static int build_snap_context(struct ceph_snap_realm *realm) dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n", realm->ino, realm, snapc, snapc->seq, snapc->num_snaps); +done: if (realm->cached_context) ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; @@ -446,9 +456,18 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) return; } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); used = __ceph_caps_used(ci); dirty = __ceph_caps_dirty(ci); + + /* + * If there is a write in progress, treat that as a dirty Fw, + * even though it hasn't completed yet; by the time we finish + * up this capsnap it will be. + */ + if (used & CEPH_CAP_FILE_WR) + dirty |= CEPH_CAP_FILE_WR; + if (__ceph_have_pending_cap_snap(ci)) { /* there is no point in queuing multiple "pending" cap_snaps, as no new writes are allowed to start when pending, so any @@ -456,13 +475,22 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); kfree(capsnap); - } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) || - (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| - CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) { + } else if (ci->i_snap_realm->cached_context == empty_snapc) { + dout("queue_cap_snap %p empty snapc\n", inode); + kfree(capsnap); + } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| + CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { struct ceph_snap_context *snapc = ci->i_head_snapc; - dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode, - capsnap, snapc); + /* + * if we are a sync write, we may need to go to the snaprealm + * to get the current snapc. + */ + if (!snapc) + snapc = ci->i_snap_realm->cached_context; + + dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", + inode, capsnap, snapc, ceph_cap_string(dirty)); ihold(inode); atomic_set(&capsnap->nref, 1); @@ -513,7 +541,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) kfree(capsnap); } - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } /* @@ -522,7 +550,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) * * If capsnap can now be flushed, add to snap_flush list, and return 1. * - * Caller must hold i_lock. + * Caller must hold i_ceph_lock. */ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap) @@ -724,9 +752,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc) inode = &ci->vfs_inode; ihold(inode); spin_unlock(&mdsc->snap_flush_lock); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __ceph_flush_snaps(ci, &session, 0); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); iput(inode); spin_lock(&mdsc->snap_flush_lock); } @@ -832,7 +860,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, continue; ci = ceph_inode(inode); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (!ci->i_snap_realm) goto skip_inode; /* @@ -861,7 +889,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, oldrealm = ci->i_snap_realm; ci->i_snap_realm = realm; spin_unlock(&realm->inodes_with_caps_lock); - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); ceph_get_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, oldrealm); @@ -870,7 +898,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, continue; skip_inode: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); iput(inode); } @@ -912,5 +940,17 @@ out: return; } +int __init ceph_snap_init(void) +{ + empty_snapc = kzalloc(sizeof(struct ceph_snap_context), GFP_NOFS); + if (!empty_snapc) + return -ENOMEM; + atomic_set(&empty_snapc->nref, 1); + empty_snapc->seq = 1; + return 0; +} - +void ceph_snap_exit(void) +{ + ceph_put_snap_context(empty_snapc); +} diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 1775022..8bfafe5 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -79,8 +79,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) buf->f_bsize = 1 << CEPH_BLOCK_SHIFT; buf->f_frsize = 1 << CEPH_BLOCK_SHIFT; buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); - buf->f_bfree = (le64_to_cpu(st.kb) - le64_to_cpu(st.kb_used)) >> - (CEPH_BLOCK_SHIFT-10); + buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); buf->f_files = le64_to_cpu(st.num_objects); @@ -120,6 +119,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait) enum { Opt_wsize, Opt_rsize, + Opt_rasize, Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_max, Opt_cap_release_safety, @@ -142,6 +142,7 @@ enum { static match_table_t fsopt_tokens = { {Opt_wsize, "wsize=%d"}, {Opt_rsize, "rsize=%d"}, + {Opt_rasize, "rasize=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, {Opt_cap_release_safety, "cap_release_safety=%d"}, @@ -202,6 +203,9 @@ static int parse_fsopt_token(char *c, void *private) case Opt_rsize: fsopt->rsize = intval; break; + case Opt_rasize: + fsopt->rasize = intval; + break; case Opt_caps_wanted_delay_min: fsopt->caps_wanted_delay_min = intval; break; @@ -295,28 +299,29 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name); - fsopt->sb_flags = flags; - fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; + fsopt->sb_flags = flags; + fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; - fsopt->rsize = CEPH_RSIZE_DEFAULT; - fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); + fsopt->rsize = CEPH_RSIZE_DEFAULT; + fsopt->rasize = CEPH_RASIZE_DEFAULT; + fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; - fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT; - fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; - fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; - fsopt->congestion_kb = default_congestion_kb(); - - /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ - err = -EINVAL; - if (!dev_name) - goto out; - *path = strstr(dev_name, ":/"); - if (*path == NULL) { - pr_err("device name is missing path (no :/ in %s)\n", - dev_name); - goto out; - } + fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT; + fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; + fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; + fsopt->congestion_kb = default_congestion_kb(); + + /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ + err = -EINVAL; + if (!dev_name) + goto out; + *path = strstr(dev_name, ":/"); + if (*path == NULL) { + pr_err("device name is missing path (no :/ in %s)\n", + dev_name); + goto out; + } dev_name_end = *path; dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name); @@ -356,8 +361,10 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) if (opt->flags & CEPH_OPT_NOCRC) seq_puts(m, ",nocrc"); - if (opt->name) - seq_printf(m, ",name=%s", opt->name); + if (opt->name) { + seq_puts(m, ",name="); + seq_escape(m, opt->name, ", \t\n\\"); + } if (opt->key) seq_puts(m, ",secret=<hidden>"); @@ -382,6 +389,8 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) seq_printf(m, ",wsize=%d", fsopt->wsize); if (fsopt->rsize != CEPH_RSIZE_DEFAULT) seq_printf(m, ",rsize=%d", fsopt->rsize); + if (fsopt->rasize != CEPH_RASIZE_DEFAULT) + seq_printf(m, ",rasize=%d", fsopt->rasize); if (fsopt->congestion_kb != default_congestion_kb()) seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT) @@ -398,7 +407,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT) seq_printf(m, ",readdir_max_bytes=%d", fsopt->max_readdir_bytes); if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT)) - seq_printf(m, ",snapdirname=%s", fsopt->snapdir_name); + seq_show_option(m, "snapdirname", fsopt->snapdir_name); return 0; } @@ -424,24 +433,27 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg) /* * create a new fs client */ -struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, +static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_options *opt) { struct ceph_fs_client *fsc; + const unsigned supported_features = + CEPH_FEATURE_FLOCK | + CEPH_FEATURE_DIRLAYOUTHASH; + const unsigned required_features = 0; int err = -ENOMEM; fsc = kzalloc(sizeof(*fsc), GFP_KERNEL); if (!fsc) return ERR_PTR(-ENOMEM); - fsc->client = ceph_create_client(opt, fsc); + fsc->client = ceph_create_client(opt, fsc, supported_features, + required_features); if (IS_ERR(fsc->client)) { err = PTR_ERR(fsc->client); goto fail; } fsc->client->extra_mon_dispatch = extra_mon_dispatch; - fsc->client->supported_features |= CEPH_FEATURE_FLOCK | - CEPH_FEATURE_DIRLAYOUTHASH; fsc->client->monc.want_mdsmap = 1; fsc->mount_options = fsopt; @@ -497,7 +509,7 @@ fail: return ERR_PTR(err); } -void destroy_fs_client(struct ceph_fs_client *fsc) +static void destroy_fs_client(struct ceph_fs_client *fsc) { dout("destroy_fs_client %p\n", fsc); @@ -633,10 +645,12 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc, if (err == 0) { dout("open_root_inode success\n"); if (ceph_ino(req->r_target_inode) == CEPH_INO_ROOT && - fsc->sb->s_root == NULL) + fsc->sb->s_root == NULL) { root = d_alloc_root(req->r_target_inode); - else + ceph_init_dentry(root); + } else { root = d_obtain_alias(req->r_target_inode); + } req->r_target_inode = NULL; dout("open_root_inode success, root dentry is %p\n", root); } else { @@ -780,11 +794,15 @@ static int ceph_register_bdi(struct super_block *sb, { int err; - /* set ra_pages based on rsize mount option? */ - if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) + /* set ra_pages based on rasize mount option? */ + if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE) fsc->backing_dev_info.ra_pages = - (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) + (fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; + else + fsc->backing_dev_info.ra_pages = + default_backing_dev_info.ra_pages; + err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", atomic_long_inc_return(&bdi_seq)); if (!err) @@ -815,8 +833,8 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type, fsc = create_fs_client(fsopt, opt); if (IS_ERR(fsc)) { res = ERR_CAST(fsc); - kfree(fsopt); - kfree(opt); + destroy_mount_options(fsopt); + ceph_destroy_options(opt); goto out_final; } @@ -895,14 +913,20 @@ static int __init init_ceph(void) if (ret) goto out; - ret = register_filesystem(&ceph_fs_type); + ret = ceph_snap_init(); if (ret) goto out_icache; + ret = register_filesystem(&ceph_fs_type); + if (ret) + goto out_snap; + pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); return 0; +out_snap: + ceph_snap_exit(); out_icache: destroy_caches(); out: @@ -913,6 +937,7 @@ static void __exit exit_ceph(void) { dout("exit_ceph\n"); unregister_filesystem(&ceph_fs_type); + ceph_snap_exit(); destroy_caches(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 9091926..242df58 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -36,7 +36,8 @@ #define ceph_test_mount_opt(fsc, opt) \ (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) -#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */ +#define CEPH_RSIZE_DEFAULT 0 /* max read size */ +#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */ #define CEPH_MAX_READDIR_DEFAULT 1024 #define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_SNAPDIRNAME_DEFAULT ".snap" @@ -45,8 +46,9 @@ struct ceph_mount_options { int flags; int sb_flags; - int wsize; - int rsize; /* max readahead */ + int wsize; /* max write size */ + int rsize; /* max read size */ + int rasize; /* max readahead */ int congestion_kb; /* max writeback in flight */ int caps_wanted_delay_min, caps_wanted_delay_max; int cap_release_safety; @@ -201,6 +203,7 @@ struct ceph_inode_xattr { * Ceph dentry state */ struct ceph_dentry_info { + unsigned long flags; struct ceph_mds_session *lease_session; u32 lease_gen, lease_shared_gen; u32 lease_seq; @@ -211,6 +214,18 @@ struct ceph_dentry_info { u64 offset; }; +/* + * dentry flags + * + * The locking for D_COMPLETE is a bit odd: + * - we can clear it at almost any time (see ceph_d_prune) + * - it is only meaningful if: + * - we hold dir inode i_ceph_lock + * - we hold dir FILE_SHARED caps + * - the dentry D_COMPLETE is set + */ +#define CEPH_D_COMPLETE 1 /* if set, d_u.d_subdirs is complete directory */ + struct ceph_inode_xattrs_info { /* * (still encoded) xattr blob. we avoid the overhead of parsing @@ -235,6 +250,8 @@ struct ceph_inode_xattrs_info { struct ceph_inode_info { struct ceph_vino i_vino; /* ceph ino + snap */ + spinlock_t i_ceph_lock; + u64 i_version; u32 i_time_warp_seq; @@ -249,14 +266,14 @@ struct ceph_inode_info { struct timespec i_rctime; u64 i_rbytes, i_rfiles, i_rsubdirs; u64 i_files, i_subdirs; - u64 i_max_offset; /* largest readdir offset, set with I_COMPLETE */ + u64 i_max_offset; /* largest readdir offset, set with D_COMPLETE */ struct rb_root i_fragtree; struct mutex i_fragtree_mutex; struct ceph_inode_xattrs_info i_xattrs; - /* capabilities. protected _both_ by i_lock and cap->session's + /* capabilities. protected _both_ by i_ceph_lock and cap->session's * s_mutex. */ struct rb_root i_caps; /* cap list */ struct ceph_cap *i_auth_cap; /* authoritative cap, if any */ @@ -344,9 +361,10 @@ static inline struct ceph_vino ceph_vino(struct inode *inode) * x86_64+ino32 64 32 * x86_64 64 64 */ -static inline u32 ceph_ino_to_ino32(ino_t ino) +static inline u32 ceph_ino_to_ino32(__u64 vino) { - ino ^= ino >> (sizeof(ino) * 8 - 32); + u32 ino = vino & 0xffffffff; + ino ^= vino >> 32; if (!ino) ino = 1; return ino; @@ -357,11 +375,11 @@ static inline u32 ceph_ino_to_ino32(ino_t ino) */ static inline ino_t ceph_vino_to_ino(struct ceph_vino vino) { - ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */ #if BITS_PER_LONG == 32 - ino = ceph_ino_to_ino32(ino); + return ceph_ino_to_ino32(vino.ino); +#else + return (ino_t)vino.ino; #endif - return ino; } /* @@ -413,7 +431,6 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_COMPLETE 1 /* we have complete directory cached */ #define CEPH_I_NODELAY 4 /* do not delay cap release */ #define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ #define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ @@ -422,18 +439,18 @@ static inline void ceph_i_clear(struct inode *inode, unsigned mask) { struct ceph_inode_info *ci = ceph_inode(inode); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_ceph_flags &= ~mask; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } static inline void ceph_i_set(struct inode *inode, unsigned mask) { struct ceph_inode_info *ci = ceph_inode(inode); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); ci->i_ceph_flags |= mask; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); } static inline bool ceph_i_test(struct inode *inode, unsigned mask) @@ -441,9 +458,9 @@ static inline bool ceph_i_test(struct inode *inode, unsigned mask) struct ceph_inode_info *ci = ceph_inode(inode); bool r; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); r = (ci->i_ceph_flags & mask) == mask; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return r; } @@ -471,6 +488,13 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off) } /* + * set/clear directory D_COMPLETE flag + */ +void ceph_dir_set_complete(struct inode *inode); +void ceph_dir_clear_complete(struct inode *inode); +bool ceph_dir_test_complete(struct inode *inode); + +/* * caps helpers */ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci) @@ -486,9 +510,9 @@ extern int __ceph_caps_issued_other(struct ceph_inode_info *ci, static inline int ceph_caps_issued(struct ceph_inode_info *ci) { int issued; - spin_lock(&ci->vfs_inode.i_lock); + spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); - spin_unlock(&ci->vfs_inode.i_lock); + spin_unlock(&ci->i_ceph_lock); return issued; } @@ -496,9 +520,9 @@ static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) { int r; - spin_lock(&ci->vfs_inode.i_lock); + spin_lock(&ci->i_ceph_lock); r = __ceph_caps_issued_mask(ci, mask, touch); - spin_unlock(&ci->vfs_inode.i_lock); + spin_unlock(&ci->i_ceph_lock); return r; } @@ -543,13 +567,16 @@ extern void ceph_reservation_status(struct ceph_fs_client *client, /* * we keep buffered readdir results attached to file->private_data */ +#define CEPH_F_SYNC 1 +#define CEPH_F_ATEND 2 + struct ceph_file_info { - int fmode; /* initialized on open */ + short fmode; /* initialized on open */ + short flags; /* CEPH_F_* */ /* readdir: position within the dir */ u32 frag; struct ceph_mds_request *last_readdir; - int at_end; /* readdir: position within a frag */ unsigned offset; /* offset of last chunk, adjusted for . and .. */ @@ -650,6 +677,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); +extern int ceph_snap_init(void); +extern void ceph_snap_exit(void); /* * a cap_snap is "pending" if it is still awaiting an in-progress @@ -692,7 +721,7 @@ extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode); extern int ceph_do_getattr(struct inode *inode, int mask); -extern int ceph_permission(struct inode *inode, int mask, unsigned int flags); +extern int ceph_permission(struct inode *inode, int mask); extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, struct kstat *stat); @@ -718,17 +747,17 @@ extern int ceph_add_cap(struct inode *inode, extern void __ceph_remove_cap(struct ceph_cap *cap); static inline void ceph_remove_cap(struct ceph_cap *cap) { - struct inode *inode = &cap->ci->vfs_inode; - spin_lock(&inode->i_lock); + spin_lock(&cap->ci->i_ceph_lock); __ceph_remove_cap(cap); - spin_unlock(&inode->i_lock); + spin_unlock(&cap->ci->i_ceph_lock); } extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); -extern int ceph_fsync(struct file *file, int datasync); +extern int ceph_fsync(struct file *file, loff_t start, loff_t end, + int datasync); extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, @@ -788,6 +817,8 @@ extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, ceph_snapdir_dentry_ops; extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); +extern int ceph_handle_snapdir(struct ceph_mds_request *req, + struct dentry *dentry, int err); extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, struct dentry *dentry, int err); @@ -795,7 +826,8 @@ extern void ceph_dentry_lru_add(struct dentry *dn); extern void ceph_dentry_lru_touch(struct dentry *dn); extern void ceph_dentry_lru_del(struct dentry *dn); extern void ceph_invalidate_dentry_lease(struct dentry *dentry); -extern unsigned ceph_dentry_hash(struct dentry *dn); +extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); +extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); /* * our d_ops vary depending on whether the inode is live, @@ -818,14 +850,6 @@ extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p, int p_locks, int f_locks); extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c); -static inline struct inode *get_dentry_parent_inode(struct dentry *dentry) -{ - if (dentry && dentry->d_parent) - return dentry->d_parent->d_inode; - - return NULL; -} - /* debugfs.c */ extern int ceph_fs_debugfs_init(struct ceph_fs_client *client); extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index f42d730..a5e36e4 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -343,8 +343,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci) } static int __build_xattrs(struct inode *inode) - __releases(inode->i_lock) - __acquires(inode->i_lock) + __releases(ci->i_ceph_lock) + __acquires(ci->i_ceph_lock) { u32 namelen; u32 numattr = 0; @@ -372,7 +372,7 @@ start: end = p + ci->i_xattrs.blob->vec.iov_len; ceph_decode_32_safe(&p, end, numattr, bad); xattr_version = ci->i_xattrs.version; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *), GFP_NOFS); @@ -387,7 +387,7 @@ start: goto bad_lock; } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.version != xattr_version) { /* lost a race, retry */ for (i = 0; i < numattr; i++) @@ -418,7 +418,7 @@ start: return err; bad_lock: - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); bad: if (xattrs) { for (i = 0; i < numattr; i++) @@ -512,7 +512,7 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (vxattrs) vxattr = ceph_match_vxattr(vxattrs, name); - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout("getxattr %p ver=%lld index_ver=%lld\n", inode, ci->i_xattrs.version, ci->i_xattrs.index_version); @@ -520,14 +520,14 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { goto get_xattr; } else { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); /* get xattrs from mds (if we don't already have them) */ err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); if (err) return err; } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (vxattr && vxattr->readonly) { err = vxattr->getxattr_cb(ci, value, size); @@ -558,7 +558,7 @@ get_xattr: memcpy(value, xattr->val, xattr->val_len); out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return err; } @@ -573,7 +573,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) u32 len; int i; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); dout("listxattr %p ver=%lld index_ver=%lld\n", inode, ci->i_xattrs.version, ci->i_xattrs.index_version); @@ -581,13 +581,13 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { goto list_xattr; } else { - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); if (err) return err; } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); err = __build_xattrs(inode); if (err < 0) @@ -619,7 +619,7 @@ list_xattr: } out: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); return err; } @@ -629,7 +629,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct inode *parent_inode = dentry->d_parent->d_inode; + struct inode *parent_inode; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = fsc->mdsc; int err; @@ -677,7 +677,9 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, req->r_data_len = size; dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); + parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); + iput(parent_inode); ceph_mdsc_put_request(req); dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); @@ -737,7 +739,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name, if (!xattr) goto out; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); if (!(issued & CEPH_CAP_XATTR_EXCL)) @@ -750,12 +752,12 @@ retry: required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) { struct ceph_buffer *blob = NULL; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) goto out; - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); ci->i_xattrs.prealloc_blob = blob; @@ -768,13 +770,13 @@ retry: dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); return err; do_sync: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: kfree(newname); @@ -788,7 +790,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = dentry->d_inode; - struct inode *parent_inode = dentry->d_parent->d_inode; + struct inode *parent_inode; struct ceph_mds_request *req; int err; @@ -802,7 +804,9 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) req->r_num_caps = 1; req->r_path2 = kstrdup(name, GFP_NOFS); + parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); + iput(parent_inode); ceph_mdsc_put_request(req); return err; } @@ -829,7 +833,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name) return -EOPNOTSUPP; } - spin_lock(&inode->i_lock); + spin_lock(&ci->i_ceph_lock); __build_xattrs(inode); issued = __ceph_caps_issued(ci, NULL); dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); @@ -842,12 +846,12 @@ int ceph_removexattr(struct dentry *dentry, const char *name) ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); return err; do_sync: - spin_unlock(&inode->i_lock); + spin_unlock(&ci->i_ceph_lock); err = ceph_send_removexattr(dentry, name); return err; } |