Commit abf137dd7712132ee56d5b3143c2ff61a72a5faa

Authored by Jens Axboe
1 parent 392ddc3298

aio: make the lookup_ioctx() lockless

The mm->ioctx_list is currently protected by a reader-writer lock,
so we always grab that lock on the read side for doing ioctx
lookups. As the workload is extremely reader biased, turn this into
an rcu hlist so we can make lookup_ioctx() lockless. Get rid of
the rwlock and use a spinlock for providing update side exclusion.

There's usually only 1 entry on this list, so it doesn't make sense
to look into fancier data structures.

Reviewed-by: Jeff Moyer <jmoyer@redhat.com>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>

Showing 5 changed files with 67 additions and 51 deletions Side-by-side Diff

arch/s390/mm/pgtable.c
... ... @@ -263,7 +263,7 @@
263 263 /* lets check if we are allowed to replace the mm */
264 264 task_lock(tsk);
265 265 if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
266   - tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
  266 + tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
267 267 task_unlock(tsk);
268 268 return -EINVAL;
269 269 }
... ... @@ -279,7 +279,7 @@
279 279 /* Now lets check again if something happened */
280 280 task_lock(tsk);
281 281 if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
282   - tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
  282 + tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
283 283 mmput(mm);
284 284 task_unlock(tsk);
285 285 return -EINVAL;
... ... @@ -191,15 +191,27 @@
191 191 kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
192 192 } while(0)
193 193  
  194 +static void ctx_rcu_free(struct rcu_head *head)
  195 +{
  196 + struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
  197 + unsigned nr_events = ctx->max_reqs;
194 198  
  199 + kmem_cache_free(kioctx_cachep, ctx);
  200 +
  201 + if (nr_events) {
  202 + spin_lock(&aio_nr_lock);
  203 + BUG_ON(aio_nr - nr_events > aio_nr);
  204 + aio_nr -= nr_events;
  205 + spin_unlock(&aio_nr_lock);
  206 + }
  207 +}
  208 +
195 209 /* __put_ioctx
196 210 * Called when the last user of an aio context has gone away,
197 211 * and the struct needs to be freed.
198 212 */
199 213 static void __put_ioctx(struct kioctx *ctx)
200 214 {
201   - unsigned nr_events = ctx->max_reqs;
202   -
203 215 BUG_ON(ctx->reqs_active);
204 216  
205 217 cancel_delayed_work(&ctx->wq);
... ... @@ -208,14 +220,7 @@
208 220 mmdrop(ctx->mm);
209 221 ctx->mm = NULL;
210 222 pr_debug("__put_ioctx: freeing %p\n", ctx);
211   - kmem_cache_free(kioctx_cachep, ctx);
212   -
213   - if (nr_events) {
214   - spin_lock(&aio_nr_lock);
215   - BUG_ON(aio_nr - nr_events > aio_nr);
216   - aio_nr -= nr_events;
217   - spin_unlock(&aio_nr_lock);
218   - }
  223 + call_rcu(&ctx->rcu_head, ctx_rcu_free);
219 224 }
220 225  
221 226 #define get_ioctx(kioctx) do { \
... ... @@ -235,6 +240,7 @@
235 240 {
236 241 struct mm_struct *mm;
237 242 struct kioctx *ctx;
  243 + int did_sync = 0;
238 244  
239 245 /* Prevent overflows */
240 246 if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
241 247  
... ... @@ -267,21 +273,30 @@
267 273 goto out_freectx;
268 274  
269 275 /* limit the number of system wide aios */
270   - spin_lock(&aio_nr_lock);
271   - if (aio_nr + ctx->max_reqs > aio_max_nr ||
272   - aio_nr + ctx->max_reqs < aio_nr)
273   - ctx->max_reqs = 0;
274   - else
275   - aio_nr += ctx->max_reqs;
276   - spin_unlock(&aio_nr_lock);
  276 + do {
  277 + spin_lock_bh(&aio_nr_lock);
  278 + if (aio_nr + nr_events > aio_max_nr ||
  279 + aio_nr + nr_events < aio_nr)
  280 + ctx->max_reqs = 0;
  281 + else
  282 + aio_nr += ctx->max_reqs;
  283 + spin_unlock_bh(&aio_nr_lock);
  284 + if (ctx->max_reqs || did_sync)
  285 + break;
  286 +
  287 + /* wait for rcu callbacks to have completed before giving up */
  288 + synchronize_rcu();
  289 + did_sync = 1;
  290 + ctx->max_reqs = nr_events;
  291 + } while (1);
  292 +
277 293 if (ctx->max_reqs == 0)
278 294 goto out_cleanup;
279 295  
280 296 /* now link into global list. */
281   - write_lock(&mm->ioctx_list_lock);
282   - ctx->next = mm->ioctx_list;
283   - mm->ioctx_list = ctx;
284   - write_unlock(&mm->ioctx_list_lock);
  297 + spin_lock(&mm->ioctx_lock);
  298 + hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
  299 + spin_unlock(&mm->ioctx_lock);
285 300  
286 301 dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
287 302 ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
... ... @@ -375,11 +390,12 @@
375 390 */
376 391 void exit_aio(struct mm_struct *mm)
377 392 {
378   - struct kioctx *ctx = mm->ioctx_list;
379   - mm->ioctx_list = NULL;
380   - while (ctx) {
381   - struct kioctx *next = ctx->next;
382   - ctx->next = NULL;
  393 + struct kioctx *ctx;
  394 +
  395 + while (!hlist_empty(&mm->ioctx_list)) {
  396 + ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
  397 + hlist_del_rcu(&ctx->list);
  398 +
383 399 aio_cancel_all(ctx);
384 400  
385 401 wait_for_all_aios(ctx);
... ... @@ -394,7 +410,6 @@
394 410 atomic_read(&ctx->users), ctx->dead,
395 411 ctx->reqs_active);
396 412 put_ioctx(ctx);
397   - ctx = next;
398 413 }
399 414 }
400 415  
401 416  
402 417  
403 418  
... ... @@ -555,19 +570,21 @@
555 570  
556 571 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
557 572 {
558   - struct kioctx *ioctx;
559   - struct mm_struct *mm;
  573 + struct mm_struct *mm = current->mm;
  574 + struct kioctx *ctx = NULL;
  575 + struct hlist_node *n;
560 576  
561   - mm = current->mm;
562   - read_lock(&mm->ioctx_list_lock);
563   - for (ioctx = mm->ioctx_list; ioctx; ioctx = ioctx->next)
564   - if (likely(ioctx->user_id == ctx_id && !ioctx->dead)) {
565   - get_ioctx(ioctx);
  577 + rcu_read_lock();
  578 +
  579 + hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
  580 + if (ctx->user_id == ctx_id && !ctx->dead) {
  581 + get_ioctx(ctx);
566 582 break;
567 583 }
568   - read_unlock(&mm->ioctx_list_lock);
  584 + }
569 585  
570   - return ioctx;
  586 + rcu_read_unlock();
  587 + return ctx;
571 588 }
572 589  
573 590 /*
574 591  
575 592  
... ... @@ -1215,19 +1232,14 @@
1215 1232 static void io_destroy(struct kioctx *ioctx)
1216 1233 {
1217 1234 struct mm_struct *mm = current->mm;
1218   - struct kioctx **tmp;
1219 1235 int was_dead;
1220 1236  
1221 1237 /* delete the entry from the list is someone else hasn't already */
1222   - write_lock(&mm->ioctx_list_lock);
  1238 + spin_lock(&mm->ioctx_lock);
1223 1239 was_dead = ioctx->dead;
1224 1240 ioctx->dead = 1;
1225   - for (tmp = &mm->ioctx_list; *tmp && *tmp != ioctx;
1226   - tmp = &(*tmp)->next)
1227   - ;
1228   - if (*tmp)
1229   - *tmp = ioctx->next;
1230   - write_unlock(&mm->ioctx_list_lock);
  1241 + hlist_del_rcu(&ioctx->list);
  1242 + spin_unlock(&mm->ioctx_lock);
1231 1243  
1232 1244 dprintk("aio_release(%p)\n", ioctx);
1233 1245 if (likely(!was_dead))
... ... @@ -5,6 +5,7 @@
5 5 #include <linux/workqueue.h>
6 6 #include <linux/aio_abi.h>
7 7 #include <linux/uio.h>
  8 +#include <linux/rcupdate.h>
8 9  
9 10 #include <asm/atomic.h>
10 11  
... ... @@ -183,7 +184,7 @@
183 184  
184 185 /* This needs improving */
185 186 unsigned long user_id;
186   - struct kioctx *next;
  187 + struct hlist_node list;
187 188  
188 189 wait_queue_head_t wait;
189 190  
... ... @@ -199,6 +200,8 @@
199 200 struct aio_ring_info ring_info;
200 201  
201 202 struct delayed_work wq;
  203 +
  204 + struct rcu_head rcu_head;
202 205 };
203 206  
204 207 /* prototypes */
include/linux/mm_types.h
... ... @@ -232,8 +232,9 @@
232 232 struct core_state *core_state; /* coredumping support */
233 233  
234 234 /* aio bits */
235   - rwlock_t ioctx_list_lock; /* aio lock */
236   - struct kioctx *ioctx_list;
  235 + spinlock_t ioctx_lock;
  236 + struct hlist_head ioctx_list;
  237 +
237 238 #ifdef CONFIG_MM_OWNER
238 239 /*
239 240 * "owner" points to a task that is regarded as the canonical
... ... @@ -415,8 +415,8 @@
415 415 set_mm_counter(mm, file_rss, 0);
416 416 set_mm_counter(mm, anon_rss, 0);
417 417 spin_lock_init(&mm->page_table_lock);
418   - rwlock_init(&mm->ioctx_list_lock);
419   - mm->ioctx_list = NULL;
  418 + spin_lock_init(&mm->ioctx_lock);
  419 + INIT_HLIST_HEAD(&mm->ioctx_list);
420 420 mm->free_area_cache = TASK_UNMAPPED_BASE;
421 421 mm->cached_hole_size = ~0UL;
422 422 mm_init_owner(mm, p);