commit dbb49afc6a07028e204b820f0c80bdcec3630efb Author: Argon Date: Thu Dec 15 10:05:06 2022 +0800 first commit diff --git a/buffers.hpp b/buffers.hpp new file mode 100644 index 0000000..d6a6380 --- /dev/null +++ b/buffers.hpp @@ -0,0 +1,67 @@ +#ifndef _BUFFERS_HPP_ +#define _BUFFERS_HPP_ + +#include + +#include "objpool.hpp" + +template +struct MemBlock { unsigned char mem[BUFSIZE]; }; + +class StaticBuffer { +public: + StaticBuffer(unsigned char* buf, uint32_t sz, std::function cb) : buffer(buf), size(sz), release_cb(cb) {}; + StaticBuffer(const StaticBuffer& sb) = delete; + StaticBuffer(StaticBuffer&& sb) { + buffer = sb.buffer; + sb.buffer = nullptr; + size = sb.size; + sb.size = 0; + release_cb = std::move(sb.release_cb); + } + ~StaticBuffer() { + if(release_cb) + release_cb(); + } + unsigned char* Data() { return buffer; } + uint32_t Capacity() { return size; } + +protected: + unsigned char* buffer = nullptr; + uint32_t size = 0; + std::function release_cb; +}; + +class BufferMgr { +public: + StaticBuffer AllocStaticBuffer(uint32_t size) { + if(size <= 128) { + auto memblk = pool1.Alloc(); + return StaticBuffer(memblk->mem, 128, [this, memblk]() { + pool1.Recycle(memblk); + }); + } else if(size <= 1024) { + auto memblk = pool2.Alloc(); + return StaticBuffer(memblk->mem, 1024, [this, memblk]() { + pool2.Recycle(memblk); + }); + } else if(size <= 4096) { + auto memblk = pool3.Alloc(); + return StaticBuffer(memblk->mem, 4096, [this, memblk]() { + pool3.Recycle(memblk); + }); + } else { + auto memblk = new unsigned char[size]; + return StaticBuffer(memblk, size, [memblk]() { + delete[] memblk; + }); + } + } + +protected: + ObjectPoolNoLock> pool1; + ObjectPoolNoLock> pool2; + ObjectPoolNoLock> pool3; +}; + +#endif diff --git a/hprose_cli.hpp b/hprose_cli.hpp new file mode 100644 index 0000000..213c4e7 --- /dev/null +++ b/hprose_cli.hpp @@ -0,0 +1,201 @@ +#ifndef _HPROSE_CLI_HPP_ +#define _HPROSE_CLI_HPP_ + +#include +#include +#include + +#include "hprose_ed.hpp" +#include "hprose_types.hpp" + +template +struct HproseRPCCallback { + template + void CallImpl(HproseReader& reader, T rcb, const std::function& arg_ref) { + auto vals = HproseValueHelperTuple().FromReader(reader); + arg_ref(); + CallImplMultiArg(rcb, vals, std::make_index_sequence{}); + } + + template + void CallImplMultiArg(T rcb, std::tuple& vals, std::index_sequence) { + rcb(std::get(vals)...); + } +}; + +template +struct HproseRPCCallback { + template + void CallImpl(HproseReader& reader, T rcb, const std::function& arg_ref) { + auto val = HproseValueHelper().FromReader(reader); + arg_ref(); + rcb(val); + } +}; + +template<> +struct HproseRPCCallback { + template + void CallImpl(HproseReader& reader, T rcb, const std::function& arg_ref) { + reader.SkipValue(); + arg_ref(); + rcb(); + } +}; + +template +struct HproseResultArg { + using type = void; +}; + +template +struct HproseResultArg> { + using type = HproseRPCCallback; +}; + +template +struct HproseResultArg> { + using type = HproseRPCCallback>...>; +}; + +class HproseRPCInfo { +public: + HproseRPCInfo(const std::string& name) : method(name) {} + + template + HproseRPCInfo& PushArgs(ARGS... args) { + arg_cb = [args = std::make_tuple(args...)](HproseWriter& writer) { + HproseValueHelper>().ToWriter(args, writer); + }; + return *this; + } + + template + HproseRPCInfo& PushRefArgs(ARGS&... args) { + ref_cb = [argrefs = std::tie(std::forward(args)...)](HproseReader& reader) mutable { + argrefs = std::move(HproseValueHelperTuple().FromReader(reader)); + }; + ref = true; + return PushArgs(std::forward(args)...); + } + + template + HproseRPCInfo& Expect(T rcb) { + res_cb = [rcb](HproseReader& reader, const std::function& arg_ref) { + using rpc_arg_type = HproseResultArg::type>; + typename rpc_arg_type::type().CallImpl(reader, rcb, arg_ref); + }; + return *this; + } + + HproseRPCInfo& Catch(std::function ecb) { + err_cb = ecb; + return *this; + } + + void WriteRequest(HproseWriter& writer) const { + writer.WriteTag(Tags::Call); + HproseValueHelperString().ToWriter(method, writer); + if(arg_cb) + arg_cb(writer); + if(ref) + writer.WriteTag(Tags::True); + } + + void Return(HproseReader& reader) { + auto tag = reader.ReadTag(); + if(tag == Tags::Error) { + auto msg = HproseValueHelperString().FromReader(reader); + if(err_cb) + err_cb(msg); + } else { + auto cb = [this, &reader]() { + if(ref && ref_cb && reader.CheckTag(Tags::Argument, true)) + ref_cb(reader); + }; + if(res_cb) + res_cb(reader, cb); + else { + reader.SkipValue(); + cb(); + } + } + } + + void Error(const std::string& msg) { if(err_cb) err_cb(msg); } + +protected: + bool ref = false; + std::string method; + std::function)> res_cb; + std::function ref_cb; + std::function arg_cb; + std::function err_cb; +}; + +class HproseClient { +public: + template + void UseConnector(SVC& svc) { + apply_handler = [&svc, this](uint32_t id, std::shared_ptr> rpcs) { + if(rpcs == nullptr) { + auto& rpcs = results[id]; + svc.SendRequest(id, rpcs); + } else + svc.SendRequestNoReturn(id, rpcs); + }; + svc.SetPacketHandler([this](uint32_t id, HproseReader& reader) { + auto iter = results.find(id); + if(iter != results.end()) { + for(auto& inf : iter->second) + inf.Return(reader); + } + results.erase(iter); + }); + svc.SetErrorHandler([this](uint32_t id, const std::string& msg) { + auto& infos = results[id]; + for(auto& inf : infos) + inf.Error(msg); + results.erase(id); + }); + } + + HproseRPCInfo& Call(const std::string& name) { + auto pre_multiid = multiid; + multiid = 0; + auto& ret = MultiCall(name); + MultiApply(); + multiid = pre_multiid; + return ret; + } + + HproseRPCInfo& MultiCall(const std::string& name) { + if(multiid == 0) + multiid = callid++; + auto& infos = results[multiid]; + infos.emplace_back(name); + return infos.back(); + } + + void MultiApply() { + if(multiid == 0) + return; + apply_handler(multiid, nullptr); + multiid = 0; + } + + HproseRPCInfo& CallNoReturn(const std::string& name) { + auto rpcinfos = std::make_shared>(); + rpcinfos->emplace_back(name); + apply_handler(callid++, rpcinfos); + return rpcinfos->at(0); + } + +protected: + uint32_t callid = 1; + uint32_t multiid = 0; + std::function>)> apply_handler; + std::unordered_map> results; +}; + +#endif diff --git a/hprose_conn.hpp b/hprose_conn.hpp new file mode 100644 index 0000000..b6429b4 --- /dev/null +++ b/hprose_conn.hpp @@ -0,0 +1,267 @@ +#ifndef _HPROSE_SVC_HPP_ +#define _HPROSE_SVC_HPP_ + +#include "hprose_srv.hpp" +#include "hprose_cli.hpp" +#include "buffers.hpp" +#include "tcp.hpp" +#include "udp.hpp" +#include "worker.hpp" + +class HproseDataHandler { +public: + using data_handler_cb = std::function; + + const unsigned char* HandleData(const unsigned char* data, size_t datalen) { + auto begin = data; + auto left = datalen; + while(left >= 4) { + uint32_t length = ((uint32_t)begin[0] << 24) | ((uint32_t)begin[1] << 16) | ((uint32_t)begin[2] << 8) | ((uint32_t)begin[3]); + bool duplex = (length & 0x80000000) != 0; + length &= 0x7fffffff; + uint32_t offset = duplex ? 8 : 4; + if(left < offset + length) + return begin; + uint32_t dup_id = duplex ? *(uint32_t*)&begin[4] : 0; + if(data_handler) + data_handler(dup_id, &begin[offset], length); + begin += offset + length; + left -= offset + length; + } + return begin; + } + + void WriteHeader(uint32_t dup_id, unsigned char* begin, size_t length) { + if(dup_id) { + length |= 0x80000000; + *(uint32_t*)&begin[4] = dup_id; + } + begin[0] = (unsigned char)((length >> 24) & 0xff); + begin[1] = (unsigned char)((length >> 16) & 0xff); + begin[2] = (unsigned char)((length >> 8) & 0xff); + begin[3] = (unsigned char)(length & 0xff); + } + + void SetDataHandler(const data_handler_cb& cb) { data_handler = cb; } + +protected: + data_handler_cb data_handler; +}; + +class HproseSessionTCP : public HproseDataHandler, public TCPSession { +public: + using TCPSession::TCPSession; + + void PushData(const unsigned char* data, size_t length) { + if(!pending_data.empty()) { + pending_data.insert(pending_data.end(), data, data + length); + auto end = HandleData(pending_data.data(), pending_data.size()); + auto left = pending_data.data() + pending_data.size() - end; + if(left == 0) { + pending_data.clear(); + } else { + memmove(pending_data.data(), end, left); + pending_data.resize(left); + } + } else { + auto end = HandleData(data, length); + auto left = data + length - end; + if(left != 0) + pending_data.assign(end, end + left); + } + } + + void SendPacket(uint32_t dup_id, StaticBuffer& buf, size_t length) { + WriteHeader(dup_id, buf.Data(), length); + this->SendData(std::move(buf), (length & 0x7fffffff) + (dup_id ? 8 : 4)); + } + +protected: + std::vector pending_data; +}; + +template +class HproseServerConnectorTCP { +public: + HproseServerConnectorTCP(Worker* wk, uint16_t pt) : worker(wk), port(pt) {} + void Start() { + auto srv = std::make_shared>(*worker, port); + srv->SetConnectedCallback([this](tcp::socket& sock) -> std::shared_ptr { + tcp::socket handler_socket(handler_workers[worker_index], tcp::v4(), sock.release()); + auto sess = std::make_shared(std::move(handler_socket)); + sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) { + sess->PushData(data, length); + }).SetDataHandler([this, idx = worker_index, sess, object = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) { + static const int32_t buflen = 4096; + auto reader = HproseReader(data, datalen); + auto buf = buff_mgr[idx].AllocStaticBuffer(buflen); + auto begin = buf.Data(); + HproseWriter writer(begin + (dup_id ? 8 : 4), buflen - (dup_id ? 8 : 4)); + handler(object.get(), reader, writer); + writer.WriteTag(Tags::End); + sess->SendPacket(dup_id, buf, writer.Length()); + }); + worker_index = (worker_index + 1) % 4; + return sess; + }).Start(); + for(uint32_t i = 0; i < 4; ++i) { + std::thread([this, i](){ + auto work = asio::make_work_guard(handler_workers[i]); + handler_workers[i].run(); + }).detach(); + } + } + + void SetObjectAllocator(const std::function()>& alloc) { obj_alloc = alloc; } + void SetPacketHandler(const std::function& cb) { handler = cb; } + +protected: + Worker* worker; + uint16_t port; + uint32_t worker_index = 0; + TimerdWorker handler_workers[4]; + BufferMgr buff_mgr[4]; + std::function()> obj_alloc; + std::function handler; +}; + +class HproseClientConnectorTCP { +public: + HproseClientConnectorTCP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {} + void SendRequest(uint32_t id, std::vector& rpcs) { + auto srv = std::make_shared>(*worker); + srv->SetConnectedCallback([this, &rpcs, id](tcp::socket& sock) -> std::shared_ptr { + auto sess = std::make_shared(std::move(sock)); + sess->SetReceivedCallback([](HproseSessionTCP* sess, const unsigned char* data, size_t length) { + sess->PushData(data, length); + }).SetDataHandler([this, sess](uint32_t dup_id, const unsigned char* data, size_t datalen) { + auto reader = HproseReader(data, datalen); + if(handler) + handler(dup_id, reader); + sess->Close(); + }); + DoSendRequests(sess.get(), rpcs, id); + return sess; + }).Connect(address.c_str(), port, 30, [this, id](const std::string& msg) { + if(ehandler) + ehandler(id, msg); + }); + } + + void SendRequestNoReturn(uint32_t id, std::shared_ptr> rpcs) { + auto srv = std::make_shared>(*worker); + srv->SetConnectedCallback([this, rpcs, id](tcp::socket& sock) -> std::shared_ptr { + auto sess = std::make_shared(std::move(sock)); + DoSendRequests(sess.get(), *rpcs, id); + sess->Close(); + return sess; + }).Connect(address.c_str(), port, 30, nullptr); + } + + void DoSendRequests(HproseSessionTCP* sess, const std::vector& infos, uint32_t id) { + static const int32_t buflen = 4096; + auto buf = buff_mgr->AllocStaticBuffer(buflen); + auto begin = buf.Data(); + HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4)); + for(auto& inf : infos) + inf.WriteRequest(writer); + writer.WriteTag(Tags::End); + sess->SendPacket(id, buf, writer.Length()); + } + + void SetPacketHandler(const std::function& cb) { handler = cb; } + void SetErrorHandler(const std::function& cb) { ehandler = cb; } + +protected: + Worker* worker; + BufferMgr* buff_mgr; + std::string address; + uint16_t port; + std::function handler; + std::function ehandler; +}; + +template +class HproseServerConnectorUDP : public HproseDataHandler { +public: + HproseServerConnectorUDP(Worker* wk, BufferMgr* mgr, uint16_t pt) : worker(wk), buff_mgr(mgr), port(pt) {} + void Start() { + auto srv = std::make_shared(*worker, port); + srv->SetReceivedCallback([this](const udp_receiver& r, const udp_sender& s, size_t available) { + auto rbuf = buff_mgr->AllocStaticBuffer(available); + auto recv_len = r(rbuf, available); + SetDataHandler([this, &s, obj = obj_alloc()](uint32_t dup_id, const unsigned char* data, size_t datalen) { + auto reader = HproseReader(data, datalen); + auto buf = buff_mgr->AllocStaticBuffer(4096); + auto begin = buf.Data(); + HproseWriter writer(begin + (dup_id ? 8 : 4), 4096 - (dup_id ? 8 : 4)); + handler(obj.get(), reader, writer); + writer.WriteTag(Tags::End); + WriteHeader(dup_id, buf.Data(), writer.Length()); + s(buf, writer.Length() + (dup_id ? 8 : 4)); + }); + HandleData(rbuf.Data(), recv_len); + }).StartReceive(); + } + + void SetObjectAllocator(const std::function()>& alloc) { obj_alloc = alloc; } + void SetPacketHandler(const std::function& cb) { handler = cb; } + +protected: + Worker* worker; + BufferMgr* buff_mgr; + uint16_t port; + std::function()> obj_alloc; + std::function handler; +}; + +class HproseClientConnectorUDP : public HproseDataHandler { +public: + HproseClientConnectorUDP(Worker* wk, BufferMgr* mgr, const std::string& addr, uint16_t pt) : worker(wk), buff_mgr(mgr), address(addr), port(pt) {} + void SendRequest(uint32_t id, std::vector& rpcs) { + auto peer = std::make_shared(*worker); + DoSendRequests(peer.get(), rpcs, id); + peer->SetReceivedCallback([this, p = peer.get()](const udp_receiver& r, const udp_sender& s, size_t available) { + auto rbuf = buff_mgr->AllocStaticBuffer(available); + auto recv_len = r(rbuf, available); + SetDataHandler([this](uint32_t dup_id, const unsigned char* data, size_t datalen) { + auto reader = HproseReader(data, datalen); + if(handler) + handler(dup_id, reader); + }); + HandleData(rbuf.Data(), recv_len); + p->StopReceive(); + }); + peer->StartReceive(); + } + + void SendRequestNoReturn(uint32_t id, std::shared_ptr> rpcs) { + auto peer = std::make_shared(*worker); + DoSendRequests(peer.get(), *rpcs, id); + } + + void DoSendRequests(UDPPeer* peer, const std::vector& infos, uint32_t id) { + static const int32_t buflen = 4096; + auto buf = buff_mgr->AllocStaticBuffer(buflen); + auto begin = buf.Data(); + HproseWriter writer(begin + (id ? 8 : 4), buflen - (id ? 8 : 4)); + for(auto& inf : infos) + inf.WriteRequest(writer); + writer.WriteTag(Tags::End); + WriteHeader(id, buf.Data(), writer.Length()); + peer->SendTo(address.c_str(), port, buf, writer.Length() + (id ? 8 : 4)); + } + + void SetPacketHandler(const std::function& cb) { handler = cb; } + void SetErrorHandler(const std::function& cb) { ehandler = cb; } + +protected: + Worker* worker; + BufferMgr* buff_mgr; + std::string address; + uint16_t port; + std::function handler; + std::function ehandler; +}; + +#endif diff --git a/hprose_ed.hpp b/hprose_ed.hpp new file mode 100644 index 0000000..d9ecd8d --- /dev/null +++ b/hprose_ed.hpp @@ -0,0 +1,271 @@ +#ifndef _HPROSE_ED_HPP_ +#define _HPROSE_ED_HPP_ + +#include +#include + +#include "hprose_tags.hpp" + +class HproseReader { +public: + HproseReader(const unsigned char* ptr, size_t len) : begin(ptr), end(ptr + len), current(ptr) {} + HproseReader(const unsigned char* ptr, const unsigned char* end) : begin(ptr), end(end), current(ptr) {} + + unsigned char ReadByte() { + if(current >= end) + return 0; + return *current++; + } + + Tags ReadTag() { + return static_cast(ReadByte()); + } + + bool CheckTag(Tags tag, bool skip) { + if(current >= end) + return false; + if(static_cast(*current) != tag) + return false; + if(skip) + current++; + return true; + } + + void Skip(uint32_t len) { + current += len; + if(current >= end) + current = end; + } + + std::string ReadUntilAsString(Tags tag) { + auto start = current; + while(current != end && *current != static_cast(tag)) + current++; + if(current == end) { + return std::string(start, current); + } else { + return std::string(start, current++); + } + } + + std::string ReadBytesAsString(size_t length) { + auto start = current; + current += length; + if(current > end) + current = end; + return std::string(start, current); + } + + std::string ReadUTF8String(size_t length) { + auto start = current; + for(size_t i = 0; i < length && current < end; ++i) { + if(*current < 0x80) { // 0xxxxxxx + current++; + } else if(*current < 0xd0) { // 110xxxxx 10xxxxxx + if(current + 2 > end) + break; + current += 2; + } else if(*current < 0xf0) { // 1110xxxx 10xxxxxx 10xxxxxx + if(current + 3 > end) + break; + current += 3; + } else { // 11110xxx 10xxxxx 10xxxxxx 10xxxxxx + if(current + 4 > end) + break; + current += 4; + } + } + auto ptr = current; + CheckTag(Tags::Quote, true); + return std::string(start, ptr); + } + + int64_t ReadInt64Raw(Tags end) { + int64_t value = 0; + auto b = ReadByte(); + if(static_cast(b) == end) + return value; + bool neg = false; + if(static_cast(b) == Tags::Neg) { + neg = true; + b = ReadByte(); + } else if(static_cast(b) == Tags::Pos) { + b = ReadByte(); + } + while(static_cast(b) != end) { + value = value * 10 + int64_t(b - '0'); + b = ReadByte(); + } + return neg ? -value : value; + } + + void SkipValue() { + SkipValue(ReadTag()); + } + + void SkipValue(Tags tag) { + switch(tag) { + case Tags::Call: + case Tags::Result: + case Tags::Error: + case Tags::End: + case Tags::Argument: { + current--; + return; + } + case Tags::Null: + case Tags::Empty: + case Tags::Num0: + case Tags::Num1: + case Tags::Num2: + case Tags::Num3: + case Tags::Num4: + case Tags::Num5: + case Tags::Num6: + case Tags::Num7: + case Tags::Num8: + case Tags::Num9: + case Tags::False: + case Tags::True: + case Tags::NaN: return; + case Tags::Infinity: { + ReadTag(); + return; + } + case Tags::Integer: + case Tags::Long: { + ReadInt64Raw(Tags::Semicolon); + return; + } + case Tags::Double: { + ReadUntilAsString(Tags::Semicolon); + return; + } + case Tags::UTF8Char: { + ReadUTF8String(1); + return; + } + case Tags::String: { + ReadUTF8String(ReadInt64Raw(Tags::Quote)); + return; + } + case Tags::Bytes: { + ReadBytesAsString(ReadInt64Raw(Tags::Quote)); + return; + } + case Tags::List: { + int64_t count = ReadInt64Raw(Tags::Openbrace); + for(int64_t i = 0; i < count; ++i) + SkipValue(); + ReadTag(); // Tags::Closebrace + return; + } + case Tags::Map: { + int64_t count = ReadInt64Raw(Tags::Openbrace) * 2; + for(int64_t i = 0; i < count; ++i) + SkipValue(); + ReadTag(); // Tags::Closebrace + return; + } + case Tags::Date: { + Skip(8); // year-4 m-2 d-2 + if(!CheckTag(Tags::Time, true)) + return; + } + case Tags::Time: { + Skip(6); // h-2 m-2 s-2 + if(CheckTag(Tags::Point, true)) { + // nano seconds 1-9digits + while(current < end && *current >= '0' && *current <= '9') + current++; + } + CheckTag(Tags::UTC, true); + return; + } + case Tags::GUID: { + Skip(38); // '"' + 36 bytes string + '"' + return; + } + case Tags::Class: { + SkipValue(Tags::String); // name + SkipValue(Tags::List); // list + if(!CheckTag(Tags::Object, true)) + return; + } + case Tags::Object: { + SkipValue(); // index + if(CheckTag(Tags::Openbrace, true)) { + do { + SkipValue(); + } while(!CheckTag(Tags::Closebrace, true)); + } + } + default: { + return; + } + } + } + + void PushFieldRefs(std::vector fields) { fieldRefs.emplace_back(std::move(fields)); } + const std::vector& GetFieldRefs(int32_t idx) { return fieldRefs[idx]; } + +protected: + const unsigned char* begin; + const unsigned char* end; + const unsigned char* current; + std::vector> fieldRefs; +}; + +class HproseWriter { +public: + HproseWriter(unsigned char* ptr, size_t len) : begin(ptr), end(ptr + len), current(ptr) {} + void WriteByte(unsigned char byte) { + *current++ = byte; + } + void WriteBytes(unsigned char* bytes, size_t len) { + memcpy(current, bytes, len); + current += len; + } + void WriteTag(Tags tag) { + *current++ = static_cast(tag); + } + void WriteLength(uint32_t length) { + auto len = sprintf((char*)current, "%u", length); + current += len; + } + void WriteInteger(int64_t val) { + auto len = sprintf((char*)current, "%ld", val); + current += len; + } + void WriteDouble(double val) { + auto len = sprintf((char*)current, "%lf", val); + current += len; + } + void WriteString(const std::string& val) { + memcpy(current, val.data(), val.length()); + current += val.length(); + } + + size_t Length() { return current - begin; } + + int32_t GetStructFieldsIndex(const std::string& name) { + auto iter = fieldIds.find(name); + if(iter == fieldIds.end()) + return -1; + return iter->second; + } + + int32_t PushFieldRefs(const std::string& name) { + int32_t index = fieldIds.size(); + fieldIds[name] = index; + return index; + } + +protected: + const unsigned char* begin; + const unsigned char* end; + unsigned char* current; + std::map fieldIds; +}; + +#endif diff --git a/hprose_srv.hpp b/hprose_srv.hpp new file mode 100644 index 0000000..2a8053d --- /dev/null +++ b/hprose_srv.hpp @@ -0,0 +1,168 @@ +#ifndef _HPROSE_SRV_HPP_ +#define _HPROSE_SRV_HPP_ + +#include +#include + +#include "hprose_ed.hpp" +#include "hprose_types.hpp" + +template +struct Invoker { + template + bool operator() (OBJECT* objs, HANDLER&& handler, HproseWriter& w, ARGS&... args) const { + return HproseValueHelper().ToWriterWithResultTag(handler(std::forward(args)...), w); + } +}; + +template<> +struct Invoker { + template + bool operator() (OBJECT* objs, HANDLER&& handler, HproseWriter& w, ARGS&... args) const { + handler(std::forward(args)...); + return HproseValueHelperNull().ToWriterWithResultTag(w); + } +}; + +template +struct InvokerObj { + template + bool operator() (OBJECT* objs, HANDLER&& handler, HproseWriter& w, ARGS&... args) const { + return HproseValueHelper().ToWriterWithResultTag((*objs.*handler)(std::forward(args)...), w); + } +}; + +template<> +struct InvokerObj { + template + bool operator() (OBJECT* objs, HANDLER&& handler, HproseWriter& w, ARGS&... args) const { + (*objs.*handler)(std::forward(args)...); + return HproseValueHelperNull().ToWriterWithResultTag(w); + } +}; + +template +struct HandlerType { + using class_type = void; + using arg_helper = std::tuple<>; + using arg_type = std::tuple<>; + using invoke_type = void; + static constexpr int32_t arg_count = 0; +}; + +template +struct HandlerType> { + using class_type = void; + using arg_type = std::tuple>...>; + using invoke_type = Invoker; + using call_type = std::function; + static constexpr int32_t arg_count = sizeof...(ARGS); +}; + +template +struct HandlerType { + using class_type = CLASS; + using arg_type = std::tuple>...>; + using invoke_type = InvokerObj; + using call_type = RET(CLASS::*)(ARGS...); + static constexpr int32_t arg_count = sizeof...(ARGS); +}; + +template +struct HandlerType { + using class_type = CLASS; + using arg_type = std::tuple>...>; + using invoke_type = InvokerObj; + using call_type = RET(CLASS::*)(ARGS...) const; + static constexpr int32_t arg_count = sizeof...(ARGS); +}; + +template +struct HproseInvoker { + virtual void Call(OBJECT* obj, HproseReader& r, HproseWriter& w) = 0; + virtual ~HproseInvoker() {}; +}; + +template +class HProseCallType : public HproseInvoker::type>::class_type> { +public: + using handler_type = HandlerType::type>; + using args_type = typename handler_type::arg_type; + + HProseCallType(HANDLER h, const std::string& name) : handler(h), method(name) {} + + virtual void Call(typename handler_type::class_type* obj, HproseReader& r, HproseWriter& w) { + args_type args; + if(r.CheckTag(Tags::List, false)) + args = HproseValueHelper().FromReader(r); + bool ref = r.CheckTag(Tags::True, true); + bool has_error = CallImpl(obj, args, w, std::make_index_sequence{}); + if(ref && handler_type::arg_count > 0 && !has_error) { + w.WriteTag(Tags::Argument); + HproseValueHelper().ToWriter(args, w); + } + } + + template + bool CallImpl(typename handler_type::class_type* obj, args_type& args, HproseWriter& w, std::index_sequence) { + return typename handler_type::invoke_type()(obj, handler, w, std::get(args)...); + } + +protected: + HANDLER handler; + std::string method; +}; + +template +std::unique_ptr> MakeHproseCallType(F f, const std::string& name) { + return std::unique_ptr>(new HProseCallType(f, name)); +} + +template +class HproseServer { +public: + template + void Register(const char* name, HANDLER func) { + auto invoker = std::unique_ptr>(MakeHproseCallType(func, name)); + rpc_calls[name] = std::move(invoker); + } + + void HandleRequests(OBJECT* sess, HproseReader& reader, HproseWriter& writer) { + auto tag = reader.ReadTag(); + while(tag == Tags::Call) { + auto method = HproseValueHelperString().FromReader(reader); + auto iter = rpc_calls.find(method); + if(iter == rpc_calls.end()) { + HproseError err(std::move(std::string("Error: \"").append(method).append("\" not found."))); + HproseValueHelperError().ToWriterWithResultTag(err, writer); + reader.SkipValue(); + reader.SkipValue(); + tag = reader.ReadTag(); + continue; + } + iter->second->Call(sess, reader, writer); + tag = reader.ReadTag(); + } + } + + template + void UseConnector(SVC& svc) { + svc.SetObjectAllocator([this]() -> std::shared_ptr { + if(obj_allocator) + return obj_allocator(); + return nullptr; + }); + svc.SetPacketHandler([this](OBJECT* obj, HproseReader& reader, HproseWriter& writer) { + this->HandleRequests(obj, reader, writer); + }); + svc.Start(); + } + + void SetObjectAllocator(const std::function()>& cb) { obj_allocator = cb; } + +protected: + std::function()> obj_allocator; + std::unordered_map>> rpc_calls; +}; + +#endif diff --git a/hprose_tags.hpp b/hprose_tags.hpp new file mode 100644 index 0000000..8c6d6fc --- /dev/null +++ b/hprose_tags.hpp @@ -0,0 +1,56 @@ +#ifndef _HPROSE_TAGS_HPP_ +#define _HPROSE_TAGS_HPP_ + +enum class Tags : unsigned char { + TagError = 0, + // Serialize Type + Integer = 'i', + Long = 'l', + Double = 'd', + Null = 'n', + Empty = 'e', + True = 't', + False = 'f', + NaN = 'N', + Infinity = 'I', + Date = 'D', + Time = 'T', + UTC = 'Z', + Bytes = 'b', + UTF8Char = 'u', + String = 's', + GUID = 'g', + List = 'a', + Map = 'm', + Class = 'c', + Object = 'o', + Ref = 'r', + // Serialize Marks + Pos = '+', + Neg = '-', + Semicolon = ';', + Openbrace = '{', + Closebrace = '}', + Quote = '"', + Point = '.', + // Protocol s + Functions = 'F', + Call = 'C', + Result = 'R', + Argument = 'A', + Error = 'E', + End = 'z', + // Numbers + Num0 = '0', + Num1 = '1', + Num2 = '2', + Num3 = '3', + Num4 = '4', + Num5 = '5', + Num6 = '6', + Num7 = '7', + Num8 = '8', + Num9 = '9', +}; + +#endif diff --git a/hprose_types.hpp b/hprose_types.hpp new file mode 100644 index 0000000..18a3f98 --- /dev/null +++ b/hprose_types.hpp @@ -0,0 +1,696 @@ +#ifndef _HPROSE_TYPES_HPP_ +#define _HPROSE_TYPES_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "hprose_ed.hpp" + +template +struct StdFuncType { using type = void; }; +template +struct StdFuncType { using type = std::function; }; +template +struct StdFuncType { using type = std::function; }; + +template +struct FunctionType { using type = typename StdFuncType::type; }; +template +struct FunctionType { using type = std::function; }; +template +struct FunctionType { using type = RET(CLASS::*)(ARGS...); }; +template +struct FunctionType { using type = RET(CLASS::*)(ARGS...) const; }; + +template +struct IndexOfTypeHelper { + static constexpr int32_t value = std::conditional_t::value, std::integral_constant, IndexOfTypeHelper>::value; +}; +template +struct IndexOfTypeHelper { + static constexpr int32_t value = std::conditional_t::value, std::integral_constant, std::integral_constant>::value; +}; +template +struct IndexOfType { static constexpr int32_t value = IndexOfTypeHelper<0, T, ARGS...>::value; }; +template +struct IndexOfType { static constexpr int32_t value = -1; }; + +class HproseError { +public: + HproseError() : is_null(true) {} + HproseError(const std::string& msg) : is_null(false), errmsg(msg) {} + HproseError(std::string&& msg) : is_null(false), errmsg(std::move(msg)) {} + bool IsNull() const { return is_null; } + const std::string& Message() const { return errmsg; } + operator bool () { return !is_null; } + +protected: + bool is_null; + std::string errmsg; +}; + +struct HproseValueHelperError { + HproseError FromReader(HproseReader& reader) { + reader.CheckTag(Tags::Null, true); + return HproseError(); + } + bool ToWriterWithResultTag(const HproseError& err, HproseWriter& writer) { + if(err.IsNull()) + writer.WriteTag(Tags::Result); + else + writer.WriteTag(Tags::Error); + ToWriter(err, writer); + return !err.IsNull(); + } + void ToWriter(const HproseError& err, HproseWriter& writer) { + if(err.IsNull()) + writer.WriteTag(Tags::Null); + else { + const std::string& errmsg = err.Message(); + writer.WriteTag(Tags::String); + writer.WriteLength(errmsg.length()); + writer.WriteTag(Tags::Quote); + writer.WriteString(errmsg); + writer.WriteTag(Tags::Quote); + } + } +}; + +struct HproseValueHelperNull { + bool ToWriterWithResultTag(HproseWriter& writer) { + writer.WriteTag(Tags::Result); + writer.WriteTag(Tags::Null); + return false; + } + void ToWriter(HproseWriter& writer) { + writer.WriteTag(Tags::Null); + } +}; + +struct HproseValueHelperInteger { + int64_t FromReader(HproseReader& r) { + auto tag = r.ReadTag(); + switch(tag) { + case Tags::Null: return 0; + case Tags::Empty: return 0; + case Tags::Num0: return 0; + case Tags::Num1: return 1; + case Tags::Num2: return 2; + case Tags::Num3: return 3; + case Tags::Num4: return 4; + case Tags::Num5: return 5; + case Tags::Num6: return 6; + case Tags::Num7: return 7; + case Tags::Num8: return 8; + case Tags::Num9: return 9; + case Tags::False: return 0; + case Tags::True: return 1; + case Tags::NaN: return 0; + case Tags::Infinity: { + r.ReadTag(); + return 0; + } + case Tags::Integer: + case Tags::Long: { // golang use big.int to decode Long + return r.ReadInt64Raw(Tags::Semicolon); + } + case Tags::Double: { + return int64_t(strtod(r.ReadUntilAsString(Tags::Semicolon).data(), nullptr)); + } + case Tags::UTF8Char: { + return strtoll(r.ReadUTF8String(1).data(), nullptr, 10); + } + case Tags::String: { + return strtoll(r.ReadUTF8String(r.ReadInt64Raw(Tags::Quote)).data(), nullptr, 10); + } + case Tags::Bytes: { + return strtoll(r.ReadBytesAsString(r.ReadInt64Raw(Tags::Quote)).data(), nullptr, 10); + } + default: { + r.SkipValue(tag); + return 0; + } + } + } + bool ToWriterWithResultTag(int64_t value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(int64_t value, HproseWriter& writer) { + if(value >= 0 && value <= 9) { + writer.WriteByte(value + '0'); + return; + } + if(value >= -0xffffffff && value <= 0xffffffff) + writer.WriteTag(Tags::Integer); + else + writer.WriteTag(Tags::Long); + writer.WriteInteger(value); + writer.WriteTag(Tags::Semicolon); + } +}; + +struct HproseValueHelperBool { + bool FromReader(HproseReader& r) { + auto tag = r.ReadTag(); + switch(tag) { + case Tags::Null: + case Tags::Empty: + case Tags::Num0: return false; + case Tags::Num1: + case Tags::Num2: + case Tags::Num3: + case Tags::Num4: + case Tags::Num5: + case Tags::Num6: + case Tags::Num7: + case Tags::Num8: + case Tags::Num9: return true; + case Tags::False: return false; + case Tags::True: return true; + case Tags::NaN: return false; + case Tags::Infinity: { + r.ReadTag(); + return true; + } + case Tags::Integer: + case Tags::Long: { + return r.ReadInt64Raw(Tags::Semicolon) != 0; + } + case Tags::Double: { + return strtod(r.ReadUntilAsString(Tags::Semicolon).data(), nullptr) != 0.0; + } + case Tags::UTF8Char: { + return r.ReadUTF8String(1) != "0"; + } + case Tags::String: { + return CheckString(r.ReadUTF8String(r.ReadInt64Raw(Tags::Quote))); + } + case Tags::Bytes: { + return CheckString(r.ReadBytesAsString(r.ReadInt64Raw(Tags::Quote))); + } + default: { + r.SkipValue(tag); + return false; + } + } + } + bool CheckString(const std::string& str) { + return str != ""; + } + bool ToWriterWithResultTag(bool value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(bool value, HproseWriter& writer) { + writer.WriteTag(value ? Tags::True : Tags::False); + } +}; + +struct HproseValueHelperDouble { + double FromReader(HproseReader& r) { + auto tag = r.ReadTag(); + switch(tag) { + case Tags::Null: return 0.0; + case Tags::Empty: return 0.0; + case Tags::Num0: return 0.0; + case Tags::Num1: return 1.0; + case Tags::Num2: return 2.0; + case Tags::Num3: return 3.0; + case Tags::Num4: return 4.0; + case Tags::Num5: return 5.0; + case Tags::Num6: return 6.0; + case Tags::Num7: return 7.0; + case Tags::Num8: return 8.0; + case Tags::Num9: return 9.0; + case Tags::False: return 0.0; + case Tags::True: return 1.0; + case Tags::NaN: return std::numeric_limits::quiet_NaN(); + case Tags::Infinity: { + auto t = r.ReadTag(); + return (t == Tags::Pos) ? std::numeric_limits::infinity() : -std::numeric_limits::infinity(); + } + case Tags::Integer: + case Tags::Long: { + return double(r.ReadInt64Raw(Tags::Semicolon)); + } + case Tags::Double: { + return strtod(r.ReadUntilAsString(Tags::Semicolon).data(), nullptr); + } + case Tags::UTF8Char: { + return strtod(r.ReadUTF8String(1).data(), nullptr); + } + case Tags::String: { + return strtod(r.ReadUTF8String(r.ReadInt64Raw(Tags::Quote)).data(), nullptr); + } + case Tags::Bytes: { + return strtod(r.ReadBytesAsString(r.ReadInt64Raw(Tags::Quote)).data(), nullptr); + } + default: { + r.SkipValue(tag); + return 0.0; + } + } + } + bool ToWriterWithResultTag(double value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(double value, HproseWriter& writer) { + if(std::isnan(value)) { + writer.WriteTag(Tags::NaN); + return; + } + if(std::isinf(value)) { + writer.WriteTag(Tags::Infinity); + writer.WriteTag(value > 0 ? Tags::Pos : Tags::Neg); + return; + } + writer.WriteTag(Tags::Double); + writer.WriteDouble(value); + writer.WriteTag(Tags::Semicolon); + } +}; + +struct HproseValueHelperString { + std::string FromReader(HproseReader& r) { + auto tag = r.ReadTag(); + switch(tag) { + case Tags::Null: return ""; + case Tags::Empty: return ""; + case Tags::Num0: return "0"; + case Tags::Num1: return "1"; + case Tags::Num2: return "2"; + case Tags::Num3: return "3"; + case Tags::Num4: return "4"; + case Tags::Num5: return "5"; + case Tags::Num6: return "6"; + case Tags::Num7: return "7"; + case Tags::Num8: return "8"; + case Tags::Num9: return "9"; + case Tags::False: return "false"; + case Tags::True: return "true"; + case Tags::NaN: return "nan"; + case Tags::Infinity: { + auto t = r.ReadTag(); + return (t == Tags::Pos) ? "+inf" : "-inf"; + } + case Tags::Integer: + case Tags::Long: + case Tags::Double: { + return r.ReadUntilAsString(Tags::Semicolon); + } + case Tags::UTF8Char: { + return r.ReadUTF8String(1); + } + case Tags::String: { + return r.ReadUTF8String(r.ReadInt64Raw(Tags::Quote)); + } + case Tags::Bytes: { + return r.ReadBytesAsString(r.ReadInt64Raw(Tags::Quote)); + } + default: { + r.SkipValue(tag); + return ""; + } + } + } + bool ToWriterWithResultTag(const std::string& value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(const std::string& value, HproseWriter& writer) { + if(value.empty()) { + writer.WriteTag(Tags::Empty); + return; + } + if(value.length() == 1) { + writer.WriteTag(Tags::UTF8Char); + writer.WriteByte(value[0]); + return; + } + writer.WriteTag(Tags::String); + writer.WriteLength(value.length()); + writer.WriteTag(Tags::Quote); + writer.WriteString(value); + writer.WriteTag(Tags::Quote); + } + + std::string FromReaderRaw(HproseReader& r) { + return r.ReadUTF8String(r.ReadInt64Raw(Tags::Quote)); + } + + void ToWriterRaw(const std::string& value, HproseWriter& writer, bool with_tag) { + if(with_tag) + writer.WriteTag(Tags::String); + writer.WriteLength(value.length()); + writer.WriteTag(Tags::Quote); + writer.WriteString(value); + writer.WriteTag(Tags::Quote); + } + +}; + +template +struct HproseValueHelperStruct { + T FromReader(HproseReader& reader) { + T val; + auto tag = reader.ReadTag(); + switch(tag) { + case Tags::Class: { + HproseValueHelperString().FromReaderRaw(reader); + int32_t count = reader.ReadInt64Raw(Tags::Openbrace); + std::vector fields; + for(int32_t i = 0; i < count; ++i) + fields.emplace_back(HproseValueHelperString().FromReader(reader)); + reader.PushFieldRefs(std::move(fields)); + reader.CheckTag(Tags::Closebrace, true); + reader.CheckTag(Tags::Object, true); + } + case Tags::Object: { + int32_t index = reader.ReadInt64Raw(Tags::Openbrace); + auto& fields = reader.GetFieldRefs(index); + auto& refmap = val.GetFieldMaps(); + for(size_t i = 0; i < fields.size(); ++i) + refmap.ReadField((uintptr_t)&val, fields[i], reader); + reader.CheckTag(Tags::Closebrace, true); + return val; + } + default: { + reader.SkipValue(tag); + return val; + } + } + } + bool ToWriterWithResultTag(const T& val, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + val.GetFieldMaps().ToWriter((uintptr_t)&val, writer); + return false; + } + void ToWriter(const T& val, HproseWriter& writer) { + val.GetFieldMaps().ToWriter((uintptr_t)&val, writer); + } +}; + +template struct HproseValueHelperImpl { using type = HproseValueHelperStruct; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperInteger; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperError; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperBool; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperDouble; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperDouble; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperString; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperString; }; +template<> struct HproseValueHelperImpl { using type = HproseValueHelperString; }; + +template +using HproseValueHelper = typename HproseValueHelperImpl>>::type; + +template +struct HproseValueHelperSlice { + std::vector FromReader(HproseReader& reader) { + std::vector ret; + auto tag = reader.ReadTag(); + switch(tag) { + case Tags::List: { + HproseValueHelper vh; + int64_t count = reader.ReadInt64Raw(Tags::Openbrace); + for(int64_t i = 0; i < count; ++i) + ret.push_back(vh.FromReader(reader)); + reader.CheckTag(Tags::Closebrace, true); + return ret; + } + default: { + reader.SkipValue(tag); + return ret; + } + } + } + bool ToWriterWithResultTag(const std::vector& value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(const std::vector& values, HproseWriter& writer) { + HproseValueHelper vh; + writer.WriteTag(Tags::List); + writer.WriteLength(values.size()); + writer.WriteTag(Tags::Openbrace); + for(auto& v : values) + vh.ToWriter(v, writer); + writer.WriteTag(Tags::Closebrace); + } +}; + +template +struct HproseValueHelperMap { + MAP FromReader(HproseReader& reader) { + MAP ret; + auto tag = reader.ReadTag(); + switch(tag) { + case Tags::Map: { + HproseValueHelper kvh; + HproseValueHelper vvh; + int64_t count = reader.ReadInt64Raw(Tags::Openbrace); + for(int64_t i = 0; i < count; ++i) { + auto k = kvh.FromReader(reader); + auto v = vvh.FromReader(reader); + ret[k] = v; + } + reader.CheckTag(Tags::Closebrace, true); + return ret; + } + default: { + reader.SkipValue(tag); + return ret; + } + } + } + bool ToWriterWithResultTag(const MAP& value, HproseWriter& writer) { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + return false; + } + void ToWriter(const MAP& value, HproseWriter& writer) { + HproseValueHelper kvh; + HproseValueHelper vvh; + writer.WriteTag(Tags::Map); + writer.WriteLength(value.size()); + writer.WriteTag(Tags::Openbrace); + for(auto& iter : value) { + kvh.ToWriter(iter.first, writer); + vvh.ToWriter(iter.second, writer); + } + writer.WriteTag(Tags::Closebrace); + } +}; + +template +struct ErrorChecker { + template + bool IsError(T& val) { + return !std::get(val).IsNull(); + } + template + void WriteError(T& val, HproseWriter& writer) { + HproseValueHelperError().ToWriterWithResultTag(std::get(val), writer); + } +}; + +template<> +struct ErrorChecker<-1> { + template + bool IsError(T& val) { return false; } + template + void WriteError(T& val, HproseWriter& writer) {} +}; + +template +struct HproseValueHelperTuple { + using helper_type = std::tuple...>; + + std::tuple FromReader(HproseReader& reader) { + std::tuple ret; + auto tag = reader.ReadTag(); + switch(tag) { + case Tags::List: { + helper_type vhs; + int64_t count = reader.ReadInt64Raw(Tags::Openbrace); + FromReaderImpl(ret, vhs, reader, count, std::make_index_sequence{}); + for(int32_t i = sizeof...(ARGS); i < count; ++i) + reader.SkipValue(); + reader.CheckTag(Tags::Closebrace, true); + return ret; + } + default: { + reader.SkipValue(tag); + return ret; + } + } + } + void FromReaderImpl(std::tuple& value, helper_type& vhs, HproseReader& reader, int64_t count, std::index_sequence<>) {} + + template + void FromReaderImpl(std::tuple& value, helper_type& vhs, HproseReader& reader, int64_t count, std::index_sequence) { + if(I < (size_t)count) { + std::get(value) = std::get(vhs).FromReader(reader); + FromReaderImpl(value, vhs, reader, count, std::index_sequence{}); + } + } + + bool ToWriterWithResultTag(const std::tuple& value, HproseWriter& writer) { + ErrorChecker::value> checker; + bool has_error = checker.IsError(value); + if(has_error) { + checker.WriteError(value, writer); + } else { + writer.WriteTag(Tags::Result); + ToWriter(value, writer); + } + return has_error; + } + + void ToWriter(const std::tuple& value, HproseWriter& writer) { + helper_type vhs; + writer.WriteTag(Tags::List); + writer.WriteLength(sizeof...(ARGS)); + writer.WriteTag(Tags::Openbrace); + ToWriterImpl(value, vhs, writer, std::make_index_sequence{}); + writer.WriteTag(Tags::Closebrace); + } + + void ToWriterImpl(const std::tuple& value, helper_type& vhs, HproseWriter& writer, std::index_sequence<>) {} + + template + void ToWriterImpl(const std::tuple& value, helper_type& vhs, HproseWriter& writer, std::index_sequence) { + std::get(vhs).ToWriter(std::get(value), writer); + ToWriterImpl(value, vhs, writer, std::index_sequence{}); + } +}; + +template +struct HproseValueHelperImpl> { using type = HproseValueHelperSlice; }; +template +struct HproseValueHelperImpl> { using type = HproseValueHelperMap>; }; +template +struct HproseValueHelperImpl> { using type = HproseValueHelperMap>; }; +template +struct HproseValueHelperImpl> { using type = HproseValueHelperTuple; }; + +struct HproseReflector { + virtual void FromReader(uintptr_t addr, HproseReader& r) = 0; + virtual void ToWriter(uintptr_t addr, HproseWriter& w) = 0; + virtual const std::string& GetName() = 0; + virtual ~HproseReflector() {} +}; + +template +struct HproseReflectType : public HproseReflector { + HproseReflectType(uintptr_t off, const std::string& n) : offset(off), name(n) {} + virtual void FromReader(uintptr_t addr, HproseReader& r) { + T* ptr = (T*)(addr + offset); + *ptr = HproseValueHelper().FromReader(r); + } + virtual void ToWriter(uintptr_t addr, HproseWriter& w) { + T* ptr = (T*)(addr + offset); + HproseValueHelper().ToWriter(*ptr, w); + } + virtual const std::string& GetName() { + return name; + } + uintptr_t offset; + std::string name; +}; + +struct HproseReflectMap { + bool IsInited() { return inited; } + + void Init(const std::string& name) { + struct_name = name; + inited = true; + } + + template + void RegisterField(T* t, const std::string& fname) { + auto reflector = new HproseReflectType((uintptr_t)t, fname); + fields[fname] = std::unique_ptr(reflector); + } + + void ReadField(uintptr_t struct_ptr, const std::string& name, HproseReader& reader) { + auto iter = fields.find(name); + if(iter == fields.end()) { + reader.SkipValue(); + return; + } + iter->second->FromReader(struct_ptr, reader); + } + + void ToWriter(uintptr_t struct_ptr, HproseWriter& writer) const { + auto idx = writer.GetStructFieldsIndex(struct_name); + if(idx == -1) { + idx = writer.PushFieldRefs(struct_name); + WriteFields(writer); + } + writer.WriteTag(Tags::Object); + HproseValueHelperInteger().ToWriter(idx, writer); + writer.WriteTag(Tags::Openbrace); + for(auto& iter : fields) + iter.second->ToWriter(struct_ptr, writer); + writer.WriteTag(Tags::Closebrace); + } + + void WriteFields(HproseWriter& writer) const { + writer.WriteTag(Tags::Class); + HproseValueHelperString().ToWriterRaw(struct_name, writer, false); + HproseValueHelperInteger().ToWriter(fields.size(), writer); + writer.WriteTag(Tags::Openbrace); + for(auto& iter : fields) + HproseValueHelperString().ToWriterRaw(iter.first, writer, true); + writer.WriteTag(Tags::Closebrace); + } + + bool inited = false; + std::string struct_name; + std::map> fields; +}; + +#define FE_1(FUN, FIELD) FUN(FIELD) +#define FE_2(FUN, FIELD, ...) FUN(FIELD)FE_1(FUN, __VA_ARGS__) +#define FE_3(FUN, FIELD, ...) FUN(FIELD)FE_2(FUN, __VA_ARGS__) +#define FE_4(FUN, FIELD, ...) FUN(FIELD)FE_3(FUN, __VA_ARGS__) +#define FE_5(FUN, FIELD, ...) FUN(FIELD)FE_4(FUN, __VA_ARGS__) +#define FE_6(FUN, FIELD, ...) FUN(FIELD)FE_5(FUN, __VA_ARGS__) +#define FE_7(FUN, FIELD, ...) FUN(FIELD)FE_6(FUN, __VA_ARGS__) +#define FE_8(FUN, FIELD, ...) FUN(FIELD)FE_7(FUN, __VA_ARGS__) + +#define GET_MACRO(_1,_2,_3,_4,_5,_6,_7,_8,NAME,...) NAME +#define FOR_EACH(action,...) GET_MACRO(__VA_ARGS__,FE_8,FE_7,FE_6,FE_5,FE_4,FE_3,FE_2,FE_1)(action,__VA_ARGS__) + +#define PUSH_FIELD(FIELD) ref_map.RegisterField(&ptr->FIELD, #FIELD); + +#define HPROSE_REFLECT(struct_type, ...) \ + public:\ + static HproseReflectMap& GetFieldMaps() {\ + static HproseReflectMap ref_map;\ + if(ref_map.IsInited())\ + return ref_map;\ + static std::atomic inited(false);\ + bool expected = false;\ + if(inited.compare_exchange_strong(expected, true)) {\ + struct_type* ptr = nullptr;\ + FOR_EACH(PUSH_FIELD, __VA_ARGS__)\ + ref_map.Init(#struct_type);\ + } else {\ + while(!ref_map.IsInited());\ + }\ + return ref_map;\ + } + +#endif diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..54b0b5a --- /dev/null +++ b/main.cpp @@ -0,0 +1,116 @@ +#include +#include + +#include +#include "timer.hpp" +#include "tcp.hpp" + +#include "hprose_types.hpp" +#include "hprose_srv.hpp" +#include "hprose_cli.hpp" +#include "hprose_conn.hpp" +#include "worker.hpp" + +struct PushData { + int result; + int uid; + std::vector stars; + HPROSE_REFLECT(PushData, result, uid, stars) +}; + +struct HandlerObject { + std::string Hello(std::string& str) { + return std::string("Hello ").append(str).append("!"); + } + + std::tuple Test(const std::tuple& a, int b) { + std::cout << "Test called." << std::endl; + return std::make_tuple(std::get<0>(a) * b, std::get<1>(a) * b, HproseError("Error!!!")); + } + + std::tuple Swap(int& a, int& b) { + a += 100; + b += 123; + return std::make_tuple(a, b, HproseError()); + } + + std::map Maptest(const std::unordered_map& m) { + std::map ret; + for(auto& iter : m) { + ret[iter.first] = iter.second + 100; + } + return ret; + } + + PushData Pushs(PushData& data) { + return data; + } + +}; + +int main() { + std::thread t1([]() { + TimerdWorker worker; + HproseServerConnectorTCP connector(&worker, 12345); + HproseServer srv; + auto obj = std::make_shared(); + srv.SetObjectAllocator([&obj]() -> auto { + return obj; + }); + srv.UseConnector(connector); + srv.Register("hello", &HandlerObject::Hello); + srv.Register("test", &HandlerObject::Test); + srv.Register("swap", &HandlerObject::Swap); + srv.Register("maptest", &HandlerObject::Maptest); + srv.Register("Pushs", &HandlerObject::Pushs); + asio::signal_set signals(worker, SIGINT, SIGTERM); + signals.async_wait([&worker](const asio::error_code& error, int signal_number) { + std::cout << "Interrupted." << std::endl; + worker.stop(); + }); + worker.run(); + }); + // std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // std::thread t2([]() { + // TimerdWorker worker; + // BufferMgr bufmgr; + // HproseClientConnectorTCP connector(&worker, &bufmgr, "127.0.0.1", 12345); + // HproseClient client; + // client.UseConnector(connector); + // client.MultiCall("hello").PushArgs("world").Expect([](std::string& ret) { + // std::cout << ret << std::endl; + // }).Catch([](const std::string& msg) { + // std::cout << msg << std::endl; + // }); + // client.MultiCall("test").PushArgs(std::make_tuple(123, 678), 10).Expect([](int a, int b, HproseError err) { + // std::cout << a << " " << b << " " << err.IsNull() << std::endl; + // }).Catch([](const std::string& msg) { + // std::cout << msg << std::endl; + // }); + // int a = 100; + // int b = 200; + // client.MultiCall("swap").PushRefArgs(a, b).Expect([&a, &b]() { + // std::cout << a << b << std::endl; + // }).Catch([](const std::string& msg) { + // std::cout << msg << std::endl; + // }); + // std::map mp{{1, 100}, {2, 400}, {3, 900}}; + // client.MultiCall("maptest").PushArgs(mp).Expect([](const std::map& ret) { + // for(auto& iter : ret) { + // std::cout << "===" << iter.first << iter.second << std::endl; + // } + // }).Catch([](const std::string& msg) { + // std::cout << msg << std::endl; + // }); + // client.MultiApply(); + // client.Call("sttest").PushArgs(testst{1, "hello", 3.0, innst{1, 2}}).Expect([](testst ret) { + // std::cout << ret.a << ret.b << ret.c << std::endl; + // }).Catch([](const std::string& msg) { + // std::cout << msg << std::endl; + // }); + // worker.run(); + // }); + t1.join(); + // t2.join(); + return 0; +} diff --git a/makefile b/makefile new file mode 100644 index 0000000..ca1cc46 --- /dev/null +++ b/makefile @@ -0,0 +1,28 @@ + +INCLUDE_PATH=./include +LIBRARY_PATH=./lib +LIBS=pthread +CFLAGS=-Wall +CXXFLAGS=--std=c++14 -Wall -O2 -g + +cobjs = temp/bit.o +cppobjs = temp/main.o + +.PHONY: all prep clean +all: prep main + +prep: + @mkdir -p temp + +main: $(cppobjs) + $(CXX) $(cppobjs) -L$(LIBRARY_PATH) -l$(LIBS) -o main + +temp/bit.o: bit.c + $(CC) $(CFLAGS) $< -I$(INCLUDE_PATH) -c -o $@ + +temp/main.o: main.cpp hprose_tags.hpp hprose_srv.hpp hprose_cli.hpp hprose_conn.hpp hprose_ed.hpp hprose_types.hpp tcp.hpp udp.hpp worker.hpp timer.hpp buffers.hpp objpool.hpp + $(CXX) $(CXXFLAGS) $< -I$(INCLUDE_PATH) -c -o $@ + +clean: + @rm -f main + @rm -rf temp diff --git a/objpool.hpp b/objpool.hpp new file mode 100644 index 0000000..a4dcf5d --- /dev/null +++ b/objpool.hpp @@ -0,0 +1,75 @@ +#ifndef _OBJECT_POOL_HPP_ +#define _OBJECT_POOL_HPP_ + +#include +#include + +template +class ObjectPool { +protected: + struct ObjectWrapper { + OBJECT real_obj; + ObjectWrapper* next; + }; +public: + ~ObjectPool() { + for(auto& block : mem_blocks) { + delete[] block; + } + } + + template + OBJECT* Alloc(ARGS... args) { + std::lock_guard locker(mutex); + if(free_blocks == nullptr) { + auto mem_block = new uint8_t[sizeof(ObjectWrapper) * alloc_size]; + mem_blocks.push_back(mem_block); + ObjectWrapper* first = reinterpret_cast(mem_block); + free_blocks = first + 1; + free_blocks->next = reinterpret_cast(alloc_size - 2); + alloc_size *= 2; + if(alloc_size > 0x10000) + alloc_size = 0x10000; + return new(&first->real_obj) OBJECT(std::forward(args)...); + } else { + OBJECT* object = nullptr; + uintptr_t left_count = reinterpret_cast(free_blocks->next); + if(left_count <= 0x10000 && left_count > 0) { + object = &free_blocks->real_obj; + free_blocks++; + free_blocks->next = reinterpret_cast(left_count - 1); + } else { + object = &free_blocks->real_obj; + if(left_count < 1) + free_blocks = nullptr; + else + free_blocks = free_blocks->next; + } + return new(object) OBJECT(std::forward(args)...); + } + } + + void Recycle(OBJECT* object) { + std::lock_guard locker(mutex); + object->~OBJECT(); + ObjectWrapper* wrapper = reinterpret_cast(object); + wrapper->next = free_blocks; + free_blocks = wrapper; + } + +protected: + uint32_t alloc_size = INIT_ALLOC_SIZE; + ObjectWrapper* free_blocks = nullptr; + std::list mem_blocks; + MUTEX_TYPE mutex; +}; + +struct LockLess { + void lock() {} + void unlock() {} +}; + +template +using ObjectPoolNoLock = ObjectPool; + +#endif diff --git a/tcp.hpp b/tcp.hpp new file mode 100644 index 0000000..54bcfb1 --- /dev/null +++ b/tcp.hpp @@ -0,0 +1,143 @@ +#ifndef _TCP_SERVER_HPP_ +#define _TCP_SERVER_HPP_ + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include + +#include + +#include "buffers.hpp" + +using asio::ip::tcp; + +template +using tcp_cb_connected = std::function(tcp::socket&)>; +template +using tcp_cb_received = std::function; +template +using tcp_cb_disconnected = std::function; + +template +class TCPSession : public std::enable_shared_from_this> { +public: + TCPSession(tcp::socket sock) : socket(std::move(sock)) { + static_assert(std::is_base_of, SessionType>::value , "SessionType should derive from TCPSession"); + } + + void ReadData() { + socket.async_read_some(asio::buffer(read_buffer, 1536), [this, self = this->shared_from_this()](std::error_code ec, std::size_t length) { + if (!ec) { + if(received) + received(static_cast(this), read_buffer, length); + ReadData(); + } else { + if(disconnected) + disconnected(static_cast(this)); + } + }); + } + + void SendData(StaticBuffer&& buffer, size_t length) { + auto abuf = asio::buffer(buffer.Data(), length); + socket.async_write_some(abuf, [buf = std::move(buffer)](std::error_code ec, size_t length) {}); + } + + void Close() { + socket.shutdown(tcp::socket::shutdown_both); + socket.close(); + } + + tcp::socket& UnderlyingSocket() { return socket; } + + SessionType& SetReceivedCallback(const tcp_cb_received& cb) { received = cb; return static_cast(*this);} + SessionType& SetDisconnectedCallback(const tcp_cb_disconnected& cb) { disconnected = cb; return static_cast(*this); } + +private: + tcp::socket socket; + tcp_cb_received received; + tcp_cb_disconnected disconnected; + unsigned char read_buffer[1536]; +}; + +template +class TCPServer : public std::enable_shared_from_this> { +public: + TCPServer(asio::io_context& ctx, short port) : acceptor(ctx, tcp::endpoint(tcp::v4(), port)) {} + + void Start() { + if(started) + return; + started = true; + _Accept(); + } + + void Stop() { + if(!started) + return; + started = false; + acceptor.close(); + } + + TCPServer& SetConnectedCallback(const tcp_cb_connected& cb) { connected = cb; return *this; } + +protected: + void _Accept() { + acceptor.async_accept([this, self = this->shared_from_this()](std::error_code ec, tcp::socket socket) { + if (!ec) { + if(connected) { + auto session = connected(socket); + if(session != nullptr) + session->ReadData(); + } + } + if(started) + _Accept(); + }); + } + + bool started = false; + tcp_cb_connected connected; + tcp::acceptor acceptor; +}; + +using connect_fail_cb = std::function; + +template +class TCPClient : public std::enable_shared_from_this> { +public: + TCPClient(asio::io_context& ctx) : socket(ctx) {} + void Connect(const char* ipaddr, short port, uint32_t timeout_time, connect_fail_cb cb = nullptr) { + tcp::endpoint endpoint(asio::ip::make_address_v4(ipaddr), port); + auto timeout = std::make_unique(socket.get_io_context(), asio::chrono::milliseconds(timeout_time)); + timeout->async_wait([this, cb](std::error_code ec) { + if(!ec) { + if(cb) + cb(ec.message()); + socket.cancel(); + } + }); + socket.async_connect(endpoint, [this, cb, timeout = std::move(timeout), self = this->shared_from_this()](std::error_code ec) { + if (!ec) { + if(connected) { + auto session = connected(socket); + if(session != nullptr) + session->ReadData(); + } + } else { + if(cb) + cb(ec.message()); + } + timeout->cancel(); + }); + } + + TCPClient& SetConnectedCallback(tcp_cb_connected cb) { connected = cb; return *this; } + +protected: + tcp::socket socket; + tcp_cb_connected connected; +}; + +#endif diff --git a/timer.hpp b/timer.hpp new file mode 100644 index 0000000..eb75c39 --- /dev/null +++ b/timer.hpp @@ -0,0 +1,46 @@ +#ifndef _TIMER_HPP_ +#define _TIMER_HPP_ + +#include + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include + +using timer_callback = std::function; + +class Timer : public std::enable_shared_from_this { +public: + Timer(asio::io_context& ctx, const timer_callback& cb, uint32_t wait, uint32_t interval) : + callback(cb), + interval(interval), + timer(ctx, asio::chrono::milliseconds(wait)) {} + + void Begin() { + timer.async_wait([this, self = shared_from_this()](const asio::error_code& ec) { + if(ec) + return; + if(callback) + callback(); + if(interval > 0) + std::make_shared(timer.get_io_context(), callback, interval, interval)->Begin(); + }); + } + void Cancel() { + timer.cancel(); + } + +protected: + timer_callback callback; + uint32_t interval; + asio::steady_timer timer; +}; + + +class TimerService { +public: + virtual std::weak_ptr AddTimer(const timer_callback& cb, uint32_t wait, uint32_t interval) = 0; +}; + +#endif diff --git a/udp.hpp b/udp.hpp new file mode 100644 index 0000000..16b26e1 --- /dev/null +++ b/udp.hpp @@ -0,0 +1,73 @@ +#ifndef _UDP_HPP_ +#define _UDP_HPP_ + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include + +#include + +#include "buffers.hpp" + +using asio::ip::udp; + +using udp_receiver = std::function; +using udp_sender = std::function; +using udp_cb_received = std::function; + +class UDPPeer : public std::enable_shared_from_this { +public: + UDPPeer(asio::io_context& ctx, short port) : socket(ctx, udp::endpoint(udp::v4(), port)) {} + UDPPeer(asio::io_context& ctx) : socket(ctx, udp::v4()) {} + + void StartReceive() { + if(started) + return; + started = true; + _ReceiveFrom(); + } + + void StopReceive() { + if(!started) + return; + started = false; + socket.cancel(); + } + + void SendTo(const char* addr, uint16_t port, StaticBuffer& buf, size_t length) { + udp::endpoint remote_peer(asio::ip::make_address_v4(addr), port); + auto abuf = asio::buffer(buf.Data(), length); + socket.async_send_to(abuf, remote_peer, [obuf = std::move(buf)](std::error_code ec, size_t){}); + } + + UDPPeer& SetReceivedCallback(udp_cb_received cb) { callback = cb; return *this; } + +protected: + void _ReceiveFrom() { + socket.async_wait(udp::socket::wait_read, [this, self = this->shared_from_this()](std::error_code ec){ + if(!ec) { + size_t reserve_size = socket.available(); + if(callback) { + auto receiver = [this](StaticBuffer& buf, size_t length) -> size_t { + return socket.receive_from(asio::buffer(buf.Data(), length), remote); + }; + auto sender = [this](StaticBuffer& buf, size_t length) { + auto abuf = asio::buffer(buf.Data(), length); + socket.async_send_to(abuf, remote, [obuf = std::move(buf)](std::error_code ec, size_t){}); + }; + callback(receiver, sender, reserve_size); + } + } + if(started) + _ReceiveFrom(); + }); + } + + bool started = false; + udp_cb_received callback; + udp::socket socket; + udp::endpoint remote; +}; + +#endif diff --git a/worker.hpp b/worker.hpp new file mode 100644 index 0000000..cd5353f --- /dev/null +++ b/worker.hpp @@ -0,0 +1,24 @@ +#ifndef _WORKER_HPP_ +#define _WORKER_HPP_ + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include + +#include "timer.hpp" + +class Worker : public asio::io_context { + +}; + +class TimerdWorker : public Worker , public TimerService { +public: + std::weak_ptr AddTimer(const timer_callback& cb, uint32_t wait, uint32_t interval) { + auto timer = std::make_shared(*this, cb, wait, interval); + timer->Begin(); + return std::weak_ptr(timer); + } +}; + +#endif