首页 服务器系统 Linux

Linux线程池

什么是池?

池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化

开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取

服务器处理完一个客户连接后,可以把相关的资源放回池中


为什么要创建线程池?

当有客户端连接时,创建耗时为:线程时间+处理时间+销毁线程时间

(耗时T1+T2+T3)

若需要大量的线程来完成任务,且完成任务的时间比较短,这样花费在T1和T3上面的时间比较多,效率低。

因此,提前创建好一定数量的线程,可以避免花费大量时间在T1和T3上,提高效率

当提前创建的线程不够用怎么办?

创建线程池之前,可以先设置线程池的属性

min_thr_num//最小线程数

max_thr_num//最大线程数

default_thr_num//添加或删除线程的步长

live_thr_num;//当前线程数

busy_thr_num//忙碌线程数

可以设置线程池的最大线程数和最小线程数

设置步长的意思是,当默认创建的线程数量不够时,一次性拓展一定数目线程(步长),当达到最大线程数目的时候就无法拓展。或者当空闲线程数量过多时,一次性销毁一定数量线程,当到最小线程数时,无法删除

工作的机制

首先服务器维护两个条件变量(图中没有画出互斥锁),其中一个维护线程池和服务器任务队列(队列不为空),另外一个维护服务器任务队列和客户端(队列不为满)

先看线程池这一块

当任务队列当中没有任务时,线程池堵塞在条件变量上,等待任务

当有任务进来时,条件变量发信号或者广播,唤醒线程,此时对任务队列而言属于共享资源,需要使用互斥量,避免资源冲突

从生产者消费模型上看,服务器为生产者,线程池为消费者

服务器和客户端

同理当任务队列满的时候,客户端阻塞等待服务器连接

当队列不为满时,条件变量发信号或者广播,通知客户端进行链接,也需要使用互斥量操作任务队列

从生产者消费模型上看,服务器为消费者,客户端为生产


代码实现如下

代码中摘除了网络基础模块,自定义模拟客户端连接

main.c

#include "threadpool.h"

#include <unistd.h>

#include <stdio.h>

#include <stdlib.h>

void* mytask(void *arg)

{

printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);

sleep(1);

free(arg);

return NULL;

}

int main(void)

{

threadpool_t *pool;

threadpool_init(&pool, 3);

int i;

for (i=0; i<10; i++)

{

int *arg = (int *)malloc(sizeof(int));

*arg = i;

threadpool_add_task(pool, mytask, arg);

}

//sleep(15);

threadpool_destroy(pool);

return 0;

}

threadpool.h

#ifndef _THREAD_POOL_H_

#define _THREAD_POOL_H_

#include "condition.h"

// 任务结构体,将任务放入队列由线程池中的线程来执行

typedef struct task

{

void *(*run)(void *arg);// 任务回调函数

void *arg;// 回调函数参数

struct task *next;// 链表队列

} task_t;

// 线程池结构体

typedef struct threadpool

{

condition_t ready;//任务准备就绪或者线程池销毁通知

task_t *first;//任务队列头指针

task_t *last;//任务队列为指针

int counter;//线程池中当前线程数

int idle;//线程池中当前正在等待任务的线程数

int max_threads;//线程池中最大允许的线程数

int quit;//销毁线程池的时候置1

} threadpool_t;

// 初始化线程池

void threadpool_init(threadpool_t **pool, int threads);

// 往线程池中添加任务

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

// 销毁线程池

void threadpool_destroy(threadpool_t *pool);

#endif /* _THREAD_POOL_H_ */

threadpool.c

#include "threadpool.h"

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <time.h>

#include <unistd.h>

void *thread_routine(void *arg)

{

struct timespec abstime;

threadpool_t *pool = (threadpool_t *)arg;

printf("thread 0x%x is starting\n", (int)pthread_self());

//1. 设置为自分离线程

pthread_detach(pthread_self());

//3. 进入轮训工作模式,如果不退出且队列不为空则一直工作

while(1)

{

//1. 先去看下有没有任务,有任务则处理任务,没有再wait

condition_lock(&pool->ready);

printf("thread 0x%x is working\n", (int)pthread_self());

if(pool->first != NULL)

{

task_t *t = pool->first; //取出第一个任务

pool->first = t->next; //修改队列头

condition_unlock(&pool->ready); //先解锁,提高效率

//处理任务

t->run(t->arg);

free(t);

continue; //既然本次有任务,可能下次还有任务,则继续查看是否有任务

}

else

{

condition_unlock(&pool->ready);

}

if(pool->quit)

{

break;

}

//2. 如果没有任务,则等待

printf("thread 0x%x is waiting\n", (int)pthread_self());

while(1)

{

clock_gettime(CLOCK_REALTIME, &abstime);

abstime.tv_sec += 2; //延时2s

//condition_wait(&pool->ready);

condition_lock(&pool->ready);

pool->idle++;

int status = condition_timedwait(&pool->ready, &abstime);

condition_unlock(&pool->ready);

if (status != ETIMEDOUT || pool->quit)

{

printf("thread 0x%x get signal\n", (int)pthread_self());

break;

}

else

{

printf("thread 0x%x is wait timed out\n", (int)pthread_self());

if(pool->counter >= 3)

{

goto THREAD_EXIT;

}

}

}

}

THREAD_EXIT:

printf("thread 0x%x is exit\n", (int)pthread_self());

condition_lock(&pool->ready);

pool->counter--;

condition_unlock(&pool->ready);

pthread_exit(NULL); //退出线程

}

void threadpool_init(threadpool_t **pool, int threads)

{

//1. 初始化基本的线程池参数

int i;

threadpool_t *newpool = malloc(sizeof(threadpool_t));

*pool = newpool;

newpool->max_threads = threads;

newpool->quit = 0;

newpool->idle = 0; //??

newpool->first = NULL;

newpool->last = NULL;

newpool->counter = 0;

condition_init(&newpool->ready);

//2. 默认有线程数,则在初始化的时候同时初始化N个线程

#if 1

for(i= 0; i < threads; i++)

{

pthread_t tid;

if(pthread_create(&tid, NULL, thread_routine, newpool) == 0)//where is task?

{

condition_lock(&newpool->ready);

newpool->counter++;

condition_unlock(&newpool->ready);

}

}

#endif

}

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)

{

if(pool->quit)

return;

//1. 生成任务包

task_t *task = malloc(sizeof(task_t));

task->run = run;

task->arg = arg;

//2. 加入task队列, 先上锁,再添加,再解锁

printf("Add new task %p ! \n", task);

condition_lock(&pool->ready);

if(pool->last == NULL)

{

pool->last = task; //队列头

pool->first = pool->last; //初始化头

}

else

{

pool->last->next = task; // add

pool->last = task;

}

//3. 计算一下线程数是否满足任务处理速度,不满足则创建一批

if(pool->counter < pool->max_threads && pool->idle <= 0)

{

//??线程创建策略,根据实际环境选择

//策略1: 固定增长,每次增长??

//策略2: 指数增长,每次翻倍?? 也就是创建 pool->counter

//策略3: 线下增长,每次+1

// 策略4: 根据任务数量增长

pthread_t tid;

if(pthread_create(&tid, NULL, thread_routine, pool) == 0) //where is task?

{

pool->counter++;

}

}

//4. 通知线程去取任务处理

if(pool->idle > 0)

{

condition_signal(&pool->ready); //唤醒一个线程去处理任务

}

//5. 解锁

condition_unlock(&pool->ready);

}

void threadpool_destroy(threadpool_t *pool)

{

//1. 设置退出条件

pool->quit = 1;

//2. 等待所有线程退出

while(pool->counter > 0)

{

//3. 广播,通知所有线程退出

condition_lock(&pool->ready);

condition_broadcast(&pool->ready); //唤醒所有线程退出

condition_unlock(&pool->ready);

sleep(1);

}

//4. 销毁线程池对象

free(pool);

}

condition.h

#ifndef _CONDITION_H_

#define _CONDITION_H_

#include <pthread.h>

typedef struct condition

{

pthread_mutex_t pmutex;

pthread_cond_t pcond;

} condition_t;

int condition_init(condition_t *cond);

int condition_lock(condition_t *cond);

int condition_unlock(condition_t *cond);

int condition_wait(condition_t *cond);

int condition_timedwait(condition_t *cond, const struct timespec *abstime);

int condition_signal(condition_t *cond);

int condition_broadcast(condition_t *cond);

int condition_destroy(condition_t *cond);

#endif /* _CONDITION_H_ */

condition.c

#include "condition.h"

int condition_init(condition_t *cond)

{

int status;

if ((status = pthread_mutex_init(&cond->pmutex, NULL)))

return status;

if ((status = pthread_cond_init(&cond->pcond, NULL)))

return status;

return 0;

}

int condition_lock(condition_t *cond)

{

return pthread_mutex_lock(&cond->pmutex);

}

int condition_unlock(condition_t *cond)

{

return pthread_mutex_unlock(&cond->pmutex);

}

int condition_wait(condition_t *cond)

{

return pthread_cond_wait(&cond->pcond, &cond->pmutex);

}

int condition_timedwait(condition_t *cond, const struct timespec *abstime)

{

return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);

}

int condition_signal(condition_t *cond)

{

return pthread_cond_signal(&cond->pcond);

}

int condition_broadcast(condition_t* cond)

{

return pthread_cond_broadcast(&cond->pcond);

}

int condition_destroy(condition_t* cond)

{

int status;

if ((status = pthread_mutex_destroy(&cond->pmutex)))

return status;

if ((status = pthread_cond_destroy(&cond->pcond)))

return status;

return 0;

}

Makefile

.PHONY:clean

CC=gcc

CFLAGS=-Wall -g

ALL=main

all:$(ALL)

OBJS=threadpool.o main.o condition.o

.c.o:

$(CC) $(CFLAGS) -c $<

main:$(OBJS)

$(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt

clean:

rm -f $(ALL) *.o

相关推荐