diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index dd155a3044..ec93354121 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -41,10 +41,13 @@ typedef std::map ChannelToIdMap; class SubChannel : public SocketUser { public: ChannelBase* chan; + ChannelOwnership ownership; // internal channel is deleted after the fake Socket is SetFailed void BeforeRecycle(Socket*) { - delete chan; + if (ownership == OWNS_CHANNEL) { + delete chan; + } delete this; } @@ -83,7 +86,8 @@ class ChannelBalancer : public SharedLoadBalancer { ChannelBalancer() {} ~ChannelBalancer(); int Init(const char* lb_name); - int AddChannel(ChannelBase* sub_channel, const std::string& tag, + int AddChannel(ChannelBase* sub_channel, + const SelectiveChannel::SubChannelOptions& subopt, SelectiveChannel::ChannelHandle* handle); void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle); int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out); @@ -168,7 +172,8 @@ int ChannelBalancer::Init(const char* lb_name) { return SharedLoadBalancer::Init(lb_name); } -int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag, +int ChannelBalancer::AddChannel(ChannelBase* sub_channel, + const SelectiveChannel::SubChannelOptions& subopt, SelectiveChannel::ChannelHandle* handle) { if (NULL == sub_channel) { LOG(ERROR) << "Parameter[sub_channel] is NULL"; @@ -185,6 +190,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag return -1; } sub_chan->chan = sub_channel; + sub_chan->ownership = subopt.ownership; SocketId sock_id; SocketOptions options; options.user = sub_chan; @@ -206,7 +212,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag << sock_id << " is disabled"; return -1; } - if (!AddServer(ServerId(sock_id, tag))) { + if (!AddServer(ServerId(sock_id, subopt.tag))) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; // sub_chan will be deleted when the socket is recycled. ptr->SetFailed(); @@ -215,10 +221,10 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag return -1; } // The health-check-related reference has been held on created. - _chan_map[sub_channel]= ptr.get(); + _chan_map[sub_channel] = ptr.get(); if (handle) { handle->id = sock_id; - handle->tag = tag; + handle->tag = subopt.tag; } return 0; } @@ -532,12 +538,7 @@ bool SelectiveChannel::initialized() const { } int SelectiveChannel::AddChannel(ChannelBase* sub_channel, - ChannelHandle* handle) { - return AddChannel(sub_channel, "", handle); -} - -int SelectiveChannel::AddChannel(ChannelBase* sub_channel, - const std::string& tag, + const SubChannelOptions& option, ChannelHandle* handle) { schan::ChannelBalancer* lb = static_cast(_chan._lb.get()); @@ -545,7 +546,7 @@ int SelectiveChannel::AddChannel(ChannelBase* sub_channel, LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel"; return -1; } - return lb->AddChannel(sub_channel, tag, handle); + return lb->AddChannel(sub_channel, option, handle); } void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) { diff --git a/src/brpc/selective_channel.h b/src/brpc/selective_channel.h index 6c0af1da9c..fd8fb9cf9d 100644 --- a/src/brpc/selective_channel.h +++ b/src/brpc/selective_channel.h @@ -56,6 +56,11 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ { std::string tag; }; + struct SubChannelOptions { + std::string tag; + ChannelOwnership ownership = OWNS_CHANNEL; + }; + SelectiveChannel(); ~SelectiveChannel(); @@ -69,8 +74,16 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ { // On success, handle is set with the key for removal. // NOTE: Different from pchan, schan can add channels at any time. // Returns 0 on success, -1 otherwise. - int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle); - int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle); + int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle) { + return AddChannel(sub_channel, SubChannelOptions(), handle); + } + int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle) { + SubChannelOptions option; + option.tag = tag; + return AddChannel(sub_channel, option, handle); + } + int AddChannel(ChannelBase* sub_channel, const SubChannelOptions& option, + ChannelHandle* handle); // Remove and destroy the sub_channel associated with `handle'. void RemoveAndDestroyChannel(const ChannelHandle& handle);