C++网络编程学习:服务端多线程分离业务处理高负载

  • 使用的语言为C/C++
  • 源码支持的平台为:Windows / Linux

一、思路与准备

之前的服务端思路大概是如下的:

1
2
3
4
5
6
7
8
9
10
1.建立socket
2.绑定端口IP
3.监听端口
while(true)
{
4.使用select函数获取存在待监听事件的socket
5.如果有新的连接则与新的客户端连接
6.如果有待监听事件,则对其进行处理(接受与发送)
}
7.关闭socket

  但是,这样的架构在select处理事件较多时,很容易效率低下。对于这类问题,我们可以引入生产者与消费者模式,来处理此类并发问题。

  主线程为生产者线程,用来处理新客户端加入事件,把新客户端分配至消费者线程中。消费者线程便是我们建立的新线程,专门用来处理客户端发送的报文。这样就实现了事件处理的分离,从而使服务端处理效率更高。当过多客户端同时涌入时,可以更快的建立连接(因为有专门的线程用来处理这一事件);而当客户端发送报文频率很快时,多线程处理报文也会大大提高效率。

  • 大致改进思路如下,红色的为此次需要加入的核心,黑色为原本架构
    核心思路图

  所以我们首先需要新建一个线程类,用来封装关于消费者线程的内容,从而建立多线程架构。随后,在本次的改进中,我决定加入计时器用来统计数据以及显示数据,主要需要统计的数据为:当前客户端连接数量、数据包的每秒接收数量。同时,我也对报文类型进行了分离,把报文类型放在单独的头文件里,这样既方便更改也方便引用。

二、代码的改进

1.新建子线程类

  • 首先是新建线程类CellServer,其中包含的基础方法以及相关变量如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//线程类
class CellServer
{
public:
//构造
CellServer(SOCKET sock = INVALID_SOCKET);
//析构
~CellServer();
//关闭socket
void CloseSocket();
//判断是否工作中
bool IsRun();
//查询是否有待处理消息
bool OnRun();
//接收数据
int RecvData(ClientSocket *t_client);
//响应数据
void NetMsg(DataHeader *head,SOCKET temp_socket);
//增加客户端
void addClient(ClientSocket* client);
//启动线程
void Start();
//获取该线程内客户端数量
int GetClientCount()

private:
//缓冲区相关
char *_Recv_buf;//接收缓冲区
//socket相关
SOCKET _sock;
//正式客户队列
std::vector<ClientSocket*> _clients;//储存客户端
//客户缓冲区队列
std::vector<ClientSocket*> _clientsBuf;
std::mutex _mutex;//锁
//线程
std::thread* _pThread;

public:
std::atomic_int _recvCount;//接收包的数量
};

  • 大致处理思路如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    线程外:
    Start() 首先调用该方法启动线程

    新客户端加入:
    GetClientCount() 首先主线程使用这个方法获取各个线程内客户端数量
    //这个添加客户端的方法内涉及到临界区,需要上锁
    addClient(ClientSocket* client) 主线程找到客户端数量最少的线程,使用该线程添加客户端至缓冲队列

    线程内:
    OnRun()//运行线程
    {
    while(IsRun())//判断是否工作中
    {
    1.将缓冲队列中的客户数据加入正式队列
    2.正式客户队列为空的话,continue本次循环
    3.select选择出待处理事件,错误的话就关闭所有连接CloseSocket()
    4.对待处理事件进行接收RecvData(),接收包的数量加一,随后处理NetMsg()
    }
    }

2.客户端主线程类的更改

  由于我们处理事件都改为在子线程中,所以首先主线程中是不需要处理报文消息了,所以类中接收消息和处理消息的方法都可以删除了。同时我们加入Start方法用来启动子线程,加入time4msg方法用来显示子线程中的客户端数量、每秒收包数等数据。

  • 主线程类TcpServer,更改后如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class TcpServer : INetEvent
{
public:
//构造
TcpServer();
//析构
~TcpServer();
//初始化socket 返回1为正常
int InitSocket();
//绑定IP/端口
int Bind(const char* ip,unsigned short port);
//监听端口
int Listen(int n);
//接受连接
int Accept();
//添加客户端至服务端
void AddClientToServer(ClientSocket* pClient);
//线程启动
void Start();
//关闭socket
void CloseSocket();
//判断是否工作中
bool IsRun();
//查询是否有待处理消息
bool OnRun();
//显示各线程数据信息
void time4msg();

private:
//socket相关
SOCKET _sock;
std::vector<ClientSocket*> _clients;//储存客户端
std::vector<CellServer*> _cellServers;//子线程们
//计时器
mytimer _time;
};

  • 大致处理思路如下:计时器相关请点这里
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    调用TcpServer封装类建立服务端的流程:
    1.InitSocket() 建立一个socket
    2.Bind(const char* ip,unsigned short port) 绑定端口和IP
    3.Listen(int n) 监听
    4.Start() 线程启动
    while(5.IsRun()) 主线程循环
    {
    6.OnRun() 开始select选择处理事件
    }
    7.CloseSocket() 关闭socket

    主线程内:
    OnRun()
    {
    time4msg()显示数据
    select选择出新客户端加入事件
    如果有新客户端加入,调用Accept()接受连接
    Accept()连接成功后,调用AddClientToServer(ClientSocket* pClient)分配客户端到子线程中
    }

    AddClientToServer()内:
    首先调用子线程的GetClientCount()方法获取各条子线程中的客户端数量
    随后调用子线程的addClient(ClientSocket* client)方法,把新客户端添加至客户端最少的线程中

    time4msg()内:
    首先GetSecond()获取计时器的计时
    如果大于一秒,就统计客户端的情况:子线程内_recvCount为收包数,主线程内_clients.size()获取客户端数量
    显示后UpDate()重置计时器,并且重置收包数,从而达到统计每秒收包数的作用

3.引入接口,实现子线程向主线程通信

  通过前两步的实现,多线程服务端已经初步完成,接下来需要进行一些完善。
  我们很容易可以发现,子线程对象是在主线程Start()方法被创建的,随后被加入容器_cellServers储存。这就导致主线程中可以调用子线程类中的方法与成员变量,但是子线程中却无法调用主线程的方法与成员变量。从而导致当子线程中有客户端退出时,主线程无法了解。
  对于这种情况,我们可以创建一个接口,让主线程类继承这个接口,子线程即可通过这个接口调用主线程中的特定方法。

  • 接口类INetEvent如下:
    1
    2
    3
    4
    5
    6
    7
    class INetEvent
    {
    public:
    //有客户端退出
    virtual void OnLeave(ClientSocket* pClient) = 0;
    private:
    };
  • 主线程类与子线程类中的相关实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1.首先是主线程类继承该接口:
class TcpServer : INetEvent

2.随后实现接口中的虚方法:
//客户端退出
void OnLeave(ClientSocket* pClient)
{
//找到退出的客户端
for(int n=0; n<_clients.size(); n++)
{
if(_clients[n] == pClient)
{
auto iter = _clients.begin() + n;
if(iter != _clients.end())
{
_clients.erase(iter);//移除
}
}
}
}
即可实现调用该方法,移除客户端容器中指定客户端

3.随后在子线程类中添加私有成员变量:
private:
INetEvent* _pNetEvent;
创建接口对象

4.创建方法,让接口对象指向主线程类
void setEventObj(INetEvent* event)
{
_pNetEvent = event;
}
event传进去主线程即可,接口对象即指向主线程

5.主线程创建、启动子线程类时,调用该方法,传入自身this
子线程对象->setEventObj(this);

6.随后即可通过子线程调用主线程的OnLeave()方法删除客户端
_pNetEvent->OnLeave(要删除的客户端);

三、详细代码实现

1.计时器头文件 mytimer.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#ifndef MY_TIMER_H_
#define MY_TIMER_H_

#include<chrono>

class mytimer
{
private:
std::chrono::steady_clock::time_point _begin;//起始时间
std::chrono::steady_clock::time_point _end;//终止时间
public:
mytimer()
{
_begin = std::chrono::steady_clock::time_point();
_end = std::chrono::steady_clock::time_point();
}

virtual ~mytimer(){};

//调用update时,使起始时间等于当前时间
void UpDate()
{
_begin = std::chrono::steady_clock::now();
}

//调用getsecond方法时,经过的时间为当前时间减去之前统计过的起始时间。
double GetSecond()
{
_end = std::chrono::steady_clock::now();
//使用duration类型变量进行时间的储存 duration_cast是类型转换方法
std::chrono::duration<double> temp = std::chrono::duration_cast<std::chrono::duration<double>>(_end - _begin);
return temp.count();//count() 获取当前时间的计数数量
}

};

#endif

2.命令头文件 CMD.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//枚举类型记录命令 
enum cmd
{
CMD_LOGIN,//登录
CMD_LOGINRESULT,//登录结果
CMD_LOGOUT,//登出
CMD_LOGOUTRESULT,//登出结果
CMD_NEW_USER_JOIN,//新用户登入
CMD_ERROR//错误
};
//定义数据包头
struct DataHeader
{
short cmd;//命令
short date_length;//数据的长短
};
//包1 登录 传输账号与密码
struct Login : public DataHeader
{
Login()//初始化包头
{
this->cmd = CMD_LOGIN;
this->date_length = sizeof(Login);
}
char UserName[32];//用户名
char PassWord[32];//密码
};
//包2 登录结果 传输结果
struct LoginResult : public DataHeader
{
LoginResult()//初始化包头
{
this->cmd = CMD_LOGINRESULT;
this->date_length = sizeof(LoginResult);
}
int Result;
};
//包3 登出 传输用户名
struct Logout : public DataHeader
{
Logout()//初始化包头
{
this->cmd = CMD_LOGOUT;
this->date_length = sizeof(Logout);
}
char UserName[32];//用户名
};
//包4 登出结果 传输结果
struct LogoutResult : public DataHeader
{
LogoutResult()//初始化包头
{
this->cmd = CMD_LOGOUTRESULT;
this->date_length = sizeof(LogoutResult);
}
int Result;
};
//包5 新用户登入 传输通告
struct NewUserJoin : public DataHeader
{
NewUserJoin()//初始化包头
{
this->cmd = CMD_NEW_USER_JOIN;
this->date_length = sizeof(NewUserJoin);
}
char UserName[32];//用户名
};

3.服务端头文件 TcpServer.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
#ifndef _TcpServer_hpp_
#define _TcpServer_hpp_

#ifdef _WIN32
#define FD_SETSIZE 10240
#define WIN32_LEAN_AND_MEAN
#include<winSock2.h>
#include<windows.h>
#pragma comment(lib,"ws2_32.lib")//链接此动态链接库 windows特有
#else
#include<arpa/inet.h>//selcet
#include<unistd.h>//uni std
#include<string.h>

#define SOCKET int
#define INVALID_SOCKET (SOCKET)(~0)
#define SOCKET_ERROR (-1)
#endif

#include"CMD.h"//命令
#include"mytimer.hpp"//计时器
#include<stdio.h>
#include<vector>
#include<thread>
#include<mutex>
#include<atomic>

//缓冲区大小
#ifndef RECV_BUFFER_SIZE
#define RECV_BUFFER_SIZE 4096
#endif

//线程数量
#define _THREAD_COUNT 4

//客户端类
class ClientSocket
{
public:
//构造
ClientSocket(SOCKET sockfd = INVALID_SOCKET)
{
_sockfd = sockfd;
//缓冲区相关
_Msg_buf = new char[RECV_BUFFER_SIZE*10];
_Len_buf = 0;
}
//析构
virtual ~ClientSocket()
{
delete[] _Msg_buf;
}

//获取socket
SOCKET GetSockfd()
{
return _sockfd;
}

//获取缓冲区
char* MsgBuf()
{
return _Msg_buf;
}

//获取缓冲区尾部变量
int GetLen()
{
return _Len_buf;
}

//设置缓冲区尾巴变量
void SetLen(int len)
{
_Len_buf = len;
}

private:
SOCKET _sockfd;
//缓冲区相关
char *_Msg_buf;//消息缓冲区
int _Len_buf;//缓冲区数据尾部变量
};

//事件接口
class INetEvent
{
public:
//有客户端退出
virtual void OnLeave(ClientSocket* pClient) = 0;
private:
};

//线程类
class CellServer
{
public:
//构造
CellServer(SOCKET sock = INVALID_SOCKET)
{
_sock = sock;
_pThread = nullptr;
_pNetEvent = nullptr;
_recvCount = 0;
//缓冲区相关
_Recv_buf = new char[RECV_BUFFER_SIZE];
}
//析构
~CellServer()
{
delete[] _Recv_buf;
//关闭socket
CloseSocket();
_sock = INVALID_SOCKET;
}

//处理事件
void setEventObj(INetEvent* event)
{
_pNetEvent = event;
}

//关闭socket
void CloseSocket()
{
if(INVALID_SOCKET != _sock)
{
#ifdef _WIN32
//关闭客户端socket
for(int n=0; n<_clients.size(); ++n)
{
closesocket(_clients[n]->GetSockfd());
delete _clients[n];
}
//关闭socket
closesocket(_sock);
//清除windows socket 环境
WSACleanup();
#else
//关闭客户端socket
for(int n=0; n<_clients.size(); ++n)
{
close(_clients[n]->GetSockfd());
delete _clients[n];
}
//关闭socket/LINUX
close(_sock);
#endif
_sock = INVALID_SOCKET;
_clients.clear();
}
}


//判断是否工作中
bool IsRun()
{
return _sock != INVALID_SOCKET;
}

//查询是否有待处理消息
bool OnRun()
{
while(IsRun())
{
//将缓冲队列中的客户数据加入正式队列
if(_clientsBuf.size() > 0)
{
std::lock_guard<std::mutex> lock(_mutex);//上锁
for(auto client :_clientsBuf)
{
_clients.push_back(client);
}
_clientsBuf.clear();
}
//如果没有需要处理的客户端就跳过
if(_clients.empty())
{
std::chrono::milliseconds t(1);//休眠一毫秒
std::this_thread::sleep_for(t);
continue;
}
fd_set fdRead;//建立集合
FD_ZERO(&fdRead);//清空集合
SOCKET maxSock = _clients[0]->GetSockfd();//最大socket
//把连接的客户端 放入read集合
for(int n=_clients.size()-1; n>=0; --n)
{
FD_SET(_clients[n]->GetSockfd(),&fdRead);
if(maxSock < _clients[n]->GetSockfd())
{
maxSock = _clients[n]->GetSockfd();
}
}
//select函数筛选select
int ret = select(maxSock+1,&fdRead,0,0,0);
if(ret<0)
{
printf("select任务结束\n");
CloseSocket();
return false;
}
//遍历所有socket 查看是否有待处理事件
for(int n=0; n<_clients.size(); ++n)
{
if(FD_ISSET(_clients[n]->GetSockfd(),&fdRead))
{
if(-1 == RecvData(_clients[n]))//处理请求 客户端退出的话
{
std::vector<ClientSocket*>::iterator iter = _clients.begin()+n;//找到退出客户端的地址
if(iter != _clients.end())//如果是合理值
{
if(_pNetEvent)//主线程中删除客户端
{
_pNetEvent->OnLeave(_clients[n]);
}
delete _clients[n];
_clients.erase(iter);//移除
}
}
}
}
//printf("空闲时间处理其他业务\n");
}
}

//接收数据
int RecvData(ClientSocket *t_client)//处理数据
{
_recvCount++;//收包数量加一
//接收客户端发送的数据
int buf_len = recv(t_client->GetSockfd(), _Recv_buf, RECV_BUFFER_SIZE, 0);
if(buf_len<=0)
{
printf("客户端已退出\n");
return -1;
}

//将接收缓冲区的数据拷贝到消息缓冲区
memcpy(t_client->MsgBuf() + t_client->GetLen(), _Recv_buf, buf_len);
//消息缓冲区的数据末尾后移
t_client->SetLen(t_client->GetLen() + buf_len);
//判断消息缓冲区的数据长度是否大于等于包头长度
while(t_client->GetLen() >= sizeof(DataHeader))//处理粘包问题
{
//选出包头数据
DataHeader* header = (DataHeader*)t_client->MsgBuf();
//判断消息缓冲区内数据长度是否大于等于报文长度 避免少包问题
if(t_client->GetLen() >= header->date_length)
{
//计算出消息缓冲区内剩余未处理数据的长度
int size = t_client->GetLen() - header->date_length;
//响应数据
NetMsg(header,t_client->GetSockfd());
//将消息缓冲区剩余未处理的数据前移
memcpy(t_client->MsgBuf(), t_client->MsgBuf() + header->date_length, size);
//消息缓冲区的数据末尾前移
t_client->SetLen(size);
}
else
{
//消息缓冲区数据不足
break;
}
}
return 0;
}

//响应数据
void NetMsg(DataHeader *head,SOCKET temp_socket)
{

//printf("接收到包头,命令:%d,数据长度:%d\n",head->cmd,head->date_length);
switch(head->cmd)
{
case CMD_LOGIN://登录 接收登录包体
{
Login *login = (Login*)head;
/*
进行判断操作
*/
//printf("%s已登录\n密码:%s\n",login->UserName,login->PassWord);
LoginResult *result = new LoginResult;
result->Result = 1;
//SendData(result,temp_socket);
}
break;
case CMD_LOGOUT://登出 接收登出包体
{
Logout *logout = (Logout*)head;
/*
进行判断操作
*/
//printf("%s已登出\n",logout->UserName);
LogoutResult *result = new LogoutResult();
result->Result = 1;
//SendData(result,temp_socket);
}
break;
default://错误
{
head->cmd = CMD_ERROR;
head->date_length = 0;
//SendData(head,temp_socket);
}
break;
}
}

//增加客户端
void addClient(ClientSocket* client)
{
std::lock_guard<std::mutex> lock(_mutex);
//_mutex.lock();
_clientsBuf.push_back(client);
//_mutex.unlock();
}

//启动线程
void Start()
{
_pThread = new std::thread(std::mem_fun(&CellServer::OnRun),this);

}

//获取该线程内客户端数量
int GetClientCount()
{
return _clients.size() + _clientsBuf.size();
}

private:
//缓冲区相关
char *_Recv_buf;//接收缓冲区
//socket相关
SOCKET _sock;
//正式客户队列
std::vector<ClientSocket*> _clients;//储存客户端
//客户缓冲区
std::vector<ClientSocket*> _clientsBuf;
std::mutex _mutex;//锁
//线程
std::thread* _pThread;
//退出事件接口
INetEvent* _pNetEvent;

public:
std::atomic_int _recvCount;//接收包的数量
};

//服务端类
class TcpServer : INetEvent
{
public:
//构造
TcpServer()
{
_sock = INVALID_SOCKET;
}

//析构
virtual ~TcpServer()
{
//关闭socket
CloseSocket();
}

//初始化socket 返回1为正常
int InitSocket()
{
#ifdef _WIN32
//启动windows socket 2,x环境
WORD ver = MAKEWORD(2,2);
WSADATA dat;
if(0 != WSAStartup(ver,&dat))
{
return -1;//-1为环境错误
}
#endif
//创建socket
if(INVALID_SOCKET != _sock)
{
printf("<Socket=%d>关闭连接\n",_sock);
CloseSocket();//如果之前有连接 就关闭连接
}
_sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(INVALID_SOCKET == _sock)
{
return 0;//0为socket创建错误
}
return 1;
}

//绑定IP/端口
int Bind(const char* ip,unsigned short port)
{
//如果为无效套接字 则初始化
if(INVALID_SOCKET == _sock)
{
InitSocket();
}
//绑定网络端口和IP地址
sockaddr_in _myaddr = {};
_myaddr.sin_family = AF_INET;//IPV4
_myaddr.sin_port = htons(port);//端口
#ifdef _WIN32
if(ip)//ip为空则监听所有网卡
{
_myaddr.sin_addr.S_un.S_addr = inet_addr(ip);//IP
}
else
{
_myaddr.sin_addr.S_un.S_addr = INADDR_ANY;//IP
}
#else
if(ip)//ip为空则监听所有网卡
{
_myaddr.sin_addr.s_addr = inet_addr(ip);//IP
}
else
{
_myaddr.sin_addr.s_addr = INADDR_ANY;//IP
}
#endif
if(SOCKET_ERROR == bind(_sock,(sockaddr*)&_myaddr,sizeof(sockaddr_in)))//socket (强制转换)sockaddr结构体 结构体大小
{
printf("绑定失败\n");
return 0;
}
else
{
printf("绑定成功\n绑定端口为%d\n",port);
return 1;
}
}

//监听端口
int Listen(int n)
{
//如果为无效套接字 则提示
if(INVALID_SOCKET == _sock)
{
printf("请先初始化套接字并绑定IP端口\n");
return 0;
}
//监听网络端口
if(SOCKET_ERROR == listen(_sock,n))//最大连接队列
{
printf("监听失败\n");
return 0;
}
else
{
printf("监听成功\n");
return 1;
}
}

//接受连接
int Accept()
{
//等待接收客户端连接
sockaddr_in clientAddr = {};//新建sockadd结构体接收客户端数据
int addr_len = sizeof(sockaddr_in);//获取sockadd结构体长度
SOCKET temp_socket = INVALID_SOCKET;//声明客户端套接字
#ifdef _WIN32
temp_socket = accept(_sock,(sockaddr*)&clientAddr,&addr_len);//自身套接字 客户端结构体 结构体大小
#else
temp_socket = accept(_sock,(sockaddr*)&clientAddr,(socklen_t*)&addr_len);//自身套接字 客户端结构体 结构体大小
#endif
if(INVALID_SOCKET == temp_socket)//接收失败
{
printf("<Socket=%d>错误,接受到无效客户端SOCKET\n",temp_socket);
return 0;
}
else
{
//printf("新客户端加入 count: %d\nIP地址为:%s \n", _clients.size(), inet_ntoa(clientAddr.sin_addr));
//群发所有客户端 通知新用户登录
//NewUserJoin *user_join = new NewUserJoin();
//strcpy(user_join->UserName,inet_ntoa(clientAddr.sin_addr));
//SendDataToAll(user_join);
//将新的客户端加入动态数组
AddClientToServer(new ClientSocket(temp_socket));
return 1;
}
}

//添加客户端至服务端
void AddClientToServer(ClientSocket* pClient)
{
_clients.push_back(pClient);
//找出客户端最少的线程 然后加入
auto pMinServer = _cellServers[0];
for(auto pCellServer : _cellServers)
{
if(pMinServer->GetClientCount() > pCellServer->GetClientCount())
{
pMinServer = pCellServer;
}
}
pMinServer->addClient(pClient);
}

//线程启动
void Start()
{
for(int n=0; n<_THREAD_COUNT; n++)
{
//线程加入容器
auto ser = new CellServer(_sock);
_cellServers.push_back(ser);
ser->setEventObj(this);
ser->Start();
}
}

//关闭socket
void CloseSocket()
{
if(INVALID_SOCKET != _sock)
{
#ifdef _WIN32
//关闭客户端socket
for(int n=0; n<_clients.size(); ++n)
{
closesocket(_clients[n]->GetSockfd());
delete _clients[n];
}
//关闭socket
closesocket(_sock);
//清除windows socket 环境
WSACleanup();
#else
//关闭客户端socket
for(int n=0; n<_clients.size(); ++n)
{
close(_clients[n]->GetSockfd());
delete _clients[n];
}
//关闭socket/LINUX
close(_sock);
#endif
_sock = INVALID_SOCKET;
_clients.clear();
}
}


//判断是否工作中
bool IsRun()
{
return _sock != INVALID_SOCKET;
}

//查询是否有待处理消息
bool OnRun()
{
if(IsRun())
{
time4msg();//查看各线程数据信息
fd_set fdRead;//建立集合
//fd_set fdWrite;
//fd_set fdExcept;
FD_ZERO(&fdRead);//清空集合
//FD_ZERO(&fdWrite);
//FD_ZERO(&fdExcept);
FD_SET(_sock,&fdRead);//放入集合
//FD_SET(_sock,&fdWrite);
//FD_SET(_sock,&fdExcept);
timeval s_t = {0,0};//select最大响应时间

//select函数筛选select
int ret = select(_sock+1,&fdRead,0,0,&s_t);
if(ret<0)
{
printf("select任务结束\n");
CloseSocket();
return false;
}
if(FD_ISSET(_sock,&fdRead))//获取是否有新socket连接
{
FD_CLR(_sock,&fdRead);//清理
Accept();//连接
}
return true;
}
return false;
}

//显示各线程数据信息
void time4msg()
{
auto t1 = _time.GetSecond();
if(1.0 <= t1)
{
int recvCount = 0;
for(auto ser: _cellServers)
{
recvCount += ser->_recvCount;
ser->_recvCount = 0;
}
//时间间隔 本机socket连接序号 客户端数量 每秒收包数
printf("time<%lf>,socket<%d>,clients<%d>,recvCount<%d>\n", t1, _sock, _clients.size(),(int)(recvCount/t1));
_time.UpDate();
}
}

//发送数据
int SendData(DataHeader *head,SOCKET temp_socket)
{
if(IsRun() && head)
{
send(temp_socket,(const char*)head,head->date_length,0);
return 1;
}
return 0;
}

//向所有人发送数据
void SendDataToAll(DataHeader *head)
{
for(int n=0;n<_clients.size();++n)
{
SendData(head, _clients[n]->GetSockfd());
}
}

//客户端退出
void OnLeave(ClientSocket* pClient)
{
//找到退出的客户端
for(int n=0; n<_clients.size(); n++)
{
if(_clients[n] == pClient)
{
auto iter = _clients.begin() + n;
if(iter != _clients.end())
{
_clients.erase(iter);//移除
}
}
}
}

private:
//socket相关
SOCKET _sock;
std::vector<ClientSocket*> _clients;//储存客户端
std::vector<CellServer*> _cellServers;//线程处理
//计时器
mytimer _time;
};

#endif

4.服务端样例代码 server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include"TcpServer.hpp"

int main()
{
printf("Welcome\n");

//建立tcp对象
TcpServer *tcp1 = new TcpServer();
//建立一个socket
tcp1->InitSocket();
//绑定端口和IP
tcp1->Bind(NULL,8888);
//监听
tcp1->Listen(5);
//线程启动
tcp1->Start();
//循环
while(tcp1->IsRun())
{
tcp1->OnRun();
}
//关闭
tcp1->CloseSocket();

printf("任务结束,程序已退出");
getchar();

return 0;
}