Commit 1fd05ba5a2f2aa8e7b9b52ef55df850e2e7d54c9

Authored by Miklos Szeredi
Committed by David S. Miller
1 parent 99d24edeb6

[AF_UNIX]: Rewrite garbage collector, fixes race.

Throw out the old mark & sweep garbage collector and put in a
refcounting cycle detecting one.

The old one had a race with recvmsg, that resulted in false positives
and hence data loss.  The old algorithm operated on all unix sockets
in the system, so any additional locking would have meant performance
problems for all users of these.

The new algorithm instead only operates on "in flight" sockets, which
are very rare, and the additional locking for these doesn't negatively
impact the vast majority of users.

In fact it's probable, that there weren't *any* heavy senders of
sockets over sockets, otherwise the above race would have been
discovered long ago.

The patch works OK with the app that exposed the race with the old
code.  The garbage collection has also been verified to work in a few
simple cases.

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
Signed-off-by: David S. Miller <davem@davemloft.net>

Showing 3 changed files with 190 additions and 144 deletions Side-by-side Diff

include/net/af_unix.h
... ... @@ -79,9 +79,10 @@
79 79 struct mutex readlock;
80 80 struct sock *peer;
81 81 struct sock *other;
82   - struct sock *gc_tree;
  82 + struct list_head link;
83 83 atomic_t inflight;
84 84 spinlock_t lock;
  85 + unsigned int gc_candidate : 1;
85 86 wait_queue_head_t peer_wait;
86 87 };
87 88 #define unix_sk(__sk) ((struct unix_sock *)__sk)
... ... @@ -592,7 +592,8 @@
592 592 u->dentry = NULL;
593 593 u->mnt = NULL;
594 594 spin_lock_init(&u->lock);
595   - atomic_set(&u->inflight, sock ? 0 : -1);
  595 + atomic_set(&u->inflight, 0);
  596 + INIT_LIST_HEAD(&u->link);
596 597 mutex_init(&u->readlock); /* single task reading lock */
597 598 init_waitqueue_head(&u->peer_wait);
598 599 unix_insert_socket(unix_sockets_unbound, sk);
... ... @@ -1134,9 +1135,6 @@
1134 1135 /* take ten and and send info to listening sock */
1135 1136 spin_lock(&other->sk_receive_queue.lock);
1136 1137 __skb_queue_tail(&other->sk_receive_queue, skb);
1137   - /* Undo artificially decreased inflight after embrion
1138   - * is installed to listening socket. */
1139   - atomic_inc(&newu->inflight);
1140 1138 spin_unlock(&other->sk_receive_queue.lock);
1141 1139 unix_state_unlock(other);
1142 1140 other->sk_data_ready(other, 0);
... ... @@ -62,6 +62,10 @@
62 62 * AV 1 Mar 1999
63 63 * Damn. Added missing check for ->dead in listen queues scanning.
64 64 *
  65 + * Miklos Szeredi 25 Jun 2007
  66 + * Reimplement with a cycle collecting algorithm. This should
  67 + * solve several problems with the previous code, like being racy
  68 + * wrt receive and holding up unrelated socket operations.
65 69 */
66 70  
67 71 #include <linux/kernel.h>
68 72  
... ... @@ -84,11 +88,10 @@
84 88  
85 89 /* Internal data structures and random procedures: */
86 90  
87   -#define GC_HEAD ((struct sock *)(-1))
88   -#define GC_ORPHAN ((struct sock *)(-3))
  91 +static LIST_HEAD(gc_inflight_list);
  92 +static LIST_HEAD(gc_candidates);
  93 +static DEFINE_SPINLOCK(unix_gc_lock);
89 94  
90   -static struct sock *gc_current = GC_HEAD; /* stack of objects to mark */
91   -
92 95 atomic_t unix_tot_inflight = ATOMIC_INIT(0);
93 96  
94 97  
95 98  
... ... @@ -122,8 +125,16 @@
122 125 {
123 126 struct sock *s = unix_get_socket(fp);
124 127 if(s) {
125   - atomic_inc(&unix_sk(s)->inflight);
  128 + struct unix_sock *u = unix_sk(s);
  129 + spin_lock(&unix_gc_lock);
  130 + if (atomic_inc_return(&u->inflight) == 1) {
  131 + BUG_ON(!list_empty(&u->link));
  132 + list_add_tail(&u->link, &gc_inflight_list);
  133 + } else {
  134 + BUG_ON(list_empty(&u->link));
  135 + }
126 136 atomic_inc(&unix_tot_inflight);
  137 + spin_unlock(&unix_gc_lock);
127 138 }
128 139 }
129 140  
130 141  
131 142  
132 143  
133 144  
134 145  
135 146  
136 147  
137 148  
138 149  
139 150  
140 151  
141 152  
142 153  
143 154  
144 155  
145 156  
146 157  
147 158  
148 159  
149 160  
150 161  
151 162  
152 163  
153 164  
154 165  
155 166  
156 167  
157 168  
158 169  
159 170  
160 171  
161 172  
162 173  
163 174  
164 175  
165 176  
166 177  
... ... @@ -131,183 +142,219 @@
131 142 {
132 143 struct sock *s = unix_get_socket(fp);
133 144 if(s) {
134   - atomic_dec(&unix_sk(s)->inflight);
  145 + struct unix_sock *u = unix_sk(s);
  146 + spin_lock(&unix_gc_lock);
  147 + BUG_ON(list_empty(&u->link));
  148 + if (atomic_dec_and_test(&u->inflight))
  149 + list_del_init(&u->link);
135 150 atomic_dec(&unix_tot_inflight);
  151 + spin_unlock(&unix_gc_lock);
136 152 }
137 153 }
138 154  
  155 +static inline struct sk_buff *sock_queue_head(struct sock *sk)
  156 +{
  157 + return (struct sk_buff *) &sk->sk_receive_queue;
  158 +}
139 159  
140   -/*
141   - * Garbage Collector Support Functions
142   - */
  160 +#define receive_queue_for_each_skb(sk, next, skb) \
  161 + for (skb = sock_queue_head(sk)->next, next = skb->next; \
  162 + skb != sock_queue_head(sk); skb = next, next = skb->next)
143 163  
144   -static inline struct sock *pop_stack(void)
  164 +static void scan_inflight(struct sock *x, void (*func)(struct sock *),
  165 + struct sk_buff_head *hitlist)
145 166 {
146   - struct sock *p = gc_current;
147   - gc_current = unix_sk(p)->gc_tree;
148   - return p;
  167 + struct sk_buff *skb;
  168 + struct sk_buff *next;
  169 +
  170 + spin_lock(&x->sk_receive_queue.lock);
  171 + receive_queue_for_each_skb(x, next, skb) {
  172 + /*
  173 + * Do we have file descriptors ?
  174 + */
  175 + if (UNIXCB(skb).fp) {
  176 + bool hit = false;
  177 + /*
  178 + * Process the descriptors of this socket
  179 + */
  180 + int nfd = UNIXCB(skb).fp->count;
  181 + struct file **fp = UNIXCB(skb).fp->fp;
  182 + while (nfd--) {
  183 + /*
  184 + * Get the socket the fd matches
  185 + * if it indeed does so
  186 + */
  187 + struct sock *sk = unix_get_socket(*fp++);
  188 + if(sk) {
  189 + hit = true;
  190 + func(sk);
  191 + }
  192 + }
  193 + if (hit && hitlist != NULL) {
  194 + __skb_unlink(skb, &x->sk_receive_queue);
  195 + __skb_queue_tail(hitlist, skb);
  196 + }
  197 + }
  198 + }
  199 + spin_unlock(&x->sk_receive_queue.lock);
149 200 }
150 201  
151   -static inline int empty_stack(void)
  202 +static void scan_children(struct sock *x, void (*func)(struct sock *),
  203 + struct sk_buff_head *hitlist)
152 204 {
153   - return gc_current == GC_HEAD;
  205 + if (x->sk_state != TCP_LISTEN)
  206 + scan_inflight(x, func, hitlist);
  207 + else {
  208 + struct sk_buff *skb;
  209 + struct sk_buff *next;
  210 + struct unix_sock *u;
  211 + LIST_HEAD(embryos);
  212 +
  213 + /*
  214 + * For a listening socket collect the queued embryos
  215 + * and perform a scan on them as well.
  216 + */
  217 + spin_lock(&x->sk_receive_queue.lock);
  218 + receive_queue_for_each_skb(x, next, skb) {
  219 + u = unix_sk(skb->sk);
  220 +
  221 + /*
  222 + * An embryo cannot be in-flight, so it's safe
  223 + * to use the list link.
  224 + */
  225 + BUG_ON(!list_empty(&u->link));
  226 + list_add_tail(&u->link, &embryos);
  227 + }
  228 + spin_unlock(&x->sk_receive_queue.lock);
  229 +
  230 + while (!list_empty(&embryos)) {
  231 + u = list_entry(embryos.next, struct unix_sock, link);
  232 + scan_inflight(&u->sk, func, hitlist);
  233 + list_del_init(&u->link);
  234 + }
  235 + }
154 236 }
155 237  
156   -static void maybe_unmark_and_push(struct sock *x)
  238 +static void dec_inflight(struct sock *sk)
157 239 {
158   - struct unix_sock *u = unix_sk(x);
  240 + atomic_dec(&unix_sk(sk)->inflight);
  241 +}
159 242  
160   - if (u->gc_tree != GC_ORPHAN)
161   - return;
162   - sock_hold(x);
163   - u->gc_tree = gc_current;
164   - gc_current = x;
  243 +static void inc_inflight(struct sock *sk)
  244 +{
  245 + atomic_inc(&unix_sk(sk)->inflight);
165 246 }
166 247  
  248 +static void inc_inflight_move_tail(struct sock *sk)
  249 +{
  250 + struct unix_sock *u = unix_sk(sk);
167 251  
  252 + atomic_inc(&u->inflight);
  253 + /*
  254 + * If this is still a candidate, move it to the end of the
  255 + * list, so that it's checked even if it was already passed
  256 + * over
  257 + */
  258 + if (u->gc_candidate)
  259 + list_move_tail(&u->link, &gc_candidates);
  260 +}
  261 +
168 262 /* The external entry point: unix_gc() */
169 263  
170 264 void unix_gc(void)
171 265 {
172   - static DEFINE_MUTEX(unix_gc_sem);
173   - int i;
174   - struct sock *s;
  266 + static bool gc_in_progress = false;
  267 +
  268 + struct unix_sock *u;
  269 + struct unix_sock *next;
175 270 struct sk_buff_head hitlist;
176   - struct sk_buff *skb;
  271 + struct list_head cursor;
177 272  
178   - /*
179   - * Avoid a recursive GC.
180   - */
  273 + spin_lock(&unix_gc_lock);
181 274  
182   - if (!mutex_trylock(&unix_gc_sem))
183   - return;
  275 + /* Avoid a recursive GC. */
  276 + if (gc_in_progress)
  277 + goto out;
184 278  
185   - spin_lock(&unix_table_lock);
186   -
187   - forall_unix_sockets(i, s)
188   - {
189   - unix_sk(s)->gc_tree = GC_ORPHAN;
190   - }
  279 + gc_in_progress = true;
191 280 /*
192   - * Everything is now marked
  281 + * First, select candidates for garbage collection. Only
  282 + * in-flight sockets are considered, and from those only ones
  283 + * which don't have any external reference.
  284 + *
  285 + * Holding unix_gc_lock will protect these candidates from
  286 + * being detached, and hence from gaining an external
  287 + * reference. This also means, that since there are no
  288 + * possible receivers, the receive queues of these sockets are
  289 + * static during the GC, even though the dequeue is done
  290 + * before the detach without atomicity guarantees.
193 291 */
  292 + list_for_each_entry_safe(u, next, &gc_inflight_list, link) {
  293 + int total_refs;
  294 + int inflight_refs;
194 295  
195   - /* Invariant to be maintained:
196   - - everything unmarked is either:
197   - -- (a) on the stack, or
198   - -- (b) has all of its children unmarked
199   - - everything on the stack is always unmarked
200   - - nothing is ever pushed onto the stack twice, because:
201   - -- nothing previously unmarked is ever pushed on the stack
202   - */
  296 + total_refs = file_count(u->sk.sk_socket->file);
  297 + inflight_refs = atomic_read(&u->inflight);
203 298  
  299 + BUG_ON(inflight_refs < 1);
  300 + BUG_ON(total_refs < inflight_refs);
  301 + if (total_refs == inflight_refs) {
  302 + list_move_tail(&u->link, &gc_candidates);
  303 + u->gc_candidate = 1;
  304 + }
  305 + }
  306 +
204 307 /*
205   - * Push root set
  308 + * Now remove all internal in-flight reference to children of
  309 + * the candidates.
206 310 */
  311 + list_for_each_entry(u, &gc_candidates, link)
  312 + scan_children(&u->sk, dec_inflight, NULL);
207 313  
208   - forall_unix_sockets(i, s)
209   - {
210   - int open_count = 0;
211   -
212   - /*
213   - * If all instances of the descriptor are not
214   - * in flight we are in use.
215   - *
216   - * Special case: when socket s is embrion, it may be
217   - * hashed but still not in queue of listening socket.
218   - * In this case (see unix_create1()) we set artificial
219   - * negative inflight counter to close race window.
220   - * It is trick of course and dirty one.
221   - */
222   - if (s->sk_socket && s->sk_socket->file)
223   - open_count = file_count(s->sk_socket->file);
224   - if (open_count > atomic_read(&unix_sk(s)->inflight))
225   - maybe_unmark_and_push(s);
226   - }
227   -
228 314 /*
229   - * Mark phase
  315 + * Restore the references for children of all candidates,
  316 + * which have remaining references. Do this recursively, so
  317 + * only those remain, which form cyclic references.
  318 + *
  319 + * Use a "cursor" link, to make the list traversal safe, even
  320 + * though elements might be moved about.
230 321 */
  322 + list_add(&cursor, &gc_candidates);
  323 + while (cursor.next != &gc_candidates) {
  324 + u = list_entry(cursor.next, struct unix_sock, link);
231 325  
232   - while (!empty_stack())
233   - {
234   - struct sock *x = pop_stack();
235   - struct sock *sk;
  326 + /* Move cursor to after the current position. */
  327 + list_move(&cursor, &u->link);
236 328  
237   - spin_lock(&x->sk_receive_queue.lock);
238   - skb = skb_peek(&x->sk_receive_queue);
239   -
240   - /*
241   - * Loop through all but first born
242   - */
243   -
244   - while (skb && skb != (struct sk_buff *)&x->sk_receive_queue) {
245   - /*
246   - * Do we have file descriptors ?
247   - */
248   - if(UNIXCB(skb).fp)
249   - {
250   - /*
251   - * Process the descriptors of this socket
252   - */
253   - int nfd=UNIXCB(skb).fp->count;
254   - struct file **fp = UNIXCB(skb).fp->fp;
255   - while(nfd--)
256   - {
257   - /*
258   - * Get the socket the fd matches if
259   - * it indeed does so
260   - */
261   - if((sk=unix_get_socket(*fp++))!=NULL)
262   - {
263   - maybe_unmark_and_push(sk);
264   - }
265   - }
266   - }
267   - /* We have to scan not-yet-accepted ones too */
268   - if (x->sk_state == TCP_LISTEN)
269   - maybe_unmark_and_push(skb->sk);
270   - skb=skb->next;
  329 + if (atomic_read(&u->inflight) > 0) {
  330 + list_move_tail(&u->link, &gc_inflight_list);
  331 + u->gc_candidate = 0;
  332 + scan_children(&u->sk, inc_inflight_move_tail, NULL);
271 333 }
272   - spin_unlock(&x->sk_receive_queue.lock);
273   - sock_put(x);
274 334 }
  335 + list_del(&cursor);
275 336  
  337 + /*
  338 + * Now gc_candidates contains only garbage. Restore original
  339 + * inflight counters for these as well, and remove the skbuffs
  340 + * which are creating the cycle(s).
  341 + */
276 342 skb_queue_head_init(&hitlist);
  343 + list_for_each_entry(u, &gc_candidates, link)
  344 + scan_children(&u->sk, inc_inflight, &hitlist);
277 345  
278   - forall_unix_sockets(i, s)
279   - {
280   - struct unix_sock *u = unix_sk(s);
  346 + spin_unlock(&unix_gc_lock);
281 347  
282   - if (u->gc_tree == GC_ORPHAN) {
283   - struct sk_buff *nextsk;
  348 + /* Here we are. Hitlist is filled. Die. */
  349 + __skb_queue_purge(&hitlist);
284 350  
285   - spin_lock(&s->sk_receive_queue.lock);
286   - skb = skb_peek(&s->sk_receive_queue);
287   - while (skb &&
288   - skb != (struct sk_buff *)&s->sk_receive_queue) {
289   - nextsk = skb->next;
290   - /*
291   - * Do we have file descriptors ?
292   - */
293   - if (UNIXCB(skb).fp) {
294   - __skb_unlink(skb,
295   - &s->sk_receive_queue);
296   - __skb_queue_tail(&hitlist, skb);
297   - }
298   - skb = nextsk;
299   - }
300   - spin_unlock(&s->sk_receive_queue.lock);
301   - }
302   - u->gc_tree = GC_ORPHAN;
303   - }
304   - spin_unlock(&unix_table_lock);
  351 + spin_lock(&unix_gc_lock);
305 352  
306   - /*
307   - * Here we are. Hitlist is filled. Die.
308   - */
  353 + /* All candidates should have been detached by now. */
  354 + BUG_ON(!list_empty(&gc_candidates));
  355 + gc_in_progress = false;
309 356  
310   - __skb_queue_purge(&hitlist);
311   - mutex_unlock(&unix_gc_sem);
  357 + out:
  358 + spin_unlock(&unix_gc_lock);
312 359 }