前言
在网络编程领域,udp(user datagram protocol,用户数据报协议)作为一种无连接的传输层协议,以其高效、低延迟的特性在实时性要求高的应用场景中占据重要地位。与tcp协议相比,udp不需要建立连接,不保证数据包的顺序和可靠性,但正是这种"轻量级"特性使其在视频流、在线游戏、dns查询等领域得到广泛应用。
本文将深入探讨如何从零开始构建一个完整的udp通信系统,涵盖服务器端、客户端的设计与实现,包括套接字编程的核心概念、关键系统调用、错误处理机制以及实际应用中的注意事项。通过本文的学习,读者不仅能够掌握udp网络编程的基本技能,还能深入理解网络通信的底层原理。
本文实现的udp通信系统具有以下特点:
- 完整的服务器/客户端架构
- 详细的错误处理和日志记录
- 可配置的服务器参数
- 跨平台兼容性考虑
- 丰富的代码示例和详细注释
一、udp服务器udpserver.hpp
1.1 基本框架设计
udp服务器的设计需要遵循模块化、可扩展的原则。我们将服务器封装为一个类,包含初始化、运行和清理等基本功能。
#ifndef udpserver_hpp
#define udpserver_hpp
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <memory>
#include <atomic>
#include <functional>
#include "log.hpp"
class udpserver {
private:
int port_; // 服务器端口
int sockfd_; // 套接字描述符
std::atomic<bool> is_running_; // 服务器运行状态
struct sockaddr_in server_addr_; // 服务器地址结构
struct sockaddr_in client_addr_; // 客户端地址结构
socklen_t client_addr_len_; // 客户端地址长度
// 服务器配置参数
size_t buffer_size_; // 缓冲区大小
int timeout_sec_; // 接收超时时间(秒)
int timeout_usec_; // 接收超时时间(微秒)
bool reuse_addr_; // 是否重用地址
public:
// 构造函数
explicit udpserver(int port = 8080);
// 析构函数
~udpserver();
// 禁止拷贝构造和赋值
udpserver(const udpserver&) = delete;
udpserver& operator=(const udpserver&) = delete;
// 初始化服务器
bool init();
// 运行服务器
void run();
// 停止服务器
void stop();
// 设置配置参数
void setbuffersize(size_t size) { buffer_size_ = size; }
void settimeout(int sec, int usec = 0) {
timeout_sec_ = sec;
timeout_usec_ = usec;
}
void setreuseaddr(bool reuse) { reuse_addr_ = reuse; }
private:
// 创建套接字
bool createsocket();
// 绑定地址
bool bindaddress();
// 设置套接字选项
bool setsocketoptions();
// 处理接收到的数据
virtual void processdata(const char* data, ssize_t len,
const struct sockaddr_in& client_addr);
// 发送响应
bool sendresponse(const char* data, ssize_t len,
const struct sockaddr_in& client_addr);
// 清理资源
void cleanup();
};
#endif // udpserver_hpp
1.2 初始化函数init详解
初始化函数是服务器启动的第一步,它负责套接字创建、地址绑定和选项设置等关键操作。
bool udpserver::init() {
// 1. 创建日志实例
logger::instance().init("udp_server.log", loglevel::info);
log_info("starting udp server initialization...");
// 2. 创建套接字
if (!createsocket()) {
log_error("failed to create socket");
return false;
}
// 3. 设置套接字选项
if (!setsocketoptions()) {
log_error("failed to set socket options");
close(sockfd_);
return false;
}
// 4. 绑定地址
if (!bindaddress()) {
log_error("failed to bind address");
close(sockfd_);
return false;
}
// 5. 初始化客户端地址结构
memset(&client_addr_, 0, sizeof(client_addr_));
client_addr_len_ = sizeof(client_addr_);
log_info("udp server initialized successfully on port %d", port_);
log_info("buffer size: %zu bytes", buffer_size_);
log_info("timeout: %d seconds %d microseconds", timeout_sec_, timeout_usec_);
return true;
}
bool udpserver::createsocket() {
// 使用af_inet表示ipv4,sock_dgram表示udp协议
sockfd_ = socket(af_inet, sock_dgram, 0);
if (sockfd_ < 0) {
log_error("socket creation failed: %s", strerror(errno));
return false;
}
log_debug("socket created successfully, fd: %d", sockfd_);
return true;
}
bool udpserver::setsocketoptions() {
int optval = 1;
// 设置地址重用选项,避免"address already in use"错误
if (reuse_addr_) {
if (setsockopt(sockfd_, sol_socket, so_reuseaddr,
&optval, sizeof(optval)) < 0) {
log_warn("failed to set so_reuseaddr: %s", strerror(errno));
// 注意:这不是致命错误,可以继续运行
} else {
log_debug("so_reuseaddr set successfully");
}
}
// 设置接收超时
if (timeout_sec_ > 0 || timeout_usec_ > 0) {
struct timeval tv;
tv.tv_sec = timeout_sec_;
tv.tv_usec = timeout_usec_;
if (setsockopt(sockfd_, sol_socket, so_rcvtimeo,
&tv, sizeof(tv)) < 0) {
log_warn("failed to set receive timeout: %s", strerror(errno));
} else {
log_debug("receive timeout set to %ld.%06ld seconds",
tv.tv_sec, tv.tv_usec);
}
}
// 设置发送缓冲区大小
int send_buf_size = 1024 * 1024; // 1mb
if (setsockopt(sockfd_, sol_socket, so_sndbuf,
&send_buf_size, sizeof(send_buf_size)) < 0) {
log_warn("failed to set send buffer size: %s", strerror(errno));
}
// 设置接收缓冲区大小
int recv_buf_size = 1024 * 1024; // 1mb
if (setsockopt(sockfd_, sol_socket, so_rcvbuf,
&recv_buf_size, sizeof(recv_buf_size)) < 0) {
log_warn("failed to set receive buffer size: %s", strerror(errno));
}
return true;
}
bool udpserver::bindaddress() {
// 初始化服务器地址结构
memset(&server_addr_, 0, sizeof(server_addr_));
// 设置地址族为ipv4
server_addr_.sin_family = af_inet;
// 设置端口,使用htons进行字节序转换
server_addr_.sin_port = htons(port_);
// 设置ip地址为inaddr_any,表示监听所有网络接口
server_addr_.sin_addr.s_addr = htonl(inaddr_any);
// 绑定套接字到指定地址和端口
if (bind(sockfd_, (struct sockaddr*)&server_addr_,
sizeof(server_addr_)) < 0) {
log_error("bind failed on port %d: %s", port_, strerror(errno));
return false;
}
// 获取实际绑定的地址信息
struct sockaddr_in actual_addr;
socklen_t actual_len = sizeof(actual_addr);
if (getsockname(sockfd_, (struct sockaddr*)&actual_addr, &actual_len) == 0) {
char ip_str[inet_addrstrlen];
inet_ntop(af_inet, &actual_addr.sin_addr, ip_str, sizeof(ip_str));
log_info("server bound to %s:%d", ip_str, ntohs(actual_addr.sin_port));
}
return true;
}
1.3 关键系统调用详解
1.inet_addr函数
inet_addr函数用于将点分十进制表示的ipv4地址转换为网络字节序的32位整数。虽然本文代码中使用的是inet_pton(更安全的版本),但理解inet_addr仍然很重要。
// inet_addr的使用示例
const char* ip_str = "192.168.1.100";
in_addr_t addr = inet_addr(ip_str);
if (addr == inaddr_none) {
log_error("invalid ip address: %s", ip_str);
} else {
log_debug("ip %s converted to network byte order: 0x%08x",
ip_str, addr);
// 转换回点分十进制格式
struct in_addr addr_struct;
addr_struct.s_addr = addr;
char* ip_str_back = inet_ntoa(addr_struct);
log_debug("converted back to string: %s", ip_str_back);
}
// 现代推荐使用inet_pton(更安全,支持ipv6)
struct sockaddr_in addr;
if (inet_pton(af_inet, ip_str, &addr.sin_addr) <= 0) {
log_error("invalid ip address format: %s", ip_str);
}
2.bzero和memset函数
bzero是bsd系统中用于将内存区域清零的函数,而memset是标准c库函数,功能更通用。
// bzero的使用(传统方式) struct sockaddr_in addr; bzero(&addr, sizeof(addr)); // 将整个结构体清零 // memset的等效用法 memset(&addr, 0, sizeof(addr)); // 更标准的做法 // memset的更多用途 char buffer[1024]; // 全部设置为0 memset(buffer, 0, sizeof(buffer)); // 全部设置为特定值 memset(buffer, 'a', sizeof(buffer)); // 部分设置 memset(buffer, 0, 100); // 只清空前100字节 // 性能比较:对于大内存块,memset通常经过优化,性能更好
1.4 服务器运行函数run
run函数是服务器的核心,负责循环接收客户端请求并处理。
void udpserver::run() {
if (sockfd_ < 0) {
log_error("cannot run server: socket not initialized");
return;
}
is_running_ = true;
log_info("udp server started, waiting for connections...");
// 分配接收缓冲区
std::vector<char> buffer(buffer_size_);
// 主循环
while (is_running_) {
// 重置客户端地址信息
memset(&client_addr_, 0, sizeof(client_addr_));
client_addr_len_ = sizeof(client_addr_);
// 接收数据
ssize_t recv_len = recvfrom(sockfd_, buffer.data(), buffer.size() - 1,
0, (struct sockaddr*)&client_addr_,
&client_addr_len_);
if (recv_len < 0) {
// 处理接收错误
if (errno == eagain || errno == ewouldblock) {
// 超时,继续循环
continue;
} else if (errno == eintr) {
// 被信号中断
log_debug("recvfrom interrupted by signal");
continue;
} else {
log_error("recvfrom failed: %s", strerror(errno));
break;
}
} else if (recv_len == 0) {
// udp中recvfrom返回0表示收到了0字节的数据包
log_debug("received empty datagram");
continue;
}
// 确保字符串以null结尾
buffer[recv_len] = '\0';
// 获取客户端信息
char client_ip[inet_addrstrlen];
inet_ntop(af_inet, &client_addr_.sin_addr,
client_ip, sizeof(client_ip));
uint16_t client_port = ntohs(client_addr_.sin_port);
log_debug("received %zd bytes from %s:%d",
recv_len, client_ip, client_port);
log_debug("data: %s", buffer.data());
// 处理数据
processdata(buffer.data(), recv_len, client_addr_);
}
log_info("udp server stopped");
cleanup();
}
void udpserver::processdata(const char* data, ssize_t len,
const struct sockaddr_in& client_addr) {
// 默认实现:原样返回数据(echo服务器)
log_debug("processing %zd bytes of data", len);
// 构造响应
std::string response = "server received: ";
response.append(data, len);
// 发送响应
if (!sendresponse(response.c_str(), response.length(), client_addr)) {
log_error("failed to send response to client");
}
}
bool udpserver::sendresponse(const char* data, ssize_t len,
const struct sockaddr_in& client_addr) {
if (len <= 0) {
log_warn("attempting to send empty data");
return true; // 空数据发送"成功"
}
// 发送数据
ssize_t sent_len = sendto(sockfd_, data, len, 0,
(const struct sockaddr*)&client_addr,
sizeof(client_addr));
if (sent_len < 0) {
log_error("sendto failed: %s", strerror(errno));
return false;
}
if (sent_len != len) {
log_warn("partial send: %zd of %zd bytes sent", sent_len, len);
}
// 获取客户端信息用于日志
char client_ip[inet_addrstrlen];
inet_ntop(af_inet, &client_addr.sin_addr, client_ip, sizeof(client_ip));
uint16_t client_port = ntohs(client_addr.sin_port);
log_debug("sent %zd bytes to %s:%d", sent_len, client_ip, client_port);
return true;
}
1.5 recvfrom和sendto函数深度解析
1.recvfrom函数
recvfrom是udp接收数据的核心函数,它不仅可以接收数据,还能获取发送者的地址信息。
/**
* recvfrom函数原型:
* ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
* struct sockaddr *src_addr, socklen_t *addrlen);
*
* 参数说明:
* - sockfd: 套接字描述符
* - buf: 接收缓冲区
* - len: 缓冲区大小
* - flags: 标志位,常用值:
* * 0: 默认行为
* * msg_waitall: 等待所有数据(对udp通常无效)
* * msg_dontwait: 非阻塞模式
* * msg_peek: 查看数据但不从缓冲区移除
* - src_addr: 发送方地址(输出参数)
* - addrlen: 地址长度(输入输出参数)
*
* 返回值:
* - 成功:接收到的字节数
* - 失败:-1,设置errno
* - 连接关闭(tcp)或空数据包(udp):0
*/
// recvfrom的完整示例
void receiveexample(int sockfd) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
char buffer[4096];
// 设置接收超时
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sockfd, sol_socket, so_rcvtimeo, &tv, sizeof(tv));
// 接收数据
ssize_t recv_len = recvfrom(sockfd, buffer, sizeof(buffer) - 1,
msg_dontwait, // 非阻塞模式
(struct sockaddr*)&client_addr, &addr_len);
if (recv_len > 0) {
buffer[recv_len] = '\0';
// 获取客户端信息
char ip_str[inet_addrstrlen];
inet_ntop(af_inet, &client_addr.sin_addr, ip_str, sizeof(ip_str));
uint16_t port = ntohs(client_addr.sin_port);
log_info("received from %s:%d: %s", ip_str, port, buffer);
// 处理不同的消息类型
processmessage(buffer, recv_len, client_addr);
} else if (recv_len == 0) {
log_debug("received empty datagram");
} else {
// 错误处理
if (errno == eagain || errno == ewouldblock) {
log_debug("no data available (non-blocking)");
} else if (errno == eintr) {
log_debug("interrupted by signal");
} else {
log_error("receive error: %s", strerror(errno));
}
}
}
// 处理不同类型的消息
void processmessage(const char* data, ssize_t len,
const struct sockaddr_in& client_addr) {
// 简单的协议处理示例
if (len >= 4 && strncmp(data, "ping", 4) == 0) {
log_debug("received ping request");
sendresponse("pong", 4, client_addr);
} else if (len >= 4 && strncmp(data, "time", 4) == 0) {
time_t now = time(nullptr);
std::string time_str = ctime(&now);
sendresponse(time_str.c_str(), time_str.length(), client_addr);
} else if (len >= 7 && strncmp(data, "echo ", 5) == 0) {
// 回显消息内容
sendresponse(data + 5, len - 5, client_addr);
} else {
std::string response = "unknown command: ";
response.append(data, len);
sendresponse(response.c_str(), response.length(), client_addr);
}
}
2.sendto函数
sendto是udp发送数据的核心函数,用于向指定地址发送数据报。
/**
* sendto函数原型:
* ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
* const struct sockaddr *dest_addr, socklen_t addrlen);
*
* 参数说明:
* - sockfd: 套接字描述符
* - buf: 发送缓冲区
* - len: 要发送的数据长度
* - flags: 标志位,常用值:
* * 0: 默认行为
* * msg_dontwait: 非阻塞模式
* * msg_confirm: 确认路由有效(linux特有)
* * msg_more: 还有更多数据要发送
* - dest_addr: 目标地址
* - addrlen: 地址长度
*
* 返回值:
* - 成功:发送的字节数(可能小于len)
* - 失败:-1,设置errno
*/
// sendto的完整示例
bool senddata(int sockfd, const void* data, size_t len,
const struct sockaddr_in& dest_addr) {
if (len == 0) {
log_warn("attempting to send zero-length data");
return true;
}
// 检查数据包大小(udp最大约64kb,实际建议小于1500字节避免分片)
if (len > 65507) { // 65535 - 20(ip头) - 8(udp头)
log_error("datagram too large: %zu bytes (max: 65507)", len);
return false;
}
if (len > 1400) {
log_warn("large datagram: %zu bytes (may be fragmented)", len);
}
// 分块发送大数据(如果需要)
const size_t max_chunk = 1400; // 避免ip分片的推荐大小
size_t total_sent = 0;
while (total_sent < len) {
size_t chunk_size = std::min(max_chunk, len - total_sent);
const char* chunk_start = static_cast<const char*>(data) + total_sent;
ssize_t sent = sendto(sockfd, chunk_start, chunk_size, 0,
(const struct sockaddr*)&dest_addr,
sizeof(dest_addr));
if (sent < 0) {
log_error("failed to send chunk: %s (sent %zu/%zu bytes)",
strerror(errno), total_sent, len);
return false;
}
total_sent += sent;
// 添加小延迟避免拥塞
if (chunk_size == max_chunk && total_sent < len) {
usleep(1000); // 1ms延迟
}
}
log_debug("successfully sent %zu bytes to %s:%d",
total_sent,
inet_ntoa(dest_addr.sin_addr),
ntohs(dest_addr.sin_port));
return true;
}
// 发送不同类型的消息
void sendvariousmessages(int sockfd, const struct sockaddr_in& dest_addr) {
// 1. 发送字符串
const char* text = "hello, udp server!";
senddata(sockfd, text, strlen(text), dest_addr);
// 2. 发送二进制数据
struct binarydata {
uint32_t magic;
uint16_t version;
uint8_t type;
uint8_t data[256];
} binary_msg;
binary_msg.magic = htonl(0xdeadbeef);
binary_msg.version = htons(1);
binary_msg.type = 0x42;
memset(binary_msg.data, 0xaa, sizeof(binary_msg.data));
senddata(sockfd, &binary_msg, sizeof(binary_msg), dest_addr);
// 3. 发送结构化数据(json格式)
std::string json_msg = r"({
"command": "update",
"timestamp": )" + std::to_string(time(nullptr)) + r"(,
"data": {"temperature": 23.5, "humidity": 65.2}
})";
senddata(sockfd, json_msg.c_str(), json_msg.length(), dest_addr);
// 4. 发送带序列号的消息
for (int i = 0; i < 10; i++) {
std::string seq_msg = "message #" + std::to_string(i);
senddata(sockfd, seq_msg.c_str(), seq_msg.length(), dest_addr);
// 添加延迟
usleep(100000); // 100ms
}
}
1.6 高级功能:多线程处理和连接管理
对于高性能udp服务器,我们需要考虑多线程处理和客户端连接管理。
// 扩展udpserver类,添加多线程支持
class advancedudpserver : public udpserver {
private:
std::vector<std::thread> worker_threads_;
std::atomic<int> thread_count_;
int max_workers_;
// 线程池和工作队列
std::queue<std::pair<std::vector<char>, sockaddr_in>> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
public:
advancedudpserver(int port = 8080, int max_workers = 4)
: udpserver(port), max_workers_(max_workers), thread_count_(0) {}
~advancedudpserver() {
stop();
}
bool init() override {
if (!udpserver::init()) {
return false;
}
// 创建工作线程
for (int i = 0; i < max_workers_; i++) {
worker_threads_.emplace_back(&advancedudpserver::workerthread, this, i);
}
log_info("started %d worker threads", max_workers_);
return true;
}
void run() override {
if (sockfd_ < 0) {
log_error("socket not initialized");
return;
}
is_running_ = true;
log_info("advanced udp server started on port %d", port_);
std::vector<char> buffer(buffer_size_);
while (is_running_) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
// 接收数据
ssize_t recv_len = recvfrom(sockfd_, buffer.data(),
buffer.size() - 1, 0,
(struct sockaddr*)&client_addr,
&addr_len);
if (recv_len < 0) {
if (errno == eagain || errno == ewouldblock) {
continue;
} else if (errno == eintr) {
continue;
} else {
log_error("receive error: %s", strerror(errno));
break;
}
}
if (recv_len > 0) {
buffer[recv_len] = '\0';
// 将任务加入队列
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.emplace(
std::vector<char>(buffer.begin(), buffer.begin() + recv_len),
client_addr
);
}
// 通知工作线程
queue_cv_.notify_one();
// 获取统计信息
if (task_queue_.size() > 10) {
log_warn("task queue size: %zu", task_queue_.size());
}
}
}
// 通知所有工作线程退出
queue_cv_.notify_all();
// 等待所有线程结束
for (auto& thread : worker_threads_) {
if (thread.joinable()) {
thread.join();
}
}
log_info("advanced udp server stopped");
cleanup();
}
private:
void workerthread(int thread_id) {
thread_count_++;
log_debug("worker thread %d started", thread_id);
while (is_running_) {
std::pair<std::vector<char>, sockaddr_in> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [this]() {
return !task_queue_.empty() || !is_running_;
});
if (!is_running_ && task_queue_.empty()) {
break;
}
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop();
} else {
continue;
}
}
// 处理任务
processtask(task.first, task.second, thread_id);
}
thread_count_--;
log_debug("worker thread %d stopped", thread_id);
}
void processtask(const std::vector<char>& data,
const sockaddr_in& client_addr,
int thread_id) {
// 获取客户端信息
char client_ip[inet_addrstrlen];
inet_ntop(af_inet, &client_addr.sin_addr,
client_ip, sizeof(client_ip));
uint16_t client_port = ntohs(client_addr.sin_port);
log_debug("thread %d processing %zu bytes from %s:%d",
thread_id, data.size(), client_ip, client_port);
// 模拟处理时间
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// 处理数据
std::string response = "thread " + std::to_string(thread_id) +
" processed: " + std::string(data.begin(), data.end());
sendresponse(response.c_str(), response.length(), client_addr);
}
};
// 连接管理类
class connectionmanager {
private:
struct clientinfo {
sockaddr_in address;
time_t last_activity;
uint64_t packet_count;
uint64_t total_bytes;
clientinfo(const sockaddr_in& addr)
: address(addr), last_activity(time(nullptr)),
packet_count(0), total_bytes(0) {}
};
std::unordered_map<std::string, clientinfo> clients_;
std::mutex clients_mutex_;
time_t cleanup_interval_;
public:
connectionmanager(time_t cleanup_interval = 300) // 5分钟
: cleanup_interval_(cleanup_interval) {}
// 更新客户端活动
void updateclient(const sockaddr_in& addr, size_t bytes) {
std::string key = getclientkey(addr);
std::lock_guard<std::mutex> lock(clients_mutex_);
auto it = clients_.find(key);
if (it == clients_.end()) {
// 新客户端
clients_.emplace(key, clientinfo(addr));
it = clients_.find(key);
char ip_str[inet_addrstrlen];
inet_ntop(af_inet, &addr.sin_addr, ip_str, sizeof(ip_str));
log_info("new client connected: %s:%d",
ip_str, ntohs(addr.sin_port));
}
// 更新统计信息
it->second.last_activity = time(nullptr);
it->second.packet_count++;
it->second.total_bytes += bytes;
}
// 清理不活跃的连接
void cleanupinactiveclients() {
time_t now = time(nullptr);
std::vector<std::string> to_remove;
{
std::lock_guard<std::mutex> lock(clients_mutex_);
for (const auto& pair : clients_) {
if (now - pair.second.last_activity > cleanup_interval_) {
to_remove.push_back(pair.first);
}
}
for (const auto& key : to_remove) {
const auto& client = clients_[key];
char ip_str[inet_addrstrlen];
inet_ntop(af_inet, &client.address.sin_addr,
ip_str, sizeof(ip_str));
log_info("client %s:%d disconnected (inactive). "
"packets: %lu, bytes: %lu",
ip_str, ntohs(client.address.sin_port),
client.packet_count, client.total_bytes);
clients_.erase(key);
}
}
if (!to_remove.empty()) {
log_info("cleaned up %zu inactive clients", to_remove.size());
}
}
// 获取客户端统计信息
std::string getstats() const {
std::lock_guard<std::mutex> lock(clients_mutex_);
std::stringstream ss;
ss << "active clients: " << clients_.size() << "\n";
for (const auto& pair : clients_) {
char ip_str[inet_addrstrlen];
inet_ntop(af_inet, &pair.second.address.sin_addr,
ip_str, sizeof(ip_str));
ss << ip_str << ":" << ntohs(pair.second.address.sin_port)
<< " - packets: " << pair.second.packet_count
<< ", bytes: " << pair.second.total_bytes
<< ", last activity: "
<< (time(nullptr) - pair.second.last_activity)
<< " seconds ago\n";
}
return ss.str();
}
private:
std::string getclientkey(const sockaddr_in& addr) const {
std::stringstream ss;
ss << inet_ntoa(addr.sin_addr) << ":" << ntohs(addr.sin_port);
return ss.str();
}
};
二、main.cc实现
主程序负责初始化服务器并处理命令行参数。
#include <iostream>
#include <csignal>
#include <cstdlib>
#include <memory>
#include "udpserver.hpp"
#include "advancedudpserver.hpp"
// 全局服务器指针,用于信号处理
std::unique_ptr<udpserver> g_server;
// 信号处理函数
void signalhandler(int signal) {
std::cout << "\nreceived signal " << signal << ", shutting down..." << std::endl;
if (g_server) {
g_server->stop();
}
}
// 显示使用帮助
void showusage(const char* program_name) {
std::cout << "udp server v1.0\n\n";
std::cout << "usage: " << program_name << " [options]\n\n";
std::cout << "options:\n";
std::cout << " -p, --port port server port (default: 8080)\n";
std::cout << " -b, --buffer size buffer size in bytes (default: 4096)\n";
std::cout << " -t, --timeout sec receive timeout in seconds (default: 5)\n";
std::cout << " -w, --workers num number of worker threads (default: 1)\n";
std::cout << " -a, --advanced use advanced server with thread pool\n";
std::cout << " -h, --help show this help message\n";
std::cout << "\nexamples:\n";
std::cout << " " << program_name << " -p 9000 -b 8192\n";
std::cout << " " << program_name << " --port 8080 --workers 4 --advanced\n";
}
// 解析命令行参数
struct serverconfig {
int port = 8080;
size_t buffer_size = 4096;
int timeout_sec = 5;
int timeout_usec = 0;
int workers = 1;
bool advanced = false;
bool reuse_addr = true;
};
serverconfig parsearguments(int argc, char* argv[]) {
serverconfig config;
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
if (arg == "-p" || arg == "--port") {
if (i + 1 < argc) {
config.port = std::atoi(argv[++i]);
if (config.port <= 0 || config.port > 65535) {
std::cerr << "error: port must be between 1 and 65535" << std::endl;
exit(1);
}
}
} else if (arg == "-b" || arg == "--buffer") {
if (i + 1 < argc) {
config.buffer_size = std::atoi(argv[++i]);
if (config.buffer_size < 1024 || config.buffer_size > 65536) {
std::cerr << "error: buffer size must be between 1024 and 65536" << std::endl;
exit(1);
}
}
} else if (arg == "-t" || arg == "--timeout") {
if (i + 1 < argc) {
config.timeout_sec = std::atoi(argv[++i]);
if (config.timeout_sec < 0) {
std::cerr << "error: timeout must be non-negative" << std::endl;
exit(1);
}
}
} else if (arg == "-w" || arg == "--workers") {
if (i + 1 < argc) {
config.workers = std::atoi(argv[++i]);
if (config.workers < 1 || config.workers > 32) {
std::cerr << "error: number of workers must be between 1 and 32" << std::endl;
exit(1);
}
}
} else if (arg == "-a" || arg == "--advanced") {
config.advanced = true;
} else if (arg == "-h" || arg == "--help") {
showusage(argv[0]);
exit(0);
} else if (arg == "--no-reuse") {
config.reuse_addr = false;
} else {
std::cerr << "error: unknown option '" << arg << "'" << std::endl;
showusage(argv[0]);
exit(1);
}
}
return config;
}
int main(int argc, char* argv[]) {
// 解析命令行参数
serverconfig config = parsearguments(argc, argv);
// 注册信号处理
signal(sigint, signalhandler);
signal(sigterm, signalhandler);
try {
std::cout << "=== udp server starting ===\n";
std::cout << "port: " << config.port << "\n";
std::cout << "buffer size: " << config.buffer_size << " bytes\n";
std::cout << "timeout: " << config.timeout_sec << " seconds\n";
std::cout << "workers: " << config.workers << "\n";
std::cout << "mode: " << (config.advanced ? "advanced" : "basic") << "\n";
std::cout << "===========================\n\n";
// 创建服务器实例
if (config.advanced) {
g_server = std::make_unique<advancedudpserver>(config.port, config.workers);
} else {
g_server = std::make_unique<udpserver>(config.port);
}
// 配置服务器
g_server->setbuffersize(config.buffer_size);
g_server->settimeout(config.timeout_sec, config.timeout_usec);
g_server->setreuseaddr(config.reuse_addr);
// 初始化服务器
if (!g_server->init()) {
std::cerr << "failed to initialize server" << std::endl;
return 1;
}
std::cout << "server initialized successfully\n";
std::cout << "press ctrl+c to stop the server\n\n";
// 运行服务器
g_server->run();
} catch (const std::exception& e) {
std::cerr << "exception: " << e.what() << std::endl;
return 1;
} catch (...) {
std::cerr << "unknown exception occurred" << std::endl;
return 1;
}
std::cout << "\nserver stopped gracefully" << std::endl;
return 0;
}
// 性能测试函数
void runperformancetest(int port) {
std::cout << "\n=== performance test ===\n";
// 创建测试服务器
auto test_server = std::make_unique<advancedudpserver>(port, 4);
test_server->setbuffersize(65536);
test_server->settimeout(1, 0);
if (!test_server->init()) {
std::cerr << "failed to initialize test server" << std::endl;
return;
}
// 在后台运行服务器
std::thread server_thread([&test_server]() {
test_server->run();
});
// 给服务器时间启动
std::this_thread::sleep_for(std::chrono::seconds(1));
// 创建测试客户端
int client_sock = socket(af_inet, sock_dgram, 0);
if (client_sock < 0) {
std::cerr << "failed to create test client socket" << std::endl;
return;
}
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = af_inet;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(inaddr_loopback);
// 测试参数
const int num_packets = 10000;
const int packet_size = 1024;
std::vector<char> test_data(packet_size, 'x');
auto start_time = std::chrono::high_resolution_clock::now();
// 发送测试数据包
for (int i = 0; i < num_packets; i++) {
// 在数据中包含序列号
memcpy(test_data.data(), &i, sizeof(i));
ssize_t sent = sendto(client_sock, test_data.data(), packet_size, 0,
(struct sockaddr*)&server_addr, sizeof(server_addr));
if (sent != packet_size) {
std::cerr << "failed to send packet " << i << std::endl;
break;
}
// 每1000个包打印进度
if ((i + 1) % 1000 == 0) {
std::cout << "sent " << (i + 1) << " packets..." << std::endl;
}
// 小延迟避免拥塞
usleep(10);
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
close(client_sock);
// 停止服务器
test_server->stop();
if (server_thread.joinable()) {
server_thread.join();
}
// 输出结果
std::cout << "\nperformance test results:\n";
std::cout << "packets sent: " << num_packets << "\n";
std::cout << "packet size: " << packet_size << " bytes\n";
std::cout << "total data: "
<< (num_packets * packet_size / 1024.0 / 1024.0)
<< " mb\n";
std::cout << "total time: " << duration.count() << " ms\n";
std::cout << "throughput: "
<< (num_packets * packet_size * 8.0 / duration.count() / 1000.0)
<< " mbps\n";
std::cout << "packets per second: "
<< (num_packets * 1000.0 / duration.count())
<< "\n";
}
三、udp客户端udpclient.cc
3.1 基本框架设计
udp客户端的设计需要简洁高效,支持多种操作模式。
#ifndef udpclient_h
#define udpclient_h
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <vector>
#include <chrono>
#include <thread>
#include <atomic>
#include <memory>
#include <iomanip>
class udpclient {
private:
int sockfd_; // 套接字描述符
struct sockaddr_in server_addr_; // 服务器地址
std::string server_ip_; // 服务器ip地址
int server_port_; // 服务器端口
// 客户端状态
std::atomic<bool> is_connected_;
std::atomic<bool> is_running_;
// 统计信息
uint64_t packets_sent_;
uint64_t packets_received_;
uint64_t bytes_sent_;
uint64_t bytes_received_;
public:
// 构造函数
udpclient(const std::string& ip = "127.0.0.1", int port = 8080);
// 析构函数
~udpclient();
// 初始化客户端
bool init();
// 连接服务器
bool connect();
// 发送数据
bool send(const std::string& data);
bool send(const void* data, size_t len);
// 接收数据(阻塞)
bool receive(std::string& data, int timeout_ms = 5000);
// 发送并等待响应
bool sendandreceive(const std::string& send_data,
std::string& recv_data,
int timeout_ms = 5000);
// 运行交互模式
void runinteractive();
// 运行性能测试模式
void runperformancetest(int num_packets = 1000,
int packet_size = 1024);
// 获取统计信息
void getstats(uint64_t& sent_packets, uint64_t& received_packets,
uint64_t& sent_bytes, uint64_t& received_bytes) const;
// 重置统计信息
void resetstats();
// 断开连接
void disconnect();
private:
// 创建套接字
bool createsocket();
// 设置套接字选项
bool setsocketoptions();
// 打印状态
void printstatus() const;
// 显示帮助信息
void showhelp() const;
};
#endif // udpclient_h
3.2 创建套接字和连接
#include "udpclient.h"
udpclient::udpclient(const std::string& ip, int port)
: server_ip_(ip), server_port_(port),
sockfd_(-1), is_connected_(false), is_running_(false),
packets_sent_(0), packets_received_(0),
bytes_sent_(0), bytes_received_(0) {
// 初始化服务器地址结构
memset(&server_addr_, 0, sizeof(server_addr_));
server_addr_.sin_family = af_inet;
server_addr_.sin_port = htons(server_port_);
// 转换ip地址
if (inet_pton(af_inet, server_ip_.c_str(), &server_addr_.sin_addr) <= 0) {
std::cerr << "invalid ip address: " << server_ip_ << std::endl;
}
}
udpclient::~udpclient() {
disconnect();
}
bool udpclient::init() {
// 创建套接字
if (!createsocket()) {
std::cerr << "failed to create socket" << std::endl;
return false;
}
// 设置套接字选项
if (!setsocketoptions()) {
std::cerr << "failed to set socket options" << std::endl;
close(sockfd_);
return false;
}
std::cout << "udp client initialized" << std::endl;
std::cout << "server: " << server_ip_ << ":" << server_port_ << std::endl;
return true;
}
bool udpclient::createsocket() {
// 创建udp套接字
sockfd_ = socket(af_inet, sock_dgram, 0);
if (sockfd_ < 0) {
std::cerr << "socket creation failed: " << strerror(errno) << std::endl;
return false;
}
std::cout << "socket created successfully (fd: " << sockfd_ << ")" << std::endl;
return true;
}
bool udpclient::setsocketoptions() {
int optval = 1;
// 设置接收超时
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
if (setsockopt(sockfd_, sol_socket, so_rcvtimeo, &tv, sizeof(tv)) < 0) {
std::cerr << "failed to set receive timeout: " << strerror(errno) << std::endl;
return false;
}
// 启用广播(如果需要)
optval = 1;
if (setsockopt(sockfd_, sol_socket, so_broadcast, &optval, sizeof(optval)) < 0) {
std::cerr << "warning: failed to enable broadcast: " << strerror(errno) << std::endl;
}
// 设置缓冲区大小
int buf_size = 1024 * 1024; // 1mb
if (setsockopt(sockfd_, sol_socket, so_rcvbuf, &buf_size, sizeof(buf_size)) < 0) {
std::cerr << "warning: failed to set receive buffer: " << strerror(errno) << std::endl;
}
if (setsockopt(sockfd_, sol_socket, so_sndbuf, &buf_size, sizeof(buf_size)) < 0) {
std::cerr << "warning: failed to set send buffer: " << strerror(errno) << std::endl;
}
return true;
}
bool udpclient::connect() {
if (sockfd_ < 0) {
std::cerr << "socket not initialized" << std::endl;
return false;
}
// udp是无连接的,这里只是测试与服务器的连通性
std::string test_msg = "connect_test";
std::string response;
if (sendandreceive(test_msg, response, 3000)) {
std::cout << "successfully connected to server" << std::endl;
std::cout << "server response: " << response << std::endl;
is_connected_ = true;
return true;
} else {
std::cerr << "failed to connect to server" << std::endl;
return false;
}
}
3.3 发送和接收数据
bool udpclient::send(const std::string& data) {
return send(data.c_str(), data.length());
}
bool udpclient::send(const void* data, size_t len) {
if (sockfd_ < 0) {
std::cerr << "socket not initialized" << std::endl;
return false;
}
if (len == 0) {
std::cerr << "attempting to send empty data" << std::endl;
return false;
}
// 检查数据包大小
if (len > 65507) {
std::cerr << "data too large: " << len << " bytes (max: 65507)" << std::endl;
return false;
}
// 发送数据
ssize_t sent = sendto(sockfd_, data, len, 0,
(struct sockaddr*)&server_addr_,
sizeof(server_addr_));
if (sent < 0) {
std::cerr << "send failed: " << strerror(errno) << std::endl;
return false;
}
if (static_cast<size_t>(sent) != len) {
std::cerr << "partial send: " << sent << " of " << len << " bytes" << std::endl;
}
// 更新统计信息
packets_sent_++;
bytes_sent_ += sent;
std::cout << "sent " << sent << " bytes to "
<< server_ip_ << ":" << server_port_ << std::endl;
return true;
}
bool udpclient::receive(std::string& data, int timeout_ms) {
if (sockfd_ < 0) {
std::cerr << "socket not initialized" << std::endl;
return false;
}
// 设置接收超时
if (timeout_ms > 0) {
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
setsockopt(sockfd_, sol_socket, so_rcvtimeo, &tv, sizeof(tv));
}
// 接收缓冲区
char buffer[65536];
struct sockaddr_in from_addr;
socklen_t addr_len = sizeof(from_addr);
// 接收数据
ssize_t recv_len = recvfrom(sockfd_, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&from_addr, &addr_len);
if (recv_len < 0) {
if (errno == eagain || errno == ewouldblock) {
std::cout << "receive timeout" << std::endl;
} else {
std::cerr << "receive failed: " << strerror(errno) << std::endl;
}
return false;
}
// 确保字符串以null结尾
buffer[recv_len] = '\0';
data.assign(buffer, recv_len);
// 获取发送者信息
char from_ip[inet_addrstrlen];
inet_ntop(af_inet, &from_addr.sin_addr, from_ip, sizeof(from_ip));
uint16_t from_port = ntohs(from_addr.sin_port);
// 更新统计信息
packets_received_++;
bytes_received_ += recv_len;
std::cout << "received " << recv_len << " bytes from "
<< from_ip << ":" << from_port << std::endl;
return true;
}
bool udpclient::sendandreceive(const std::string& send_data,
std::string& recv_data,
int timeout_ms) {
// 发送数据
if (!send(send_data)) {
return false;
}
// 接收响应
if (!receive(recv_data, timeout_ms)) {
return false;
}
return true;
}
3.4 交互模式和性能测试
void udpclient::runinteractive() {
if (!is_connected_) {
if (!connect()) {
std::cerr << "cannot start interactive mode: not connected" << std::endl;
return;
}
}
is_running_ = true;
std::cout << "\n=== udp client interactive mode ===\n";
std::cout << "type 'help' for commands, 'quit' to exit\n\n";
std::string input;
while (is_running_) {
std::cout << "udp> ";
std::getline(std::cin, input);
if (input.empty()) {
continue;
}
// 处理命令
if (input == "quit" || input == "exit") {
std::cout << "exiting..." << std::endl;
break;
} else if (input == "help") {
showhelp();
} else if (input == "status") {
printstatus();
} else if (input == "stats") {
std::cout << "\n=== statistics ===\n";
std::cout << "packets sent: " << packets_sent_ << "\n";
std::cout << "packets received: " << packets_received_ << "\n";
std::cout << "bytes sent: " << bytes_sent_ << "\n";
std::cout << "bytes received: " << bytes_received_ << "\n";
if (packets_sent_ > 0) {
std::cout << "average sent size: "
<< (bytes_sent_ / packets_sent_) << " bytes\n";
}
if (packets_received_ > 0) {
std::cout << "average received size: "
<< (bytes_received_ / packets_received_) << " bytes\n";
}
} else if (input == "reset") {
resetstats();
std::cout << "statistics reset" << std::endl;
} else if (input == "ping") {
std::string response;
if (sendandreceive("ping", response)) {
std::cout << "server response: " << response << std::endl;
}
} else if (input == "time") {
std::string response;
if (sendandreceive("time", response)) {
std::cout << "server time: " << response;
}
} else if (input.compare(0, 4, "echo") == 0) {
if (input.length() > 5) {
std::string echo_data = input.substr(5);
std::string response;
if (sendandreceive("echo " + echo_data, response)) {
std::cout << "echo: " << response << std::endl;
}
} else {
std::cout << "usage: echo <message>" << std::endl;
}
} else if (input.compare(0, 4, "file") == 0) {
// 模拟文件传输
std::string filename = input.length() > 5 ? input.substr(5) : "test.txt";
std::cout << "simulating file transfer: " << filename << std::endl;
// 创建模拟文件内容
std::string file_content;
for (int i = 0; i < 100; i++) {
file_content += "line " + std::to_string(i + 1) + ": this is test data\n";
}
// 分块发送
const size_t chunk_size = 1024;
size_t total_sent = 0;
int chunk_num = 1;
for (size_t i = 0; i < file_content.length(); i += chunk_size) {
size_t chunk_len = std::min(chunk_size, file_content.length() - i);
std::string chunk = file_content.substr(i, chunk_len);
// 添加块头信息
std::string chunk_with_header = "file_chunk " +
std::to_string(chunk_num) + " " +
chunk;
if (send(chunk_with_header)) {
total_sent += chunk_len;
std::cout << "sent chunk " << chunk_num
<< " (" << chunk_len << " bytes)" << std::endl;
chunk_num++;
// 小延迟
usleep(10000); // 10ms
} else {
std::cerr << "failed to send chunk " << chunk_num << std::endl;
break;
}
}
std::cout << "file transfer complete: " << total_sent << " bytes sent" << std::endl;
} else if (input == "perftest") {
runperformancetest();
} else {
// 默认:发送原始消息
std::string response;
if (sendandreceive(input, response)) {
std::cout << "response: " << response << std::endl;
}
}
}
}
void udpclient::runperformancetest(int num_packets, int packet_size) {
std::cout << "\n=== performance test ===\n";
std::cout << "packets: " << num_packets << "\n";
std::cout << "packet size: " << packet_size << " bytes\n";
std::cout << "total data: "
<< (num_packets * packet_size / 1024.0 / 1024.0)
<< " mb\n\n";
// 准备测试数据
std::vector<char> test_data(packet_size, 'x');
// 记录开始时间
auto start_time = std::chrono::high_resolution_clock::now();
// 发送测试数据包
int successful_sends = 0;
for (int i = 0; i < num_packets; i++) {
// 在数据中包含序列号和时间戳
uint64_t seq = i;
auto timestamp = std::chrono::high_resolution_clock::now();
uint64_t timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timestamp.time_since_epoch()).count();
// 将序列号和时间戳复制到数据开始处
memcpy(test_data.data(), &seq, sizeof(seq));
memcpy(test_data.data() + sizeof(seq), ×tamp_ns, sizeof(timestamp_ns));
if (send(test_data.data(), packet_size)) {
successful_sends++;
}
// 每100个包打印进度
if ((i + 1) % 100 == 0) {
std::cout << "sent " << (i + 1) << " packets..." << std::endl;
}
// 控制发送速率(1000 packets/second)
usleep(1000);
}
// 记录结束时间
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
// 接收响应(可选)
std::cout << "\nwaiting for responses..." << std::endl;
int responses_received = 0;
auto receive_start = std::chrono::high_resolution_clock::now();
// 设置短超时接收剩余响应
struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
setsockopt(sockfd_, sol_socket, so_rcvtimeo, &tv, sizeof(tv));
char buffer[65536];
while (true) {
ssize_t recv_len = recvfrom(sockfd_, buffer, sizeof(buffer) - 1, 0, null, null);
if (recv_len > 0) {
responses_received++;
// 解析响应中的序列号
if (recv_len >= sizeof(uint64_t)) {
uint64_t seq;
memcpy(&seq, buffer, sizeof(seq));
// 可以在这里计算往返时间等
}
} else {
break; // 超时
}
}
auto receive_end = std::chrono::high_resolution_clock::now();
auto receive_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
receive_end - receive_start);
// 输出结果
std::cout << "\n=== test results ===\n";
std::cout << "packets sent: " << successful_sends << "/" << num_packets << "\n";
std::cout << "responses received: " << responses_received << "\n";
std::cout << "send duration: " << duration.count() << " ms\n";
std::cout << "receive duration: " << receive_duration.count() << " ms\n";
if (duration.count() > 0) {
double send_rate = (successful_sends * 1000.0) / duration.count();
double throughput = (successful_sends * packet_size * 8.0) /
(duration.count() * 1000.0); // mbps
std::cout << "send rate: " << send_rate << " packets/second\n";
std::cout << "throughput: " << throughput << " mbps\n";
}
if (responses_received > 0) {
double response_rate = (responses_received * 1000.0) / receive_duration.count();
std::cout << "response rate: " << response_rate << " packets/second\n";
}
double loss_rate = 0;
if (successful_sends > 0) {
loss_rate = (1.0 - (responses_received / (double)successful_sends)) * 100.0;
std::cout << "packet loss rate: " << std::fixed << std::setprecision(2)
<< loss_rate << "%\n";
}
}
void udpclient::showhelp() const {
std::cout << "\navailable commands:\n";
std::cout << " help show this help message\n";
std::cout << " quit, exit exit the client\n";
std::cout << " status show connection status\n";
std::cout << " stats show statistics\n";
std::cout << " reset reset statistics\n";
std::cout << " ping send ping to server\n";
std::cout << " time get server time\n";
std::cout << " echo <message> echo message to server\n";
std::cout << " file [name] simulate file transfer\n";
std::cout << " perftest run performance test\n";
std::cout << " <any text> send custom message\n";
}
void udpclient::printstatus() const {
std::cout << "\n=== client status ===\n";
std::cout << "server: " << server_ip_ << ":" << server_port_ << "\n";
std::cout << "socket: " << (sockfd_ >= 0 ? "ok" : "not initialized") << "\n";
std::cout << "connected: " << (is_connected_ ? "yes" : "no") << "\n";
std::cout << "running: " << (is_running_ ? "yes" : "no") << "\n";
}
void udpclient::getstats(uint64_t& sent_packets, uint64_t& received_packets,
uint64_t& sent_bytes, uint64_t& received_bytes) const {
sent_packets = packets_sent_;
received_packets = packets_received_;
sent_bytes = bytes_sent_;
received_bytes = bytes_received_;
}
void udpclient::resetstats() {
packets_sent_ = 0;
packets_received_ = 0;
bytes_sent_ = 0;
bytes_received_ = 0;
}
void udpclient::disconnect() {
if (sockfd_ >= 0) {
// 发送断开连接消息
std::string disconnect_msg = "disconnect";
send(disconnect_msg);
// 关闭套接字
close(sockfd_);
sockfd_ = -1;
std::cout << "disconnected from server" << std::endl;
}
is_connected_ = false;
is_running_ = false;
}
// 客户端主程序
int main(int argc, char* argv[]) {
std::string server_ip = "127.0.0.1";
int server_port = 8080;
// 解析命令行参数
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
if (arg == "-s" || arg == "--server") {
if (i + 1 < argc) {
server_ip = argv[++i];
}
} else if (arg == "-p" || arg == "--port") {
if (i + 1 < argc) {
server_port = std::atoi(argv[++i]);
}
} else if (arg == "-h" || arg == "--help") {
std::cout << "udp client usage:\n";
std::cout << " -s, --server ip server ip address (default: 127.0.0.1)\n";
std::cout << " -p, --port port server port (default: 8080)\n";
std::cout << " -t, --test run performance test\n";
std::cout << " -i, --interactive run in interactive mode\n";
std::cout << " -h, --help show this help\n";
return 0;
} else if (arg == "-t" || arg == "--test") {
// 性能测试模式
udpclient client(server_ip, server_port);
if (client.init() && client.connect()) {
client.runperformancetest();
}
return 0;
} else if (arg == "-i" || arg == "--interactive") {
// 交互模式(默认)
}
}
std::cout << "=== udp client ===\n";
std::cout << "connecting to " << server_ip << ":" << server_port << "\n\n";
udpclient client(server_ip, server_port);
if (!client.init()) {
std::cerr << "failed to initialize client" << std::endl;
return 1;
}
if (!client.connect()) {
std::cerr << "failed to connect to server" << std::endl;
return 1;
}
// 运行交互模式
client.runinteractive();
std::cout << "\nclient terminated" << std::endl;
return 0;
}
四、测试
4.1 单元测试
// testudpserver.cpp
#include <gtest/gtest.h>
#include <thread>
#include <chrono>
#include "udpserver.hpp"
#include "udpclient.h"
class udpservertest : public ::testing::test {
protected:
void setup() override {
// 启动测试服务器
test_port_ = 9999;
server_ = std::make_unique<udpserver>(test_port_);
assert_true(server_->init());
// 在后台线程运行服务器
server_thread_ = std::thread([this]() {
server_->run();
});
// 等待服务器启动
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void teardown() override {
if (server_) {
server_->stop();
}
if (server_thread_.joinable()) {
server_thread_.join();
}
}
int test_port_;
std::unique_ptr<udpserver> server_;
std::thread server_thread_;
};
test_f(udpservertest, basicecho) {
udpclient client("127.0.0.1", test_port_);
assert_true(client.init());
std::string send_data = "hello, server!";
std::string recv_data;
expect_true(client.sendandreceive(send_data, recv_data));
expect_ne(recv_data.find("server received"), std::string::npos);
expect_ne(recv_data.find(send_data), std::string::npos);
}
test_f(udpservertest, multipleclients) {
const int num_clients = 5;
std::vector<std::unique_ptr<udpclient>> clients;
std::vector<std::thread> client_threads;
for (int i = 0; i < num_clients; i++) {
auto client = std::make_unique<udpclient>("127.0.0.1", test_port_);
assert_true(client->init());
clients.push_back(std::move(client));
}
// 并发发送消息
for (int i = 0; i < num_clients; i++) {
client_threads.emplace_back([&clients, i]() {
std::string send_data = "message from client " + std::to_string(i);
std::string recv_data;
expect_true(clients[i]->sendandreceive(send_data, recv_data, 3000));
});
}
// 等待所有线程完成
for (auto& thread : client_threads) {
thread.join();
}
}
test_f(udpservertest, largedata) {
udpclient client("127.0.0.1", test_port_);
assert_true(client.init());
// 发送较大数据(小于64kb)
std::string large_data(50000, 'x'); // 50kb
std::string recv_data;
expect_true(client.sendandreceive(large_data, recv_data, 5000));
expect_gt(recv_data.size(), large_data.size());
}
test_f(udpservertest, performance) {
udpclient client("127.0.0.1", test_port_);
assert_true(client.init());
const int num_packets = 100;
const int packet_size = 1400; // 避免分片
auto start_time = std::chrono::high_resolution_clock::now();
int success_count = 0;
for (int i = 0; i < num_packets; i++) {
std::string data(packet_size, 'a' + (i % 26));
std::string response;
if (client.sendandreceive(data, response, 1000)) {
success_count++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
std::cout << "\nperformance test results:\n";
std::cout << "successful exchanges: " << success_count << "/" << num_packets << "\n";
std::cout << "total time: " << duration.count() << " ms\n";
std::cout << "average rtt: " << (duration.count() / (double)success_count) << " ms\n";
expect_gt(success_count, num_packets * 0.9); // 90%成功率
}
int main(int argc, char **argv) {
::testing::initgoogletest(&argc, argv);
return run_all_tests();
}
4.2 集成测试
// integrationtest.cpp
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include "advancedudpserver.hpp"
#include "udpclient.h"
class integrationtest {
private:
std::unique_ptr<advancedudpserver> server_;
std::thread server_thread_;
int server_port_;
public:
integrationtest(int port = 8888, int workers = 4)
: server_port_(port) {
// 启动高性能服务器
server_ = std::make_unique<advancedudpserver>(port, workers);
server_->setbuffersize(65536);
server_->settimeout(1, 0);
if (!server_->init()) {
throw std::runtime_error("failed to initialize server");
}
// 在后台运行服务器
server_thread_ = std::thread([this]() {
server_->run();
});
// 等待服务器启动
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "test server started on port " << port << std::endl;
}
~integrationtest() {
if (server_) {
server_->stop();
}
if (server_thread_.joinable()) {
server_thread_.join();
}
std::cout << "test server stopped" << std::endl;
}
void runloadtest(int num_clients, int messages_per_client) {
std::cout << "\n=== load test ===\n";
std::cout << "clients: " << num_clients << "\n";
std::cout << "messages per client: " << messages_per_client << "\n";
std::vector<std::thread> client_threads;
std::atomic<int> total_success{0};
std::atomic<int> total_failures{0};
auto start_time = std::chrono::high_resolution_clock::now();
// 创建客户端线程
for (int client_id = 0; client_id < num_clients; client_id++) {
client_threads.emplace_back([this, client_id, messages_per_client,
&total_success, &total_failures]() {
udpclient client("127.0.0.1", server_port_);
if (!client.init()) {
total_failures += messages_per_client;
return;
}
int local_success = 0;
int local_failures = 0;
for (int msg_num = 0; msg_num < messages_per_client; msg_num++) {
std::string message = "client" + std::to_string(client_id) +
"_msg" + std::to_string(msg_num);
std::string response;
if (client.sendandreceive(message, response, 1000)) {
local_success++;
// 验证响应包含原始消息
if (response.find(message) == std::string::npos) {
std::cerr << "warning: invalid response from server" << std::endl;
}
} else {
local_failures++;
}
// 小延迟避免拥塞
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
total_success += local_success;
total_failures += local_failures;
std::cout << "client " << client_id << ": "
<< local_success << "/" << messages_per_client
<< " successful" << std::endl;
});
}
// 等待所有客户端完成
for (auto& thread : client_threads) {
thread.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
// 输出结果
std::cout << "\n=== load test results ===\n";
std::cout << "total messages: " << (num_clients * messages_per_client) << "\n";
std::cout << "successful: " << total_success << "\n";
std::cout << "failed: " << total_failures << "\n";
std::cout << "success rate: "
<< (total_success * 100.0 / (num_clients * messages_per_client))
<< "%\n";
std::cout << "total time: " << duration.count() << " ms\n";
std::cout << "throughput: "
<< (total_success * 1000.0 / duration.count())
<< " messages/second\n";
// 验证至少95%成功率
double success_rate = total_success * 100.0 / (num_clients * messages_per_client);
if (success_rate >= 95.0) {
std::cout << "\n✓ load test passed" << std::endl;
} else {
std::cout << "\n✗ load test failed (success rate below 95%)" << std::endl;
}
}
void runstresstest() {
std::cout << "\n=== stress test ===\n";
const int num_clients = 10;
const int messages_per_client = 1000;
const int message_size = 1000; // 1kb
std::vector<std::thread> client_threads;
std::atomic<uint64_t> total_bytes_sent{0};
std::atomic<uint64_t> total_bytes_received{0};
auto start_time = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_clients; i++) {
client_threads.emplace_back([this, i, &total_bytes_sent, &total_bytes_received]() {
udpclient client("127.0.0.1", server_port_);
if (!client.init()) {
return;
}
// 准备大消息
std::string large_message(message_size, 'a' + (i % 26));
uint64_t client_bytes_sent = 0;
uint64_t client_bytes_received = 0;
for (int j = 0; j < messages_per_client; j++) {
// 修改消息内容
large_message[0] = '0' + (j % 10);
std::string response;
if (client.sendandreceive(large_message, response, 500)) {
client_bytes_sent += large_message.size();
client_bytes_received += response.size();
}
// 更短的延迟以增加压力
if (j % 100 == 0) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
total_bytes_sent += client_bytes_sent;
total_bytes_received += client_bytes_received;
std::cout << "stress client " << i << " completed" << std::endl;
});
}
for (auto& thread : client_threads) {
thread.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
std::cout << "\n=== stress test results ===\n";
std::cout << "total data sent: "
<< (total_bytes_sent / 1024.0 / 1024.0) << " mb\n";
std::cout << "total data received: "
<< (total_bytes_received / 1024.0 / 1024.0) << " mb\n";
std::cout << "total time: " << duration.count() << " ms\n";
if (duration.count() > 0) {
double send_throughput = (total_bytes_sent * 8.0) /
(duration.count() * 1000.0); // mbps
double receive_throughput = (total_bytes_received * 8.0) /
(duration.count() * 1000.0); // mbps
std::cout << "send throughput: " << send_throughput << " mbps\n";
std::cout << "receive throughput: " << receive_throughput << " mbps\n";
std::cout << "total throughput: " << (send_throughput + receive_throughput)
<< " mbps\n";
}
if (total_bytes_sent > 0) {
std::cout << "\n✓ stress test completed successfully" << std::endl;
}
}
};
int main() {
try {
std::cout << "=== udp system integration test ===\n\n";
integrationtest test(8888, 4);
// 运行负载测试
test.runloadtest(5, 100);
// 运行压力测试
test.runstresstest();
std::cout << "\n=== all tests completed ===\n";
} catch (const std::exception& e) {
std::cerr << "test failed: " << e.what() << std::endl;
return 1;
}
return 0;
}
4.3 网络测试工具
// networktesttool.cpp
#include <iostream>
#include <iomanip>
#include <vector>
#include <map>
#include <cmath>
#include "udpclient.h"
class networktesttool {
private:
std::string server_ip_;
int server_port_;
public:
networktesttool(const std::string& ip, int port)
: server_ip_(ip), server_port_(port) {}
void runlatencytest(int num_packets = 100) {
std::cout << "\n=== latency test ===\n";
std::cout << "server: " << server_ip_ << ":" << server_port_ << "\n";
std::cout << "packets: " << num_packets << "\n\n";
udpclient client(server_ip_, server_port_);
if (!client.init()) {
std::cerr << "failed to initialize client" << std::endl;
return;
}
std::vector<double> latencies;
int successful_packets = 0;
for (int i = 0; i < num_packets; i++) {
// 准备包含时间戳的消息
auto send_time = std::chrono::high_resolution_clock::now();
uint64_t send_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
send_time.time_since_epoch()).count();
std::string message = "ping_" + std::to_string(i) + "_" +
std::to_string(send_ns);
std::string response;
if (client.sendandreceive(message, response, 1000)) {
auto recv_time = std::chrono::high_resolution_clock::now();
uint64_t recv_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
recv_time.time_since_epoch()).count();
// 计算往返时间
double rtt_ns = static_cast<double>(recv_ns - send_ns);
double rtt_ms = rtt_ns / 1000000.0;
latencies.push_back(rtt_ms);
successful_packets++;
if ((i + 1) % 10 == 0) {
std::cout << "sent " << (i + 1) << " packets..." << std::endl;
}
} else {
std::cout << "packet " << i << " lost" << std::endl;
}
// 等待以避免拥塞
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 计算统计信息
if (!latencies.empty()) {
double sum = 0;
double min_latency = latencies[0];
double max_latency = latencies[0];
for (double latency : latencies) {
sum += latency;
min_latency = std::min(min_latency, latency);
max_latency = std::max(max_latency, latency);
}
double average = sum / latencies.size();
// 计算标准差
double variance = 0;
for (double latency : latencies) {
variance += (latency - average) * (latency - average);
}
variance /= latencies.size();
double stddev = std::sqrt(variance);
// 计算百分位数
std::sort(latencies.begin(), latencies.end());
double p50 = latencies[latencies.size() * 0.5];
double p90 = latencies[latencies.size() * 0.9];
double p95 = latencies[latencies.size() * 0.95];
double p99 = latencies[latencies.size() * 0.99];
// 输出结果
std::cout << "\n=== latency test results ===\n";
std::cout << "packets sent: " << num_packets << "\n";
std::cout << "packets received: " << successful_packets << "\n";
std::cout << "packet loss: "
<< std::fixed << std::setprecision(2)
<< ((num_packets - successful_packets) * 100.0 / num_packets)
<< "%\n";
std::cout << "\nlatency statistics (ms):\n";
std::cout << " minimum: " << std::fixed << std::setprecision(3)
<< min_latency << "\n";
std::cout << " maximum: " << max_latency << "\n";
std::cout << " average: " << average << "\n";
std::cout << " std dev: " << stddev << "\n";
std::cout << " 50th percentile: " << p50 << "\n";
std::cout << " 90th percentile: " << p90 << "\n";
std::cout << " 95th percentile: " << p95 << "\n";
std::cout << " 99th percentile: " << p99 << "\n";
// 显示直方图
displayhistogram(latencies);
}
}
void displayhistogram(const std::vector<double>& data) {
if (data.empty()) return;
double min_val = *std::min_element(data.begin(), data.end());
double max_val = *std::max_element(data.begin(), data.end());
const int num_bins = 10;
double bin_width = (max_val - min_val) / num_bins;
std::vector<int> bins(num_bins, 0);
for (double value : data) {
int bin_index = static_cast<int>((value - min_val) / bin_width);
if (bin_index == num_bins) bin_index--; // 处理边界情况
bins[bin_index]++;
}
std::cout << "\nlatency distribution:\n";
for (int i = 0; i < num_bins; i++) {
double bin_start = min_val + i * bin_width;
double bin_end = bin_start + bin_width;
std::cout << std::fixed << std::setprecision(1)
<< " " << std::setw(6) << bin_start
<< " - " << std::setw(6) << bin_end << " ms: "
<< std::string(bins[i] * 50 / data.size(), '#')
<< " (" << bins[i] << ")\n";
}
}
void runbandwidthtest(int duration_sec = 10, int packet_size = 1400) {
std::cout << "\n=== bandwidth test ===\n";
std::cout << "duration: " << duration_sec << " seconds\n";
std::cout << "packet size: " << packet_size << " bytes\n\n";
udpclient client(server_ip_, server_port_);
if (!client.init()) {
std::cerr << "failed to initialize client" << std::endl;
return;
}
std::vector<char> packet_data(packet_size, 'b');
auto start_time = std::chrono::steady_clock::now();
auto end_time = start_time + std::chrono::seconds(duration_sec);
uint64_t total_packets = 0;
uint64_t total_bytes = 0;
uint64_t successful_responses = 0;
std::cout << "testing bandwidth...\n";
while (std::chrono::steady_clock::now() < end_time) {
// 发送数据包
if (client.send(packet_data.data(), packet_size)) {
total_packets++;
total_bytes += packet_size;
}
// 尝试接收响应(非阻塞)
std::string response;
if (client.receive(response, 10)) { // 10ms超时
successful_responses++;
}
// 控制发送速率(约1000 packets/second)
std::this_thread::sleep_for(std::chrono::microseconds(900));
}
auto actual_end = std::chrono::steady_clock::now();
auto actual_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
actual_end - start_time);
// 输出结果
std::cout << "\n=== bandwidth test results ===\n";
std::cout << "actual duration: " << actual_duration.count() << " ms\n";
std::cout << "packets sent: " << total_packets << "\n";
std::cout << "total data sent: "
<< (total_bytes / 1024.0 / 1024.0) << " mb\n";
std::cout << "responses received: " << successful_responses << "\n";
if (actual_duration.count() > 0) {
double packets_per_sec = total_packets * 1000.0 / actual_duration.count();
double bandwidth_mbps = (total_bytes * 8.0) /
(actual_duration.count() * 1000.0);
std::cout << "send rate: " << packets_per_sec << " packets/second\n";
std::cout << "bandwidth: " << bandwidth_mbps << " mbps\n";
std::cout << "response rate: "
<< (successful_responses * 1000.0 / actual_duration.count())
<< " packets/second\n";
}
double loss_rate = 0;
if (total_packets > 0) {
loss_rate = (1.0 - (successful_responses / (double)total_packets)) * 100.0;
std::cout << "estimated loss rate: "
<< std::fixed << std::setprecision(2) << loss_rate << "%\n";
}
}
};
int main(int argc, char* argv[]) {
std::string server_ip = "127.0.0.1";
int server_port = 8080;
std::string test_type = "latency";
// 解析命令行参数
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
if (arg == "-s" || arg == "--server") {
if (i + 1 < argc) server_ip = argv[++i];
} else if (arg == "-p" || arg == "--port") {
if (i + 1 < argc) server_port = std::atoi(argv[++i]);
} else if (arg == "-t" || arg == "--test") {
if (i + 1 < argc) test_type = argv[++i];
} else if (arg == "-h" || arg == "--help") {
std::cout << "network test tool\n\n";
std::cout << "usage: " << argv[0] << " [options]\n\n";
std::cout << "options:\n";
std::cout << " -s, --server ip server ip address\n";
std::cout << " -p, --port port server port\n";
std::cout << " -t, --test type test type (latency|bandwidth)\n";
std::cout << " -h, --help show this help\n";
return 0;
}
}
networktesttool tester(server_ip, server_port);
if (test_type == "latency") {
tester.runlatencytest();
} else if (test_type == "bandwidth") {
tester.runbandwidthtest();
} else {
std::cerr << "unknown test type: " << test_type << std::endl;
std::cerr << "available types: latency, bandwidth" << std::endl;
return 1;
}
return 0;
}
五、源代码
5.1 log.hpp - 日志系统
#ifndef log_hpp
#define log_hpp
#include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <iomanip>
#include <ctime>
#include <mutex>
#include <memory>
// 日志级别
enum class loglevel {
debug,
info,
warn,
error,
fatal
};
class logger {
private:
static std::shared_ptr<logger> instance_;
std::ofstream log_file_;
loglevel min_level_;
std::mutex log_mutex_;
bool console_output_;
// 私有构造函数
logger() : min_level_(loglevel::info), console_output_(true) {}
public:
// 删除拷贝构造函数和赋值运算符
logger(const logger&) = delete;
logger& operator=(const logger&) = delete;
// 获取单例实例
static logger& instance() {
static std::shared_ptr<logger> instance(new logger());
return *instance;
}
// 初始化日志系统
bool init(const std::string& filename = "",
loglevel min_level = loglevel::info,
bool console = true) {
std::lock_guard<std::mutex> lock(log_mutex_);
min_level_ = min_level;
console_output_ = console;
if (!filename.empty()) {
log_file_.open(filename, std::ios::app);
if (!log_file_.is_open()) {
std::cerr << "failed to open log file: " << filename << std::endl;
return false;
}
}
return true;
}
// 设置日志级别
void setlevel(loglevel level) {
std::lock_guard<std::mutex> lock(log_mutex_);
min_level_ = level;
}
// 启用/禁用控制台输出
void enableconsole(bool enable) {
std::lock_guard<std::mutex> lock(log_mutex_);
console_output_ = enable;
}
// 记录日志
void log(loglevel level, const std::string& message,
const char* file = nullptr, int line = 0) {
if (level < min_level_) {
return;
}
std::lock_guard<std::mutex> lock(log_mutex_);
// 获取当前时间
auto now = std::chrono::system_clock::now();
auto now_time = std::chrono::system_clock::to_time_t(now);
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
// 格式化时间
std::tm* tm_info = std::localtime(&now_time);
char time_buffer[80];
std::strftime(time_buffer, sizeof(time_buffer),
"%y-%m-%d %h:%m:%s", tm_info);
// 日志级别字符串
const char* level_str = "";
switch (level) {
case loglevel::debug: level_str = "debug"; break;
case loglevel::info: level_str = "info"; break;
case loglevel::warn: level_str = "warn"; break;
case loglevel::error: level_str = "error"; break;
case loglevel::fatal: level_str = "fatal"; break;
}
// 构建日志消息
std::stringstream ss;
ss << "[" << time_buffer << "."
<< std::setfill('0') << std::setw(3) << now_ms.count() << "] "
<< "[" << level_str << "] ";
if (file != nullptr) {
ss << "[" << file << ":" << line << "] ";
}
ss << message;
std::string log_message = ss.str();
// 输出到控制台
if (console_output_) {
std::ostream& stream = (level >= loglevel::warn) ? std::cerr : std::cout;
stream << log_message << std::endl;
if (level == loglevel::fatal) {
stream << "fatal error, terminating..." << std::endl;
}
}
// 输出到文件
if (log_file_.is_open()) {
log_file_ << log_message << std::endl;
log_file_.flush();
if (level == loglevel::fatal) {
log_file_ << "fatal error, terminating..." << std::endl;
log_file_.flush();
}
}
// 如果是致命错误,终止程序
if (level == loglevel::fatal) {
std::exit(exit_failure);
}
}
// 关闭日志
void close() {
std::lock_guard<std::mutex> lock(log_mutex_);
if (log_file_.is_open()) {
log_file_.close();
}
}
~logger() {
close();
}
};
// 日志宏
#define log_debug(msg) logger::instance().log(loglevel::debug, msg, __file__, __line__)
#define log_info(msg) logger::instance().log(loglevel::info, msg, __file__, __line__)
#define log_warn(msg) logger::instance().log(loglevel::warn, msg, __file__, __line__)
#define log_error(msg) logger::instance().log(loglevel::error, msg, __file__, __line__)
#define log_fatal(msg) logger::instance().log(loglevel::fatal, msg, __file__, __line__)
#endif // log_hpp
5.2 makefile - 构建系统
# makefile for udp network system
# compiler and flags
cxx = g++
cxxflags = -std=c++11 -wall -wextra -o2 -pthread
debug_flags = -g -ddebug
release_flags = -o3 -dndebug
# directories
src_dir = src
obj_dir = obj
bin_dir = bin
inc_dir = include
# source files
server_srcs = $(src_dir)/udpserver.cpp $(src_dir)/main.cpp $(src_dir)/log.cpp
client_srcs = $(src_dir)/udpclient.cpp
test_srcs = $(src_dir)/testudpserver.cpp
integration_srcs = $(src_dir)/integrationtest.cpp
nettest_srcs = $(src_dir)/networktesttool.cpp
# object files
server_objs = $(patsubst $(src_dir)/%.cpp,$(obj_dir)/%.o,$(server_srcs))
client_objs = $(patsubst $(src_dir)/%.cpp,$(obj_dir)/%.o,$(client_srcs))
test_objs = $(patsubst $(src_dir)/%.cpp,$(obj_dir)/%.o,$(test_srcs))
integration_objs = $(patsubst $(src_dir)/%.cpp,$(obj_dir)/%.o,$(integration_srcs))
nettest_objs = $(patsubst $(src_dir)/%.cpp,$(obj_dir)/%.o,$(nettest_srcs))
# executables
server_exe = $(bin_dir)/udp_server
client_exe = $(bin_dir)/udp_client
test_exe = $(bin_dir)/test_server
integration_exe = $(bin_dir)/integration_test
nettest_exe = $(bin_dir)/network_test
# include paths
includes = -i$(inc_dir)
# libraries
libs = -lpthread
test_libs = $(libs) -lgtest -lgtest_main
# default target
all: directories server client
# create directories
directories:
@mkdir -p $(obj_dir) $(bin_dir)
# server build
server: $(server_exe)
$(server_exe): $(server_objs)
$(cxx) $(cxxflags) $(includes) $^ -o $@ $(libs)
# client build
client: $(client_exe)
$(client_exe): $(client_objs)
$(cxx) $(cxxflags) $(includes) $^ -o $@ $(libs)
# test build
test: $(test_exe)
$(test_exe): $(test_objs)
$(cxx) $(cxxflags) $(includes) $^ -o $@ $(test_libs)
# integration test build
integration: $(integration_exe)
$(integration_exe): $(integration_objs)
$(cxx) $(cxxflags) $(includes) $^ -o $@ $(libs)
# network test tool build
nettest: $(nettest_exe)
$(nettest_exe): $(nettest_objs)
$(cxx) $(cxxflags) $(includes) $^ -o $@ $(libs)
# compile source files
$(obj_dir)/%.o: $(src_dir)/%.cpp
$(cxx) $(cxxflags) $(includes) -c $< -o $@
# debug build
debug: cxxflags += $(debug_flags)
debug: all
# release build
release: cxxflags += $(release_flags)
release: all
# static analysis with cppcheck
check:
cppcheck --enable=all --suppress=missingincludesystem $(src_dir) $(inc_dir)
# run tests
run-test: test
$(test_exe)
run-integration: integration
$(integration_exe)
# clean build files
clean:
rm -rf $(obj_dir) $(bin_dir)
rm -f *.log
# install system-wide (requires root)
install: release
cp $(server_exe) /usr/local/bin/udp_server
cp $(client_exe) /usr/local/bin/udp_client
chmod +x /usr/local/bin/udp_server /usr/local/bin/udp_client
# uninstall
uninstall:
rm -f /usr/local/bin/udp_server /usr/local/bin/udp_client
# run server
run-server: server
$(server_exe) -p 8080
# run client
run-client: client
$(client_exe) -s 127.0.0.1 -p 8080
# run network test
run-nettest: nettest
$(nettest_exe) -s 127.0.0.1 -p 8080 -t latency
# generate documentation
doc:
doxygen doxyfile
# help
help:
@echo "available targets:"
@echo " all - build server and client (default)"
@echo " server - build server only"
@echo " client - build client only"
@echo " test - build and run unit tests"
@echo " integration - build integration tests"
@echo " nettest - build network test tool"
@echo " debug - build with debug flags"
@echo " release - build with release flags"
@echo " check - run static analysis"
@echo " run-test - run unit tests"
@echo " run-integration - run integration tests"
@echo " clean - remove build files"
@echo " install - install system-wide"
@echo " uninstall - uninstall"
@echo " run-server - run server on port 8080"
@echo " run-client - run client connecting to localhost:8080"
@echo " run-nettest - run network latency test"
@echo " doc - generate documentation"
@echo " help - show this help"
.phony: all directories server client test integration nettest debug release \
check run-test run-integration clean install uninstall run-server \
run-client run-nettest doc help
5.3 完整的udpserver.hpp
#ifndef udpserver_hpp
#define udpserver_hpp
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <memory>
#include <atomic>
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include "log.hpp"
class udpserver {
protected:
int port_; // 服务器端口
int sockfd_; // 套接字描述符
std::atomic<bool> is_running_; // 服务器运行状态
struct sockaddr_in server_addr_; // 服务器地址结构
struct sockaddr_in client_addr_; // 客户端地址结构
socklen_t client_addr_len_; // 客户端地址长度
// 服务器配置参数
size_t buffer_size_; // 缓冲区大小
int timeout_sec_; // 接收超时时间(秒)
int timeout_usec_; // 接收超时时间(微秒)
bool reuse_addr_; // 是否重用地址
public:
// 构造函数
explicit udpserver(int port = 8080);
// 析构函数
virtual ~udpserver();
// 禁止拷贝构造和赋值
udpserver(const udpserver&) = delete;
udpserver& operator=(const udpserver&) = delete;
// 初始化服务器
virtual bool init();
// 运行服务器
virtual void run();
// 停止服务器
virtual void stop();
// 设置配置参数
void setbuffersize(size_t size) { buffer_size_ = size; }
void settimeout(int sec, int usec = 0) {
timeout_sec_ = sec;
timeout_usec_ = usec;
}
void setreuseaddr(bool reuse) { reuse_addr_ = reuse; }
// 获取服务器信息
int getport() const { return port_; }
bool isrunning() const { return is_running_; }
protected:
// 创建套接字
virtual bool createsocket();
// 绑定地址
virtual bool bindaddress();
// 设置套接字选项
virtual bool setsocketoptions();
// 处理接收到的数据
virtual void processdata(const char* data, ssize_t len,
const struct sockaddr_in& client_addr);
// 发送响应
virtual bool sendresponse(const char* data, ssize_t len,
const struct sockaddr_in& client_addr);
// 清理资源
virtual void cleanup();
};
// 高级udp服务器(带线程池)
class advancedudpserver : public udpserver {
private:
std::vector<std::thread> worker_threads_;
std::atomic<int> thread_count_;
int max_workers_;
// 线程池和工作队列
struct task {
std::vector<char> data;
struct sockaddr_in client_addr;
time_t receive_time;
task(const std::vector<char>& d, const struct sockaddr_in& addr)
: data(d), client_addr(addr), receive_time(time(nullptr)) {}
};
std::queue<task> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
std::atomic<bool> workers_running_;
public:
advancedudpserver(int port = 8080, int max_workers = 4);
~advancedudpserver() override;
bool init() override;
void run() override;
void stop() override;
// 获取线程池状态
int getactiveworkers() const { return thread_count_; }
size_t getqueuesize() const { return task_queue_.size(); }
private:
void workerthread(int thread_id);
void processtask(const task& task, int thread_id);
// 重写基类方法
bool setsocketoptions() override;
void processdata(const char* data, ssize_t len,
const struct sockaddr_in& client_addr) override;
// 任务调度
void addtask(const std::vector<char>& data,
const struct sockaddr_in& client_addr);
};
#endif // udpserver_hpp
5.4 完整的main.cpp
#include <iostream>
#include <csignal>
#include <cstdlib>
#include <memory>
#include <getopt.h>
#include "udpserver.hpp"
#include "advancedudpserver.hpp"
// 全局服务器指针,用于信号处理
std::unique_ptr<udpserver> g_server;
// 信号处理函数
void signalhandler(int signal) {
std::cout << "\nreceived signal " << signal << ", shutting down..." << std::endl;
if (g_server) {
g_server->stop();
}
}
// 显示使用帮助
void showusage(const char* program_name) {
std::cout << "udp server v1.0 - high performance udp server implementation\n";
std::cout << "build date: " << __date__ << " " << __time__ << "\n\n";
std::cout << "usage: " << program_name << " [options]\n\n";
std::cout << "options:\n";
std::cout << " -p, --port port server port (default: 8080)\n";
std::cout << " -b, --buffer size buffer size in bytes (default: 4096)\n";
std::cout << " -t, --timeout sec receive timeout in seconds (default: 5)\n";
std::cout << " -w, --workers num number of worker threads (default: 1)\n";
std::cout << " -a, --advanced use advanced server with thread pool\n";
std::cout << " -r, --no-reuse disable address reuse\n";
std::cout << " -v, --verbose enable verbose logging\n";
std::cout << " -d, --daemon run as daemon\n";
std::cout << " -c, --config file load configuration from file\n";
std::cout << " -h, --help show this help message\n";
std::cout << "\nexamples:\n";
std::cout << " " << program_name << " -p 9000 -b 8192\n";
std::cout << " " << program_name << " --port 8080 --workers 4 --advanced\n";
std::cout << " " << program_name << " --daemon --config /etc/udp-server.conf\n";
}
// 服务器配置结构
struct serverconfig {
int port = 8080;
size_t buffer_size = 4096;
int timeout_sec = 5;
int timeout_usec = 0;
int workers = 1;
bool advanced = false;
bool reuse_addr = true;
bool daemon = false;
bool verbose = false;
std::string config_file;
std::string log_file = "udp_server.log";
loglevel log_level = loglevel::info;
};
// 解析命令行参数
serverconfig parsearguments(int argc, char* argv[]) {
serverconfig config;
struct option long_options[] = {
{"port", required_argument, 0, 'p'},
{"buffer", required_argument, 0, 'b'},
{"timeout", required_argument, 0, 't'},
{"workers", required_argument, 0, 'w'},
{"advanced", no_argument, 0, 'a'},
{"no-reuse", no_argument, 0, 'r'},
{"verbose", no_argument, 0, 'v'},
{"daemon", no_argument, 0, 'd'},
{"config", required_argument, 0, 'c'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};
int opt;
int option_index = 0;
while ((opt = getopt_long(argc, argv, "p:b:t:w:arvdc:h",
long_options, &option_index)) != -1) {
switch (opt) {
case 'p':
config.port = std::atoi(optarg);
if (config.port <= 0 || config.port > 65535) {
std::cerr << "error: port must be between 1 and 65535" << std::endl;
exit(exit_failure);
}
break;
case 'b':
config.buffer_size = std::atoi(optarg);
if (config.buffer_size < 1024 || config.buffer_size > 65536) {
std::cerr << "error: buffer size must be between 1024 and 65536" << std::endl;
exit(exit_failure);
}
break;
case 't':
config.timeout_sec = std::atoi(optarg);
if (config.timeout_sec < 0) {
std::cerr << "error: timeout must be non-negative" << std::endl;
exit(exit_failure);
}
break;
case 'w':
config.workers = std::atoi(optarg);
if (config.workers < 1 || config.workers > 32) {
std::cerr << "error: number of workers must be between 1 and 32" << std::endl;
exit(exit_failure);
}
break;
case 'a':
config.advanced = true;
break;
case 'r':
config.reuse_addr = false;
break;
case 'v':
config.verbose = true;
config.log_level = loglevel::debug;
break;
case 'd':
config.daemon = true;
break;
case 'c':
config.config_file = optarg;
// 这里可以添加从配置文件加载配置的逻辑
break;
case 'h':
showusage(argv[0]);
exit(exit_success);
default:
std::cerr << "error: unknown option" << std::endl;
showusage(argv[0]);
exit(exit_failure);
}
}
return config;
}
// 守护进程化
void daemonize() {
pid_t pid = fork();
if (pid < 0) {
std::cerr << "failed to fork daemon: " << strerror(errno) << std::endl;
exit(exit_failure);
}
if (pid > 0) {
// 父进程退出
exit(exit_success);
}
// 子进程继续
umask(0);
pid_t sid = setsid();
if (sid < 0) {
std::cerr << "failed to create new session: " << strerror(errno) << std::endl;
exit(exit_failure);
}
if ((chdir("/")) < 0) {
std::cerr << "failed to change directory: " << strerror(errno) << std::endl;
exit(exit_failure);
}
// 关闭标准文件描述符
close(stdin_fileno);
close(stdout_fileno);
close(stderr_fileno);
// 重定向到/dev/null
open("/dev/null", o_rdonly);
open("/dev/null", o_wronly);
open("/dev/null", o_rdwr);
}
int main(int argc, char* argv[]) {
// 解析命令行参数
serverconfig config = parsearguments(argc, argv);
// 如果需要,转换为守护进程
if (config.daemon) {
daemonize();
}
// 注册信号处理
signal(sigint, signalhandler);
signal(sigterm, signalhandler);
signal(sigpipe, sig_ign); // 忽略管道破裂信号
try {
if (!config.daemon) {
std::cout << "=== udp server starting ===\n";
std::cout << "version: 1.0\n";
std::cout << "port: " << config.port << "\n";
std::cout << "buffer size: " << config.buffer_size << " bytes\n";
std::cout << "timeout: " << config.timeout_sec << " seconds\n";
std::cout << "workers: " << config.workers << "\n";
std::cout << "mode: " << (config.advanced ? "advanced (thread pool)" : "basic") << "\n";
std::cout << "log file: " << config.log_file << "\n";
std::cout << "log level: " << (config.verbose ? "debug" : "info") << "\n";
std::cout << "===========================\n\n";
}
// 初始化日志系统
logger::instance().init(config.log_file, config.log_level, !config.daemon);
log_info("udp server starting...");
log_info("configuration: port=%d, buffer=%zu, timeout=%d, workers=%d, mode=%s",
config.port, config.buffer_size, config.timeout_sec,
config.workers, config.advanced ? "advanced" : "basic");
// 创建服务器实例
if (config.advanced) {
g_server = std::make_unique<advancedudpserver>(config.port, config.workers);
} else {
g_server = std::make_unique<udpserver>(config.port);
}
// 配置服务器
g_server->setbuffersize(config.buffer_size);
g_server->settimeout(config.timeout_sec, config.timeout_usec);
g_server->setreuseaddr(config.reuse_addr);
// 初始化服务器
if (!g_server->init()) {
log_fatal("failed to initialize server");
return exit_failure;
}
log_info("server initialized successfully");
if (!config.daemon) {
std::cout << "server initialized successfully\n";
std::cout << "press ctrl+c to stop the server\n\n";
}
// 运行服务器
g_server->run();
} catch (const std::exception& e) {
log_error("exception: %s", e.what());
if (!config.daemon) {
std::cerr << "exception: " << e.what() << std::endl;
}
return exit_failure;
} catch (...) {
log_error("unknown exception occurred");
if (!config.daemon) {
std::cerr << "unknown exception occurred" << std::endl;
}
return exit_failure;
}
log_info("server stopped gracefully");
if (!config.daemon) {
std::cout << "\nserver stopped gracefully" << std::endl;
}
return exit_success;
}
总结
通过本文的详细讲解和代码实现,我们完成了一个完整的udp网络通信系统的设计与实现。这个系统具有以下特点和优势:
1. 系统架构特点
模块化设计:
- 服务器和客户端分离,职责明确
- 日志系统 独立,便于维护和扩展
- 配置系统灵活,支持命令行和配置文件
高性能设计:
- 支持多线程处理,充分利用多核cpu
- 智能缓冲区管理,避免内存碎片
- 异步i/o操作,减少等待时间
可靠性保障:
- 完善的错误处理和异常恢复机制
- 连接状态监控和自动清理
- 详细的日志记录,便于问题排查
2. 关键技术点
套接字编程核心:
- 深入理解了
socket()、bind()、recvfrom()、sendto()等系统调用 - 掌握了地址转换函数如
inet_pton()、inet_ntop()的使用 - 理解了字节序转换的重要性
并发处理:
- 多线程编程的最佳实践
- 线程安全的队列实现
- 条件变量的正确使用
网络优化:
- 缓冲区大小的优化配置
- 超时机制的合理设置
- 数据包分片和重组处理
3. 实际应用价值
教育意义:
- 完整的网络编程教学示例
- 良好的编码规范和架构设计示范
- 详细的注释和文档说明
实用价值:
- 可直接用于实际项目的网络通信模块
- 提供了性能测试和监控工具
- 支持多种运行模式和配置选项
扩展性:
- 易于添加新的协议支持
- 支持插件式功能扩展
- 良好的接口设计,便于二次开发
4. 性能优化建议
服务器端优化:
- 使用epoll或kqueue等i/o多路复用技术处理更多并发连接
- 实现连接池减少连接建立开销
- 使用内存池技术减少内存分配开销
客户端优化:
- 实现请求合并,减少网络包数量
- 添加压缩支持,减少数据传输量
- 实现智能重传机制,提高可靠性
网络优化:
- 支持ipv6双栈
- 添加quic协议支持
- 实现流量控制和拥塞避免算法
5. 安全考虑
基础安全:
- 输入验证和边界检查
- 缓冲区溢出防护
- 资源限制和配额管理
高级安全:
- 支持tls/dtls加密传输
- 实现身份验证和授权机制
- 添加dos攻击防护
6. 未来发展方向
功能增强:
- 添加web管理界面
- 支持集群部署
- 实现负载均衡
性能提升:
- 支持rdma高速网络
- 添加gpu加速支持
- 实现零拷贝技术
生态系统:
- 提供多种语言sdk
- 支持云原生部署
- 集成监控告警系统
通过本系统的实现,读者不仅能够掌握udp网络编程的核心技术,还能够学习到软件工程中的良好实践,包括模块化设计、错误处理、性能优化、测试策略等。这个系统可以作为学习网络编程的绝佳范例,也可以作为实际项目的基础框架进行扩展和优化。
网络编程是一个既深又广的领域,本文只是抛砖引玉。希望读者能够在此基础上继续探索,深入研究网络协议的各个层面,从应用层到底层实现,不断积累经验,最终成为网络编程的专家。
以上就是基于c++的udp网络通信系统设计与实现详解的详细内容,更多关于c++ udp网络通信的资料请关注代码网其它相关文章!
发表评论