回顾一下之前做的轻量级高并发网络服务器项目
项目部署在VMware的乌班图系统上,运行效果如下:
终端启动:
浏览器访问:
系统流程
主线程用epoll这种多路复用机制监听客户端连接,当有read或者write事件发出请求时,将请求加入到线程池,线程池中的一个线程争抢到锁,开始执行任务,任务包括解析请求与生成响应,包括对信号的序列化与反序列化,最终发布的静态资源为js页面,这里简单让ai写了一个电子木鱼的效果并进行一些润色,系统适配Linux。
线程池类
head-only threadpool class
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <pthread.h>
#include <cstdio>
#include <list>
#include <exception>
#include "locker.h"
using namespace std;
// 线程池类
template<typename T>
class threadpool
{
private:
// 线程数量
int m_thread_number;
// 线程池数组
pthread_t * m_threadpool;
// 请求队列中最多允许的等待处理请求数量
int m_maxrequestnum;
// 请求队列
list<T* > m_workqueue;
// 互斥锁
locker m_queuelocker;
// 信号量 用来判断是否有任务需要处理
sem m_queuestat;
// 是否结束线程
bool m_stop;
private:
static void* worker(void*);
void run();
public:
threadpool(int thread_number = 8, int max_request = 10000);
~threadpool();
// 添加任务
bool append(T* request);
};
/**
* @brief 构造线程池并创建工作线程
* @param thread_number 线程池中线程数量(默认 8)
* @param max_request 请求队列最大容量(默认 10000)
* @throws exception 构造参数不合法或线程创建/分离失败时抛出
*/
template <typename T>
threadpool<T> :: threadpool(int thread_number, int max_request):
m_thread_number(thread_number), m_maxrequestnum(max_request),
m_threadpool(NULL), m_stop(false){
if ((thread_number <= 0) || (max_request <= 0))
throw exception();
m_threadpool = new pthread_t[m_thread_number];
if (!m_threadpool) throw exception();
// 创建thread_num 个线程,并设置为detach
for (int i = 0; i < thread_number; i++){
printf("create %d thread\n", i);
// worker 必须为static, this 传参
if (pthread_create(m_threadpool + i, NULL, worker, this) != 0){
delete [] m_threadpool;
throw exception();
}
if (pthread_detach(m_threadpool[i])){
delete [] m_threadpool;
throw exception();
}
}
}
/**
* @brief 析构函数,清理线程数组并标记停止
* @note 这里没有等待线程结束(线程创建时为 detached),需要确保其他资源可安全释放
*/
template <typename T>
threadpool<T> :: ~threadpool(){
delete [] m_threadpool;
m_stop = true;
}
/**
* @brief 向线程池的任务队列添加一个任务指针
* @param request 待处理任务(T*)
* @return 添加成功返回 true;队列已满返回 false
*/
template <typename T>
bool threadpool<T> :: append(T* request){
m_queuelocker.lock();
// 超出最大数量范围
if (m_workqueue.size() > m_maxrequestnum){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
/**
* @brief 工作线程入口函数(static),用于 pthread_create 的回调
* @param arg 传入的 threadpool 对象指针
* @return 返回传入指针(或者 nullptr)
*/
template <typename T>
void * threadpool<T> :: worker(void *arg){
threadpool* pool = (threadpool *)arg;
pool->run();
return pool;
}
/**
* @brief 工作线程的实际循环体,等待信号量并从队列中取出任务执行
* @note 持续运行直到 m_stop 被设置为 true
*/
template <typename T>
void threadpool<T> :: run(){
while(!m_stop){
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty()){
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request) continue;
request->process();
}
}
#endif
线程同步封装类
threadsemaphore class
#ifndef LOCKER_H
#define LOCKER_H
#include <pthread.h>
#include <exception>
#include <semaphore.h>
using namespace std;
// 线程同步封装,方便使用
// 互斥锁类
class locker{
public:
locker(){
if (pthread_mutex_init(&m_mutex, NULL) != 0){
throw exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
//上锁
bool lock(){
return pthread_mutex_lock(&m_mutex) == 0;
}
//解锁
bool unlock(){
return pthread_mutex_unlock(&m_mutex) == 0;
}
//返回互斥量
pthread_mutex_t * get(){
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
// 条件变量类
class cond{
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0){
throw exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t * mutex){
return pthread_cond_wait(&m_cond, mutex) == 0;
}
bool timewait(pthread_mutex_t * mutex, struct timespec t){
return pthread_cond_timedwait(&m_cond, mutex, &t) == 0;
}
bool signal(pthread_mutex_t * mutex){
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast(){
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
pthread_cond_t m_cond;
};
// 信号量类
class sem{
public:
sem(){
if (sem_init(&m_sem, 0, 0) != 0)
throw exception();
}
sem(int num){
if (sem_init(&m_sem, 0, num) != 0)
throw exception();
}
~sem(){
sem_destroy(&m_sem);
}
// 等待信号量
bool wait(){
return sem_wait(&m_sem) == 0;
}
// 增加信号量
bool post(){
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
#endif
Http解析类
httpparse cpp
#include "http_conn.h"
// 模块职责与连接生命周期说明:
// - http_conn 对象代表一个客户端连接,包含读写缓冲、解析状态机、与文件映射状态。
// - 生命周期概述:init(sockfd) -> 主线程触发 EPOLLIN -> http_conn::read() 读取所有可读数据 -> 将对象交给线程池处理(process) ->
// 线程池线程中执行 process_read()(主状态机 + 从状态机解析请求),若得到完整请求则 process_write() 填充响应并设置 m_iv,
// 最终通过 modifyfd 将该 fd 的 epoll 事件改为 EPOLLOUT 触发主线程写出;写完后根据 Connection 决定是否 keep-alive 或关闭连接。
// - epoll 与 EPOLLONESHOT:每次将 fd 注册为 EPOLLONESHOT,处理线程在完成一次读取/写入后必须调用 modifyfd 恢复事件,
// 以保证不会有多个线程同时处理同一 fd(避免并发竞态)。
//
// 解析/响应关键点:
// - 主状态机:CHECK_STATE_REQUESTLINE -> CHECK_STATE_HEADER -> CHECK_STATE_CONTENT;parse_line() 用于按 CRLF 分割行。
// - 对静态文件请求:do_request 使用 stat 检查并用 mmap 映射文件到内存,response 通过 writev 发送头部与映射区两段内存。
// - 非阻塞读写:read() 循环 recv 直到返回 EAGAIN/EWOULDBLOCK;write() 使用 writev,在 EAGAIN 时修改为 EPOLLOUT 等待下一次可写。
int http_conn :: m_epollfd = -1; // 所有socket上事件注册到同一个epoll
int http_conn :: m_user_count = 0; //统计用户数量
// 定义HTTP响应的一些状态信息
const char* ok_200_title = "OK";
const char* error_400_title = "Bad Request";
const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char* error_403_title = "Forbidden";
const char* error_403_form = "You do not have permission to get file from this server.\n";
const char* error_404_title = "Not Found";
const char* error_404_form = "The requested file was not found on this server.\n";
const char* error_500_title = "Internal Error";
const char* error_500_form = "There was an unusual problem serving the requested file.\n";
const char* doc_root = "./resources";
/**
* @brief 将文件描述符设置为非阻塞模式
* @param fd 要设置的文件描述符
* @return 返回原有的文件状态标志(old_flag),调用者可用于恢复
*/
int setnonblocking(int fd){
int old_flag = fcntl(fd, F_GETFL);
int new_flag = old_flag | O_NONBLOCK;
fcntl(fd, F_SETFL, new_flag);
return old_flag;
}
/**
* @brief 将 fd 添加到 epoll 实例并设置触发模式与非阻塞
* @param epollfd epoll 实例的文件描述符
* @param fd 要添加的文件描述符
* @param one_shot 是否使用 EPOLLONESHOT(防止并发处理同一连接)
*/
void addfd(int epollfd, int fd, bool one_shot){
epoll_event event;
event.data.fd = fd;
// 此处修改触发模式
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; // ET
//event.events = EPOLLIN | EPOLLRDHUP; // LT
if (one_shot){
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
// 设置文件描述符非阻塞
setnonblocking(fd);
}
/**
* @brief 从 epoll 中删除 fd 并关闭该描述符
* @param epollfd epoll 实例的文件描述符
* @param fd 要删除并关闭的文件描述符
*/
void removefd(int epollfd, int fd){
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
/**
* @brief 修改 epoll 中 fd 的事件(并重置 EPOLLONESHOT)
* @param epollfd epoll 实例的文件描述符
* @param fd 目标文件描述符
* @param ev 要设置的事件掩码(如 EPOLLIN/EPOLLOUT)
*/
void modifyfd(int epollfd, int fd ,int ev){
epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLONESHOT | EPOLLRDHUP | EPOLLET ; // 同步修改触发模式
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
/**
* @brief 初始化一个新的客户端连接(绑定 socket 到 http_conn 对象)
* @param sockfd 客户端连接的 socket 文件描述符
* @param addr 客户端地址信息
* @note 会将 socket 添加到 epoll,设置为 EPOLLONESHOT,并更新连接计数
*/
void http_conn:: init(int sockfd, const sockaddr_in &addr){
m_sockfd = sockfd;
m_address = addr;
// 设置端口复用
int reuse = 1;
setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
// 添加到epoll事件组中
addfd(m_epollfd, m_sockfd, true);
m_user_count++;
init();
}
/**
* @brief 重置连接解析/读写状态,准备处理新请求
* @note 该函数不会关闭 socket,仅重置状态机与缓冲区
*/
void http_conn :: init(){
m_check_state = CHECK_STATE_REQUESTLINE; // 初始状态为检查请求行
m_linger = false; // 默认不保持链接 Connection : keep-alive保持连接
m_method = GET; // 默认请求方式为GET
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_index = 0;
m_read_idx = 0;
m_write_idx = 0;
bzero(m_read_buf, READ_BUFFER_SIZE);
bzero(m_write_buf, READ_BUFFER_SIZE);
bzero(m_real_file, FILENAME_LEN);
}
/**
* @brief 关闭当前连接并从 epoll 中移除
* @note 会将 m_sockfd 置 -1 并减少全局连接计数
*/
void http_conn :: closeconn(){
if (m_sockfd != -1){
removefd(m_epollfd, m_sockfd);
m_sockfd = -1;
m_user_count--; // 关闭一个连接,客户总数量减1
}
}
/**
* @brief 非阻塞读取数据,直到缓冲区满或没有更多数据
* @return 若成功读取(或暂时无更多数据)返回 true;发生错误或对端关闭返回 false
*/
bool http_conn ::read(){
if (m_read_idx >= READ_BUFFER_SIZE) return false;
// 读取到的字节
int bytes_read = 0;
while (true){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1){
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有数据
break;
}
return false;
}else if (bytes_read == 0) return false;
m_read_idx += bytes_read;
}
//printf("read: %s\n", m_read_buf);
return true;
}
/**
* @brief 向客户端写出准备好的 HTTP 响应,使用 writev 分散写
* @return 发送完成并保持连接返回 true;发送完成但需要关闭连接返回 false;出错返回 false
* @note 在 EAGAIN 情况下会将事件修改为 EPOLLOUT 并返回 true(等待下一次可写)
*/
bool http_conn::write()
{
int temp = 0;
int bytes_have_send = 0; // 已经发送的字节
int bytes_to_send = m_write_idx;// 将要发送的字节 (m_write_idx)写缓冲区中待发送的字节数
if ( bytes_to_send == 0 ) {
// 将要发送的字节为0,这一次响应结束。
modifyfd( m_epollfd, m_sockfd, EPOLLIN );
init();
return true;
}
while(1) {
// 分散写
temp = writev(m_sockfd, m_iv, m_iv_count);
if ( temp <= -1 ) {
// 如果TCP写缓冲没有空间,则等待下一轮EPOLLOUT事件,虽然在此期间,
// 服务器无法立即接收到同一客户的下一个请求,但可以保证连接的完整性。
if( errno == EAGAIN ) {
modifyfd( m_epollfd, m_sockfd, EPOLLOUT );
return true;
}
unmap();
return false;
}
bytes_to_send -= temp;
bytes_have_send += temp;
if ( bytes_to_send <= bytes_have_send ) {
// 发送HTTP响应成功,根据HTTP请求中的Connection字段决定是否立即关闭连接
unmap();
if(m_linger) {
init();
modifyfd( m_epollfd, m_sockfd, EPOLLIN );
return true;
} else {
modifyfd( m_epollfd, m_sockfd, EPOLLIN );
return false;
}
}
}
}
/**
* @brief 线程池中工作线程调用的入口,处理该连接的请求
* @note 包含解析请求、生成响应并修改 epoll 事件以触发写出
*/
void http_conn :: process(){
// 解析HTTP请求
HTTP_CODE read_ret = process_read();
if (read_ret == NO_REQUEST){
modifyfd(m_epollfd, m_sockfd, EPOLLIN);
return;
}
// 生成响应
bool write_ret = process_write( read_ret );
if ( !write_ret ) {
closeconn();
}
modifyfd( m_epollfd, m_sockfd, EPOLLOUT);
}
/**
* @brief 主状态机:解析 HTTP 请求(请求行、头部、内容)
* @return 解析结果枚举(NO_REQUEST, GET_REQUEST, BAD_REQUEST ...)
* @note 使用 parse_line 拆分行,循环直到无法继续解析
*/
http_conn :: HTTP_CODE http_conn :: process_read(){
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
while (((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK))
|| (line_status = parse_line()) == LINE_OK){
// 解析到了一行完整的数据 或 解析到请求体,也是完整的数据
// 获取一行数据
text = get_line();
m_start_line = m_checked_index;
printf("got 1 http line:%s\n", text);
switch (m_check_state)
{
case CHECK_STATE_REQUESTLINE:
{
ret = parse_request_line(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER:
/* code */
{
ret = parse_headers(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}else if (ret == GET_REQUEST){
return do_request(); // 解析具体信息
}
break;
}
case CHECK_STATE_CONTENT:
/* code */
{
ret = parse_content(text);
if (ret == GET_REQUEST){
return do_request();
}
line_status = LINE_OPEN;
break;
}
default:
{
return INTERNAL_ERROR;
}
}
}
return NO_REQUEST;
}
/**
* @brief 解析请求行,取得请求方法、URL、HTTP 版本
* @param text 请求行字符串(以 '\0' 结尾)
* @return 解析结果枚举(NO_REQUEST 表示继续解析,BAD_REQUEST 表示格式错误)
*/
http_conn :: HTTP_CODE http_conn :: parse_request_line(char *text){
// GET /index.html HTTP/1.1
m_url = strpbrk(text , " \t");
if (!m_url){
return BAD_REQUEST;
}
// GET\0/index.html HTTP/1.1
*m_url++ = '\0'; // 先用原值赋\0, 后++,
// GET\0 字符串结束符截断
char* method = text;
if (strcasecmp(method, "GET") == 0){
m_method = GET;
}else {
return BAD_REQUEST;
}
// /index.html HTTP/1.1
m_version = strpbrk(m_url, " \t");
if (!m_version){
return BAD_REQUEST;
}
// /index.html\0HTTP/1.1
*m_version++ = '\0';
if (strcasecmp(m_version, "HTTP/1.1") != 0){
return BAD_REQUEST;
}
// http://192.168.1.1:10000/index.html
if (strncasecmp(m_url, "http://", 7) == 0){
m_url += 7; // 192.168.1.1:10000/index.html
m_url = strchr(m_url, '/'); // /index.html
}
if (!m_url || m_url[0] != '/'){
return BAD_REQUEST;
}
m_check_state = CHECK_STATE_HEADER; // 改变主状态机状态
return NO_REQUEST;
}
/**
* @brief 解析请求头部字段(逐行调用)
* @param text 当前头部行(以 '\0' 结尾),空行表示头部结束
* @return GET_REQUEST 表示头部解析完成且无消息体;NO_REQUEST 表示继续解析;BAD_REQUEST 表示错误
*/
http_conn :: HTTP_CODE http_conn :: parse_headers(char *text){
// 遇到空行,表示头部字段解析完毕
if( text[0] == '\0' ) {
// 如果HTTP请求有消息体,则还需要读取m_content_length字节的消息体,
// 状态机转移到CHECK_STATE_CONTENT状态
if ( m_content_length != 0 ) {
m_check_state = CHECK_STATE_CONTENT;
return NO_REQUEST;
}
// 否则说明已经得到了一个完整的HTTP请求
return GET_REQUEST;
} else if ( strncasecmp( text, "Connection:", 11 ) == 0 ) {
// 处理Connection 头部字段 Connection: keep-alive
text += 11;
text += strspn( text, " \t" );
if ( strcasecmp( text, "keep-alive" ) == 0 ) {
m_linger = true;
}
} else if ( strncasecmp( text, "Content-Length:", 15 ) == 0 ) {
// 处理Content-Length头部字段
text += 15;
text += strspn( text, " \t" );
m_content_length = atol(text);
} else if ( strncasecmp( text, "Host:", 5 ) == 0 ) {
// 处理Host头部字段
text += 5;
text += strspn( text, " \t" );
m_host = text;
} else {
printf( "oops! unknow header %s\n", text );
}
return NO_REQUEST;
}
/**
* @brief 检查请求体是否已全部读入(不做具体解析)
* @param text 指向请求体(起始位置)
* @return 若读取完毕返回 GET_REQUEST,否则返回 NO_REQUEST
*/
http_conn::HTTP_CODE http_conn::parse_content( char* text ) {
if ( m_read_idx >= ( m_content_length + m_checked_index ) )
{
text[ m_content_length ] = '\0';
return GET_REQUEST;
}
return NO_REQUEST;
}
/**
* @brief 按行解析读缓冲区,判断是否遇到 CRLF 结束
* @return LINE_OK / LINE_BAD / LINE_OPEN
*/
http_conn :: LINE_STATUS http_conn :: parse_line(){
char temp;
for (; m_checked_index < m_read_idx; ++ m_checked_index){
temp = m_read_buf[m_checked_index];
if(temp == '\r'){
if((m_checked_index + 1) == m_read_idx) return LINE_OPEN;
else if (m_read_buf[m_checked_index + 1]== '\n') {
m_read_buf[m_checked_index++] = '\0';
m_read_buf[m_checked_index++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}else if (temp == '\n'){
if((m_checked_index > 1) && (m_read_buf[m_checked_index-1] == '\r')){
m_read_buf[m_checked_index-1] = '\0';
m_read_buf[m_checked_index++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;
}
/**
* @brief 根据解析后的 URL 在文件系统中查找目标文件并 mmap 映射
* @return FILE_REQUEST 表示文件请求成功,否则返回相应错误类型(NO_RESOURCE/FORBIDDEN_REQUEST 等)
*/
http_conn::HTTP_CODE http_conn::do_request()
{
// "/home/miao/TinyWebserver/resources"
strcpy( m_real_file, doc_root );
int len = strlen( doc_root );
strncpy( m_real_file + len, m_url, FILENAME_LEN - len - 1 );
// 获取m_real_file文件的相关的状态信息,-1失败,0成功
if ( stat( m_real_file, &m_file_stat ) < 0 ) {
return NO_RESOURCE;
}
// 判断访问权限
if ( ! ( m_file_stat.st_mode & S_IROTH ) ) {
return FORBIDDEN_REQUEST;
}
// 判断是否是目录
if ( S_ISDIR( m_file_stat.st_mode ) ) {
return BAD_REQUEST;
}
// 以只读方式打开文件
int fd = open( m_real_file, O_RDONLY );
// 创建内存映射
m_file_address = ( char* )mmap( 0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0 );
close( fd );
return FILE_REQUEST;
}
/**
* @brief 解除对文件映射的映射并清理 m_file_address
*/
void http_conn::unmap() {
if( m_file_address )
{
munmap( m_file_address, m_file_stat.st_size );
m_file_address = 0;
}
}
/**
* @brief 向写缓冲追加格式化响应文本(可变参数)
* @param format printf 风格的格式字符串
* @return 成功返回 true,写入超限返回 false
*/
bool http_conn::add_response( const char* format, ... ) {
if( m_write_idx >= WRITE_BUFFER_SIZE ) {
return false;
}
va_list arg_list;
va_start( arg_list, format );
int len = vsnprintf( m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list );
if( len >= ( WRITE_BUFFER_SIZE - 1 - m_write_idx ) ) {
return false;
}
m_write_idx += len;
va_end( arg_list );
return true;
}
/**
* @brief 添加状态行,比如 "HTTP/1.1 200 OK"
*/
bool http_conn::add_status_line( int status, const char* title ) {
return add_response( "%s %d %s\r\n", "HTTP/1.1", status, title );
}
/**
* @brief 添加通用响应头(Content-Length/Content-Type/Connection/空行)
* @param content_len 正文长度(字节)
* @return 无(内部通过 add_response 填充)
*/
bool http_conn::add_headers(int content_len) {
add_content_length(content_len);
add_content_type();
add_linger();
add_blank_line();
}
/**
* @brief 添加 Content-Length 头
* @param content_len 正文长度(字节)
*/
bool http_conn::add_content_length(int content_len) {
return add_response( "Content-Length: %d\r\n", content_len );
}
/**
* @brief 添加 Connection 头(keep-alive 或 close)
*/
bool http_conn::add_linger()
{
return add_response( "Connection: %s\r\n", ( m_linger == true ) ? "keep-alive" : "close" );
}
/**
* @brief 添加响应空行,结束 header 部分
*/
bool http_conn::add_blank_line()
{
return add_response( "%s", "\r\n" );
}
/**
* @brief 添加响应正文字符串到写缓冲
* @param content 要添加的正文字符串
*/
bool http_conn::add_content( const char* content )
{
return add_response( "%s", content );
}
/**
* @brief 添加 Content-Type 头(目前固定为 text/html)
*/
bool http_conn::add_content_type() {
return add_response("Content-Type:%s\r\n", "text/html");
}
/**
* @brief 根据解析结果构建要发送的响应内容到写缓冲 / iov
* @param ret HTTP_CODE 解析结果
* @return 成功返回 true,失败返回 false
*/
bool http_conn::process_write(HTTP_CODE ret) {
switch (ret)
{
case INTERNAL_ERROR:
add_status_line( 500, error_500_title );
add_headers( strlen( error_500_form ) );
if ( ! add_content( error_500_form ) ) {
return false;
}
break;
case BAD_REQUEST:
add_status_line( 400, error_400_title );
add_headers( strlen( error_400_form ) );
if ( ! add_content( error_400_form ) ) {
return false;
}
break;
case NO_RESOURCE:
add_status_line( 404, error_404_title );
add_headers( strlen( error_404_form ) );
if ( ! add_content( error_404_form ) ) {
return false;
}
break;
case FORBIDDEN_REQUEST:
add_status_line( 403, error_403_title );
add_headers(strlen( error_403_form));
if ( ! add_content( error_403_form ) ) {
return false;
}
break;
case FILE_REQUEST:
add_status_line(200, ok_200_title );
add_headers(m_file_stat.st_size);
m_iv[ 0 ].iov_base = m_write_buf;
m_iv[ 0 ].iov_len = m_write_idx;
m_iv[ 1 ].iov_base = m_file_address;
m_iv[ 1 ].iov_len = m_file_stat.st_size;
m_iv_count = 2;
return true;
default:
return false;
}
m_iv[ 0 ].iov_base = m_write_buf;
m_iv[ 0 ].iov_len = m_write_idx;
m_iv_count = 1;
return true;
}
headfile
#ifndef HTTP_CONNECTION_H
#define HTTP_CONNECTION_H
#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <sys/uio.h>
class http_conn
{
public:
static int m_epollfd; // 所有socket上事件注册到同一个epoll
static int m_user_count; //统计用户数量
static const int READ_BUFFER_SIZE = 2048; // 读缓冲区大小
static const int WRITE_BUFFER_SIZE = 1024; // 写缓冲区大小
static const int FILENAME_LEN = 200; // 文件名的最大长度
// HTTP请求方法,这里只支持GET
enum METHOD {GET = 0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT};
/*
解析客户端请求时,主状态机的状态
CHECK_STATE_REQUESTLINE:当前正在分析请求行
CHECK_STATE_HEADER:当前正在分析头部字段
CHECK_STATE_CONTENT:当前正在解析请求体
*/
enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER, CHECK_STATE_CONTENT };
/*
服务器处理HTTP请求的可能结果,报文解析的结果
NO_REQUEST : 请求不完整,需要继续读取客户数据
GET_REQUEST : 表示获得了一个完成的客户请求
BAD_REQUEST : 表示客户请求语法错误
NO_RESOURCE : 表示服务器没有资源
FORBIDDEN_REQUEST : 表示客户对资源没有足够的访问权限
FILE_REQUEST : 文件请求,获取文件成功
INTERNAL_ERROR : 表示服务器内部错误
CLOSED_CONNECTION : 表示客户端已经关闭连接了
*/
enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION };
// 从状态机的三种可能状态,即行的读取状态,分别表示
// 1.读取到一个完整的行 2.行出错 3.行数据尚且不完整
enum LINE_STATUS { LINE_OK = 0, LINE_BAD, LINE_OPEN };
public:
/// @brief 处理客户端请求
void process();
/// @brief 初始化新接受的连接
void init(int sockfd, const sockaddr_in &addr);
/// @brief 关闭连接
void closeconn();
/// @brief 非阻塞读
bool read();
/// @brief 非阻塞写
bool write();
public:
/// @brief 解析HTTP请求
HTTP_CODE process_read();
/// @brief 解析请求首行
HTTP_CODE parse_request_line(char *text);
/// @brief 解析请求头
HTTP_CODE parse_headers(char *text);
/// @brief 解析请求体
HTTP_CODE parse_content(char *text);
/// @brief 解析请求一行
LINE_STATUS parse_line();
// 填充HTTP应答
bool process_write( HTTP_CODE ret );
public:
// 这一组函数被process_write调用以填充HTTP应答
bool add_response( const char* format, ... );
bool add_content( const char* content );
bool add_content_type();
bool add_status_line( int status, const char* title );
bool add_headers( int content_length );
bool add_content_length( int content_length );
bool add_linger();
bool add_blank_line();
private:
/* data */
int m_sockfd; // 该用户端连接的socket
sockaddr_in m_address; // 通信的socket地址
char m_read_buf[READ_BUFFER_SIZE];// 读缓冲区
int m_read_idx; // 标识读缓冲区中已经读入的客户端数据的最后一个字节的下一个位置
int m_checked_index; // 当前正在分析的字符在缓冲区的位置
int m_start_line; // 当前正在解析的行的起始位置
CHECK_STATE m_check_state; // 主状态机当前所处的状态
char* m_url; // 请求地址
char* m_version; // 请求协议
METHOD m_method; //请求方法
char* m_host; // 主机名
bool m_linger; //判断HTTP请求是否保持连接
int m_content_length; // HTTP请求的消息总长度
char m_real_file[ FILENAME_LEN ]; // 客户请求的目标文件的完整路径,其内容等于 doc_root + m_url, doc_root是网站根目录
char m_write_buf[ WRITE_BUFFER_SIZE ]; // 写缓冲区
int m_write_idx; // 写缓冲区中待发送的字节数
char* m_file_address; // 客户请求的目标文件被mmap到内存中的起始位置
struct stat m_file_stat; // 目标文件的状态。通过它我们可以判断文件是否存在、是否为目录、是否可读,并获取文件大小等信息
struct iovec m_iv[2]; // 采用writev来执行写操作,所以定义下面两个成员,其中m_iv_count表示被写内存块的数量。
int m_iv_count;
void init(); // 连接解析状态初始化
void unmap(); // 对内存映射区执行munmap操作
char* get_line(){ return m_read_buf + m_start_line;}
HTTP_CODE do_request();
};
#endif
启动
main.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <signal.h>
#include "threadpool.h"
#include "locker.h"
#include "http_conn.h"
// 服务器总体工作逻辑说明:
// 1. 主线程完成初始化:忽略 SIGPIPE、创建线程池、分配 clients 数组、创建监听 socket 并加入 epoll。
// 2. 使用 epoll_wait 循环等待事件:
// - 如果发生监听 socket 的可读事件,accept 新连接,设置 socket 非阻塞,注册到 epoll(使用 EPOLLONESHOT + ET)并把连接对象初始化到 users[]。
// - 对客户端 fd 的 EPOLLIN 事件:主线程调用 http_conn::read() 非阻塞读入全部数据,若读完则将该 http_conn 对象放入线程池队列处理(pool->append)。
// - 线程池中工作线程取出任务后调用 http_conn::process():解析请求(状态机 parse),生成响应(process_write),并通过修改 epoll 事件触发写事件。
// - 对客户端 fd 的 EPOLLOUT 事件:主线程调用 http_conn::write(),使用 writev 发送响应(包含头部缓冲区和 mmap 的文件),完成后根据 Connection 决定是否继续保持连接或关闭。
// 3. 关键点说明:
// - 使用 EPOLLONESHOT 保证同一连接不会被多个线程并发处理,处理完成后需手动通过 modifyfd 恢复 EPOLLONESHOT 以再次接收事件。
// - 使用非阻塞 I/O + 边沿触发(EPOLLET),read() 需要循环读取直到 EAGAIN/EWOULDBLOCK。
// - 使用 mmap 将文件映射到内存并通过 writev 发送,发送完成需 munmap。
// - 线程池负责耗时的请求解析与响应准备,主线程负责 I/O 事件分发与短小的非阻塞读写触发调度。
#define MAX_FD 65535 //最大文件描述符个数
#define MAX_EVENT_NUM 10000 //一次最大监听事件个数
// 添加信号捕捉
void addsig(int sig, void(handler)(int)){
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
sigfillset(&sa.sa_mask);
sigaction(sig, &sa, NULL);
}
// 添加文件描述符到epoll中
extern void addfd(int epollfd, int fd, bool one_shot);
// 从epoll中删除文件描述符
extern void removefd(int epollfd, int fd);
// 修改文件描述符
extern void modifyfd(int epollfd, int fd ,int ev);
int main(int argc, char* argv[]){
if (argc <= 1){
printf("按照如下格式运行: %s port num needed\n", basename(argv[0]));
exit(-1);
}
int port = atoi(argv[1]);
// 对SIGPIE处理
addsig(SIGPIPE, SIG_IGN);
// 初始化线程池
threadpool<http_conn>* pool = NULL;
try
{
pool = new threadpool<http_conn>;
}
catch(const std::exception & e)
{
std::cerr << e.what() << std::endl;
exit(-1);
}
// 创建数组用于保存所有客户端信息
http_conn *users = new http_conn[MAX_FD];
// 创建套接字
// IPv4 TCP
int lfd = socket(PF_INET, SOCK_STREAM, 0);
if (lfd == -1){
perror("socket create");
exit(-1);
}
// 设置端口复用
int reuse = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
struct sockaddr_in address;
address.sin_port = htons(port);
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
// bind
int ret = bind(lfd, (struct sockaddr*) &address, sizeof(address));
if (ret == -1){
perror("bind");
exit(-1);
}
// 监听
ret = listen(lfd, 5);
if (ret == -1){
perror("listen");
exit(-1);
}
// 创建epoll对象,事件数组,添加监听事件
epoll_event events[MAX_EVENT_NUM];
int epollfd = epoll_create(5);
addfd(epollfd, lfd, false);
http_conn::m_epollfd = epollfd;
// 主线程检测
while(true){
int num = epoll_wait(epollfd, events, MAX_EVENT_NUM, -1);
if( (num < 0) && (errno != EINTR)){
printf("epoll failed\n");
break;
}
// 循环遍历事件数组
for (int i = 0; i < num; i++){
int sockfd = events[i].data.fd;
// 有客户端连接情况
if (sockfd == lfd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int connfd = accept(lfd, (struct sockaddr*)&clientaddr, &len);
if (http_conn::m_user_count >= MAX_FD){
// 目前连接数已满
// 给客户端写一个信息:服务器正忙
close(connfd);
continue;
}
// 将connfd客户数据初始化后放入用户信息数组
users[connfd].init(connfd, clientaddr);
}
// 有客户端错误事件情况
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP| EPOLLERR))
{
users[sockfd].closeconn();
}
// 有客户端读事件情况
else if (events[i].events & EPOLLIN){
if (users[sockfd].read()){
// 一次性把所有数据读完
pool->append(users + sockfd);
}else {
users[sockfd].closeconn();
}
}
else if (events[i].events & EPOLLOUT){
if (users[sockfd].write()){
// 一次性把所有数据写完
}else {
users[sockfd].closeconn();
}
}
}
}
close(epollfd);
close(lfd);
delete[] users;
delete pool;
return 0;
}
压测
压测工具:webbench 1.5
指令:./webbench -c 5000 -t 10 http://localh
ost:8080/index.html
结果:
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Benchmarking: GET http://localhost:8080/index.html
5000 clients, running 10 sec.
Speed=2732598 pages/min, 7240987 bytes/sec.
Requests: 455431 susceed, 2 failed.
QPS 45,543,竟然还不错呢
不过并发数量再高点就不行了
经典永流传