add PushCallbackDemo

pull/256/head
ruanshudong 2022-09-07 14:52:15 +08:00
parent 0793be0881
commit 15a440aece
22 changed files with 1771 additions and 1 deletions

View File

@ -11,6 +11,7 @@ endif()
add_subdirectory(CustomDemo)
add_subdirectory(AuthDemo)
add_subdirectory(PushCallbackDemo)
if(TARS_SSL)
add_subdirectory(SSLDemo)
@ -46,6 +47,13 @@ if(WIN32)
COMMAND ../examples/scripts/run-auth.bat ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run auth")
add_custom_target(run-push-callback
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS PushCallbackServer PushCallbackClient
USES_TERMINAL
COMMAND ../examples/scripts/run-push-callback.bat ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run auth")
add_custom_target(run-udp
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS UdpServer UdpClient
@ -149,6 +157,12 @@ else(WIN32)
COMMAND sh ../examples/scripts/run-auth.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run auth")
add_custom_target(run-push-callback
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS PushCallbackServer PushCallbackClient
COMMAND sh ../examples/scripts/run-push-callback.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run push callback")
if(TARS_HTTP2)
add_custom_target(run-http2
WORKING_DIRECTORY ${WORKING_DIRECTORY}

View File

@ -0,0 +1,5 @@
include_directories(Server)
add_subdirectory(Server)
add_subdirectory(Client)

View File

@ -0,0 +1 @@
build_tars_server("PushCallbackClient" "PushCallbackServer")

View File

@ -0,0 +1,35 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
sync-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.AuthClient
<TestApp.AuthServer.AuthObj>
#auth access key
accesskey = tars-test-user
#auth secret key
secretkey = 123456
</TestApp.AuthServer.AuthObj>
</client>
</application>
</tars>

View File

@ -0,0 +1,80 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include <iostream>
#include "servant/Communicator.h"
#include "Hello.h"
#include "Push.h"
#include "util/tc_option.h"
using namespace std;
using namespace tars;
using namespace TestApp;
Communicator* _comm;
static string helloObj = "TestApp.PushCallbackServer.PushObj@tcp -h 127.0.0.1 -p 9316";
int g_count = 0;
class PushCallbackImp : public PushPrxCallback
{
public:
virtual void callback_pushMsg(tars::Int32 ret, const std::string& sRsp)
{
LOG_CONSOLE_DEBUG << ret << ", " << sRsp << endl;
++g_count;
}
};
int main(int argc, char *argv[])
{
try
{
if (argc < 1)
{
cout << "Usage:" << argv[0] << "--config=conf " << endl;
return 0;
}
TC_Option option;
option.decode(argc, argv);
_comm = new Communicator();
TC_Config conf;
conf.parseFile(option.getValue("config"));
_comm->setProperty(conf);
HelloPrx pPrx = _comm->stringToProxy<HelloPrx>(helloObj);
pPrx->tars_set_push_callback(new PushCallbackImp());
pPrx->registerPush();
while(g_count < 10)
{
TC_Common::sleep(1);
}
}
catch(exception &ex)
{
cout << ex.what() << endl;
}
cout << "main return." << endl;
return 0;
}

View File

@ -0,0 +1 @@
build_tars_server("PushCallbackServer" "")

View File

@ -0,0 +1,565 @@
// **********************************************************************
// This file was generated by a TARS parser!
// TARS version 3.0.12.
// **********************************************************************
#ifndef __HELLO_H_
#define __HELLO_H_
#include <map>
#include <string>
#include <vector>
#include "tup/Tars.h"
#include "tup/TarsJson.h"
using namespace std;
#include "servant/ServantProxy.h"
#include "servant/Servant.h"
#include "promise/promise.h"
#include "servant/Application.h"
namespace TestApp
{
/* callback of async proxy for client */
class HelloPrxCallback: public tars::ServantProxyCallback
{
public:
virtual ~HelloPrxCallback(){}
virtual void callback_registerPush(tars::Int32 ret)
{ throw std::runtime_error("callback_registerPush() override incorrect."); }
virtual void callback_registerPush_exception(tars::Int32 ret)
{ throw std::runtime_error("callback_registerPush_exception() override incorrect."); }
public:
virtual const map<std::string, std::string> & getResponseContext() const
{
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
if(!pCbtd->getContextValid())
{
throw TC_Exception("cann't get response context");
}
return pCbtd->getResponseContext();
}
public:
virtual int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Hello_all[]=
{
"registerPush"
};
auto it = _msg_->response->status.find("TARS_FUNC");
pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+1, (it==_msg_->response->status.end())?_msg_->request.sFuncName:it->second);
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Hello_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_registerPush_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CR), TRACE_ANNOTATION_CR, "", ServerConfig::Application + "." + ServerConfig::ServerName, "registerPush", 0, _trace_param_, "");
}
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
pCbtd->setResponseContext(_msg_->response->context);
callback_registerPush(_ret);
pCbtd->delResponseContext();
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
typedef tars::TC_AutoPtr<HelloPrxCallback> HelloPrxCallbackPtr;
//callback of promise async proxy for client
class HelloPrxCallbackPromise: public tars::ServantProxyCallback
{
public:
virtual ~HelloPrxCallbackPromise(){}
public:
struct PromiseregisterPush: virtual public TC_HandleBase
{
public:
tars::Int32 _ret;
map<std::string, std::string> _mRspContext;
};
typedef tars::TC_AutoPtr< HelloPrxCallbackPromise::PromiseregisterPush > PromiseregisterPushPtr;
HelloPrxCallbackPromise(const tars::Promise< HelloPrxCallbackPromise::PromiseregisterPushPtr > &promise)
: _promise_registerPush(promise)
{}
virtual void callback_registerPush(const HelloPrxCallbackPromise::PromiseregisterPushPtr &ptr)
{
_promise_registerPush.setValue(ptr);
}
virtual void callback_registerPush_exception(tars::Int32 ret)
{
std::string str("");
str += "Function:registerPush_exception|Ret:";
str += TC_Common::tostr(ret);
_promise_registerPush.setException(tars::copyException(str, ret));
}
protected:
tars::Promise< HelloPrxCallbackPromise::PromiseregisterPushPtr > _promise_registerPush;
public:
virtual int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Hello_all[]=
{
"registerPush"
};
pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+1, string(_msg_->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Hello_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_registerPush_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
HelloPrxCallbackPromise::PromiseregisterPushPtr ptr = new HelloPrxCallbackPromise::PromiseregisterPush();
try
{
_is.read(ptr->_ret, 0, true);
}
catch(std::exception &ex)
{
callback_registerPush_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_registerPush_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
ptr->_mRspContext = _msg_->response->context;
callback_registerPush(ptr);
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
typedef tars::TC_AutoPtr<HelloPrxCallbackPromise> HelloPrxCallbackPromisePtr;
/* callback of coroutine async proxy for client */
class HelloCoroPrxCallback: public HelloPrxCallback
{
public:
virtual ~HelloCoroPrxCallback(){}
public:
virtual const map<std::string, std::string> & getResponseContext() const { return _mRspContext; }
virtual void setResponseContext(const map<std::string, std::string> &mContext) { _mRspContext = mContext; }
public:
int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Hello_all[]=
{
"registerPush"
};
pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+1, string(_msg_->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Hello_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_registerPush_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
try
{
tars::Int32 _ret;
_is.read(_ret, 0, true);
setResponseContext(_msg_->response->context);
callback_registerPush(_ret);
}
catch(std::exception &ex)
{
callback_registerPush_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_registerPush_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
protected:
map<std::string, std::string> _mRspContext;
};
typedef tars::TC_AutoPtr<HelloCoroPrxCallback> HelloCoroPrxCallbackPtr;
/* proxy for client */
class HelloProxy : public tars::ServantProxy
{
public:
typedef map<string, string> TARS_CONTEXT;
tars::Int32 registerPush(const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
_pSptd_->newSpan();
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CS, _os.getLength());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CS), TRACE_ANNOTATION_CS, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "registerPush", 0, _trace_param_, "");
}
std::map<string, string> _mStatus;
shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"registerPush", _os, context, _mStatus);
if(pResponseContext)
{
pResponseContext->swap(rep->context);
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CR), TRACE_ANNOTATION_CR, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "registerPush", 0, _trace_param_, "");
}
return _ret;
}
void async_registerPush(HelloPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
_pSptd_->newSpan();
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CS, _os.getLength());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CS), TRACE_ANNOTATION_CS, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "registerPush", 0, _trace_param_, "");
}
tars_invoke_async(tars::TARSNORMAL,"registerPush", _os, context, _mStatus, callback);
}
tars::Future< HelloPrxCallbackPromise::PromiseregisterPushPtr > promise_async_registerPush(const map<string, string>& context)
{
tars::Promise< HelloPrxCallbackPromise::PromiseregisterPushPtr > promise;
HelloPrxCallbackPromisePtr callback = new HelloPrxCallbackPromise(promise);
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"registerPush", _os, context, _mStatus, callback);
return promise.getFuture();
}
void coro_registerPush(HelloCoroPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"registerPush", _os, context, _mStatus, callback, true);
}
HelloProxy* tars_hash(uint32_t key)
{
return (HelloProxy*)ServantProxy::tars_hash(key);
}
HelloProxy* tars_consistent_hash(uint32_t key)
{
return (HelloProxy*)ServantProxy::tars_consistent_hash(key);
}
HelloProxy* tars_open_trace(bool traceParam = false)
{
return (HelloProxy*)ServantProxy::tars_open_trace(traceParam);
}
HelloProxy* tars_set_timeout(int msecond)
{
return (HelloProxy*)ServantProxy::tars_set_timeout(msecond);
}
static const char* tars_prxname() { return "HelloProxy"; }
};
typedef tars::TC_AutoPtr<HelloProxy> HelloPrx;
/* servant for server */
class Hello : public tars::Servant
{
public:
virtual ~Hello(){}
virtual tars::Int32 registerPush(tars::TarsCurrentPtr _current_) = 0;
static void async_response_registerPush(tars::TarsCurrentPtr _current_, tars::Int32 _ret)
{
size_t _rsp_len_ = 0;
if (_current_->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current_->getRequestVersion());
_tarsAttr_.put("", _ret);
_tarsAttr_.put("tars_ret", _ret);
vector<char> sTupResponseBuffer;
_tarsAttr_.encode(sTupResponseBuffer);
_current_->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
_rsp_len_ = sTupResponseBuffer.size();
}
else if (_current_->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _p = new tars::JsonValueObj();
_p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
vector<char> sJsonResponseBuffer;
tars::TC_Json::writeValue(_p, sJsonResponseBuffer);
_current_->sendResponse(tars::TARSSERVERSUCCESS, sJsonResponseBuffer);
_rsp_len_ = sJsonResponseBuffer.size();
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_rsp_len_ = _os.getLength();
_current_->sendResponse(tars::TARSSERVERSUCCESS, _os);
}
if (_current_->isTraced())
{
string _trace_param_;
int _trace_param_flag_ = ServantProxyThreadData::needTraceParam(ServantProxyThreadData::TraceContext::EST_SS, _current_->getTraceKey(), _rsp_len_);
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_current_->getTraceKey(), TRACE_ANNOTATION_SS, "", ServerConfig::Application + "." + ServerConfig::ServerName, "registerPush", 0, _trace_param_, "");
}
}
static void async_response_push_registerPush(tars::CurrentPtr _current_, tars::Int32 _ret, const map<string, string> &_context = tars::Current::TARS_STATUS())
{
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_current_->sendPushResponse( tars::TARSSERVERSUCCESS ,"registerPush", _os, _context);
}
}
public:
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)
{
static ::std::string __TestApp__Hello_all[]=
{
"registerPush"
};
pair<string*, string*> r = equal_range(__TestApp__Hello_all, __TestApp__Hello_all+1, _current->getFuncName());
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __TestApp__Hello_all)
{
case 0:
{
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_current->getRequestBuffer());
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current->getRequestVersion());
_tarsAttr_.decode(_current->getRequestBuffer());
}
else if (_current->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _jsonPtr = tars::JsonValueObjPtr::dynamicCast(tars::TC_Json::getValue(_current->getRequestBuffer()));
}
else
{
}
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_SR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SR), TRACE_ANNOTATION_SR, "", ServerConfig::Application + "." + ServerConfig::ServerName, "registerPush", 0, _trace_param_, "");
}
tars::Int32 _ret = registerPush(_current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current->getRequestVersion());
_tarsAttr_.put("", _ret);
_tarsAttr_.put("tars_ret", _ret);
_tarsAttr_.encode(_sResponseBuffer);
}
else if (_current->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _p = new tars::JsonValueObj();
_p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
tars::TC_Json::writeValue(_p, _sResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.swap(_sResponseBuffer);
}
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_SS, _sResponseBuffer.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SS), TRACE_ANNOTATION_SS, "", ServerConfig::Application + "." + ServerConfig::ServerName, "registerPush", 0, _trace_param_, "");
}
}
else if(_pSptd_ && _pSptd_->_traceCall)
{
_current->setTrace(_pSptd_->_traceCall, _pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SS));
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
}
#endif

View File

@ -0,0 +1,25 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
module TestApp
{
interface Hello
{
int registerPush();
};
};

View File

@ -0,0 +1,47 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "HelloImp.h"
#include "servant/Application.h"
#include "HelloServer.h"
using namespace std;
//////////////////////////////////////////////////////
void HelloImp::initialize()
{
//initialize servant here:
//...
}
//////////////////////////////////////////////////////
void HelloImp::destroy()
{
//destroy servant here:
//...
}
int HelloImp::doClose(tars::TarsCurrentPtr current)
{
g_app._pushThread.delCurrent(current);
}
int HelloImp::registerPush(tars::TarsCurrentPtr current)
{
g_app._pushThread.addCurrent(current);
return 0;
}

View File

@ -0,0 +1,58 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef _SSLImp_H_
#define _SSLImp_H_
#include "servant/Application.h"
#include "Hello.h"
/**
*
*
*/
class HelloImp : public TestApp::Hello
{
public:
/**
*
*/
virtual ~HelloImp() {}
/**
*
*/
virtual void initialize();
/**
*
*/
virtual void destroy();
/**
*
* @param current
* @return
*/
virtual int doClose(tars::TarsCurrentPtr current);
/**
* push
*/
virtual int registerPush(tars::TarsCurrentPtr current);
};
/////////////////////////////////////////////////////
#endif

View File

@ -0,0 +1,63 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "HelloServer.h"
#include "HelloImp.h"
using namespace std;
HelloServer g_app;
/////////////////////////////////////////////////////////////////
void
HelloServer::initialize()
{
//initialize application here:
//...
addServant<HelloImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".PushObj");
_pushThread.start();
}
/////////////////////////////////////////////////////////////////
void
HelloServer::destroyApp()
{
//destroy application here:
//...
_pushThread.terminate();
_pushThread.join();
}
/////////////////////////////////////////////////////////////////
int
main(int argc, char* argv[])
{
try
{
g_app.main(argc, argv);
g_app.waitForShutdown();
}
catch (std::exception& e)
{
LOG_CONSOLE_DEBUG << "std::exception error:" << e.what() << std::endl;
}
catch (...)
{
LOG_CONSOLE_DEBUG << "unknown exception." << std::endl;
}
return -1;
}
/////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,53 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef _HelloServer_H_
#define _HelloServer_H_
#include <iostream>
#include "servant/Application.h"
#include "PushThread.h"
using namespace tars;
/**
*
**/
class HelloServer : public Application
{
public:
/**
*
**/
virtual ~HelloServer() {};
/**
*
**/
virtual void initialize();
/**
*
**/
virtual void destroyApp();
PushThread _pushThread;
};
extern HelloServer g_app;
////////////////////////////////////////////
#endif

View File

@ -0,0 +1,590 @@
// **********************************************************************
// This file was generated by a TARS parser!
// TARS version 3.0.12.
// **********************************************************************
#ifndef __PUSH_H_
#define __PUSH_H_
#include <map>
#include <string>
#include <vector>
#include "tup/Tars.h"
#include "tup/TarsJson.h"
using namespace std;
#include "servant/ServantProxy.h"
#include "servant/Servant.h"
#include "promise/promise.h"
#include "servant/Application.h"
namespace TestApp
{
/* callback of async proxy for client */
class PushPrxCallback: public tars::ServantProxyCallback
{
public:
virtual ~PushPrxCallback(){}
virtual void callback_pushMsg(tars::Int32 ret, const std::string& sRsp)
{ throw std::runtime_error("callback_pushMsg() override incorrect."); }
virtual void callback_pushMsg_exception(tars::Int32 ret)
{ throw std::runtime_error("callback_pushMsg_exception() override incorrect."); }
public:
virtual const map<std::string, std::string> & getResponseContext() const
{
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
if(!pCbtd->getContextValid())
{
throw TC_Exception("cann't get response context");
}
return pCbtd->getResponseContext();
}
public:
virtual int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Push_all[]=
{
"pushMsg"
};
auto it = _msg_->response->status.find("TARS_FUNC");
pair<string*, string*> r = equal_range(__Push_all, __Push_all+1, (it==_msg_->response->status.end())?_msg_->request.sFuncName:it->second);
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Push_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_pushMsg_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
std::string sRsp;
_is.read(sRsp, 1, true);
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_p_->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CR), TRACE_ANNOTATION_CR, "", ServerConfig::Application + "." + ServerConfig::ServerName, "pushMsg", 0, _trace_param_, "");
}
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
pCbtd->setResponseContext(_msg_->response->context);
callback_pushMsg(_ret, sRsp);
pCbtd->delResponseContext();
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
typedef tars::TC_AutoPtr<PushPrxCallback> PushPrxCallbackPtr;
//callback of promise async proxy for client
class PushPrxCallbackPromise: public tars::ServantProxyCallback
{
public:
virtual ~PushPrxCallbackPromise(){}
public:
struct PromisepushMsg: virtual public TC_HandleBase
{
public:
tars::Int32 _ret;
std::string sRsp;
map<std::string, std::string> _mRspContext;
};
typedef tars::TC_AutoPtr< PushPrxCallbackPromise::PromisepushMsg > PromisepushMsgPtr;
PushPrxCallbackPromise(const tars::Promise< PushPrxCallbackPromise::PromisepushMsgPtr > &promise)
: _promise_pushMsg(promise)
{}
virtual void callback_pushMsg(const PushPrxCallbackPromise::PromisepushMsgPtr &ptr)
{
_promise_pushMsg.setValue(ptr);
}
virtual void callback_pushMsg_exception(tars::Int32 ret)
{
std::string str("");
str += "Function:pushMsg_exception|Ret:";
str += TC_Common::tostr(ret);
_promise_pushMsg.setException(tars::copyException(str, ret));
}
protected:
tars::Promise< PushPrxCallbackPromise::PromisepushMsgPtr > _promise_pushMsg;
public:
virtual int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Push_all[]=
{
"pushMsg"
};
pair<string*, string*> r = equal_range(__Push_all, __Push_all+1, string(_msg_->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Push_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_pushMsg_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
PushPrxCallbackPromise::PromisepushMsgPtr ptr = new PushPrxCallbackPromise::PromisepushMsg();
try
{
_is.read(ptr->_ret, 0, true);
_is.read(ptr->sRsp, 1, true);
}
catch(std::exception &ex)
{
callback_pushMsg_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_pushMsg_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
ptr->_mRspContext = _msg_->response->context;
callback_pushMsg(ptr);
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
typedef tars::TC_AutoPtr<PushPrxCallbackPromise> PushPrxCallbackPromisePtr;
/* callback of coroutine async proxy for client */
class PushCoroPrxCallback: public PushPrxCallback
{
public:
virtual ~PushCoroPrxCallback(){}
public:
virtual const map<std::string, std::string> & getResponseContext() const { return _mRspContext; }
virtual void setResponseContext(const map<std::string, std::string> &mContext) { _mRspContext = mContext; }
public:
int onDispatch(tars::ReqMessagePtr _msg_)
{
static ::std::string __Push_all[]=
{
"pushMsg"
};
pair<string*, string*> r = equal_range(__Push_all, __Push_all+1, string(_msg_->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Push_all)
{
case 0:
{
if (_msg_->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_pushMsg_exception(_msg_->response->iRet);
return _msg_->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_msg_->response->sBuffer);
try
{
tars::Int32 _ret;
_is.read(_ret, 0, true);
std::string sRsp;
_is.read(sRsp, 1, true);
setResponseContext(_msg_->response->context);
callback_pushMsg(_ret, sRsp);
}
catch(std::exception &ex)
{
callback_pushMsg_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_pushMsg_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
protected:
map<std::string, std::string> _mRspContext;
};
typedef tars::TC_AutoPtr<PushCoroPrxCallback> PushCoroPrxCallbackPtr;
/* proxy for client */
class PushProxy : public tars::ServantProxy
{
public:
typedef map<string, string> TARS_CONTEXT;
tars::Int32 pushMsg(std::string &sRsp,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(sRsp, 1);
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
_pSptd_->newSpan();
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CS, _os.getLength());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CS), TRACE_ANNOTATION_CS, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "pushMsg", 0, _trace_param_, "");
}
std::map<string, string> _mStatus;
shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"pushMsg", _os, context, _mStatus);
if(pResponseContext)
{
pResponseContext->swap(rep->context);
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
_is.read(sRsp, 1, true);
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_p_->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CR), TRACE_ANNOTATION_CR, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "pushMsg", 0, _trace_param_, "");
}
return _ret;
}
void async_pushMsg(PushPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
_pSptd_->newSpan();
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_CS, _os.getLength());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_CS), TRACE_ANNOTATION_CS, ServerConfig::Application + "." + ServerConfig::ServerName, tars_name(), "pushMsg", 0, _trace_param_, "");
}
tars_invoke_async(tars::TARSNORMAL,"pushMsg", _os, context, _mStatus, callback);
}
tars::Future< PushPrxCallbackPromise::PromisepushMsgPtr > promise_async_pushMsg(const map<string, string>& context)
{
tars::Promise< PushPrxCallbackPromise::PromisepushMsgPtr > promise;
PushPrxCallbackPromisePtr callback = new PushPrxCallbackPromise(promise);
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"pushMsg", _os, context, _mStatus, callback);
return promise.getFuture();
}
void coro_pushMsg(PushCoroPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"pushMsg", _os, context, _mStatus, callback, true);
}
PushProxy* tars_hash(uint32_t key)
{
return (PushProxy*)ServantProxy::tars_hash(key);
}
PushProxy* tars_consistent_hash(uint32_t key)
{
return (PushProxy*)ServantProxy::tars_consistent_hash(key);
}
PushProxy* tars_open_trace(bool traceParam = false)
{
return (PushProxy*)ServantProxy::tars_open_trace(traceParam);
}
PushProxy* tars_set_timeout(int msecond)
{
return (PushProxy*)ServantProxy::tars_set_timeout(msecond);
}
static const char* tars_prxname() { return "PushProxy"; }
};
typedef tars::TC_AutoPtr<PushProxy> PushPrx;
/* servant for server */
class Push : public tars::Servant
{
public:
virtual ~Push(){}
virtual tars::Int32 pushMsg(std::string &sRsp,tars::TarsCurrentPtr _current_) = 0;
static void async_response_pushMsg(tars::TarsCurrentPtr _current_, tars::Int32 _ret, const std::string &sRsp)
{
size_t _rsp_len_ = 0;
if (_current_->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current_->getRequestVersion());
_tarsAttr_.put("", _ret);
_tarsAttr_.put("tars_ret", _ret);
_tarsAttr_.put("sRsp", sRsp);
vector<char> sTupResponseBuffer;
_tarsAttr_.encode(sTupResponseBuffer);
_current_->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
_rsp_len_ = sTupResponseBuffer.size();
}
else if (_current_->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _p = new tars::JsonValueObj();
_p->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
vector<char> sJsonResponseBuffer;
tars::TC_Json::writeValue(_p, sJsonResponseBuffer);
_current_->sendResponse(tars::TARSSERVERSUCCESS, sJsonResponseBuffer);
_rsp_len_ = sJsonResponseBuffer.size();
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.write(sRsp, 1);
_rsp_len_ = _os.getLength();
_current_->sendResponse(tars::TARSSERVERSUCCESS, _os);
}
if (_current_->isTraced())
{
string _trace_param_;
int _trace_param_flag_ = ServantProxyThreadData::needTraceParam(ServantProxyThreadData::TraceContext::EST_SS, _current_->getTraceKey(), _rsp_len_);
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_p_->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_current_->getTraceKey(), TRACE_ANNOTATION_SS, "", ServerConfig::Application + "." + ServerConfig::ServerName, "pushMsg", 0, _trace_param_, "");
}
}
static void async_response_push_pushMsg(tars::CurrentPtr _current_, tars::Int32 _ret, const std::string &sRsp, const map<string, string> &_context = tars::Current::TARS_STATUS())
{
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.write(sRsp, 1);
_current_->sendPushResponse( tars::TARSSERVERSUCCESS ,"pushMsg", _os, _context);
}
}
public:
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)
{
static ::std::string __TestApp__Push_all[]=
{
"pushMsg"
};
pair<string*, string*> r = equal_range(__TestApp__Push_all, __TestApp__Push_all+1, _current->getFuncName());
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __TestApp__Push_all)
{
case 0:
{
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_current->getRequestBuffer());
std::string sRsp;
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current->getRequestVersion());
_tarsAttr_.decode(_current->getRequestBuffer());
_tarsAttr_.getByDefault("sRsp", sRsp, sRsp);
}
else if (_current->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _jsonPtr = tars::JsonValueObjPtr::dynamicCast(tars::TC_Json::getValue(_current->getRequestBuffer()));
tars::JsonInput::readJson(sRsp, _jsonPtr->value["sRsp"], false);
}
else
{
_is.read(sRsp, 1, false);
}
ServantProxyThreadData *_pSptd_ = ServantProxyThreadData::getData();
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_SR, _is.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SR), TRACE_ANNOTATION_SR, "", ServerConfig::Application + "." + ServerConfig::ServerName, "pushMsg", 0, _trace_param_, "");
}
tars::Int32 _ret = pushMsg(sRsp, _current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> _tarsAttr_;
_tarsAttr_.setVersion(_current->getRequestVersion());
_tarsAttr_.put("", _ret);
_tarsAttr_.put("tars_ret", _ret);
_tarsAttr_.put("sRsp", sRsp);
_tarsAttr_.encode(_sResponseBuffer);
}
else if (_current->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _p = new tars::JsonValueObj();
_p->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
tars::TC_Json::writeValue(_p, _sResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.write(sRsp, 1);
_os.swap(_sResponseBuffer);
}
if (_pSptd_ && _pSptd_->_traceCall)
{
string _trace_param_;
int _trace_param_flag_ = _pSptd_->needTraceParam(ServantProxyThreadData::TraceContext::EST_SS, _sResponseBuffer.size());
if (ServantProxyThreadData::TraceContext::ENP_NORMAL == _trace_param_flag_)
{
tars::JsonValueObjPtr _p_ = new tars::JsonValueObj();
_p_->value[""] = tars::JsonOutput::writeJson(_ret);
_p_->value["sRsp"] = tars::JsonOutput::writeJson(sRsp);
_trace_param_ = tars::TC_Json::writeValue(_p_);
}
else if(ServantProxyThreadData::TraceContext::ENP_OVERMAXLEN == _trace_param_flag_)
{
_trace_param_ = "{\"trace_param_over_max_len\":true}";
}
TARS_TRACE(_pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SS), TRACE_ANNOTATION_SS, "", ServerConfig::Application + "." + ServerConfig::ServerName, "pushMsg", 0, _trace_param_, "");
}
}
else if(_pSptd_ && _pSptd_->_traceCall)
{
_current->setTrace(_pSptd_->_traceCall, _pSptd_->getTraceKey(ServantProxyThreadData::TraceContext::EST_SS));
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
}
#endif

View File

@ -0,0 +1,25 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
module TestApp
{
interface Push
{
int pushMsg(out string sRsp);
};
};

View File

@ -0,0 +1,44 @@
//
// Created by jarod on 2022/9/7.
//
#include "PushThread.h"
#include "Push.h"
void PushThread::terminate()
{
std::lock_guard<std::mutex> lock(_mutex);
_terminate = true;
_cond.notify_one();
}
void PushThread::addCurrent(CurrentPtr &current)
{
std::lock_guard<std::mutex> lock(_mutex);
_currents[current->getUId()] = current;
}
void PushThread::delCurrent(CurrentPtr &current)
{
std::lock_guard<std::mutex> lock(_mutex);
_currents.erase(current->getUId());
}
void PushThread::run()
{
while(!_terminate)
{
std::unique_lock<std::mutex> lock(_mutex);
for(auto it : _currents)
{
TestApp::Push::async_response_push_pushMsg(it.second, 0, "push message");
}
_cond.wait_for(lock, std::chrono::milliseconds(1000));
}
}

View File

@ -0,0 +1,32 @@
//
// Created by jarod on 2022/9/7.
//
#ifndef FRAMEWORK_PUSHTHREAD_H
#define FRAMEWORK_PUSHTHREAD_H
#include <mutex>
#include "util/tc_thread.h"
#include "servant/Application.h"
using namespace tars;
class PushThread : public TC_Thread
{
public:
void addCurrent(CurrentPtr &current);
void delCurrent(CurrentPtr &current);
void terminate();
protected:
virtual void run();
protected:
bool _terminate = false;
std::mutex _mutex;
std::condition_variable _cond;
map<int, CurrentPtr> _currents;
};
#endif //FRAMEWORK_PUSHTHREAD_H

View File

@ -0,0 +1,72 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
sync-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.PushCallbackClient
</client>
<server>
#not cout
closecout = 0
#app name
app = TestApp
#server name
server = PushCallbackServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<TestApp.PushCallbackServer.PushObjAdapter>
#ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9316 -t 10000
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = TestApp.PushCallbackServer.PushObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = tars
</TestApp.PushCallbackServer.PushObjAdapter>
</server>
</application>
</tars>

View File

@ -33,6 +33,10 @@ void HelloImp::destroy()
//...
}
int HelloImp::doClose(tars::TarsCurrentPtr current)
{
return 0;
}
int HelloImp::testHello(const std::string &sReq, std::string &sRsp, tars::TarsCurrentPtr current)
{
// TLOGDEBUG("HelloImp::testHellosReq:"<<sReq<<endl);

View File

@ -42,6 +42,7 @@ public:
*/
virtual void destroy();
virtual int doClose(tars::TarsCurrentPtr current);
/**
*
*/

View File

@ -0,0 +1,29 @@
echo "run-push-callback.bat"
set EXE_PATH=%1
set SRC_PATH=%2\\..
echo %EXE_PATH% %SRC_PATH%
taskkill /im PushCallbackServer.exe /t /f
timeout /T 1
echo "start server: %EXE_PATH%/PushCallbackServer.exe --config=%SRC_PATH%/examples/PushCallbackDemo/Server/config.conf"
start /b %EXE_PATH%\\PushCallbackServer.exe --config=%SRC_PATH%\\examples\\PushCallbackDemo\\Server\\config.conf
timeout /T 3
echo "client: ${EXE_PATH}/PushCallbackClient.exe"
%EXE_PATH%\\PushCallbackClient.exe
timeout /T 1
taskkill /im PushCallbackServer.exe /t /f

View File

@ -0,0 +1,27 @@
#!/bin/bash
echo "run-push-callback.sh"
EXE_PATH=$1
SRC_PATH=$2/..
echo ${EXE_PATH} ${SRC_PATH}
killall -9 PushCallbackServer
sleep 1
echo "start server: ${EXE_PATH}/PushCallbackServer --config=${SRC_PATH}/examples/PushCallbackDemo/Server/config.conf &"
${EXE_PATH}/PushCallbackServer --config=${SRC_PATH}/examples/PushCallbackDemo/Server/config.conf &
sleep 1
echo "client: ${EXE_PATH}/PushCallbackClient"
${EXE_PATH}/PushCallbackClient --config=${SRC_PATH}/examples/PushCallbackDemo/Client/config.conf
sleep 1
killall -9 PushCallbackServer

View File

@ -610,7 +610,6 @@ bool TC_EpollServer::Connection::handleOutputImp(const shared_ptr<TC_Epoller::Ep
bool TC_EpollServer::Connection::handleInputImp(const shared_ptr<TC_Epoller::EpollInfo> &data)
{
// LOG_CONSOLE_DEBUG << endl;
TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie();
try