hproselet/hprose_conn.hpp

268 lines
11 KiB
C++
Raw Normal View History

2022-12-15 02:05:06 +00:00
#ifndef _HPROSE_SVC_HPP_
#define _HPROSE_SVC_HPP_
#include "hprose_srv.hpp"
#include "hprose_cli.hpp"
#include "buffers.hpp"
#include "tcp.hpp"
#include "udp.hpp"
#include "worker.hpp"
class HproseDataHandler {
public:
using data_handler_cb = std::function<void(uint32_t dup_id, const unsigned char* data, size_t datalen)>;
const unsigned char* HandleData(const unsigned char* data, size_t datalen) {
auto begin = data;
auto left = datalen;
while(left >= 4) {
uint32_t length = ((uint32_t)begin[0] << 24) | ((uint32_t)begin[1] << 16) | ((uint32_t)begin[2] << 8) | ((uint32_t)begin[3]);
bool duplex = (length & 0x80000000) != 0;
length &= 0x7fffffff;
uint32_t offset = duplex ? 8 : 4;
if(left < offset + length)
return begin;
uint32_t dup_id = duplex ? *(uint32_t*)&begin[4] : 0;
if(data_handler)
data_handler(dup_id, &begin[offset], length);
begin += offset + length;
left -= offset + length;
}
return begin;
}
void WriteHeader(uint32_t dup_id, unsigned char* begin, size_t length) {
if(dup_id) {
length |= 0x80000000;
*(uint32_t*)&begin[4] = dup_id;
}
begin[0] = (unsigned char)((length >> 24) & 0xff);
begin[1] = (unsigned char)((length >> 16) & 0xff);
begin[2] = (unsigned char)((length >> 8) & 0xff);
begin[3] = (unsigned char)(length & 0xff);
}
void SetDataHandler(const data_handler_cb& cb) { data_handler = cb; }
protected:
data_handler_cb data_handler;
};
class HproseSessionTCP : public HproseDataHandler, public TCPSession<HproseSessionTCP> {
public:
using TCPSession<HproseSessionTCP>::TCPSession;
void PushData(const unsigned char* data, size_t length) {
if(!pending_data.empty()) {
pending_data.insert(pending_data.end(), data, data + length);
auto end = HandleData(pending_data.data(), pending_data.size());
auto left = pending_data.data() + pending_data.size() - end;
if(left == 0) {
pending_data.clear();
} else {
memmove(pending_data.data(), end, left);
pending_data.resize(left);
}
} else {
auto end = HandleData(data, length);
auto left = data + length - end;
if(left != 0)
pending_data.assign(end, end + left);
}
}
void SendPacket(uint32_t dup_id, StaticBuffer& buf, size_t length) {
WriteHeader(dup_id, buf.Data(), length);
this->SendData(std::move(buf), (length & 0x7fffffff) + (dup_id ? 8 : 4));
}
protected:
std::vector<unsigned char> pending_data;
};
template<typename OBJECT>
class HproseServerConnectorTCP {
public:
HproseServerConnectorTCP(Worker* wk, uint16_t pt) : worker(wk), port(pt) {}
void Start() {
auto srv = std::make_shared<TCPServer<HproseSessionTCP>>(*worker, port);
srv->SetConnectedCallback([this](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
tcp::socket handler_socket(handler_workers[worker_index], tcp::v4(), sock.release());
auto sess = std::make_shared<HproseSessionTCP>(std::move(handler_socket));
sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) {
sess->PushData(data, length);
}).SetDataHandler([this, idx = worker_index, sess, object = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) {
static const int32_t buflen = 4096;
auto reader = HproseReader(data, datalen);
auto buf = buff_mgr[idx].AllocStaticBuffer(buflen);
auto begin = buf.Data();
HproseWriter writer(begin + (dup_id ? 8 : 4), buflen - (dup_id ? 8 : 4));
handler(object.get(), reader, writer);
writer.WriteTag(Tags::End);
sess->SendPacket(dup_id, buf, writer.Length());
});
worker_index = (worker_index + 1) % 4;
return sess;
}).Start();
for(uint32_t i = 0; i < 4; ++i) {
std::thread([this, i](){
auto work = asio::make_work_guard(handler_workers[i]);
handler_workers[i].run();
}).detach();
}
}
void SetObjectAllocator(const std::function<std::shared_ptr<OBJECT>()>& alloc) { obj_alloc = alloc; }
void SetPacketHandler(const std::function<void(OBJECT*, HproseReader&, HproseWriter&)>& cb) { handler = cb; }
protected:
Worker* worker;
uint16_t port;
uint32_t worker_index = 0;
TimerdWorker handler_workers[4];
BufferMgr buff_mgr[4];
std::function<std::shared_ptr<OBJECT>()> obj_alloc;
std::function<void(OBJECT*, HproseReader&, HproseWriter&)> handler;
};
class HproseClientConnectorTCP {
public:
HproseClientConnectorTCP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {}
void SendRequest(uint32_t id, std::vector<HproseRPCInfo>& rpcs) {
auto srv = std::make_shared<TCPClient<HproseSessionTCP>>(*worker);
srv->SetConnectedCallback([this, &rpcs, id](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
auto sess = std::make_shared<HproseSessionTCP>(std::move(sock));
sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) {
sess->PushData(data, length);
}).SetDataHandler([this, sess](uint32_t dup_id, const unsigned char* data, size_t datalen) {
auto reader = HproseReader(data, datalen);
if(handler)
handler(dup_id, reader);
sess->Close();
});
DoSendRequests(sess.get(), rpcs, id);
return sess;
}).Connect(address.c_str(), port, 30, [this, id](const std::string& msg) {
if(ehandler)
ehandler(id, msg);
});
}
void SendRequestNoReturn(uint32_t id, std::shared_ptr<std::vector<HproseRPCInfo>> rpcs) {
auto srv = std::make_shared<TCPClient<HproseSessionTCP>>(*worker);
srv->SetConnectedCallback([this, rpcs, id](tcp::socket& sock) -> std::shared_ptr<HproseSessionTCP> {
auto sess = std::make_shared<HproseSessionTCP>(std::move(sock));
DoSendRequests(sess.get(), *rpcs, id);
sess->Close();
return sess;
}).Connect(address.c_str(), port, 30, nullptr);
}
void DoSendRequests(HproseSessionTCP* sess, const std::vector<HproseRPCInfo>& infos, uint32_t id) {
static const int32_t buflen = 4096;
auto buf = buff_mgr->AllocStaticBuffer(buflen);
auto begin = buf.Data();
HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4));
for(auto& inf : infos)
inf.WriteRequest(writer);
writer.WriteTag(Tags::End);
sess->SendPacket(id, buf, writer.Length());
}
void SetPacketHandler(const std::function<void(uint32_t, HproseReader&)>& cb) { handler = cb; }
void SetErrorHandler(const std::function<void(uint32_t, const std::string&)>& cb) { ehandler = cb; }
protected:
Worker* worker;
BufferMgr* buff_mgr;
std::string address;
uint16_t port;
std::function<void(uint32_t, HproseReader&)> handler;
std::function<void(uint32_t, const std::string&)> ehandler;
};
template<typename OBJECT>
class HproseServerConnectorUDP : public HproseDataHandler {
public:
HproseServerConnectorUDP(Worker* wk, BufferMgr* mgr, uint16_t pt) : worker(wk), buff_mgr(mgr), port(pt) {}
void Start() {
auto srv = std::make_shared<UDPPeer>(*worker, port);
srv->SetReceivedCallback([this](const udp_receiver& r, const udp_sender& s, size_t available) {
auto rbuf = buff_mgr->AllocStaticBuffer(available);
auto recv_len = r(rbuf, available);
SetDataHandler([this, &s, obj = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) {
auto reader = HproseReader(data, datalen);
auto buf = buff_mgr->AllocStaticBuffer(4096);
auto begin = buf.Data();
HproseWriter writer(begin + (dup_id ? 8 : 4), 4096 - (dup_id ? 8 : 4));
handler(obj.get(), reader, writer);
writer.WriteTag(Tags::End);
WriteHeader(dup_id, buf.Data(), writer.Length());
s(buf, writer.Length() + (dup_id ? 8 : 4));
});
HandleData(rbuf.Data(), recv_len);
}).StartReceive();
}
void SetObjectAllocator(const std::function<std::shared_ptr<OBJECT>()>& alloc) { obj_alloc = alloc; }
void SetPacketHandler(const std::function<void(OBJECT*, HproseReader&, HproseWriter&)>& cb) { handler = cb; }
protected:
Worker* worker;
BufferMgr* buff_mgr;
uint16_t port;
std::function<std::shared_ptr<OBJECT>()> obj_alloc;
std::function<void(OBJECT*, HproseReader&, HproseWriter&)> handler;
};
class HproseClientConnectorUDP : public HproseDataHandler {
public:
HproseClientConnectorUDP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {}
void SendRequest(uint32_t id, std::vector<HproseRPCInfo>& rpcs) {
auto peer = std::make_shared<UDPPeer>(*worker);
DoSendRequests(peer.get(), rpcs, id);
peer->SetReceivedCallback([this, p = peer.get()](const udp_receiver& r, const udp_sender& s, size_t available) {
auto rbuf = buff_mgr->AllocStaticBuffer(available);
auto recv_len = r(rbuf, available);
SetDataHandler([this](uint32_t dup_id, const unsigned char* data, size_t datalen) {
auto reader = HproseReader(data, datalen);
if(handler)
handler(dup_id, reader);
});
HandleData(rbuf.Data(), recv_len);
p->StopReceive();
});
peer->StartReceive();
}
void SendRequestNoReturn(uint32_t id, std::shared_ptr<std::vector<HproseRPCInfo>> rpcs) {
auto peer = std::make_shared<UDPPeer>(*worker);
DoSendRequests(peer.get(), *rpcs, id);
}
void DoSendRequests(UDPPeer* peer, const std::vector<HproseRPCInfo>& infos, uint32_t id) {
static const int32_t buflen = 4096;
auto buf = buff_mgr->AllocStaticBuffer(buflen);
auto begin = buf.Data();
HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4));
for(auto& inf : infos)
inf.WriteRequest(writer);
writer.WriteTag(Tags::End);
WriteHeader(id, buf.Data(), writer.Length());
peer->SendTo(address.c_str(), port, buf, writer.Length() + (id ? 8 : 4));
}
void SetPacketHandler(const std::function<void(uint32_t, HproseReader&)>& cb) { handler = cb; }
void SetErrorHandler(const std::function<void(uint32_t, const std::string&)>& cb) { ehandler = cb; }
protected:
Worker* worker;
BufferMgr* buff_mgr;
std::string address;
uint16_t port;
std::function<void(uint32_t, HproseReader&)> handler;
std::function<void(uint32_t, const std::string&)> ehandler;
};
#endif