RHEL4/kernel/rcupdate.c
<<
>>
Prefs
   1/*
   2 * Read-Copy Update mechanism for mutual exclusion
   3 *
   4 * This program is free software; you can redistribute it and/or modify
   5 * it under the terms of the GNU General Public License as published by
   6 * the Free Software Foundation; either version 2 of the License, or
   7 * (at your option) any later version.
   8 *
   9 * This program is distributed in the hope that it will be useful,
  10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12 * GNU General Public License for more details.
  13 *
  14 * You should have received a copy of the GNU General Public License
  15 * along with this program; if not, write to the Free Software
  16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  17 *
  18 * Copyright (C) IBM Corporation, 2001
  19 *
  20 * Authors: Dipankar Sarma <dipankar@in.ibm.com>
  21 *          Manfred Spraul <manfred@colorfullife.com>
  22 * 
  23 * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
  24 * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
  25 * Papers:
  26 * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
  27 * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
  28 *
  29 * For detailed explanation of Read-Copy Update mechanism see -
  30 *              http://lse.sourceforge.net/locking/rcupdate.html
  31 *
  32 */
  33#include <linux/types.h>
  34#include <linux/kernel.h>
  35#include <linux/init.h>
  36#include <linux/spinlock.h>
  37#include <linux/smp.h>
  38#include <linux/interrupt.h>
  39#include <linux/sched.h>
  40#include <asm/atomic.h>
  41#include <asm/bitops.h>
  42#include <linux/module.h>
  43#include <linux/completion.h>
  44#include <linux/moduleparam.h>
  45#include <linux/percpu.h>
  46#include <linux/notifier.h>
  47#include <linux/rcupdate.h>
  48#include <linux/cpu.h>
  49
  50/* Definition for rcupdate control block. */
  51struct rcu_ctrlblk rcu_ctrlblk = 
  52        { .cur = -300, .completed = -300 , .lock = SEQCNT_ZERO };
  53struct rcu_ctrlblk rcu_bh_ctrlblk =
  54        { .cur = -300, .completed = -300 , .lock = SEQCNT_ZERO };
  55
  56/* Bookkeeping of the progress of the grace period */
  57struct rcu_state {
  58        spinlock_t      lock; /* Guard this struct and writes to rcu_ctrlblk */
  59        cpumask_t       cpumask; /* CPUs that need to switch in order    */
  60                                      /* for current batch to proceed.        */
  61};
  62
  63struct rcu_state rcu_state ____cacheline_maxaligned_in_smp =
  64          {.lock = SPIN_LOCK_UNLOCKED, .cpumask = CPU_MASK_NONE };
  65struct rcu_state rcu_bh_state ____cacheline_maxaligned_in_smp =
  66          {.lock = SPIN_LOCK_UNLOCKED, .cpumask = CPU_MASK_NONE };
  67
  68DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
  69DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
  70
  71/* Fake initialization required by compiler */
  72static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
  73static int maxbatch = 10;
  74
  75/**
  76 * call_rcu - Queue an RCU callback for invocation after a grace period.
  77 * @head: structure to be used for queueing the RCU updates.
  78 * @func: actual update function to be invoked after the grace period
  79 *
  80 * The update function will be invoked some time after a full grace
  81 * period elapses, in other words after all currently executing RCU
  82 * read-side critical sections have completed.  RCU read-side critical
  83 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
  84 * and may be nested.
  85 */
  86void fastcall call_rcu(struct rcu_head *head,
  87                                void (*func)(struct rcu_head *rcu))
  88{
  89        unsigned long flags;
  90        struct rcu_data *rdp;
  91
  92        head->func = func;
  93        head->next = NULL;
  94        local_irq_save(flags);
  95        rdp = &__get_cpu_var(rcu_data);
  96        *rdp->nxttail = head;
  97        rdp->nxttail = &head->next;
  98        local_irq_restore(flags);
  99}
 100
 101/**
 102 * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
 103 * @head: structure to be used for queueing the RCU updates.
 104 * @func: actual update function to be invoked after the grace period
 105 *
 106 * The update function will be invoked some time after a full grace
 107 * period elapses, in other words after all currently executing RCU
 108 * read-side critical sections have completed. call_rcu_bh() assumes
 109 * that the read-side critical sections end on completion of a softirq
 110 * handler. This means that read-side critical sections in process
 111 * context must not be interrupted by softirqs. This interface is to be
 112 * used when most of the read-side critical sections are in softirq context.
 113 * RCU read-side critical sections are delimited by rcu_read_lock() and
 114 * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
 115 * and rcu_read_unlock_bh(), if in process context. These may be nested.
 116 */
 117void fastcall call_rcu_bh(struct rcu_head *head,
 118                                void (*func)(struct rcu_head *rcu))
 119{
 120        unsigned long flags;
 121        struct rcu_data *rdp;
 122
 123        head->func = func;
 124        head->next = NULL;
 125        local_irq_save(flags);
 126        rdp = &__get_cpu_var(rcu_bh_data);
 127        *rdp->nxttail = head;
 128        rdp->nxttail = &head->next;
 129        local_irq_restore(flags);
 130}
 131
 132/*
 133 * Invoke the completed RCU callbacks. They are expected to be in
 134 * a per-cpu list.
 135 */
 136static void rcu_do_batch(struct rcu_data *rdp)
 137{
 138        struct rcu_head *next, *list;
 139        int count = 0;
 140
 141        list = rdp->donelist;
 142        while (list) {
 143                next = rdp->donelist = list->next;
 144                list->func(list);
 145                list = next;
 146                if (++count >= maxbatch)
 147                        break;
 148        }
 149        if (!rdp->donelist)
 150                rdp->donetail = &rdp->donelist;
 151        else
 152                tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
 153}
 154
 155/*
 156 * Grace period handling:
 157 * The grace period handling consists out of two steps:
 158 * - A new grace period is started.
 159 *   This is done by rcu_start_batch. The start is not broadcasted to
 160 *   all cpus, they must pick this up by comparing rcp->cur with
 161 *   rdp->quiescbatch. All cpus are recorded  in the
 162 *   rcu_state.cpumask bitmap.
 163 * - All cpus must go through a quiescent state.
 164 *   Since the start of the grace period is not broadcasted, at least two
 165 *   calls to rcu_check_quiescent_state are required:
 166 *   The first call just notices that a new grace period is running. The
 167 *   following calls check if there was a quiescent state since the beginning
 168 *   of the grace period. If so, it updates rcu_state.cpumask. If
 169 *   the bitmap is empty, then the grace period is completed.
 170 *   rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
 171 *   period (if necessary).
 172 */
 173/*
 174 * Register a new batch of callbacks, and start it up if there is currently no
 175 * active batch and the batch to be registered has not already occurred.
 176 * Caller must hold rcu_state.lock.
 177 */
 178static void rcu_start_batch(struct rcu_ctrlblk *rcp, struct rcu_state *rsp,
 179                                int next_pending)
 180{
 181        if (next_pending)
 182                rcp->next_pending = 1;
 183
 184        if (rcp->next_pending &&
 185                        rcp->completed == rcp->cur) {
 186                /* Can't change, since spin lock held. */
 187                cpus_andnot(rsp->cpumask, cpu_online_map, nohz_cpu_mask);
 188                write_seqcount_begin(&rcp->lock);
 189                rcp->next_pending = 0;
 190                rcp->cur++;
 191                write_seqcount_end(&rcp->lock);
 192        }
 193}
 194
 195/*
 196 * cpu went through a quiescent state since the beginning of the grace period.
 197 * Clear it from the cpu mask and complete the grace period if it was the last
 198 * cpu. Start another grace period if someone has further entries pending
 199 */
 200static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp, struct rcu_state *rsp)
 201{
 202        cpu_clear(cpu, rsp->cpumask);
 203        if (cpus_empty(rsp->cpumask)) {
 204                /* batch completed ! */
 205                rcp->completed = rcp->cur;
 206                rcu_start_batch(rcp, rsp, 0);
 207        }
 208}
 209
 210/*
 211 * Check if the cpu has gone through a quiescent state (say context
 212 * switch). If so and if it already hasn't done so in this RCU
 213 * quiescent cycle, then indicate that it has done so.
 214 */
 215static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
 216                        struct rcu_state *rsp, struct rcu_data *rdp)
 217{
 218        if (rdp->quiescbatch != rcp->cur) {
 219                /* new grace period: record qsctr value. */
 220                rdp->qs_pending = 1;
 221                rdp->last_qsctr = rdp->qsctr;
 222                rdp->quiescbatch = rcp->cur;
 223                return;
 224        }
 225
 226        /* Grace period already completed for this cpu?
 227         * qs_pending is checked instead of the actual bitmap to avoid
 228         * cacheline trashing.
 229         */
 230        if (!rdp->qs_pending)
 231                return;
 232
 233        /* 
 234         * Races with local timer interrupt - in the worst case
 235         * we may miss one quiescent state of that CPU. That is
 236         * tolerable. So no need to disable interrupts.
 237         */
 238        if (rdp->qsctr == rdp->last_qsctr)
 239                return;
 240        rdp->qs_pending = 0;
 241
 242        spin_lock(&rsp->lock);
 243        /*
 244         * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
 245         * during cpu startup. Ignore the quiescent state.
 246         */
 247        if (likely(rdp->quiescbatch == rcp->cur))
 248                cpu_quiet(rdp->cpu, rcp, rsp);
 249
 250        spin_unlock(&rsp->lock);
 251}
 252
 253
 254#ifdef CONFIG_HOTPLUG_CPU
 255
 256/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
 257 * locking requirements, the list it's pulling from has to belong to a cpu
 258 * which is dead and hence not processing interrupts.
 259 */
 260static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
 261                                struct rcu_head **tail)
 262{
 263        local_irq_disable();
 264        *this_rdp->nxttail = list;
 265        if (list)
 266                this_rdp->nxttail = tail;
 267        local_irq_enable();
 268}
 269
 270static void __rcu_offline_cpu(struct rcu_data *this_rdp,
 271        struct rcu_ctrlblk *rcp, struct rcu_state *rsp, struct rcu_data *rdp)
 272{
 273        /* if the cpu going offline owns the grace period
 274         * we can block indefinitely waiting for it, so flush
 275         * it here
 276         */
 277        spin_lock_bh(&rsp->lock);
 278        if (rcp->cur != rcp->completed)
 279                cpu_quiet(rdp->cpu, rcp, rsp);
 280        spin_unlock_bh(&rsp->lock);
 281        rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
 282        rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
 283
 284}
 285static void rcu_offline_cpu(int cpu)
 286{
 287        struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
 288        struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
 289
 290        __rcu_offline_cpu(this_rdp, &rcu_ctrlblk, &rcu_state,
 291                                        &per_cpu(rcu_data, cpu));
 292        __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk, &rcu_bh_state,
 293                                        &per_cpu(rcu_bh_data, cpu));
 294        put_cpu_var(rcu_data);
 295        put_cpu_var(rcu_bh_data);
 296        tasklet_kill_immediate(&per_cpu(rcu_tasklet, cpu), cpu);
 297}
 298
 299#else
 300
 301static void rcu_offline_cpu(int cpu)
 302{
 303}
 304
 305#endif
 306
 307/*
 308 * This does the RCU processing work from tasklet context. 
 309 */
 310static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
 311                        struct rcu_state *rsp, struct rcu_data *rdp)
 312{
 313        if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
 314                *rdp->donetail = rdp->curlist;
 315                rdp->donetail = rdp->curtail;
 316                rdp->curlist = NULL;
 317                rdp->curtail = &rdp->curlist;
 318        }
 319
 320        local_irq_disable();
 321        if (rdp->nxtlist && !rdp->curlist) {
 322                int next_pending, seq;
 323
 324                rdp->curlist = rdp->nxtlist;
 325                rdp->curtail = rdp->nxttail;
 326                rdp->nxtlist = NULL;
 327                rdp->nxttail = &rdp->nxtlist;
 328                local_irq_enable();
 329
 330                /*
 331                 * start the next batch of callbacks
 332                 */
 333                do {
 334                        seq = read_seqcount_begin(&rcp->lock);
 335                        /* determine batch number */
 336                        rdp->batch = rcp->cur + 1;
 337                        next_pending = rcp->next_pending;
 338                } while (read_seqcount_retry(&rcp->lock, seq));
 339
 340                if (!next_pending) {
 341                        /* and start it/schedule start if it's a new batch */
 342                        spin_lock(&rsp->lock);
 343                        rcu_start_batch(rcp, rsp, 1);
 344                        spin_unlock(&rsp->lock);
 345                }
 346        } else {
 347                local_irq_enable();
 348        }
 349        rcu_check_quiescent_state(rcp, rsp, rdp);
 350        if (rdp->donelist)
 351                rcu_do_batch(rdp);
 352}
 353
 354static void rcu_process_callbacks(unsigned long unused)
 355{
 356        __rcu_process_callbacks(&rcu_ctrlblk, &rcu_state,
 357                                &__get_cpu_var(rcu_data));
 358        __rcu_process_callbacks(&rcu_bh_ctrlblk, &rcu_bh_state,
 359                                &__get_cpu_var(rcu_bh_data));
 360}
 361
 362void rcu_check_callbacks(int cpu, int user)
 363{
 364        if (user || 
 365            (idle_cpu(cpu) && !in_softirq() && 
 366                                hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
 367                rcu_qsctr_inc(cpu);
 368                rcu_bh_qsctr_inc(cpu);
 369        } else if (!in_softirq())
 370                rcu_bh_qsctr_inc(cpu);
 371        tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
 372}
 373
 374static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 375                                                struct rcu_data *rdp)
 376{
 377        memset(rdp, 0, sizeof(*rdp));
 378        rdp->curtail = &rdp->curlist;
 379        rdp->nxttail = &rdp->nxtlist;
 380        rdp->donetail = &rdp->donelist;
 381        rdp->quiescbatch = rcp->completed;
 382        rdp->qs_pending = 0;
 383        rdp->cpu = cpu;
 384}
 385
 386static void __devinit rcu_online_cpu(int cpu)
 387{
 388        struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 389        struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
 390
 391        rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
 392        rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
 393        tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
 394}
 395
 396static int __devinit rcu_cpu_notify(struct notifier_block *self, 
 397                                unsigned long action, void *hcpu)
 398{
 399        long cpu = (long)hcpu;
 400        switch (action) {
 401        case CPU_UP_PREPARE:
 402                rcu_online_cpu(cpu);
 403                break;
 404        case CPU_DEAD:
 405                rcu_offline_cpu(cpu);
 406                break;
 407        default:
 408                break;
 409        }
 410        return NOTIFY_OK;
 411}
 412
 413static struct notifier_block __devinitdata rcu_nb = {
 414        .notifier_call  = rcu_cpu_notify,
 415};
 416
 417/*
 418 * Initializes rcu mechanism.  Assumed to be called early.
 419 * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
 420 * Note that rcu_qsctr and friends are implicitly
 421 * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
 422 */
 423void __init rcu_init(void)
 424{
 425        rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
 426                        (void *)(long)smp_processor_id());
 427        /* Register notifier for non-boot CPUs */
 428        register_cpu_notifier(&rcu_nb);
 429}
 430
 431struct rcu_synchronize {
 432        struct rcu_head head;
 433        struct completion completion;
 434};
 435
 436/* Because of FASTCALL declaration of complete, we use this wrapper */
 437static void wakeme_after_rcu(struct rcu_head  *head)
 438{
 439        struct rcu_synchronize *rcu;
 440
 441        rcu = container_of(head, struct rcu_synchronize, head);
 442        complete(&rcu->completion);
 443}
 444
 445/**
 446 * synchronize_kernel - wait until a grace period has elapsed.
 447 *
 448 * Control will return to the caller some time after a full grace
 449 * period has elapsed, in other words after all currently executing RCU
 450 * read-side critical sections have completed.  RCU read-side critical
 451 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
 452 * and may be nested.
 453 */
 454void synchronize_kernel(void)
 455{
 456        struct rcu_synchronize rcu;
 457
 458        init_completion(&rcu.completion);
 459        /* Will wake me after RCU finished */
 460        call_rcu(&rcu.head, wakeme_after_rcu);
 461
 462        /* Wait for it */
 463        wait_for_completion(&rcu.completion);
 464}
 465
 466module_param(maxbatch, int, 0);
 467EXPORT_SYMBOL(call_rcu);
 468EXPORT_SYMBOL(call_rcu_bh);
 469EXPORT_SYMBOL(synchronize_kernel);
 470