/* kernel/workqueue.c */
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until all currently-scheduled works are completed, but it doesn't
* want to be livelocked by new, incoming ones. So it waits until
* remove_sequence is >= the insert_sequence which pertained when
* flush_scheduled_work() was called.
*/
// 这个结构是针对每个CPU的
struct cpu_workqueue_struct {
// 结构锁
spinlock_t lock;
// 下一个要执行的节点序号
long remove_sequence; /* Least-recently added (next to run) */
// 下一个要插入节点的序号
long insert_sequence; /* Next to add */
// 工作机构链表节点
struct list_head worklist;
// 要进行处理的等待队列
wait_queue_head_t more_work;
// 处理完的等待队列
wait_queue_head_t work_done;
// 工作队列节点
struct workqueue_struct *wq;
// 进程指针
struct task_struct *thread;
int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
// 工作队列结构
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
const char *name;
struct list_head list; /* Empty if single thread */
};
4.2 释放工作队列
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*
* Safely destroy a workqueue. All work currently pending will be done first.
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
int cpu;
// 清除当前工作队列中的所有工作
flush_workqueue(wq);
/* We don't need the distraction of CPUs appearing and vanishing. */
mutex_lock(&workqueue_mutex);
// 结束该工作队列的线程
if (is_single_threaded(wq))
cleanup_workqueue_thread(wq, singlethread_cpu);
else {
for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu);
list_del(&wq->list);
}
mutex_unlock(&workqueue_mutex);
// 释放工作队列中对应每个CPU的工作队列数据
free_percpu(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
/**
* flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
*
* Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers.
*
* This function will sample each workqueue's current insert_sequence number and
* will sleep until the head sequence is greater than or equal to that. This
* means that we sleep until all works which were queued on entry have been
* handled, but we are not livelocked by new incoming ones.
*
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
// 该进程可以睡眠
might_sleep();
// 清空每个CPU上的工作队列
if (is_single_threaded(wq)) {
/* Always use first cpu's area. */
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
} else {
int cpu;
mutex_lock(&workqueue_mutex);
for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
mutex_unlock(&workqueue_mutex);
}
}
EXPORT_SYMBOL_GPL(flush_workqueue);
flush_workqueue的核心处理函数为flush_cpu_workqueue:
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
if (cwq->thread == current) {
// 如果是工作队列进程正在被调度
/*
* Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking.
*/
// 执行完该工作队列
run_workqueue(cwq);
} else {
// 定义等待
DEFINE_WAIT(wait);
long sequence_needed;
// 加锁
spin_lock_irq(&cwq->lock);
// 最新工作结构序号
sequence_needed = cwq->insert_sequence;
// 该条件是判断队列中是否还有没有执行的工作结构
while (sequence_needed - cwq->remove_sequence > 0) {
// 有为执行的工作结构
// 通过work_done等待队列等待
prepare_to_wait(&cwq->work_done, &wait,
TASK_UNINTERRUPTIBLE);
// 解锁
spin_unlock_irq(&cwq->lock);
// 睡眠, 由wake_up(&cwq->work_done)来唤醒
schedule();
// 重新加锁
spin_lock_irq(&cwq->lock);
}
// 等待清除
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
}
}
4.3.1 立即调度
// 在其他函数中使用以下函数来调度工作结构, 是把工作结构挂接到工作队列中进行调度
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* This puts a job in the kernel-global workqueue.
*/
// 调度工作结构, 将工作结构添加到事件工作队列keventd_wq
int fastcall schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work);
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
* We queue the work to the CPU it was submitted, but there is no
* guarantee that it will be processed by that CPU.
*/
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0, cpu = get_cpu();
if (!test_and_set_bit(0, &work->pending)) {
// 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中
if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;
BUG_ON(!list_empty(&work->entry));
// 进行具体的排队
__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
ret = 1;
}
put_cpu();
return ret;
}
EXPORT_SYMBOL_GPL(queue_work);
/* Preempt must be disabled. */
// 不能被抢占
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
// 加锁
spin_lock_irqsave(&cwq->lock, flags);
// 指向CPU工作队列
work->wq_data = cwq;
// 挂接到工作链表
list_add_tail(&work->entry, &cwq->worklist);
// 递增插入的序列号
cwq->insert_sequence++;
// 唤醒等待队列准备处理工作结构
wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags);
}
4.3.2 延迟调度
4.3.2.1 schedule_delayed_work
/**
* schedule_delayed_work - put work task in global workqueue after delay
* @work: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue.
*/
// 延迟调度工作, 延迟一定时间后再将工作结构挂接到工作队列
int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
{
return queue_delayed_work(keventd_wq, work, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
/**
* queue_delayed_work - queue work on a workqueue after delay
* @wq: workqueue to use
* @work: work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int fastcall queue_delayed_work(struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{
int ret = 0;
// 定时器, 此时的定时器应该是不起效的, 延迟将通过该定时器来实现
struct timer_list *timer = &work->timer;
if (!test_and_set_bit(0, &work->pending)) {
// 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中
// 如果现在定时器已经起效, 出错
BUG_ON(timer_pending(timer));
// 工作结构已经挂接到链表, 出错
BUG_ON(!list_empty(&work->entry));
/* This stores wq for the moment, for the timer_fn */
// 保存工作队列的指针
work->wq_data = wq;
// 定时器初始化
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
// 定时函数
timer->function = delayed_work_timer_fn;
// 定时器生效, 定时到期后再添加到工作队列
add_timer(timer);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_delayed_work);
// 定时中断函数
static void delayed_work_timer_fn(unsigned long __data)
{
struct work_struct *work = (struct work_struct *)__data;
struct workqueue_struct *wq = work->wq_data;
// 获取CPU
int cpu = smp_processor_id();
if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;
// 将工作结构添加到工作队列,注意这是在时间中断调用
__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
}
4.3.2.2 schedule_delayed_work_on
指定CPU的延迟调度工作结构, 和schedule_delayed_work相比增加了一个CPU参数, 其他都相同
/**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay
* @cpu: cpu to use
* @work: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue on the specified CPU.
*/
int schedule_delayed_work_on(int cpu,
struct work_struct *work, unsigned long delay)
{
return queue_delayed_work_on(cpu, keventd_wq, work, delay);
}
/**
* queue_delayed_work_on - queue work on specific CPU after delay
* @cpu: CPU number to execute work on
* @wq: workqueue to use
* @work: work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{
int ret = 0;
struct timer_list *timer = &work->timer;
if (!test_and_set_bit(0, &work->pending)) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
/* This stores wq for the moment, for the timer_fn */
work->wq_data = wq;
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
timer->function = delayed_work_timer_fn;
add_timer_on(timer, cpu);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);