Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Add support for automatically joining new OmniPaxos nodes #2478

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

tillrohrmann
Copy link
Contributor

This PR contains a first variant of an embedded metadata store based on OmniPaxos. The metadata store has a durable log and supports reconfiguration. The metadata store awaits provisioning. Once provisioned a single Restate node that runs the metadata store role acts as the metadata store cluster. As more nodes join the cluster, those that run the metadata store role will try to join the metadata store cluster by requesting a reconfiguration. Once enough metadata store nodes have joined one should be able to kill floor((n - 1)/2) nodes and things should continue working.

If you want to try things out, you can spawn three Restate server's with the configuration files you find here. Then you need to provision the cluster via

restatectl cluster provision --num-partitions 2 --bifrost-provider replicated --replication-property 2 --yes

After a short while you should see all nodes running the partition processors. If you see the log line Run as active metadata store node on every node, then the metadata store cluster should contain all nodes and you should be able to kill a single random node at your will.

Internally, the OmniPaxosMetadataStore works the following way:

  1. Check whether we have an OmniPaxosConfiguration persisted to disk. If yes, then start as an active metadata store
  2. Check whether a NodesConfiguration is known. If yes, then this indicates a prior provisioning. Start as passive metadata store
  3. Await the provision signal to create the initial NodesConfiguration and start as active metadata store

When being a passive metadata store, then try to join an existing cluster by randomly asking any node that runs the metadata store node role. Joining a cluster entails a reconfiguration and sending of the whole log to the new joiner.

An active metadata store runs the OmniPaxos library as well as processing metadata store requests and join requests. A metadata store only reacts to requests if it is the leader. This is a simplification to avoid hanging requests because they are never committed.

The draft PR has still many rough edges. The main goal is to verify the overall direction and discuss the design decisions (some of them being quite questionable). I've tried to highlight the parts that reviewers should take a closer look at. The best way to look at the PR is probably to take a look at the full result of the OmniPaxosMetadataStore struct in the crates/metadata-store/src/omnipaxos/store.rs.

Some of the missing features are (non-exhaustive):

  • Trimming of the log
  • Snapshotting of the KvMemoryStorage
  • Integrating the OmniPaxosMetadataStore into the MetadataStoreState lifecycle
  • Let MetadataStoreClient select addresses based on MetadataStoreState::Active.
  • Observability
  • Improving log messages
  • Fine tuning timeouts
  • Hardening
  • Testing

This commit makes it configurable which metadata will be run
by the Node when starting the Restate server.
This commit adds the skeleton of the Raft metadata store. At the moment
only a single node with memory storage is supported.

This fixes restatedev#1785.
The raft metadata store does not accept new proposals if there is
no known leader. In this situation, request failed with an internal
ProposalDropped error. This commit changes the behavior so that a
ProposalDropped error will be translated into an unavailable Tonic
status. That way, the request will get automatically retried.
This commit adds RocksDbStorage which implements raft::Storage.
The RocksDbStorage is a durable storage implementation which is
used by the RaftMetadataStore to store the raft state durably.

This fixes restatedev#1791.
The OmniPaxos metadata store stores its state in memory.
This commit introduces the ability to specify multiple addresses for the
metadata store endpoint. On error, the GrpcMetadataStoreClient randomly
switches to another endpoint. Moreover, this commit makes the OmniPaxosMetadataStore
only accept requests if it is the leader. Additionally, it fails all pending
callbacks if it loses leadership to avoid hanging requests if the request
was not decided.
The Restate version enables OmniPaxos to run with a single peer.
Comment on lines 86 to 89

// todo we seem to have a race condition between this call and the provision step which might
// write a different logs configuration
// self.bifrost.admin().init_metadata().await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem should already exist in the current main. The problem is that we first provision the NodesConfiguration and then try to write the Logs with the configured LogsConfiguration. If now the node succeeds at joining the cluster and reaches first the point where it starts the BifrostService, then it can happen that this line writes the initial Logs which does not respect the LogsConfiguration that is specified in the provision command.

rpc ConnectTo(stream NetworkMessage) returns (stream NetworkMessage);

// Try to join an existing metadata store cluster
rpc JoinCluster(JoinClusterRequest) returns (JoinClusterResponse);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to properly document the contract and which status codes are returned in which cases.

Comment on lines +105 to +107
if response.as_ref().is_err_and(|err| err.is_network_error()) {
self.choose_different_endpoint();
}
Copy link
Contributor Author

@tillrohrmann tillrohrmann Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor man's solution to handle multiple endpoints. Requests that are rejected by non-leaders return an Unavailable status. That's why this + the retry policy will help us to find the actual leader by retrying. This also helps with dealing with metadata store nodes that are down. Very simplistic solution.

Comment on lines +220 to +224
// This is potentially dangerous because we might have provisioned before on a
// different node but delivering of the response failed.
// todo harden by choosing oneself if one runs the metadata store. If not, then pick
// a single node to reach out to.
self.choose_different_endpoint();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous because we might end up provisioning different metadata store nodes.

Comment on lines +122 to +145
// Not really happy about making the `KvMemoryStorage` aware of the NodesConfiguration. I
// couldn't find a better way to let a restarting metadata store know about the latest
// addresses of its peers which it reads from the NodesConfiguration. An alternative could
// be to not support changing addresses. Changing addresses will also only be possible as
// long as we maintain a quorum of running nodes. Otherwise, the nodes might not find each
// other to form quorum.
if let Some(metadata_writer) = self.metadata_writer.as_mut() {
if key == NODES_CONFIG_KEY {
let mut data = self
.kv_entries
.get(&key)
.expect("to be present")
.value
.as_ref();
match StorageCodec::decode::<NodesConfiguration, _>(&mut data) {
Ok(nodes_configuration) => {
metadata_writer.submit(Arc::new(nodes_configuration));
}
Err(err) => {
debug!("Failed deserializing NodesConfiguration: {err}");
}
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to not rely on the NodesConfiguration to learn about the other nodes' addresses and instead require that their addresses don't change because in the general case this cannot work (stopping all nodes and restarting them with different addresses).

… on NodesConfiguration

With this change, nodes can change their addresses as long as a quorum of running metadata
store nodes keeps running so that the NodesConfiguration can be updated. It is a bit questionable
how much worth this feature is. However, w/o it, metadata store nodes behave slightly differently
than normal nodes (worker, log store, etc.) as they cannot change their address.
Using the generational node id as the OmniPaxos NodeId is a bit problematic because
it will get bumped on every restart. If we fail while joining an existing OmniPaxos
cluster and before persisting the configuration, we will try joining with a different
id. If we allow higher generations to be accepted, then this is problematic because we
can't distinguish between this situation and the loss of the disk. If we are strict
about the id equality, then we might get the system stuck because the restarted node
cannot join the OmniPaxos cluster which might expect its previous generation to be
present. Therefore, this commit introduces the StorageId which is a random number
stored in the RocksDbStorage. Whenever we try to join an OmniPaxos cluster we send
this StorageId along with the PlainNodeId. Based on the StorageId we can detect if
a node lost its disk because it would generate a new one.
@tillrohrmann tillrohrmann force-pushed the omnipaxos-reconfiguration branch from 332157b to 9364667 Compare January 9, 2025 15:27
Comment on lines +122 to +134
// make sure that the storage is initialized with a storage id to be able to detect disk losses
let storage_id = if let Some(storage_id) = rocksdb_storage
.get_storage_id()
.map_err(|err| BuildError::InitStorage(err.to_string()))?
{
storage_id
} else {
let storage_id = random();
rocksdb_storage
.set_storage_id(storage_id)
.map_err(|err| BuildError::InitStorage(err.to_string()))?;
storage_id
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The durable storage_id together with the node's PlainNodeId constitute the MemberId which is used to check whether a node is part of a configuration or not. In case of a disk loss we will generate a new StorageId which requires a reconfiguration for a node to rejoin an omni paxos cluster.

Comment on lines +189 to +200
// Try to read a persisted nodes configuration in order to learn about the addresses of our
// potential peers and the metadata store states.
if let Some(nodes_configuration) = self
.rocksdb_storage
.get_nodes_configuration()
.map_err(|err| BuildError::InitStorage(err.to_string()))?
{
metadata_writer
.update(Arc::new(nodes_configuration))
.await?
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the information like what are the node's addresses or what is my MetadataStoreState are stored in the NodesConfiguration. Since we cannot guarantee that we can join a cluster w/o it (e.g. when restarting all nodes at the same time or when failing over during a reconfiguration where we haven't received the omnipaxos configuration yet), we persist it ourselves.

Comment on lines +268 to +272
TaskCenter::spawn_unmanaged(TaskKind::Background, "provision-responder", async move {
while let Some(request) = provision_rx.recv().await {
let _ = request.result_tx.send(Ok(false));
}
})?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reject all future provision requests.

}

node_config.current_generation.as_plain()
} else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is needed if another node received the provision signal (e.g. a non metadata store node) and tries to provision the metadata store with a NodesConfiguration that does not contain the metadata store node.

Comment on lines +499 to +500
// todo remove additional indirection from Arc
connection_manager.store(Some(Arc::new(new_connection_manager)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConnectionManager needs to know our own metadata peer id to validate that connections are set up correctly. Since we only know what our id will be once we have joined an OmniPaxos cluster, we need to instantiate it lazily. This is not super beautiful :-(

Comment on lines +575 to +577
Ok(()) = nodes_config_watch.changed() => {
self.update_node_addresses(&Metadata::with_current(|m| m.nodes_config_ref()));
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support changing node/peer addresses that are communicated via the NodesConfiguration.

Comment on lines +641 to +647
// we lost leadership :-( notify callers that their requests might not get committed
// because we don't know whether the leader will start with the same log as we have.
self.kv_storage
.fail_callbacks(|| RequestError::Unavailable("lost leadership".into()));
self.fail_join_callbacks(|| JoinClusterError::NotLeader);
} else if !previous_is_leader && self.is_leader {
debug!(configuration_id = %self.cluster_config.configuration_id, "Won leadership");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem I had was that OmniPaxos does not tell you what is the lsn for a given proposal. Therefore, it is hard to track whether a proposal made it or was replaced by a different proposal (e.g. because of a leadership change and a different log suffix). That's why I am quite defensive here to avoid hanging requests.

The correlation between request and log entries is currently done by having an explicit ulid that is part of a request entry.

Comment on lines +771 to +775
Self::reset_storage_for_new_configuration(
&mut rocksdb_storage,
&omni_paxos_configuration,
last_decided_index,
)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop signs are not part of the actual log. Therefore, we need to manually reset the decided index and clear some fields in the RocksDbStorage in order to reuse the same storage for the next configuration.

Comment on lines +798 to +807
// remember the latest configuration we have seen for future checks
Self::reset_storage_for_new_configuration(
&mut rocksdb_storage,
&OmniPaxosConfiguration {
own_member_id: self.own_member_id,
cluster_config: stop_sign.next_config,
members: metadata.members,
},
last_decided_index,
)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be helpful if this node is removed from the next configuration but some other nodes ask this node whether it knows which nodes are part of the new configuration and what the log is.

Comment on lines +1088 to +1089
// todo only try joining if MetadataStoreState::Candidate
let mut join_cluster = std::pin::pin!(Self::join_cluster(None, storage_id));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A potential follow-up is to use the MetadataStoreState stored in the NodesConfiguration to control which nodes should join a metadata store cluster or not. That could give us a declarative way to define the nodes that should run an active metadata store if they succeed at joining the cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying on the NodesConfiguration to store metadata store relevant information (e.g. the MetadataStoreState and addresses) makes things considerably harder because of the circular dependency (we need the metadata store to write/read the NodesConfiguration but it also controls the behavior of the metadata store). This becomes especially problematic in the case where we fail during a reconfiguration w/o having received the log yet. Especially, if this node is now needed for the metadata store to become available there is no easy way to obtain the NodesConfiguration by following the normal join flow.

Comment on lines +1130 to +1140
// We cannot assume that we have already joined the cluster and obtained our generational node id.
// That's why we need to retrieve the plain node id based on our name from the latest known
// NodesConfiguration. If the node has no assigned plain node id, then it first needs to obtain
// it by following the regular join path before it can join the metadata store cluster.
let my_node_id = if let Some(node_config) =
nodes_config.find_node_by_name(Configuration::pinned().common.node_name())
{
node_config.current_generation.as_plain()
} else {
bail!("The node with name '{}' has not obtained a node id yet. W/o the node id, it cannot join the metadata store cluster.", Configuration::pinned().common.node_name());
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example where we need a persisted NodesConfiguration to remember our PlainNodeId if we fail during reconfiguration but then are needed for the availability of the metadata store.

Comment on lines +1143 to +1149
let active_metadata_store_node = nodes_config.iter().filter_map(|(node, config)| {
if config.has_role(Role::MetadataStore) && node != my_node_id {
Some(node)
} else {
None
}
}).choose(&mut thread_rng()).ok_or(anyhow::anyhow!("No other metadata store present in the cluster. This indicates a misconfiguration."))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should consider the MetadataStoreState::Active in the future.

Comment on lines +1176 to +1181
// once the log grows beyond the configured grpc max message size (by default 4 MB) this
// will no longer work :-( If we shared the snapshot we still have the same problem once the
// snapshot grows beyond 4 MB. Then we need a separate channel (e.g. object store) or
// support for chunked transfer.
let log_prefix = flexbuffers::from_slice(response.log_prefix.as_ref())?;
let omni_paxos_configuration =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bound to break eventually.

@tillrohrmann
Copy link
Contributor Author

fyi @pcholakov for the future testing efforts.

The provision grpc call translates into a put command because the
local metadata store does not need to be provisioned.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant