This repository has been archived on 2021-11-25. You can view files and clone it, but cannot push or open issues or pull requests.
KChatWSServer/KChatWSServer.cpp
2018-09-07 16:09:54 +08:00

290 lines
10 KiB
C++
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "gsock.h"
#include "gsock_helper.h"
#include <string>
#include <iostream>
#include <sstream>
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
#include <queue>
#include "websocket.h"
using namespace std;
// 消息锁. 持有这个锁的时候可以继续获取m_sockvec
mutex m_msgbus;
vector<string> msgbus;
// socket锁. 注意如果若想获取两个锁必须先获取消息队列锁
mutex m_sockvec;
vector<pair<sock*, string>> sockvec;
condition_variable cond_msg;
void ChatWorker()
{
unique_lock<mutex> ulk(m_msgbus);
while (true)
{
cond_msg.wait(ulk);
unique_lock<mutex> ulsk(m_sockvec);
for (auto iter = sockvec.begin(); iter != sockvec.end(); )
{
bool bad = false;
for (const auto& msg : msgbus)
{
if (SendMsg(*(iter->first), msg) <= 0)
{
bad = true;
break;
}
}
if (bad)
{
delete iter->first;
iter = sockvec.erase(iter);
}
else ++iter;
}
msgbus.clear();
}
}
std::string MessageFilter(const std::string& str)
{
std::string output;
for (const auto& c : str)
{
if (c == '<' || c == '>') output.push_back(' ');
else output.push_back(c);
}
return output;
}
// 调用此函数之前必须已经持有m_sockvec锁
// 返回-1表明消息发送错误.但sock尚未被释放.
int SendOnlineList_L(sock* ps)
{
if (SendMsg(*ps, "#*Lclr") <= 0)
{
return -1;
}
else
{
// 发送在线列表
for (auto xter = sockvec.cbegin(); xter != sockvec.cend(); ++xter)
{
if (SendMsg(*ps, string("#*Ladd ") + xter->second) <= 0)
{
return -1;
}
}
return 0;
}
}
// 调用此函数之前必须已经持有m_sockvec锁
void NoticeJoin_L(sock* ps, const string& thisName)
{
for (auto iter = sockvec.begin(); iter != sockvec.end();)
{
if (iter->first != ps)
{
// 不是新加入的用户进行增量通知
if (SendMsg(*(iter->first), string("#*Ladd ") + thisName) <= 0)
{
delete iter->first;
iter = sockvec.erase(iter);
}
else
{
++iter;
}
}
else
{
// 如果是新加入的用户则发送在线列表
if (SendOnlineList_L(ps) < 0)
{
delete ps;
iter = sockvec.erase(iter);
}
else
{
++iter;
}
}
}
}
// 1: 消息是command而且已经完成处理
// 0: 消息不是command
// -1: 消息符合command格式,但不是已知的command
int DoCommand(const std::string& str,std::string& thisName)
{
if (str.size() < 6)
{
return 0;
}
string head = str.substr(0, 6);
if (head == "*#nick" && str.size() >= 8) // 昵称变更
{
string cmd1 = "#*Ldel " + thisName;
string cmd2 = "#*Ladd " + str.substr(7);
{
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(cmd1);
msgbus.push_back(cmd2);
cond_msg.notify_all();
}
cout << "Nickname changed from " << thisName << " to " << str.substr(7) << endl;
thisName = str.substr(7);
return 1;
}
// 未知命令
else if (str.substr(0, 2) == "*#") // 但是符合命令起始符
{
return -1;
}
else
{
return 0;
}
}
void Worker(sock* ps)
{
sock& s = *ps;
if (Handshake(s) < 0)
{
cout << "Failed to finish Handshake on " << ps << endl;
return;
}
string thisName;
{
string firstmsg;
int xret = ReadMsg(s, firstmsg);
if (xret <= 0 || firstmsg.size() < 6) // 读不出来或者命令太短了
{
delete ps;
return;
}
string firstcmd = firstmsg.substr(0, 6);
if (firstcmd == "*#newj")
{
thisName = "NoName";
}
else if (firstcmd == "*#name")
{
if (firstmsg.size() < 8) // 命令太短了
{
delete ps;
return;
}
else
{
thisName = firstmsg.substr(7);
}
}
else {
// 非法的用户或指令
delete ps;
return;
}
}
// 至此处,用户为合法用户. 加入到客户端列表中并且启动通知.
{
unique_lock<mutex> xlk(m_sockvec);
sockvec.push_back(make_pair(ps, thisName));
NoticeJoin_L(ps, thisName);
}
string data;
while (true)
{
int ret = ReadMsg(s, data);
cout << "ReadMsg: " << ret << endl;
cout << data << endl;
if (ret <= 0) break;
data = MessageFilter(data);
if (DoCommand(data, thisName) != 0)
{
continue;
}
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(data);
cond_msg.notify_all();
}
// 由read导致的退出
bool found = false;
{
// 查找当前worker的socket. 找到的话从队列中移除
unique_lock<mutex> ulk(m_sockvec);
for (auto iter = sockvec.begin(); iter != sockvec.end(); ++iter)
{
if (iter->first == ps)
{
sockvec.erase(iter);
found = true;
break;
}
}
}
// 如果找到,则由自己释放资源.(否则由ChatWorker释放这个socket,即因为send出错导致的退出)
if (found)
{
delete ps;
}
// 推送退出消息到队列中
{
data = "#*Ldel ";
data.append(thisName);
string xdata(thisName);
xdata.append(" disconnected.");
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(data);
msgbus.push_back(xdata);
cond_msg.notify_all();
}
}
int main()
{
serversock t;
if (t.bind(59505) < 0 || t.listen(10) < 0)
{
cout << "Failed to bind or listen." << endl;
}
thread tdc(ChatWorker);
tdc.detach();
while (true)
{
sock* ps = new sock;
sock& s = *ps;
int ret = t.accept(s);
if (ret < 0)
{
cout << "Failed to accept. Stop." << endl;
break;
}
cout << "Accepted: " << ps << endl;
thread td(Worker, ps);
td.detach();
}
return 0;
}