Skip to content

Commit cc99379

Browse files
author
echo724
committed
feat: add get_roles
1 parent 11e303d commit cc99379

File tree

6 files changed

+36
-5
lines changed

6 files changed

+36
-5
lines changed

duva-client/src/controller.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ impl<T> ClientController<T> {
4848
| Config { .. }
4949
| Info
5050
| ClusterForget { .. }
51-
| Role
5251
| ReplicaOf { .. }
5352
| ClusterInfo => match query_io {
5453
| QueryIO::Null => Response::Null,
@@ -130,7 +129,7 @@ impl<T> ClientController<T> {
130129
}
131130
Response::Array(keys)
132131
},
133-
| ClusterNodes => {
132+
| Role | ClusterNodes => {
134133
let QueryIO::Array(value) = query_io else {
135134
return Response::FormatError;
136135
};

duva/src/domains/cluster_actors/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub enum ClientMessage {
6262
LeaderReqConsensus(ConsensusRequest),
6363
ClusterNodes(Callback<Vec<PeerState>>),
6464
GetRole(Callback<ReplicationRole>),
65+
GetRoles(Callback<Vec<(PeerIdentifier, ReplicationRole)>>),
6566
SubscribeToTopologyChange(Callback<tokio::sync::broadcast::Receiver<Topology>>),
6667
ClusterMeet(PeerIdentifier, LazyOption, Callback<anyhow::Result<()>>),
6768
GetTopology(Callback<Topology>),

duva/src/domains/cluster_actors/service.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::domains::cluster_actors::ConnectionMessage;
66
use crate::domains::cluster_actors::SchedulerMessage;
77
use crate::domains::operation_logs::interfaces::TWriteAheadLog;
88
use crate::domains::peers::PeerMessage;
9+
use std::iter;
910

1011
use crate::prelude::PeerIdentifier;
1112
use crate::res_err;
@@ -109,6 +110,20 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
109110
| GetRole(callback) => {
110111
callback.send(self.replication.role.clone());
111112
},
113+
| GetRoles(callback) => {
114+
let followers = self
115+
.members
116+
.iter()
117+
.filter(|(_, peer_state)| peer_state.is_replica(&self.replication.replid))
118+
.map(|(peer_id, peer_state)| (peer_id.clone(), peer_state.role().clone()));
119+
120+
let roles =
121+
iter::once((self.replication.self_identifier(), self.replication.role.clone()))
122+
.chain(followers)
123+
.collect();
124+
125+
callback.send(roles);
126+
},
112127
| SubscribeToTopologyChange(callback) => {
113128
callback.send(self.node_change_broadcast.subscribe());
114129
},

duva/src/presentation/clients/controller.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,15 @@ impl ClientController {
143143
QueryIO::SimpleString("OK".into())
144144
},
145145
| ClientAction::Role => {
146-
let role = self.cluster_communication_manager.route_get_role();
147-
QueryIO::SimpleString(role.await?.to_string().into())
146+
let roles = self.cluster_communication_manager.route_get_roles().await?;
147+
QueryIO::Array(
148+
roles
149+
.into_iter()
150+
.map(|(peer_id, role)| {
151+
QueryIO::BulkString(format!("{}:{}", peer_id, role).into())
152+
})
153+
.collect(),
154+
)
148155
},
149156
| ClientAction::Ttl { key } => {
150157
QueryIO::SimpleString(self.cache_manager.route_ttl(key).await?.into())

duva/src/presentation/clusters/communication_manager.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ impl ClusterCommunicationManager {
106106
Ok(rx.await?)
107107
}
108108

109+
pub(crate) async fn route_get_roles(
110+
&self,
111+
) -> anyhow::Result<Vec<(PeerIdentifier, ReplicationRole)>> {
112+
let (tx, rx) = Callback::create();
113+
self.send(ClientMessage::GetRoles(tx)).await?;
114+
Ok(rx.await?)
115+
}
116+
109117
pub(crate) async fn route_get_role(&self) -> anyhow::Result<ReplicationRole> {
110118
let (tx, rx) = Callback::create();
111119
self.send(ClientMessage::GetRole(tx)).await?;

duva/tests/common.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ pub fn spawn_server_process(env: &ServerEnv) -> anyhow::Result<TestProcessChild>
135135
continue;
136136
}
137137

138-
if role_res.is_empty() || (role_res != "leader" && role_res != "follower") {
138+
if role_res.is_empty() || (role_res.contains("leader") && role_res.contains("follower"))
139+
{
139140
continue;
140141
}
141142

0 commit comments

Comments
 (0)