268 lines
11 KiB
C++
268 lines
11 KiB
C++
#ifndef _HPROSE_SVC_HPP_
|
|
#define _HPROSE_SVC_HPP_
|
|
|
|
#include "hprose_srv.hpp"
|
|
#include "hprose_cli.hpp"
|
|
#include "buffers.hpp"
|
|
#include "tcp.hpp"
|
|
#include "udp.hpp"
|
|
#include "worker.hpp"
|
|
|
|
class HproseDataHandler {
|
|
public:
|
|
using data_handler_cb = std::function<void(uint32_t dup_id, const unsigned char* data, size_t datalen)>;
|
|
|
|
const unsigned char* HandleData(const unsigned char* data, size_t datalen) {
|
|
auto begin = data;
|
|
auto left = datalen;
|
|
while(left >= 4) {
|
|
uint32_t length = ((uint32_t)begin[0] << 24) | ((uint32_t)begin[1] << 16) | ((uint32_t)begin[2] << 8) | ((uint32_t)begin[3]);
|
|
bool duplex = (length & 0x80000000) != 0;
|
|
length &= 0x7fffffff;
|
|
uint32_t offset = duplex ? 8 : 4;
|
|
if(left < offset + length)
|
|
return begin;
|
|
uint32_t dup_id = duplex ? *(uint32_t*)&begin[4] : 0;
|
|
if(data_handler)
|
|
data_handler(dup_id, &begin[offset], length);
|
|
begin += offset + length;
|
|
left -= offset + length;
|
|
}
|
|
return begin;
|
|
}
|
|
|
|
void WriteHeader(uint32_t dup_id, unsigned char* begin, size_t length) {
|
|
if(dup_id) {
|
|
length |= 0x80000000;
|
|
*(uint32_t*)&begin[4] = dup_id;
|
|
}
|
|
begin[0] = (unsigned char)((length >> 24) & 0xff);
|
|
begin[1] = (unsigned char)((length >> 16) & 0xff);
|
|
begin[2] = (unsigned char)((length >> 8) & 0xff);
|
|
begin[3] = (unsigned char)(length & 0xff);
|
|
}
|
|
|
|
void SetDataHandler(const data_handler_cb& cb) { data_handler = cb; }
|
|
|
|
protected:
|
|
data_handler_cb data_handler;
|
|
};
|
|
|
|
class HproseSessionTCP : public HproseDataHandler, public TCPSession<HproseSessionTCP> {
|
|
public:
|
|
using TCPSession<HproseSessionTCP>::TCPSession;
|
|
|
|
void PushData(const unsigned char* data, size_t length) {
|
|
if(!pending_data.empty()) {
|
|
pending_data.insert(pending_data.end(), data, data + length);
|
|
auto end = HandleData(pending_data.data(), pending_data.size());
|
|
auto left = pending_data.data() + pending_data.size() - end;
|
|
if(left == 0) {
|
|
pending_data.clear();
|
|
} else {
|
|
memmove(pending_data.data(), end, left);
|
|
pending_data.resize(left);
|
|
}
|
|
} else {
|
|
auto end = HandleData(data, length);
|
|
auto left = data + length - end;
|
|
if(left != 0)
|
|
pending_data.assign(end, end + left);
|
|
}
|
|
}
|
|
|
|
void SendPacket(uint32_t dup_id, StaticBuffer& buf, size_t length) {
|
|
WriteHeader(dup_id, buf.Data(), length);
|
|
this->SendData(std::move(buf), (length & 0x7fffffff) + (dup_id ? 8 : 4));
|
|
}
|
|
|
|
protected:
|
|
std::vector<unsigned char> pending_data;
|
|
};
|
|
|
|
template<typename OBJECT>
|
|
class HproseServerConnectorTCP {
|
|
public:
|
|
HproseServerConnectorTCP(Worker* wk, uint16_t pt) : worker(wk), port(pt) {}
|
|
void Start() {
|
|
auto srv = std::make_shared<TCPServer<HproseSessionTCP>>(*worker, port);
|
|
srv->SetConnectedCallback([this](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
|
|
tcp::socket handler_socket(handler_workers[worker_index], tcp::v4(), sock.release());
|
|
auto sess = std::make_shared<HproseSessionTCP>(std::move(handler_socket));
|
|
sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) {
|
|
sess->PushData(data, length);
|
|
}).SetDataHandler([this, idx = worker_index, sess, object = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) {
|
|
static const int32_t buflen = 4096;
|
|
auto reader = HproseReader(data, datalen);
|
|
auto buf = buff_mgr[idx].AllocStaticBuffer(buflen);
|
|
auto begin = buf.Data();
|
|
HproseWriter writer(begin + (dup_id ? 8 : 4), buflen - (dup_id ? 8 : 4));
|
|
handler(object.get(), reader, writer);
|
|
writer.WriteTag(Tags::End);
|
|
sess->SendPacket(dup_id, buf, writer.Length());
|
|
});
|
|
worker_index = (worker_index + 1) % 4;
|
|
return sess;
|
|
}).Start();
|
|
for(uint32_t i = 0; i < 4; ++i) {
|
|
std::thread([this, i](){
|
|
auto work = asio::make_work_guard(handler_workers[i]);
|
|
handler_workers[i].run();
|
|
}).detach();
|
|
}
|
|
}
|
|
|
|
void SetObjectAllocator(const std::function<std::shared_ptr<OBJECT>()>& alloc) { obj_alloc = alloc; }
|
|
void SetPacketHandler(const std::function<void(OBJECT*, HproseReader&, HproseWriter&)>& cb) { handler = cb; }
|
|
|
|
protected:
|
|
Worker* worker;
|
|
uint16_t port;
|
|
uint32_t worker_index = 0;
|
|
TimerdWorker handler_workers[4];
|
|
BufferMgr buff_mgr[4];
|
|
std::function<std::shared_ptr<OBJECT>()> obj_alloc;
|
|
std::function<void(OBJECT*, HproseReader&, HproseWriter&)> handler;
|
|
};
|
|
|
|
class HproseClientConnectorTCP {
|
|
public:
|
|
HproseClientConnectorTCP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {}
|
|
void SendRequest(uint32_t id, std::vector<HproseRPCInfo>& rpcs) {
|
|
auto srv = std::make_shared<TCPClient<HproseSessionTCP>>(*worker);
|
|
srv->SetConnectedCallback([this, &rpcs, id](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
|
|
auto sess = std::make_shared<HproseSessionTCP>(std::move(sock));
|
|
sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) {
|
|
sess->PushData(data, length);
|
|
}).SetDataHandler([this, sess](uint32_t dup_id, const unsigned char* data, size_t datalen) {
|
|
auto reader = HproseReader(data, datalen);
|
|
if(handler)
|
|
handler(dup_id, reader);
|
|
sess->Close();
|
|
});
|
|
DoSendRequests(sess.get(), rpcs, id);
|
|
return sess;
|
|
}).Connect(address.c_str(), port, 30, [this, id](const std::string& msg) {
|
|
if(ehandler)
|
|
ehandler(id, msg);
|
|
});
|
|
}
|
|
|
|
void SendRequestNoReturn(uint32_t id, std::shared_ptr<std::vector<HproseRPCInfo>> rpcs) {
|
|
auto srv = std::make_shared<TCPClient<HproseSessionTCP>>(*worker);
|
|
srv->SetConnectedCallback([this, rpcs, id](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
|
|
auto sess = std::make_shared<HproseSessionTCP>(std::move(sock));
|
|
DoSendRequests(sess.get(), *rpcs, id);
|
|
sess->Close();
|
|
return sess;
|
|
}).Connect(address.c_str(), port, 30, nullptr);
|
|
}
|
|
|
|
void DoSendRequests(HproseSessionTCP* sess, const std::vector<HproseRPCInfo>& infos, uint32_t id) {
|
|
static const int32_t buflen = 4096;
|
|
auto buf = buff_mgr->AllocStaticBuffer(buflen);
|
|
auto begin = buf.Data();
|
|
HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4));
|
|
for(auto& inf : infos)
|
|
inf.WriteRequest(writer);
|
|
writer.WriteTag(Tags::End);
|
|
sess->SendPacket(id, buf, writer.Length());
|
|
}
|
|
|
|
void SetPacketHandler(const std::function<void(uint32_t, HproseReader&)>& cb) { handler = cb; }
|
|
void SetErrorHandler(const std::function<void(uint32_t, const std::string&)>& cb) { ehandler = cb; }
|
|
|
|
protected:
|
|
Worker* worker;
|
|
BufferMgr* buff_mgr;
|
|
std::string address;
|
|
uint16_t port;
|
|
std::function<void(uint32_t, HproseReader&)> handler;
|
|
std::function<void(uint32_t, const std::string&)> ehandler;
|
|
};
|
|
|
|
template<typename OBJECT>
|
|
class HproseServerConnectorUDP : public HproseDataHandler {
|
|
public:
|
|
HproseServerConnectorUDP(Worker* wk, BufferMgr* mgr, uint16_t pt) : worker(wk), buff_mgr(mgr), port(pt) {}
|
|
void Start() {
|
|
auto srv = std::make_shared<UDPPeer>(*worker, port);
|
|
srv->SetReceivedCallback([this](const udp_receiver& r, const udp_sender& s, size_t available) {
|
|
auto rbuf = buff_mgr->AllocStaticBuffer(available);
|
|
auto recv_len = r(rbuf, available);
|
|
SetDataHandler([this, &s, obj = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) {
|
|
auto reader = HproseReader(data, datalen);
|
|
auto buf = buff_mgr->AllocStaticBuffer(4096);
|
|
auto begin = buf.Data();
|
|
HproseWriter writer(begin + (dup_id ? 8 : 4), 4096 - (dup_id ? 8 : 4));
|
|
handler(obj.get(), reader, writer);
|
|
writer.WriteTag(Tags::End);
|
|
WriteHeader(dup_id, buf.Data(), writer.Length());
|
|
s(buf, writer.Length() + (dup_id ? 8 : 4));
|
|
});
|
|
HandleData(rbuf.Data(), recv_len);
|
|
}).StartReceive();
|
|
}
|
|
|
|
void SetObjectAllocator(const std::function<std::shared_ptr<OBJECT>()>& alloc) { obj_alloc = alloc; }
|
|
void SetPacketHandler(const std::function<void(OBJECT*, HproseReader&, HproseWriter&)>& cb) { handler = cb; }
|
|
|
|
protected:
|
|
Worker* worker;
|
|
BufferMgr* buff_mgr;
|
|
uint16_t port;
|
|
std::function<std::shared_ptr<OBJECT>()> obj_alloc;
|
|
std::function<void(OBJECT*, HproseReader&, HproseWriter&)> handler;
|
|
};
|
|
|
|
class HproseClientConnectorUDP : public HproseDataHandler {
|
|
public:
|
|
HproseClientConnectorUDP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {}
|
|
void SendRequest(uint32_t id, std::vector<HproseRPCInfo>& rpcs) {
|
|
auto peer = std::make_shared<UDPPeer>(*worker);
|
|
DoSendRequests(peer.get(), rpcs, id);
|
|
peer->SetReceivedCallback([this, p = peer.get()](const udp_receiver& r, const udp_sender& s, size_t available) {
|
|
auto rbuf = buff_mgr->AllocStaticBuffer(available);
|
|
auto recv_len = r(rbuf, available);
|
|
SetDataHandler([this](uint32_t dup_id, const unsigned char* data, size_t datalen) {
|
|
auto reader = HproseReader(data, datalen);
|
|
if(handler)
|
|
handler(dup_id, reader);
|
|
});
|
|
HandleData(rbuf.Data(), recv_len);
|
|
p->StopReceive();
|
|
});
|
|
peer->StartReceive();
|
|
}
|
|
|
|
void SendRequestNoReturn(uint32_t id, std::shared_ptr<std::vector<HproseRPCInfo>> rpcs) {
|
|
auto peer = std::make_shared<UDPPeer>(*worker);
|
|
DoSendRequests(peer.get(), *rpcs, id);
|
|
}
|
|
|
|
void DoSendRequests(UDPPeer* peer, const std::vector<HproseRPCInfo>& infos, uint32_t id) {
|
|
static const int32_t buflen = 4096;
|
|
auto buf = buff_mgr->AllocStaticBuffer(buflen);
|
|
auto begin = buf.Data();
|
|
HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4));
|
|
for(auto& inf : infos)
|
|
inf.WriteRequest(writer);
|
|
writer.WriteTag(Tags::End);
|
|
WriteHeader(id, buf.Data(), writer.Length());
|
|
peer->SendTo(address.c_str(), port, buf, writer.Length() + (id ? 8 : 4));
|
|
}
|
|
|
|
void SetPacketHandler(const std::function<void(uint32_t, HproseReader&)>& cb) { handler = cb; }
|
|
void SetErrorHandler(const std::function<void(uint32_t, const std::string&)>& cb) { ehandler = cb; }
|
|
|
|
protected:
|
|
Worker* worker;
|
|
BufferMgr* buff_mgr;
|
|
std::string address;
|
|
uint16_t port;
|
|
std::function<void(uint32_t, HproseReader&)> handler;
|
|
std::function<void(uint32_t, const std::string&)> ehandler;
|
|
};
|
|
|
|
#endif
|