Files
my-chat_-server/transmite/source/transmite_server.hpp
2025-10-13 18:34:48 +08:00

235 lines
10 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//实现语音识别子服务
#include <brpc/server.h>
#include <butil/logging.h>
#include "etcd.hpp" // 服务注册模块封装
#include "logger.hpp" // 日志模块封装
#include "rabbitmq.hpp"
#include "channel.hpp"
#include "utils.hpp"
#include "mysql_chat_session_member.hpp"
#include "base.pb.h" // protobuf框架代码
#include "user.pb.h" // protobuf框架代码
#include "transmite.pb.h" // protobuf框架代码
namespace bite_im{
class TransmiteServiceImpl : public bite_im::MsgTransmitService {
public:
TransmiteServiceImpl(const std::string &user_service_name,
const ServiceManager::ptr &channels,
const std::shared_ptr<odb::core::database> &mysql_client,
const std::string &exchange_name,
const std::string &routing_key,
const MQClient::ptr &mq_client):
_user_service_name(user_service_name),
_mm_channels(channels),
_mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)),
_exchange_name(exchange_name),
_routing_key(routing_key),
_mq_client(mq_client){}
~TransmiteServiceImpl(){}
void GetTransmitTarget(google::protobuf::RpcController* controller,
const ::bite_im::NewMessageReq* request,
::bite_im::GetTransmitTargetRsp* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard rpc_guard(done);
auto err_response = [this, response](const std::string &rid,
const std::string &errmsg) -> void {
response->set_request_id(rid);
response->set_success(false);
response->set_errmsg(errmsg);
return;
};
//从请求中获取关键信息用户ID所属会话ID消息内容
std::string rid = request->request_id();
std::string uid = request->user_id();
std::string chat_ssid = request->chat_session_id();
const MessageContent &content = request->message();
// 进行消息组织:发送者-用户子服务获取信息所属会话消息内容产生时间消息ID
auto channel = _mm_channels->choose(_user_service_name);
if (!channel) {
LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);
return err_response(rid, "没有可供访问的用户子服务节点!");
}
UserService_Stub stub(channel.get());
GetUserInfoReq req;
GetUserInfoRsp rsp;
req.set_request_id(rid);
req.set_user_id(uid);
brpc::Controller cntl;
stub.GetUserInfo(&cntl, &req, &rsp, nullptr);
if (cntl.Failed() == true || rsp.success() == false) {
LOG_ERROR("{} - 用户子服务调用失败:{}", request->request_id(), cntl.ErrorText());
return err_response(request->request_id(), "用户子服务调用失败!");
}
MessageInfo message;
message.set_message_id(uuid());
message.set_chat_session_id(chat_ssid);
message.set_timestamp(time(nullptr));
message.mutable_sender()->CopyFrom(rsp.user_info());
message.mutable_message()->CopyFrom(content);
// 获取消息转发客户端用户列表
auto target_list = _mysql_session_member_table->members(chat_ssid);
// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化
bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);
if (ret == false) {
LOG_ERROR("{} - 持久化消息发布失败:{}", request->request_id(), cntl.ErrorText());
return err_response(request->request_id(), "持久化消息发布失败:!");
}
//组织响应
response->set_request_id(rid);
response->set_success(true);
response->mutable_message()->CopyFrom(message);
for (const auto &id : target_list) {
response->add_target_id_list(id);
}
}
private:
//用户子服务调用相关信息
std::string _user_service_name;
ServiceManager::ptr _mm_channels;
//聊天会话成员表的操作句柄
ChatSessionMemeberTable::ptr _mysql_session_member_table;
//消息队列客户端句柄
std::string _exchange_name;
std::string _routing_key;
MQClient::ptr _mq_client;
};
class TransmiteServer {
public:
using ptr = std::shared_ptr<TransmiteServer>;
TransmiteServer(
const std::shared_ptr<odb::core::database> &mysql_client,
const Discovery::ptr discovery_client,
const Registry::ptr &registry_client,
const std::shared_ptr<brpc::Server> &server):
_service_discoverer(discovery_client),
_registry_client(registry_client),
_mysql_client(mysql_client),
_rpc_server(server){}
~TransmiteServer(){}
//搭建RPC服务器并启动服务器
void start() {
_rpc_server->RunUntilAskedToQuit();
}
private:
Discovery::ptr _service_discoverer; //服务发现客户端
Registry::ptr _registry_client; // 服务注册客户端
std::shared_ptr<odb::core::database> _mysql_client; //mysql数据库客户端
std::shared_ptr<brpc::Server> _rpc_server;
};
class TransmiteServerBuilder {
public:
//构造mysql客户端对象
void make_mysql_object(
const std::string &user,
const std::string &pswd,
const std::string &host,
const std::string &db,
const std::string &cset,
int port,
int conn_pool_count) {
_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);
}
//用于构造服务发现客户端&信道管理对象
void make_discovery_object(const std::string &reg_host,
const std::string &base_service_name,
const std::string &user_service_name) {
_user_service_name = user_service_name;
_mm_channels = std::make_shared<ServiceManager>();
_mm_channels->declared(user_service_name);
LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);
auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);
auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);
_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);
}
//用于构造服务注册客户端对象
void make_registry_object(const std::string &reg_host,
const std::string &service_name,
const std::string &access_host) {
_registry_client = std::make_shared<Registry>(reg_host);
_registry_client->registry(service_name, access_host);
}
//用于构造rabbitmq客户端对象
void make_mq_object(const std::string &user,
const std::string &passwd,
const std::string &host,
const std::string &exchange_name,
const std::string &queue_name,
const std::string &binding_key) {
_routing_key = binding_key;
_exchange_name = exchange_name;
_mq_client = std::make_shared<MQClient>(user, passwd, host);
_mq_client->declareComponents(exchange_name, queue_name, binding_key);
}
//构造RPC服务器对象
void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {
if (!_mq_client) {
LOG_ERROR("还未初始化消息队列客户端模块!");
abort();
}
if (!_mm_channels) {
LOG_ERROR("还未初始化信道管理模块!");
abort();
}
if (!_mysql_client) {
LOG_ERROR("还未初始化Mysql数据库模块");
abort();
}
_rpc_server = std::make_shared<brpc::Server>();
TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(
_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);
int ret = _rpc_server->AddService(transmite_service,
brpc::ServiceOwnership::SERVER_OWNS_SERVICE);
if (ret == -1) {
LOG_ERROR("添加Rpc服务失败");
abort();
}
brpc::ServerOptions options;
options.idle_timeout_sec = timeout;
options.num_threads = num_threads;
ret = _rpc_server->Start(port, &options);
if (ret == -1) {
LOG_ERROR("服务启动失败!");
abort();
}
}
TransmiteServer::ptr build() {
if (!_service_discoverer) {
LOG_ERROR("还未初始化服务发现模块!");
abort();
}
if (!_registry_client) {
LOG_ERROR("还未初始化服务注册模块!");
abort();
}
if (!_rpc_server) {
LOG_ERROR("还未初始化RPC服务器模块");
abort();
}
TransmiteServer::ptr server = std::make_shared<TransmiteServer>(
_mysql_client, _service_discoverer, _registry_client, _rpc_server);
return server;
}
private:
std::string _user_service_name;
ServiceManager::ptr _mm_channels;
Discovery::ptr _service_discoverer;
std::string _routing_key;
std::string _exchange_name;
MQClient::ptr _mq_client;
Registry::ptr _registry_client; // 服务注册客户端
std::shared_ptr<odb::core::database> _mysql_client; //mysql数据库客户端
std::shared_ptr<brpc::Server> _rpc_server;
};
}