Commit 8789a9e7df6bf9b93739c4c7d4e380725bc9e936

Authored by Steven Rostedt
Committed by Ingo Molnar
1 parent abc9b56d66

ring-buffer: read page interface

Impact: new API to ring buffer

This patch adds a new interface into the ring buffer that allows a
page to be read from the ring buffer on a given CPU. For every page
read, one must also be given to allow for a "swap" of the pages.

 rpage = ring_buffer_alloc_read_page(buffer);
 if (!rpage)
	goto err;
 ret = ring_buffer_read_page(buffer, &rpage, cpu, full);
 if (!ret)
	goto empty;
 process_page(rpage);
 ring_buffer_free_read_page(rpage);

The caller of these functions must handle any waits that are
needed to wait for new data. The ring_buffer_read_page will simply
return 0 if there is no data, or if "full" is set and the writer
is still on the current page.

Signed-off-by: Steven Rostedt <srostedt@redhat.com>
Signed-off-by: Ingo Molnar <mingo@elte.hu>

Showing 2 changed files with 171 additions and 0 deletions Side-by-side Diff

include/linux/ring_buffer.h
... ... @@ -124,6 +124,11 @@
124 124 void tracing_off(void);
125 125 void tracing_off_permanent(void);
126 126  
  127 +void *ring_buffer_alloc_read_page(struct ring_buffer *buffer);
  128 +void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data);
  129 +int ring_buffer_read_page(struct ring_buffer *buffer,
  130 + void **data_page, int cpu, int full);
  131 +
127 132 enum ring_buffer_flags {
128 133 RB_FL_OVERWRITE = 1 << 0,
129 134 };
kernel/trace/ring_buffer.c
... ... @@ -687,6 +687,12 @@
687 687 return event->type == RINGBUF_TYPE_PADDING;
688 688 }
689 689  
  690 +static inline void *
  691 +__rb_data_page_index(struct buffer_data_page *page, unsigned index)
  692 +{
  693 + return page->data + index;
  694 +}
  695 +
690 696 static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
691 697 {
692 698 return page->page->data + index;
... ... @@ -2230,6 +2236,166 @@
2230 2236 atomic_dec(&cpu_buffer_b->record_disabled);
2231 2237  
2232 2238 return 0;
  2239 +}
  2240 +
  2241 +static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
  2242 + struct buffer_data_page *page)
  2243 +{
  2244 + struct ring_buffer_event *event;
  2245 + unsigned long head;
  2246 +
  2247 + __raw_spin_lock(&cpu_buffer->lock);
  2248 + for (head = 0; head < local_read(&page->commit);
  2249 + head += rb_event_length(event)) {
  2250 +
  2251 + event = __rb_data_page_index(page, head);
  2252 + if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
  2253 + return;
  2254 + /* Only count data entries */
  2255 + if (event->type != RINGBUF_TYPE_DATA)
  2256 + continue;
  2257 + cpu_buffer->entries--;
  2258 + }
  2259 + __raw_spin_unlock(&cpu_buffer->lock);
  2260 +}
  2261 +
  2262 +/**
  2263 + * ring_buffer_alloc_read_page - allocate a page to read from buffer
  2264 + * @buffer: the buffer to allocate for.
  2265 + *
  2266 + * This function is used in conjunction with ring_buffer_read_page.
  2267 + * When reading a full page from the ring buffer, these functions
  2268 + * can be used to speed up the process. The calling function should
  2269 + * allocate a few pages first with this function. Then when it
  2270 + * needs to get pages from the ring buffer, it passes the result
  2271 + * of this function into ring_buffer_read_page, which will swap
  2272 + * the page that was allocated, with the read page of the buffer.
  2273 + *
  2274 + * Returns:
  2275 + * The page allocated, or NULL on error.
  2276 + */
  2277 +void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
  2278 +{
  2279 + unsigned long addr;
  2280 + struct buffer_data_page *page;
  2281 +
  2282 + addr = __get_free_page(GFP_KERNEL);
  2283 + if (!addr)
  2284 + return NULL;
  2285 +
  2286 + page = (void *)addr;
  2287 +
  2288 + return page;
  2289 +}
  2290 +
  2291 +/**
  2292 + * ring_buffer_free_read_page - free an allocated read page
  2293 + * @buffer: the buffer the page was allocate for
  2294 + * @data: the page to free
  2295 + *
  2296 + * Free a page allocated from ring_buffer_alloc_read_page.
  2297 + */
  2298 +void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
  2299 +{
  2300 + free_page((unsigned long)data);
  2301 +}
  2302 +
  2303 +/**
  2304 + * ring_buffer_read_page - extract a page from the ring buffer
  2305 + * @buffer: buffer to extract from
  2306 + * @data_page: the page to use allocated from ring_buffer_alloc_read_page
  2307 + * @cpu: the cpu of the buffer to extract
  2308 + * @full: should the extraction only happen when the page is full.
  2309 + *
  2310 + * This function will pull out a page from the ring buffer and consume it.
  2311 + * @data_page must be the address of the variable that was returned
  2312 + * from ring_buffer_alloc_read_page. This is because the page might be used
  2313 + * to swap with a page in the ring buffer.
  2314 + *
  2315 + * for example:
  2316 + * rpage = ring_buffer_alloc_page(buffer);
  2317 + * if (!rpage)
  2318 + * return error;
  2319 + * ret = ring_buffer_read_page(buffer, &rpage, cpu, 0);
  2320 + * if (ret)
  2321 + * process_page(rpage);
  2322 + *
  2323 + * When @full is set, the function will not return true unless
  2324 + * the writer is off the reader page.
  2325 + *
  2326 + * Note: it is up to the calling functions to handle sleeps and wakeups.
  2327 + * The ring buffer can be used anywhere in the kernel and can not
  2328 + * blindly call wake_up. The layer that uses the ring buffer must be
  2329 + * responsible for that.
  2330 + *
  2331 + * Returns:
  2332 + * 1 if data has been transferred
  2333 + * 0 if no data has been transferred.
  2334 + */
  2335 +int ring_buffer_read_page(struct ring_buffer *buffer,
  2336 + void **data_page, int cpu, int full)
  2337 +{
  2338 + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
  2339 + struct ring_buffer_event *event;
  2340 + struct buffer_data_page *page;
  2341 + unsigned long flags;
  2342 + int ret = 0;
  2343 +
  2344 + if (!data_page)
  2345 + return 0;
  2346 +
  2347 + page = *data_page;
  2348 + if (!page)
  2349 + return 0;
  2350 +
  2351 + spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
  2352 +
  2353 + /*
  2354 + * rb_buffer_peek will get the next ring buffer if
  2355 + * the current reader page is empty.
  2356 + */
  2357 + event = rb_buffer_peek(buffer, cpu, NULL);
  2358 + if (!event)
  2359 + goto out;
  2360 +
  2361 + /* check for data */
  2362 + if (!local_read(&cpu_buffer->reader_page->page->commit))
  2363 + goto out;
  2364 + /*
  2365 + * If the writer is already off of the read page, then simply
  2366 + * switch the read page with the given page. Otherwise
  2367 + * we need to copy the data from the reader to the writer.
  2368 + */
  2369 + if (cpu_buffer->reader_page == cpu_buffer->commit_page) {
  2370 + unsigned int read = cpu_buffer->reader_page->read;
  2371 +
  2372 + if (full)
  2373 + goto out;
  2374 + /* The writer is still on the reader page, we must copy */
  2375 + page = cpu_buffer->reader_page->page;
  2376 + memcpy(page->data,
  2377 + cpu_buffer->reader_page->page->data + read,
  2378 + local_read(&page->commit) - read);
  2379 +
  2380 + /* consume what was read */
  2381 + cpu_buffer->reader_page += read;
  2382 +
  2383 + } else {
  2384 + /* swap the pages */
  2385 + rb_init_page(page);
  2386 + page = cpu_buffer->reader_page->page;
  2387 + cpu_buffer->reader_page->page = *data_page;
  2388 + cpu_buffer->reader_page->read = 0;
  2389 + *data_page = page;
  2390 + }
  2391 + ret = 1;
  2392 +
  2393 + /* update the entry counter */
  2394 + rb_remove_entries(cpu_buffer, page);
  2395 + out:
  2396 + spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
  2397 +
  2398 + return ret;
2233 2399 }
2234 2400  
2235 2401 static ssize_t