run-all succ except http

2.0
jarodruan 2020-02-12 16:50:16 +08:00
parent 7c359b7281
commit 5229509b66
42 changed files with 342 additions and 346 deletions

View File

@ -13,7 +13,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: install
run: brew install bison flex
run: brew install bison flex cmake; cmake --version
- name: git
run: git submodule update --init --recursive
- name: configure

View File

@ -15,7 +15,7 @@ jobs:
- name: git
run: git submodule update --init --recursive
- name: configure
run: mkdir build;cd build;cmake .. -A X64
run: mkdir build;cd build; cmake --version; cmake .. -A X64
- name: make
run: cd build; cmake --build . --config release --target run-all

View File

@ -13,7 +13,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: install
run: sudo apt-get install -y bison flex
run: sudo apt-get install -y bison flex cmake; cmake --version
- name: git
run: git submodule update --init --recursive
- name: configure

View File

@ -13,7 +13,7 @@ add_definitions(-DTARS_HTTP2=${TARS_HTTP2})
set(TARS_PROTOBUF 0)
add_definitions(-DTARS_PROTOBUF=${TARS_PROTOBUF})
set(TARS_ZLIB 0)
add_definitions(-TARS_ZLIB=${TARS_ZLIB})
add_definitions(-DTARS_ZLIB=${TARS_ZLIB})
# list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/modules/")
set(CMAKE_VERBOSE_MAKEFILE off)

View File

@ -20,14 +20,14 @@
#merge net and sync thread
mergenetasync = 0
#module name
modulename = Test.AServer
modulename = TestApp.AServer
</client>
<server>
#not cout
closecout = 0
#app name
app = Test
app = TestApp
#server name
server = AServer
#path
@ -59,7 +59,7 @@
#imp thread num
threads = 5
#servant
servant = Test.AServer.AServantObj
servant = TestApp.AServer.AServantObj
#queue capacity
queuecap = 1000000
#tars protocol

View File

@ -28,7 +28,7 @@ void BServantImp::initialize()
{
//initialize servant here:
//...
_pPrx = Application::getCommunicator()->stringToProxy<AServantPrx>("Test.AServer.AServantObj@tcp -h 127.0.0.1 -p 9000 -t 10000");
_pPrx = Application::getCommunicator()->stringToProxy<AServantPrx>("TestApp.AServer.AServantObj@tcp -h 127.0.0.1 -p 9000 -t 10000");
}
//////////////////////////////////////////////////////
void BServantImp::destroy()

View File

@ -20,14 +20,14 @@
#merge net and sync thread
mergenetasync = 0
#module name
modulename = Test.BServer
modulename = TestApp.BServer
</client>
<server>
#not cout
closecout = 0
#app name
app = Test
app = TestApp
#server name
server = BServer
#path
@ -61,7 +61,7 @@
#imp thread num
threads = 5
#servant
servant = Test.BServer.BServantObj
servant = TestApp.BServer.BServantObj
#queue capacity
queuecap = 1000000
#tars protocol

View File

@ -41,7 +41,7 @@ Test1::Test1()
{
// _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000");
// _comm.setProperty("stat", "tars.tarsstat.StatObj");
_prx = _comm.stringToProxy<BServantPrx>("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
_prx = _comm.stringToProxy<BServantPrx>("TestApp.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
}
Test1::~Test1()

View File

@ -43,7 +43,7 @@ TestCoroutine::TestCoroutine(int iNum)
: _num(iNum)
{
// _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000");
_prx = _comm.stringToProxy<BServantPrx>("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
_prx = _comm.stringToProxy<BServantPrx>("TestApp.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
// _comm.stringToProxy(_sObj, _prx);
}

View File

@ -78,7 +78,7 @@ TestCoroutine::TestCoroutine(int iNum)
: _num(iNum)
{
// _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000");
_prx = _comm.stringToProxy<BServantPrx>("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
_prx = _comm.stringToProxy<BServantPrx>("TestApp.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100");
// _comm.stringToProxy(_sObj, _prx);
}

View File

@ -6,6 +6,24 @@ using namespace std;
using namespace tars;
Communicator* _comm;
static string sObjName = "TestApp.CustomServer.CustomServantObj@tcp -h 127.0.0.1 -t 60000 -p 9400";
struct Param
{
int count;
string call;
int thread;
int buffersize;
int netthread;
ServantPrx servantPrx;
};
Param param;
std::atomic<int> callback_count(0);
//The response packet decoding function decodes the data received from the server according to the specific format and resolves it to theResponsePacket
static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
@ -85,9 +103,10 @@ public:
}
else
{
cout << "succ:" << msg->response->sBuffer.data() << endl;
// cout << "succ:" << string(msg->response->sBuffer.data(), msg->response->sBuffer.size()) << endl;
}
++callback_count;
return msg->response->iRet;
}
@ -95,40 +114,151 @@ public:
typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;
int main(int argc,char**argv)
void syncCall(int c)
{
try
{
Communicator comm;
string sObjName = "TestApp.CustomServer.CustomServantObj@tcp -h 127.0.0.1 -t 60000 -p 9400";
string buffer(param.buffersize, 'a');
ServantPrx prx = comm.stringToProxy<ServantPrx>(sObjName);
int64_t t = TC_Common::now2us();
//发起远程调用
for (int i = 0; i < c; ++i)
{
string r;
ProxyProtocol prot;
prot.requestFunc = customRequest;
prot.responseFunc = customResponse;
try
{
ResponsePacket rsp;
param.servantPrx->rpc_call(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), rsp);
}
catch(exception& e)
{
cout << "exception:" << e.what() << endl;
}
++callback_count;
}
prx->tars_set_protocol(prot);
string buf = "helloword";
ResponsePacket rsp;
prx->rpc_call(prx->tars_gen_requestid(), "doCustomFunc", buf.c_str(), buf.length(), rsp);
cout << "sync response buffer:" << rsp.sBuffer.data() << endl;
CustomCallBackPtr cb = new CustomCallBack();
prx->rpc_call_async(prx->tars_gen_requestid(), "doCustomFunc", buf.c_str(), buf.length(), cb);
TC_Common::sleep(2000);
}
catch(std::exception&e)
{
cerr<<"std::exception:"<<e.what()<<endl;
}
catch(...)
{
cerr<<"unknown exception"<<endl;
}
return 0;
int64_t cost = TC_Common::now2us() - t;
cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
}
void asyncCall(int c)
{
int64_t t = TC_Common::now2us();
string buffer(param.buffersize, 'a');
//发起远程调用
for (int i = 0; i < c; ++i)
{
try
{
CustomCallBackPtr cb = new CustomCallBack();
param.servantPrx->rpc_call_async(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), cb);
}
catch(exception& e)
{
cout << "exception:" << e.what() << endl;
}
}
int64_t cost = TC_Common::now2us() - t;
cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
}
int main(int argc, char *argv[])
{
try
{
if (argc < 6)
{
cout << "Usage:" << argv[0] << "--count=1000 --call=[sync|async] --thread=1 --buffersize=1000 --netthread=1" << endl;
return 0;
}
TC_Option option;
option.decode(argc, argv);
param.count = TC_Common::strto<int>(option.getValue("count"));
if(param.count <= 0) param.count = 1000;
param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
if(param.buffersize <= 0) param.buffersize = 1000;
param.call = option.getValue("call");
if(param.call.empty()) param.call = "sync";
param.thread = TC_Common::strto<int>(option.getValue("thread"));
if(param.thread <= 0) param.thread = 1;
param.netthread = TC_Common::strto<int>(option.getValue("netthread"));
if(param.netthread <= 0) param.netthread = 1;
_comm = new Communicator();
_comm->setProperty("sendqueuelimit", "1000000");
_comm->setProperty("asyncqueuecap", "1000000");
_comm->setProperty("netthread", TC_Common::tostr(param.netthread));
param.servantPrx = _comm->stringToProxy<ServantPrx>(sObjName);
// TarsRollLogger::getInstance()->logger()->setLogLevel(6);
ProxyProtocol prot;
prot.requestFunc = customRequest;
prot.responseFunc = customResponse;
param.servantPrx->tars_set_protocol(prot);
param.servantPrx->tars_connect_timeout(5000);
param.servantPrx->tars_async_timeout(60*1000);
int64_t start = TC_Common::now2us();
std::function<void(int)> func;
if (param.call == "sync")
{
func = syncCall;
}
else if (param.call == "async")
{
func = asyncCall;
}
else
{
cout << "no func, exits" << endl;
exit(0);
}
vector<std::thread*> vt;
for(int i = 0 ; i< param.thread; i++)
{
vt.push_back(new std::thread(func, param.count));
}
std::thread print([&]{while(callback_count != param.count * param.thread) {
cout << param.call << ": ----------finish count:" << callback_count << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
};});
for(size_t i = 0 ; i< vt.size(); i++)
{
vt[i]->join();
delete vt[i];
}
cout << "(pid:" << std::this_thread::get_id() << ")"
<< "(count:" << param.count << ")"
<< "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
<< endl;
while(callback_count != param.count * param.thread) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
print.join();
cout << "----------finish count:" << callback_count << endl;
}
catch(exception &ex)
{
cout << ex.what() << endl;
}
cout << "main return." << endl;
return 0;
}

View File

@ -24,7 +24,7 @@ int CustomServantImp::doRequest(tars::TarsCurrentPtr current, vector<char>& resp
const vector<char>& request = current->getRequestBuffer();
response = request;
cout << "doRequest: requestId:" << current->getRequestId() << ", funcName:" << current->getFuncName() << endl;
// cout << "doRequest: requestId:" << current->getRequestId() << ", funcName:" << current->getFuncName() << endl;
return 0;
}

View File

@ -27,7 +27,7 @@ using namespace tars;
Communicator* _comm;
static string http2Obj = "Test.HttpServer.http2Obj@tcp -h 127.0.0.1 -p 8082";
static string http2Obj = "TestApp.Http2Server.http2Obj@tcp -h 127.0.0.1 -p 8082";
struct Param
{

View File

@ -1,50 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "HttpImp.h"
#include "servant/Application.h"
using namespace std;
//////////////////////////////////////////////////////
void HttpImp::initialize()
{
//initialize servant here:
//...
}
//////////////////////////////////////////////////////
void HttpImp::destroy()
{
//destroy servant here:
//...
}
int HttpImp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
{
TC_HttpRequest request;
vector<char> v = current->getRequestBuffer();
string sBuf;
sBuf.assign(&v[0],v.size());
request.decode(sBuf);
TC_HttpResponse rsp;
string s="hello";
rsp.setResponse(s.c_str(),s.size());
rsp.encode(buffer);
return 0;
}

View File

@ -1,51 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef _HttpImp_H_
#define _HttpImp_H_
#include "servant/Application.h"
/**
*
*
*/
class HttpImp : public Servant
{
public:
/**
*
*/
virtual ~HttpImp() {}
/**
*
*/
virtual void initialize();
/**
*
*/
virtual void destroy();
/**
*
*/
int doRequest(TarsCurrentPtr current, vector<char> &buffer);
};
/////////////////////////////////////////////////////
#endif

View File

@ -15,7 +15,6 @@
*/
#include "HttpServer.h"
#include "HttpImp.h"
#include "Http2Imp.h"
#include "util/tc_http2.h"
@ -46,9 +45,7 @@ HttpServer::initialize()
//initialize application here:
//...
addServant<HttpImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj");
addServant<Http2Imp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".Http2Obj");
addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",&TC_NetWorkBuffer::parseHttp);
addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".Http2Obj", &parseHttp2);
}
/////////////////////////////////////////////////////////////////

View File

@ -21,14 +21,14 @@
#合并回调线程和网络线程(以网络线程个数为准)
mergenetasync = 0
#模块名称
modulename = Test.HttpServer
modulename = TestApp.HttpServer
</client>
#定义所有绑定的IP
<server>
closecout = 0
#应用名称
app = Test
app = TestApp
#服务名称
server = HttpServer
#服务的数据目录,可执行文件,配置文件等
@ -55,7 +55,7 @@
allow =
maxconns = 4096
threads = 5
servant = Test.HttpServer.HttpObj
servant = TestApp.HttpServer.HttpObj
queuecap = 1000000
protocol = not-tars
</HttpAdapter>
@ -65,7 +65,7 @@
allow =
maxconns = 4096
threads = 5
servant = Test.HttpServer.Http2Obj
servant = TestApp.HttpServer.Http2Obj
queuecap = 1000000
protocol = not-tars
</Http2Adapter>

View File

@ -31,7 +31,7 @@ using namespace tup;
Communicator* _comm;
static string httpObj = "Test.HttpServer.httpObj@tcp -h 127.0.0.1 -p 8081";
static string httpObj = "TestApp.HttpServer.httpObj@tcp -h 127.0.0.1 -p 8081";
struct Param
{
@ -62,8 +62,7 @@ void httpCall(int excut_num)
{
TC_HttpResponse stHttpRsp;
// iRet = doRequest(stHttpReq, tcpClient1, stHttpRsp, 3000);
iRet = stHttpReq.doRequest(stHttpRsp, 3000);
iRet = stHttpReq.doRequest(stHttpRsp, 3000);
if (iRet != 0)
{
@ -125,8 +124,7 @@ void syncRpc(int c)
try
{
param.servantPrx->http_call("GET", "/", header, "helloworld", rheader, rbody);
param.servantPrx->http1_call("GET", "/", header, "helloworld", rheader, rbody);
}
catch(exception& e)
{
@ -162,7 +160,7 @@ int main(int argc, char *argv[])
_comm = new Communicator();
// TarsRollLogger::getInstance()->logger()->setLogLevel(6);
// TarsRollLogger::getInstance()->logger()->setLogLevel(6);
_comm->setProperty("sendqueuelimit", "1000000");
_comm->setProperty("asyncqueuecap", "1000000");

View File

@ -21,14 +21,14 @@
#合并回调线程和网络线程(以网络线程个数为准)
mergenetasync = 0
#模块名称
modulename = Test.HttpServer
modulename = TestApp.HttpServer
</client>
#定义所有绑定的IP
<server>
closecout = 0
#应用名称
app = Test
app = TestApp
#服务名称
server = HttpServer
#服务的数据目录,可执行文件,配置文件等
@ -55,7 +55,7 @@
allow =
maxconns = 4096
threads = 5
servant = Test.HttpServer.HttpObj
servant = TestApp.HttpServer.HttpObj
queuecap = 1000000
protocol = not-tars
</HttpAdapter>

View File

@ -300,6 +300,8 @@ int main(int argc, char *argv[])
_comm->setProperty("netthread", TC_Common::tostr(param.netthread));
// TarsRollLogger::getInstance()->logger()->setLogLevel(6);
param.pPrx = _comm->stringToProxy<HelloPrx>(helloObj);
param.pPrx->tars_connect_timeout(5000);

View File

@ -1,16 +0,0 @@
#-----------------------------------------------------------------------
APP := TestApp
TARGET := TestHelloServer
CONFIG :=
STRIP_FLAG:= N
INCLUDE +=
LIB +=
#-----------------------------------------------------------------------
include /home/tarsproto/TestApp/HelloServer/HelloServer.mk
include /usr/local/tars/cpp/makefile/makefile.tars
#-----------------------------------------------------------------------

View File

@ -1,16 +0,0 @@
#-----------------------------------------------------------------------
APP := TestApp
TARGET := HelloServer
CONFIG :=
STRIP_FLAG:= N
INCLUDE +=
LIB +=
#-----------------------------------------------------------------------
include /usr/local/tars/cpp/makefile/makefile.tars
#-----------------------------------------------------------------------

View File

@ -38,7 +38,7 @@ private:
Test1::Test1(const string &sStr)
{
// _comm.setProperty("locator","tars.tarsregistry.QueryObj@tcp -h 172.27.194.147 -p 17890 -t 50000");
_comm.setProperty("locator","tars.tarsregistry.QueryObj@tcp -h 10.120.129.226 -p 17890 -t 10000");
// _comm.setProperty("locator","tars.tarsregistry.QueryObj@tcp -h 10.120.129.226 -p 17890 -t 10000");
_comm.setProperty("stat", "tars.tarsstat.StatObj");
_comm.stringToProxy(sStr, prx);
}

View File

@ -1,16 +0,0 @@
#-----------------------------------------------------------------------
APP := Test
TARGET := TarsStressClient
CONFIG :=
STRIP_FLAG:= N
INCLUDE +=
LIB +=
#-----------------------------------------------------------------------
include /home/tarsproto/Test/TarsStressServer/TarsStressServer.mk
include /usr/local/tars/cpp/makefile/makefile.tars
#-----------------------------------------------------------------------

View File

@ -1,16 +0,0 @@
#-----------------------------------------------------------------------
APP := Test
TARGET := TarsStressServer
CONFIG :=
STRIP_FLAG:= N
TARS2CPP_FLAG:=
INCLUDE +=
LIB +=
#-----------------------------------------------------------------------
include /usr/local/tars/cpp/makefile/makefile.tars
#-----------------------------------------------------------------------

View File

@ -15,7 +15,8 @@ sleep 3
echo "client: .\\bin\\Release\\CustomClient.exe"
.\\bin\\Release\\CustomClient.exe 5
.\\bin\\Release\\CustomClient.exe --count=10000 --thread=2 --call=sync --netthread=2 --buffersize=100
.\\bin\\Release\\CustomClient.exe --count=10000 --thread=2 --call=async --netthread=2 --buffersize=100
killall -9 CustomServer.exe

View File

@ -15,7 +15,9 @@ sleep 2
echo "client: ./bin/CustomClient"
./bin/CustomClient 5
./bin/CustomClient --count=10000 --thread=2 --call=sync --netthread=1 --buffersize=100
./bin/CustomClient --count=10000 --thread=2 --call=async --netthread=1 --buffersize=100
killall -2 CustomServer

View File

@ -86,6 +86,13 @@ AdapterProxy::~AdapterProxy()
{
}
//AdapterProxy *AdapterProxy::clone()
//{
// AdapterProxy *adapterProxy = new AdapterProxy(_objectProxy, _endpoint, _communicator);
// adapterProxy->checkActive(true);
// return adapterProxy;
//}
string AdapterProxy::getSlaveName(const string& sSlaveName)
{
string::size_type pos = sSlaveName.find(".");
@ -164,7 +171,9 @@ int AdapterProxy::invoke(ReqMessage * msg)
msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
//链表是空的, 则直接发送当前这条数据, 如果链表非空或者发送失败了, 则放到队列中, 等待下次发送
// TLOGERROR("[TARS][AdapterProxy::invoke insert timeout queue fail, queue size:" << _timeoutQueue->size() << ", id:" << msg->request.iRequestId << ", obj:" <<_objectProxy->name() << ", desc:" << _endpoint.desc() <<endl);
//链表是空的, 则直接发送当前这条数据, 如果链表非空或者发送失败了, 则放到队列中, 等待下次发送
if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)
{
TLOGTARS("[TARS][AdapterProxy::invoke push (send) objname:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",id:" << msg->request.iRequestId << endl);
@ -185,7 +194,7 @@ int AdapterProxy::invoke(ReqMessage * msg)
bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
if(!bFlag)
{
TLOGERROR("[TARS][AdapterProxy::invoke fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id:" << msg->request.iRequestId << ", objname:" <<_objectProxy->name() << ",desc:" << _endpoint.desc() <<endl);
TLOGERROR("[TARS][AdapterProxy::invoke fail1: insert timeout queue fail, queue size:" << _timeoutQueue->size() << ", id:" << msg->request.iRequestId << ", obj:" <<_objectProxy->name() << ", desc:" << _endpoint.desc() <<endl);
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
@ -201,7 +210,7 @@ int AdapterProxy::invoke(ReqMessage * msg)
bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
if(!bFlag)
{
TLOGERROR("[TARS][AdapterProxy::invoke fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << "," <<_objectProxy->name() << ", " << _endpoint.desc() <<endl);
TLOGERROR("[TARS][AdapterProxy::invoke fail2: insert timeout queue fail, queue size:" << _timeoutQueue->size() << ", id:" << msg->request.iRequestId << ", obj:" <<_objectProxy->name() << ", desc:" << _endpoint.desc() <<endl);
msg->eStatus = ReqMessage::REQ_EXC;
@ -221,11 +230,6 @@ int AdapterProxy::invoke(ReqMessage * msg)
void AdapterProxy::doInvoke()
{
if(_timeoutQueue->sendListEmpty())
{
return ;
}
while(!_timeoutQueue->sendListEmpty())
{
ReqMessage * msg = NULL;
@ -480,7 +484,6 @@ void AdapterProxy::setInactive()
TLOGTARS("[TARS][AdapterProxy::setInactive objname:" << _objectProxy->name() << ",desc:" << _endpoint.desc() << ",inactive" << endl);
}
// void AdapterProxy::finishInvoke(ResponsePacket & rsp)
void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
{
TLOGTARS("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:" << _objectProxy->name() << ",desc:" << _endpoint.desc()

View File

@ -65,7 +65,7 @@ vector<char> ProxyProtocol::tarsRequest(RequestPacket& request, Transceiver *)
vector<char> ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiver *trans)
{
request.iRequestId = trans->getAdapterProxy()->getId();
request.iRequestId = trans->getAdapterProxy()->getId();
TC_HttpRequest httpRequest;

View File

@ -216,21 +216,6 @@ void QueryEpBase::setObjName(const string & sObjName)
}
}
// bool isRealEndpoint(const string & s, const string & s1)
// {
// if (s1.empty())
// return true;
// const string delim = " \t\n\r";
// string::size_type beg;
// string::size_type end = 0;
// beg = s1.find_first_not_of(delim, end);
// if (s1[beg] != 't' && s1[beg] != 'u' && s1[beg] != 's')
// return false;
// return true;
// }
vector<string> QueryEpBase::sepEndpoint(const string& sEndpoints)
{
vector<string> vEndpoints;
@ -809,6 +794,7 @@ void EndpointManager::doNotify()
bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
{
pAdapterProxy = NULL;
//刷新主控
refreshReg(E_DEFAULT,"");
@ -1086,8 +1072,7 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
{
return thisHash[hash];
}
if(!thisHash[hash]->isConnTimeout() &&
!thisHash[hash]->isConnExc())
if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
{
conn.push_back(thisHash[hash]);
}

View File

@ -167,7 +167,9 @@ void ObjectProxy::invoke(ReqMessage * msg)
return ;
}
msg->adapter = pAdapterProxy;
// pAdapterProxy = pAdapterProxy->clone();
msg->adapter = pAdapterProxy;
pAdapterProxy->invoke(msg);
}
@ -200,16 +202,14 @@ void ObjectProxy::doInvoke()
return;
}
// pAdapterProxy = pAdapterProxy->clone();
msg->adapter = pAdapterProxy;
pAdapterProxy->invoke(msg);
}
}
// const vector<AdapterProxy*> & ObjectProxy::getAdapters() const
// {
// return _endpointManger->getAdapters();
// }
void ObjectProxy::doInvokeException(ReqMessage * msg)
{
// TLOGTARS("[TARS][ObjectProxy::doInvokeException, objname:" << _name << "]" << endl);

View File

@ -829,12 +829,43 @@ void ServantProxy::rpc_call_async(uint32_t iRequestId,
invoke(msg, bCoro);
}
void ServantProxy::http_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody)
void ServantProxy::http1_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody)
{
ReqMessage* msg = new ReqMessage();
msg->init(ReqMessage::SYNC_CALL, NULL, "");
msg->bFromRpc = true;
msg->request.sServantName = uri;
msg->request.sFuncName = method;
msg->request.iRequestId = this->tars_gen_requestid();
// 使用下面两个字段保存头部和包体
msg->request.context = headers;
msg->request.sBuffer.assign(body.begin(), body.end());
invoke(msg);
rheaders.swap(msg->response->status);
rbody.assign(msg->response->sBuffer.begin(), msg->response->sBuffer.end());
delete msg;
msg = NULL;
}
void ServantProxy::http2_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody)
{
ReqMessage* msg = new ReqMessage();
@ -857,9 +888,9 @@ void ServantProxy::http_call(const std::string& method,
msg = NULL;
}
void ServantProxy::http_call_async(const std::map<std::string, std::string>& headers,
const std::string& body,
const HttpCallbackPtr &cb)
void ServantProxy::http2_call_async(const std::map<std::string, std::string>& headers,
const std::string& body,
const HttpCallbackPtr &cb)
{
ReqMessage * msg = new ReqMessage();

View File

@ -310,8 +310,6 @@ void TarsCurrent::sendResponse(int iRet, const vector<char> &buffer, const map<
//先预留4个字节长度
os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
// TarsOutputStream<BufferWriter> os;
if (_request.iVersion != TUPVERSION)
{
ResponsePacket response;
@ -378,9 +376,9 @@ void TarsCurrent::sendResponse(int iRet, const vector<char> &buffer, const map<
response.writeTo(os);
}
assert(send->buffer()->length() >= 4);
assert(os.getLength() >= 4);
iHeaderLen = htonl((int)(send->buffer()->length()));
iHeaderLen = htonl((int)(os.getLength()));
memcpy(os.getByteBuffer().data(), (const char *)&iHeaderLen, sizeof(iHeaderLen));

View File

@ -355,8 +355,9 @@ int Transceiver::doRequest()
int Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff, bool forceSend)
{
//空数据 直接返回成功
if(buff->empty())
return eRetOk;
if(buff->empty()) {
return eRetOk;
}
if(eConnected != _connStatus)
{
@ -376,14 +377,16 @@ int Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff, b
//buf不为空, 表示之前的数据还没发送完, 直接返回失败
//等buffer可写了,epoll会通知写事件
if(!_sendBuffer.empty())
return eRetError;
if(!_sendBuffer.empty()) {
return eRetError;
}
int iRet = this->send(buff->buffer(), (uint32_t)buff->length(), 0);
//失败,直接返回
if(iRet<0)
return eRetError;
if(iRet<0) {
return eRetError;
}
//没有全部发送完,写buffer 返回成功
if(iRet < (int)buff->length())
@ -446,8 +449,6 @@ int TcpTransceiver::doResponse()
int recvCount = 0;
shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
do
{
char buff[BUFFER_SIZE] = {0x00};
@ -463,7 +464,9 @@ int TcpTransceiver::doResponse()
TC_NetWorkBuffer::PACKET_TYPE ret;
do
{
ret = _adapterProxy->getObjProxy()->getProxyProtocol().responseFunc(_recvBuffer, *rsp.get());
shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
ret = _adapterProxy->getObjProxy()->getProxyProtocol().responseFunc(_recvBuffer, *rsp.get());
if (ret == TC_NetWorkBuffer::PACKET_ERR) {
TLOGERROR( "[TARS][tcp doResponse," << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << "," << _ep.desc() << ",tcp recv decode error" << endl);
@ -472,15 +475,13 @@ int TcpTransceiver::doResponse()
}
else if (ret == TC_NetWorkBuffer::PACKET_FULL) {
_adapterProxy->finishInvoke(rsp);
rsp = std::make_shared<ResponsePacket>();
}
else {
break;
}
}
while (ret == TC_NetWorkBuffer::PACKET_FULL);
while (ret == TC_NetWorkBuffer::PACKET_FULL && !_recvBuffer.empty());
//接收的数据小于buffer大小, 内核会再次通知你
if(iRet < BUFFER_SIZE)
@ -650,9 +651,9 @@ int TcpTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
if (iRet < 0 && !TC_Socket::isPending())
{
TLOGTARS("[TARS][tcp send," << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << "," << _ep.desc()
<< ",fail! errno:" << TC_Exception::getSystemCode() << ","
<< TC_Exception::parseError(TC_Exception::getSystemCode()) << ",close]" << endl);
TLOGTARS("[TARS][tcp send," << _adapterProxy->getObjProxy()->name() << ", fd:" << _fd << "," << _ep.desc()
<< ", fail! errno:" << TC_Exception::getSystemCode() << ", "
<< TC_Exception::parseError(TC_Exception::getSystemCode()) << ", close]" << endl);
close();
@ -665,8 +666,8 @@ int TcpTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
_adapterProxy->getObjProxy()->getCommunicatorEpoll()->modFd(_fd, &_fdInfo, EPOLLIN | EPOLLOUT);
}
#endif
TLOGTARS("[TARS][tcp send," << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << ","
<< _ep.desc() << ",len:" << iRet << "]" << endl);
TLOGTARS("[TARS][tcp send," << _adapterProxy->getObjProxy()->name() << ", fd:" << _fd << ","
<< _ep.desc() << ", len:" << iRet << "]" << endl);
return iRet;
}

@ -1 +1 @@
Subproject commit 2e3c1abd10ad801f74f7673f54f7d45b7962318f
Subproject commit 7c22d46777d76c8c08c2161f7cb4d9f0f45991d7

View File

@ -50,6 +50,12 @@ public:
*/
~AdapterProxy();
/**
* clone
* @return
*/
// AdapterProxy *clone();
/**
* server
*/
@ -65,7 +71,6 @@ public:
* server
*/
void finishInvoke(shared_ptr<ResponsePacket> &rsp);
// void finishInvoke(ResponsePacket &rsp);
/**
* ,false
@ -73,7 +78,6 @@ public:
* @onlyCheck:
* @return bool
*/
// bool checkActive(bool bForceConnect = false, bool onlyCheck = false);
bool checkActive(bool bForceConnect = false);
/**
@ -89,7 +93,6 @@ public:
/**
* stat
*/
// void doStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg);
void mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg);
/**
*

View File

@ -247,27 +247,6 @@ protected:
*/
ObjectProxyFactory * _objectProxyFactory;
// /*
// * 异步线程数组
// */
// //异步线程(跨通信器共享)
// vector<AsyncProcThread*> _asyncThread;//[MAX_THREAD_NUM];
// /*
// * 异步队列的统计上报的对象
// */
// PropertyReport * _reportAsyncQueue;
// /*
// * 异步线程数目
// */
// size_t _asyncThreadNum;
/*
* 线seq
*/
// size_t _asyncSeq;
/*
* 线id
*/

View File

@ -619,21 +619,32 @@ public:
const ServantProxyCallbackPtr& callback,
bool bCoro = false);
/**
* http1
*/
void http1_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody);
/**
* http1 or 2
* http2
*/
void http_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody);
void http2_call(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body,
std::map<std::string, std::string>& rheaders,
std::string& rbody);
/**
* http2
*/
void http_call_async(const std::map<std::string, std::string>& headers,
const std::string& body,
const HttpCallbackPtr &cb);
void http2_call_async(const std::map<std::string, std::string>& headers,
const std::string& body,
const HttpCallbackPtr &cb);
/**
* RequestPacketcontext

View File

@ -131,11 +131,13 @@ public:
class SendContext
{
public:
SendContext(const shared_ptr<RecvContext> &context, char cmd) : _context(context), _cmd(cmd) {}
SendContext(const shared_ptr<RecvContext> &context, char cmd) : _context(context), _cmd(cmd)
{
_sbuffer = std::make_shared<TC_NetWorkBuffer::Buffer>();
}
const shared_ptr<RecvContext> &getRecvContext() { return _context; }
const shared_ptr<TC_NetWorkBuffer::Buffer> & buffer() { return _sbuffer; }
// const vector<char> &buffer() const { return _sbuffer; }
char cmd() const { return _cmd; }
uint32_t uid() const { return _context->uid(); }
int fd() const { return _context->fd(); }
@ -179,7 +181,7 @@ public:
shared_ptr<SendContext> createSendContext() { return std::make_shared<SendContext>(shared_from_this(), 's'); }
shared_ptr<SendContext> createCloseContext() { return std::make_shared<SendContext>(shared_from_this(), 'c'); }
int64_t _recvUS;
// int64_t _recvUS;
protected:
uint32_t _uid; /**连接标示*/

View File

@ -61,8 +61,16 @@ public:
void swap(vector<char> &buff, size_t pos = 0)
{
if(_pos != 0)
{
buff.resize(length());
memcpy(&buff[0], buffer(), length());
}
else
{
buff.swap(_buffer);
}
_pos = pos;
buff.swap(_buffer);
}
void clear()
@ -99,6 +107,8 @@ public:
size_t length() const { return _buffer.size() - _pos; }
size_t pos() const { return _pos; }
void add(uint32_t ret)
{
_pos += ret;
@ -335,7 +345,7 @@ public:
protected:
void getBuffers(char *buffer, size_t length) const;
size_t getBuffers(char *buffer, size_t length) const;
template<typename T>
T getValue() const

View File

@ -910,10 +910,8 @@ int TC_EpollServer::Connection::recvTcp()
{
int recvCount = 0;
static int totalRecv = 0;
while (true)
{
// vector<char> buffer(BUFFER_SIZE);
char buffer[BUFFER_SIZE] = {0x00};
int iBytesReceived = _sock.recv((void *)buffer, BUFFER_SIZE);
@ -940,7 +938,7 @@ int TC_EpollServer::Connection::recvTcp()
}
else
{
totalRecv += iBytesReceived;
// totalRecv += iBytesReceived;
_recvBuffer.addBuffer(buffer, iBytesReceived);
//字符串太长时, 强制解析协议

View File

@ -86,7 +86,7 @@ const char * TC_NetWorkBuffer::mergeBuffers()
return NULL;
}
void TC_NetWorkBuffer::getBuffers(char *buffer, size_t length) const
size_t TC_NetWorkBuffer::getBuffers(char *buffer, size_t length) const
{
assert(length <= getBufferLength());
@ -95,7 +95,7 @@ void TC_NetWorkBuffer::getBuffers(char *buffer, size_t length) const
size_t left = length;
size_t pos = 0;
while(it != _bufferList.end() || left == 0)
while(it != _bufferList.end() && left != 0)
{
size_t len = std::min(left, (*it)->length());
@ -106,6 +106,8 @@ void TC_NetWorkBuffer::getBuffers(char *buffer, size_t length) const
++it;
}
return pos;
}
string TC_NetWorkBuffer::getBuffersString() const
@ -121,9 +123,17 @@ string TC_NetWorkBuffer::getBuffersString() const
vector<char> TC_NetWorkBuffer::getBuffers() const
{
vector<char> buffer;
buffer.resize(_length);
getBuffers(&buffer[0], _length);
if(_bufferList.size() == 1)
{
(*_bufferList.begin())->swap(buffer);
}
else
{
buffer.resize(_length);
getBuffers(&buffer[0], _length);
}
return buffer;
}
@ -140,7 +150,7 @@ bool TC_NetWorkBuffer::getHeader(size_t len, std::string &buffer) const
return true;
}
buffer.reserve(len);
buffer.resize(len);
getBuffers(&buffer[0], len);
//
@ -183,7 +193,7 @@ bool TC_NetWorkBuffer::getHeader(size_t len, std::vector<char> &buffer) const
return true;
}
buffer.reserve(len);
buffer.resize(len);
getBuffers(&buffer[0], len);
//
@ -306,12 +316,12 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp()
TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, vector<char> &out)
{
cout << "parseHttp" << endl;
TC_NetWorkBuffer::PACKET_TYPE b = in.checkHttp();
if (b == PACKET_FULL)
{
out = in.getBuffers();
in.clearBuffers();
}