Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster state refresher #2429

Closed
wants to merge 3 commits into from
Closed

Conversation

muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Dec 16, 2024

Cluster state refresher

Summary:
Simple ping mechanism to collect and maintain a local
view of cluster liveness state


Stack created with Sapling. Best reviewed with ReviewStack.

@muhamadazmy muhamadazmy mentioned this pull request Dec 16, 2024
@muhamadazmy muhamadazmy force-pushed the pr2429 branch 6 times, most recently from fae0c62 to 0e90b2e Compare December 17, 2024 13:18
@muhamadazmy muhamadazmy force-pushed the pr2429 branch 7 times, most recently from 835d7da to 1bf7793 Compare December 20, 2024 13:13
@muhamadazmy muhamadazmy changed the title Cluster state gossiping Cluster state refresher Dec 20, 2024
@muhamadazmy muhamadazmy force-pushed the pr2429 branch 2 times, most recently from 3f976c4 to 9490ed2 Compare December 20, 2024 14:57
Summary:
derprecate old cluster state that included information about
partition state.

A new ClusterState object is introduced that only have livenss information
Summary:
Types used by nodes to share cluster state
Summary:
Simple ping mechanism to collect and maintain a local
view of cluster liveness state
networking: Networking<T>,
nodes: BTreeMap<PlainNodeId, NodeTracker>,
heartbeat_interval: Duration,
cluster_state_watch_tx: watch::Sender<Arc<ClusterState>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you imagine situations where users ClusterState would be interested in waiting for state change of the entire ClusterState, or is it more likely that they'd be interested in a certain node?

And in respect to the latter, how can this achieved with this design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually pretty common pattern here to wait for a certain state of the entire cluster or a single node. While this can be accomplished by waiting on changes of the cluster state in a state machine, we maybe can make the experience a little better by providing a wrapper on top of the watch that make it easier to wait on more complex configuration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example use-case?

last_attempt_at: Option<MillisSinceEpoch>,
}

pub struct ClusterStateRefresher<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this is what we traditionally call "FailureDetector", perhaps we can call it that to avoid confusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds better :)

Comment on lines +27 to +28
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodePong {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you imagine will be in response and do we actually need a response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point :) Pongs are definitely not needed as their own message type and can be dropped. The failure detector still treat it as a ping anyway.

Comment on lines +108 to +110
_ = &mut cancelled => {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: this is possibly a good place to inform peers that you are shutting down?

Comment on lines +75 to +81
tokio::select! {
result = cluster_state_refresher.run() => {
result
}
_ = cancelled => {
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't refresher itself monitor the cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, true. this is a mistake

networking: Networking<T>,
nodes: BTreeMap<PlainNodeId, NodeTracker>,
heartbeat_interval: Duration,
cluster_state_watch_tx: watch::Sender<Arc<ClusterState>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example use-case?

networking,
nodes: BTreeMap::default(),
heartbeat_interval: config.common.heartbeat_interval.into(),
cluster_state_watch_tx: watch::Sender::new(Arc::new(ClusterState::empty())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why ClusterState is in an Arc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Borrowed values from a watch has a Read lock which means you should only borrow for very short period of times and never across await points. Hence I think using an Arc is safer so we can cheaply copy the cluster state and release the borrow. Then you can pass this state snapshot around or use it across await points

Comment on lines +267 to +276
async fn on_pong(&mut self, mut msg: Incoming<NodePong>) -> Result<(), ShutdownError> {
msg.follow_from_sender();

trace!("Handling pong response");

let tracker = self.nodes.entry(msg.peer().as_plain()).or_default();
tracker.seen = Some(SeenState::new(msg.peer()));

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not sure if pong is needed. Nodes ping other nodes, and each node makes its own view based on the pings it has received.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense!


pub struct BaseRole {
processor_manager_handle: Option<ProcessorsManagerHandle>,
incoming_node_state: MessageStream<GetNodeState>,
processors_state_request_stream: MessageStream<GetPartitionsProcessorsState>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this move to PPM?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, do we still need BaseRole or should we remove it?

Copy link
Contributor Author

@muhamadazmy muhamadazmy Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be eventually be deleted but right now it still handles GetPartitionsProcessorsState messages needed for CC operation

@muhamadazmy muhamadazmy deleted the pr2429 branch December 26, 2024 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants