This repository has been archived on 2021-11-25. You can view files and clone it, but cannot push or open issues/pull-requests.
KChatWSServer/KChatWSServer.cpp

290 lines
10 KiB
C++
Raw Permalink Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

#include "gsock.h"
#include "gsock_helper.h"
#include <string>
#include <iostream>
#include <sstream>
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
#include <queue>
#include "websocket.h"
using namespace std;
// 消息锁. 持有这个锁的时候可以继续获取m_sockvec
mutex m_msgbus;
vector<string> msgbus;
// socket锁. 注意如果若想获取两个锁必须先获取消息队列锁
mutex m_sockvec;
vector<pair<sock*, string>> sockvec;
condition_variable cond_msg;
void ChatWorker()
{
unique_lock<mutex> ulk(m_msgbus);
while (true)
{
cond_msg.wait(ulk);
unique_lock<mutex> ulsk(m_sockvec);
for (auto iter = sockvec.begin(); iter != sockvec.end(); )
{
bool bad = false;
for (const auto& msg : msgbus)
{
if (SendMsg(*(iter->first), msg) <= 0)
{
bad = true;
break;
}
}
if (bad)
{
delete iter->first;
iter = sockvec.erase(iter);
}
else ++iter;
}
msgbus.clear();
}
}
std::string MessageFilter(const std::string& str)
{
std::string output;
for (const auto& c : str)
{
if (c == '<' || c == '>') output.push_back(' ');
else output.push_back(c);
}
return output;
}
// 调用此函数之前必须已经持有m_sockvec锁
// 返回-1表明消息发送错误.但sock尚未被释放.
int SendOnlineList_L(sock* ps)
{
if (SendMsg(*ps, "#*Lclr") <= 0)
{
return -1;
}
else
{
// 发送在线列表
for (auto xter = sockvec.cbegin(); xter != sockvec.cend(); ++xter)
{
if (SendMsg(*ps, string("#*Ladd ") + xter->second) <= 0)
{
return -1;
}
}
return 0;
}
}
// 调用此函数之前必须已经持有m_sockvec锁
void NoticeJoin_L(sock* ps, const string& thisName)
{
for (auto iter = sockvec.begin(); iter != sockvec.end();)
{
if (iter->first != ps)
{
// 不是新加入的用户进行增量通知
if (SendMsg(*(iter->first), string("#*Ladd ") + thisName) <= 0)
{
delete iter->first;
iter = sockvec.erase(iter);
}
else
{
++iter;
}
}
else
{
// 如果是新加入的用户则发送在线列表
if (SendOnlineList_L(ps) < 0)
{
delete ps;
iter = sockvec.erase(iter);
}
else
{
++iter;
}
}
}
}
// 1: 消息是command而且已经完成处理
// 0: 消息不是command
// -1: 消息符合command格式,但不是已知的command
int DoCommand(const std::string& str,std::string& thisName)
{
if (str.size() < 6)
{
return 0;
}
string head = str.substr(0, 6);
if (head == "*#nick" && str.size() >= 8) // 昵称变更
{
string cmd1 = "#*Ldel " + thisName;
string cmd2 = "#*Ladd " + str.substr(7);
{
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(cmd1);
msgbus.push_back(cmd2);
cond_msg.notify_all();
}
cout << "Nickname changed from " << thisName << " to " << str.substr(7) << endl;
thisName = str.substr(7);
return 1;
}
// 未知命令
else if (str.substr(0, 2) == "*#") // 但是符合命令起始符
{
return -1;
}
else
{
return 0;
}
}
void Worker(sock* ps)
{
sock& s = *ps;
if (Handshake(s) < 0)
{
cout << "Failed to finish Handshake on " << ps << endl;
return;
}
string thisName;
{
string firstmsg;
int xret = ReadMsg(s, firstmsg);
if (xret <= 0 || firstmsg.size() < 6) // 读不出来或者命令太短了
{
delete ps;
return;
}
string firstcmd = firstmsg.substr(0, 6);
if (firstcmd == "*#newj")
{
thisName = "NoName";
}
else if (firstcmd == "*#name")
{
if (firstmsg.size() < 8) // 命令太短了
{
delete ps;
return;
}
else
{
thisName = firstmsg.substr(7);
}
}
else {
// 非法的用户或指令
delete ps;
return;
}
}
// 至此处,用户为合法用户. 加入到客户端列表中并且启动通知.
{
unique_lock<mutex> xlk(m_sockvec);
sockvec.push_back(make_pair(ps, thisName));
NoticeJoin_L(ps, thisName);
}
string data;
while (true)
{
int ret = ReadMsg(s, data);
cout << "ReadMsg: " << ret << endl;
cout << data << endl;
if (ret <= 0) break;
data = MessageFilter(data);
if (DoCommand(data, thisName) != 0)
{
continue;
}
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(data);
cond_msg.notify_all();
}
// 由read导致的退出
bool found = false;
{
// 查找当前worker的socket. 找到的话从队列中移除
unique_lock<mutex> ulk(m_sockvec);
for (auto iter = sockvec.begin(); iter != sockvec.end(); ++iter)
{
if (iter->first == ps)
{
sockvec.erase(iter);
found = true;
break;
}
}
}
// 如果找到,则由自己释放资源.(否则由ChatWorker释放这个socket,即因为send出错导致的退出)
if (found)
{
delete ps;
}
// 推送退出消息到队列中
{
data = "#*Ldel ";
data.append(thisName);
string xdata(thisName);
xdata.append(" disconnected.");
unique_lock<mutex> ulk(m_msgbus);
msgbus.push_back(data);
msgbus.push_back(xdata);
cond_msg.notify_all();
}
}
int main()
{
serversock t;
if (t.bind(59505) < 0 || t.listen(10) < 0)
{
cout << "Failed to bind or listen." << endl;
}
thread tdc(ChatWorker);
tdc.detach();
while (true)
{
sock* ps = new sock;
sock& s = *ps;
int ret = t.accept(s);
if (ret < 0)
{
cout << "Failed to accept. Stop." << endl;
break;
}
cout << "Accepted: " << ps << endl;
thread td(Worker, ps);
td.detach();
}
return 0;
}