需求分析
我们的目标是实现以下功能:
- 服务器能够启动多个客户端进程,客户端数量可配置
- 每个客户端在不同的端口上监听
- 服务器能够连接到所有客户端
- 服务器能够同时向所有客户端发送消息
- 客户端接收到消息后发送确认
- 程序能够优雅地处理终止信号
系统架构
整个系统由两个主要组件组成:
客户端程序(client):
- 在指定端口上监听连接
- 接收服务器发送的消息
- 发送确认消息给服务器
服务器程序(server):
- 从配置文件读取设置
- 启动多个客户端进程
- 连接到每个客户端
- 向所有客户端发送消息
- 接收客户端的确认消息
实现细节
客户端实现(client.cpp)
客户端程序需要创建一个socket,在指定端口上监听连接,接收服务器发送的消息,并发送确认。
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#define buffer_size 1024
#define default_port 8888
bool running = true;
void signalhandler(int signum) {
std::cout << "interrupt signal (" << signum << ") received.\n";
running = false;
}
int main(int argc, char* argv[]) {
// 注册信号处理函数,用于优雅关闭
signal(sigint, signalhandler);
signal(sigterm, signalhandler);
// 解析命令行参数
int port = default_port;
if (argc > 1) {
port = std::stoi(argv[1]);
}
// 客户端id(进程id)
pid_t pid = getpid();
std::cout << "client started with pid: " << pid << std::endl;
// 创建socket
int clientsocket = socket(af_inet, sock_stream, 0);
if (clientsocket < 0) {
std::cerr << "error creating socket" << std::endl;
return 1;
}
// 设置服务器地址
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = af_inet;
serveraddr.sin_port = htons(port);
serveraddr.sin_addr.s_addr = inaddr_any;
// 绑定socket到地址
if (bind(clientsocket, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0) {
std::cerr << "error binding socket to port " << port << std::endl;
close(clientsocket);
return 1;
}
// 监听连接
if (listen(clientsocket, 5) < 0) {
std::cerr << "error listening on socket" << std::endl;
close(clientsocket);
return 1;
}
std::cout << "client listening on port " << port << std::endl;
// 接受服务器连接
struct sockaddr_in serverconnaddr;
socklen_t serverlen = sizeof(serverconnaddr);
int serverconnection = accept(clientsocket, (struct sockaddr*)&serverconnaddr, &serverlen);
if (serverconnection < 0) {
std::cerr << "error accepting connection" << std::endl;
close(clientsocket);
return 1;
}
std::cout << "connected to server at "
<< inet_ntoa(serverconnaddr.sin_addr)
<< ":" << ntohs(serverconnaddr.sin_port)
<< std::endl;
// 接收服务器消息
char buffer[buffer_size];
while (running) {
memset(buffer, 0, buffer_size);
int bytesreceived = recv(serverconnection, buffer, buffer_size - 1, 0);
if (bytesreceived > 0) {
std::cout << "message received: " << buffer << std::endl;
// 发送确认
std::string ack = "client " + std::to_string(pid) + " received message";
send(serverconnection, ack.c_str(), ack.length(), 0);
}
else if (bytesreceived == 0) {
std::cout << "server disconnected" << std::endl;
break;
}
else {
if (errno != eintr) { // 忽略信号中断
std::cerr << "error receiving data: " << strerror(errno) << std::endl;
break;
}
}
}
// 清理资源
close(serverconnection);
close(clientsocket);
std::cout << "client terminated" << std::endl;
return 0;
}
服务器实现(server.cpp)
服务器程序需要启动多个客户端进程,连接到每个客户端,并向所有客户端发送消息。
#include <iostream>
#include <vector>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <thread>
#include <chrono>
#include <fstream>
#include <sstream>
#include <signal.h>
#include <sys/wait.h>
#include <mutex>
#include <condition_variable>
#define buffer_size 1024
#define default_base_port 8888
#define default_num_clients 3
#define config_file "server_config.txt"
std::mutex mtx;
std::condition_variable cv;
bool allclientsconnected = false;
int connectedclients = 0;
int totalclients = 0;
std::vector<pid_t> clientpids;
std::vector<int> clientsockets;
// 从配置文件读取配置
bool readconfig(int& numclients, int& baseport) {
std::ifstream configfile(config_file);
if (!configfile.is_open()) {
std::cout << "config file not found, using defaults" << std::endl;
return false;
}
std::string line;
while (std::getline(configfile, line)) {
std::istringstream iss(line);
std::string key;
if (std::getline(iss, key, '=')) {
std::string value;
if (std::getline(iss, value)) {
if (key == "num_clients") {
numclients = std::stoi(value);
} else if (key == "base_port") {
baseport = std::stoi(value);
}
}
}
}
configfile.close();
return true;
}
// 写入默认配置到文件
void writedefaultconfig() {
std::ofstream configfile(config_file);
if (configfile.is_open()) {
configfile << "num_clients=" << default_num_clients << std::endl;
configfile << "base_port=" << default_base_port << std::endl;
configfile.close();
std::cout << "created default configuration file" << std::endl;
} else {
std::cerr << "unable to create configuration file" << std::endl;
}
}
// 启动客户端进程
pid_t launchclient(int port) {
pid_t pid = fork();
if (pid == 0) {
// 子进程
std::string portstr = std::to_string(port);
execl("./client", "client", portstr.c_str(), nullptr);
// 如果execl返回,说明出错了
std::cerr << "error launching client: " << strerror(errno) << std::endl;
exit(1);
} else if (pid < 0) {
// fork失败
std::cerr << "fork failed: " << strerror(errno) << std::endl;
return -1;
}
// 父进程
return pid;
}
// 连接到客户端
bool connecttoclient(int& clientsocket, int port) {
clientsocket = socket(af_inet, sock_stream, 0);
if (clientsocket < 0) {
std::cerr << "error creating socket for client on port " << port << std::endl;
return false;
}
struct sockaddr_in clientaddr;
memset(&clientaddr, 0, sizeof(clientaddr));
clientaddr.sin_family = af_inet;
clientaddr.sin_port = htons(port);
clientaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
// 尝试连接,带重试
int retries = 10;
while (retries > 0) {
if (connect(clientsocket, (struct sockaddr*)&clientaddr, sizeof(clientaddr)) == 0) {
std::cout << "connected to client on port " << port << std::endl;
return true;
}
std::cout << "connection attempt failed, retrying in 1 second..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
retries--;
}
std::cerr << "failed to connect to client on port " << port << std::endl;
close(clientsocket);
return false;
}
// 连接客户端的线程函数
void clientconnectionthread(int port, int clientindex) {
int clientsocket;
if (connecttoclient(clientsocket, port)) {
std::lock_guard<std::mutex> lock(mtx);
clientsockets[clientindex] = clientsocket;
connectedclients++;
if (connectedclients == totalclients) {
allclientsconnected = true;
cv.notify_one();
}
}
}
// 向所有已连接的客户端发送消息
void sendtoallclients(const std::string& message) {
for (int socket : clientsockets) {
if (socket > 0) {
send(socket, message.c_str(), message.length(), 0);
// 接收确认
char buffer[buffer_size];
memset(buffer, 0, buffer_size);
int bytesreceived = recv(socket, buffer, buffer_size - 1, 0);
if (bytesreceived > 0) {
std::cout << "acknowledgment: " << buffer << std::endl;
}
}
}
}
// 清理资源
void cleanup() {
// 关闭所有客户端socket
for (int socket : clientsockets) {
if (socket > 0) {
close(socket);
}
}
// 终止所有客户端进程
for (pid_t pid : clientpids) {
if (pid > 0) {
kill(pid, sigterm);
waitpid(pid, nullptr, 0);
}
}
}
// 信号处理函数
void signalhandler(int signum) {
std::cout << "interrupt signal (" << signum << ") received.\n";
cleanup();
exit(signum);
}
int main() {
// 注册信号处理函数
signal(sigint, signalhandler);
signal(sigterm, signalhandler);
// 读取配置
int numclients = default_num_clients;
int baseport = default_base_port;
if (!readconfig(numclients, baseport)) {
writedefaultconfig();
}
std::cout << "starting server with " << numclients << " clients, base port: " << baseport << std::endl;
// 初始化客户端向量
totalclients = numclients;
clientpids.resize(numclients, -1);
clientsockets.resize(numclients, -1);
// 启动客户端进程
for (int i = 0; i < numclients; i++) {
int port = baseport + i;
pid_t pid = launchclient(port);
if (pid > 0) {
clientpids[i] = pid;
std::cout << "launched client " << i + 1 << " with pid " << pid << " on port " << port << std::endl;
} else {
std::cerr << "failed to launch client " << i + 1 << std::endl;
}
}
// 给客户端启动的时间
std::cout << "waiting for clients to start..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
// 连接到客户端
std::vector<std::thread> connectionthreads;
for (int i = 0; i < numclients; i++) {
if (clientpids[i] > 0) {
int port = baseport + i;
connectionthreads.push_back(std::thread(clientconnectionthread, port, i));
}
}
// 等待所有连接建立
{
std::unique_lock<std::mutex> lock(mtx);
if (!allclientsconnected) {
std::cout << "waiting for all clients to connect..." << std::endl;
cv.wait(lock, []{ return allclientsconnected; });
}
}
// 等待所有连接线程结束
for (auto& thread : connectionthreads) {
thread.join();
}
std::cout << "all clients connected. ready to send messages." << std::endl;
// 主循环
std::string message;
while (true) {
std::cout << "enter message to send to all clients (or 'exit' to quit): ";
std::getline(std::cin, message);
if (message == "exit") {
break;
}
std::cout << "sending message to all clients..." << std::endl;
sendtoallclients(message);
}
// 清理
cleanup();
std::cout << "server terminated" << std::endl;
return 0;
}
编译和运行
为了方便编译,我们可以创建一个makefile:
cc = g++ cflags = -std=c++11 -wall -pthread ldflags = -pthread all: client server client: client.cpp $(cc) $(cflags) -o client client.cpp $(ldflags) server: server.cpp $(cc) $(cflags) -o server server.cpp $(ldflags) clean: rm -f client server server_config.txt .phony: all clean
编译和运行的步骤:
# 编译 make # 运行服务器 ./server
代码解析
端口号传递机制
服务器如何将端口号传递给客户端是本系统的一个关键点。整个过程如下:
- 服务器为每个客户端分配一个唯一的端口号(基础端口号 + 索引)
- 服务器通过
fork()创建子进程 - 子进程通过
execl()执行客户端程序,将端口号作为命令行参数传递 - 客户端程序解析命令行参数获取端口号
- 客户端使用该端口号创建和绑定socket
- 服务器知道每个客户端的端口号,并使用这些端口号连接到客户端
信号处理
程序使用信号处理机制来实现优雅关闭:
signal(sigint, signalhandler); signal(sigterm, signalhandler);
这两行代码注册了信号处理函数,当程序接收到sigint(通常是按ctrl+c)或sigterm(通常是系统发送的终止信号)时,会调用signalhandler函数。在这个函数中,程序会清理资源并正常退出。
多线程连接
服务器使用多线程来并行连接到所有客户端:
std::vector<std::thread> connectionthreads;
for (int i = 0; i < numclients; i++) {
if (clientpids[i] > 0) {
int port = baseport + i;
connectionthreads.push_back(std::thread(clientconnectionthread, port, i));
}
}
这样可以同时尝试连接到所有客户端,而不是一个接一个地连接,提高了效率。
条件变量同步
服务器使用条件变量来等待所有客户端连接完成:
{
std::unique_lock<std::mutex> lock(mtx);
if (!allclientsconnected) {
std::cout << "waiting for all clients to connect..." << std::endl;
cv.wait(lock, []{ return allclientsconnected; });
}
}
当所有客户端都连接成功后,allclientsconnected变量会被设置为true,条件变量会通知主线程继续执行。
总结
本文介绍了如何在linux环境下实现一个服务器程序,该程序能够启动多个客户端进程,并通过socket与这些客户端进行通信。主要特点包括:
- 可配置的客户端数量
- 动态端口分配
- 并行连接
- 广播消息
- 确认机制
- 优雅关闭
这个示例展示了多进程、socket通信、多线程和同步机制的综合应用,可以作为网络编程的参考实现。
进一步改进
这个示例还可以进一步改进,例如:
- 添加错误恢复机制
- 实现客户端自动重连
- 添加消息队列
- 实现更复杂的通信协议
- 添加安全机制(如tls加密)
以上就是linux环境下实现多进程socket通信功能的详细内容,更多关于linux多进程socket通信的资料请关注代码网其它相关文章!
发表评论