Linux内核中等待队列的源码分析

等待队列就是一个进程列表,其中包含了等待某个特定事件的所有进程。

在Linux中,一个等待队列通过一个等待队列头(wait queue head)来管理,等待队列头是一个类型为wait_queue_head_t的结构体,定义在<linux/wait.h>中。

静态定义并初始化一个等待队列头:

DECLARE_WAIT_QUEUE_HEAD(name);

使用动态的方法:

wait_queue_head_t  my_queue;
init_waitqueue_head(&my_queue);

当进程休眠时,它将期待某个条件会在未来成为真。当一个休眠进程被唤醒时,它必须再次检查它所等待的条件的确为真。

Linux内核中最简单的休眠方式时成为wait_event的宏。

wait_event_interruptible(queue, condition)  //queue是等待队列头,它通过值传递,而不是通过指针。condition是布尔表达式

整个过程的另外一半是唤醒,其他的某个执行线程(可能是另一个进程或者中断处理例程)必须为我们执行唤醒。

wake_up_interruptible(wait_queue_head_t *queue);

在wait.h 中

wait_queue_head_t的定义如下所示

 
struct __wait_queue_head {
    spinlock_t lock;        //自旋锁
    struct list_head task_list;    //链表
};
typedef struct __wait_queue_head wait_queue_head_t;

wait_queue_t的定义如下所示

 
typedef struct __wait_queue wait_queue_t;
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int sync, void *key);
int default_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key);

struct __wait_queue {
    unsigned int flags;
#define WQ_FLAG_EXCLUSIVE    0x01
    void *private;    
    wait_queue_func_t func;
    struct list_head task_list;    //链表节点
};
 

等待队列头wait_queue_head_t的初始化

 
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) {                \
    .lock        = __SPIN_LOCK_UNLOCKED(name.lock),        \
    .task_list    = { &(name).task_list, &(name).task_list } }

#define DECLARE_WAIT_QUEUE_HEAD(name) \
    wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

等待队列wait_queue_t的初始化

 
#define __WAITQUEUE_INITIALIZER(name, tsk) {                \
    .private    = tsk,                        \
    .func        = default_wake_function,            \
    .task_list    = { NULL, NULL } }

#define DECLARE_WAITQUEUE(name, tsk)                    \
    wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
#define DEFINE_WAIT(name)                        \
    wait_queue_t name = {                        \
        .private    = current,                \        //保存当前进程的描述符
        .func        = autoremove_wake_function,        \
        .task_list    = LIST_HEAD_INIT((name).task_list),    \
    }
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int ret = default_wake_function(wait, mode, sync, key);

    if (ret)
        list_del_init(&wait->task_list);
    return ret;
}
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync,
              void *key)
{
    return try_to_wake_up(curr->private, mode, sync);
}
/***
 * try_to_wake_up - wake up a thread
 * @p: the to-be-woken-up thread
 * @state: the mask of task states that can be woken
 * @sync: do a synchronous wakeup?
 *
 * Put it on the run-queue if it's not already there. The "current"
 * thread is always on the run-queue (except when the actual
 * re-schedule is in progress), and as such you're allowed to do
 * the simpler "current->state = TASK_RUNNING" to mark yourself
 * runnable without the overhead of this.
 *
 * returns failure only if the task is already active.
 */
static int try_to_wake_up(struct task_struct *p, unsigned int state, int sync)
{
    int cpu, orig_cpu, this_cpu, success = 0;
    unsigned long flags;
    long old_state;
    struct rq *rq;

    if (!sched_feat(SYNC_WAKEUPS))
        sync = 0;

#ifdef CONFIG_SMP
    if (sched_feat(LB_WAKEUP_UPDATE)) {
        struct sched_domain *sd;

        this_cpu = raw_smp_processor_id();
        cpu = task_cpu(p);

        for_each_domain(this_cpu, sd) {
            if (cpumask_test_cpu(cpu, sched_domain_span(sd))) {
                update_shares(sd);
                break;
            }
        }
    }
#endif

    smp_wmb();
    rq = task_rq_lock(p, &flags);
    update_rq_clock(rq);
    old_state = p->state;
    if (!(old_state & state))
        goto out;

    if (p->se.on_rq)
        goto out_running;

    cpu = task_cpu(p);
    orig_cpu = cpu;
    this_cpu = smp_processor_id();

#ifdef CONFIG_SMP
    if (unlikely(task_running(rq, p)))
        goto out_activate;

    cpu = p->sched_class->select_task_rq(p, sync);
    if (cpu != orig_cpu) {
        set_task_cpu(p, cpu);
        task_rq_unlock(rq, &flags);
        /* might preempt at this point */
        rq = task_rq_lock(p, &flags);
        old_state = p->state;
        if (!(old_state & state))
            goto out;
        if (p->se.on_rq)
            goto out_running;

        this_cpu = smp_processor_id();
        cpu = task_cpu(p);
    }

#ifdef CONFIG_SCHEDSTATS
    schedstat_inc(rq, ttwu_count);
    if (cpu == this_cpu)
        schedstat_inc(rq, ttwu_local);
    else {
        struct sched_domain *sd;
        for_each_domain(this_cpu, sd) {
            if (cpumask_test_cpu(cpu, sched_domain_span(sd))) {
                schedstat_inc(sd, ttwu_wake_remote);
                break;
            }
        }
    }
#endif /* CONFIG_SCHEDSTATS */

out_activate:
#endif /* CONFIG_SMP */
    schedstat_inc(p, se.nr_wakeups);
    if (sync)
        schedstat_inc(p, se.nr_wakeups_sync);
    if (orig_cpu != cpu)
        schedstat_inc(p, se.nr_wakeups_migrate);
    if (cpu == this_cpu)
        schedstat_inc(p, se.nr_wakeups_local);
    else
        schedstat_inc(p, se.nr_wakeups_remote);
    activate_task(rq, p, 1);
    success = 1;

out_running:
    trace_sched_wakeup(rq, p, success);
    check_preempt_curr(rq, p, sync);

    p->state = TASK_RUNNING;
#ifdef CONFIG_SMP
    if (p->sched_class->task_wake_up)
        p->sched_class->task_wake_up(rq, p);
#endif
out:
    current->se.last_wakeup = current->se.sum_exec_runtime;

    task_rq_unlock(rq, &flags);

    return success;
}

wait_event_interruptible函数的定义如下所示

 
#define wait_event_interruptible(wq, condition)                \
({                                    \
    int __ret = 0;                            \
    if (!(condition))                        \
        __wait_event_interruptible(wq, condition, __ret);    \
    __ret;                                \
})
 
 
#define __wait_event_interruptible(wq, condition, ret)            \
do {                                    \
    DEFINE_WAIT(__wait);                        \
                                    \
    for (;;) {                            \
        prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE);    \
        if (condition)                        \
            break;                        \
        if (!signal_pending(current)) {                \
            schedule();                    \
            continue;                    \
        }                            \
        ret = -ERESTARTSYS;                    \
        break;                            \
    }                                \
    finish_wait(&wq, &__wait);                    \
} while (0)
 
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
    unsigned long flags;

    wait->flags &= ~WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&q->lock, flags);
    if (list_empty(&wait->task_list))
        __add_wait_queue(q, wait);
    set_current_state(state);
    spin_unlock_irqrestore(&q->lock, flags);
}
 
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
    list_add(&new->task_list, &head->task_list);
}
 
void finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
    unsigned long flags;

    __set_current_state(TASK_RUNNING);
    /*
     * We can check for list emptiness outside the lock
     * IFF:
     * - we use the "careful" check that verifies both
     * the next and prev pointers, so that there cannot
     * be any half-pending updates in progress on other
     * CPU's that we haven't seen yet (and that might
     * still change the stack area.
     * and
     * - all other users take the lock (ie we can only
     * have _one_ other CPU that looks at or modifies
     * the list).
     */
    if (!list_empty_careful(&wait->task_list)) {
        spin_lock_irqsave(&q->lock, flags);
        list_del_init(&wait->task_list);
        spin_unlock_irqrestore(&q->lock, flags);
    }
}
 

wake_up_interruptible(wait_queue_head_t *queue)函数的定义如下所示

 
#define wake_up_interruptible(x)    __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
 
 
void __wake_up(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;

    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, 0, key);
    spin_unlock_irqrestore(&q->lock, flags);
}
 
void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int sync, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, sync, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *