Skip to content

Commit

Permalink
MCOL-5696: FoundationDB cluster start and reconfiguration.
Browse files Browse the repository at this point in the history
[add] new file handlers/foundation_db.py and FDBHandler class inside
[add] new process dispatcher foundation.py (FDBDispatcher)
[add] FDB constants to constants.py
[fix] minor naming issue in systemd.py
[add] get_txn_handler function to helpers.py
[add] new cmapi config section Txn_handler with parameter name inside
[add] fdb node add/remove while adding/removing node to cluster
[add] include_all_nodes, set_coordinators, exclude_node methods to FDBHandler class
[add] include_node method in FDBHandler
[add] remove node method from FDB cluster
  • Loading branch information
mariadb-AlanMologorsky committed Aug 23, 2024
1 parent e294162 commit a2a3d12
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 6 deletions.
5 changes: 4 additions & 1 deletion cmapi/cmapi_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cmapi_server.managers.application import AppManager
from cmapi_server.managers.process import MCSProcessManager
from cmapi_server.managers.certificate import CertificateManager
from cmapi_server.managers.transaction import TransactionManager
from failover.node_monitor import NodeMonitor
from mcs_node_control.models.dbrm_socket import SOCK_TIMEOUT, DBRMSocketHandler
from mcs_node_control.models.node_config import NodeConfig
Expand Down Expand Up @@ -160,11 +161,13 @@ def stop(self):
cfg_parser
)
MCSProcessManager.detect(dispatcher_name, dispatcher_path)
TransactionManager.handler = helpers.get_txn_handler(cfg_parser)
if TransactionManager.internal_hadler_used():
TxnBackgroundThread(cherrypy.engine, app).subscribe()
# If we don't have auto_failover flag in the config turn it ON by default.
turn_on_failover = cfg_parser.getboolean(
'application', 'auto_failover', fallback=True
)
TxnBackgroundThread(cherrypy.engine, app).subscribe()
# subscribe FailoverBackgroundThread plugin code to bus channels
# code below not starting "real" failover background thread
FailoverBackgroundThread(cherrypy.engine, turn_on_failover).subscribe()
Expand Down
6 changes: 6 additions & 0 deletions cmapi/cmapi_server/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,9 @@ class ProgInfo(NamedTuple):
IFLAG = os.path.join(MCS_ETC_PATH, 'container-initialized')
LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2')
MCS_LOG_PATH = '/var/log/mariadb/columnstore'


# FoundationDB constants
FDB_ETC_PATH = '/etc/foundationdb/'
FDB_CONFIG_PATH = os.path.join(FDB_ETC_PATH, 'foundationdb.conf')
FDB_CLUSTER_CONFIG_PATH = os.path.join(FDB_ETC_PATH, 'fdb.cluster')
5 changes: 4 additions & 1 deletion cmapi/cmapi_server/controllers/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,15 @@ def put_add_node(self):
request_body = request.json
node = request_body.get('node', None)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
fdb_config_data = request_body.get('fdb_config_data', None)

if node is None:
raise_422_error(module_logger, func_name, 'missing node argument')

try:
response = ClusterHandler.add_node(node, config)
response = ClusterHandler.add_node(
node, config, fdb_config_data=fdb_config_data
)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)

Expand Down
5 changes: 4 additions & 1 deletion cmapi/cmapi_server/handlers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def process_shutdown():
@staticmethod
def add_node(
node: str, config: str = DEFAULT_MCS_CONF_PATH,
fdb_config_data: Optional[str] = None,
logger: logging.Logger = logging.getLogger('cmapi_server')
) -> dict:
"""Method to add node to MCS CLuster.
Expand All @@ -205,6 +206,8 @@ def add_node(
:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param fdb_config: fdb config data, defaults to None
:type fdb_config: str, optional
:param logger: logger, defaults to logging.getLogger('cmapi_server')
:type logger: logging.Logger, optional
:raises CMAPIBasicError: on exception while starting transaction
Expand Down Expand Up @@ -238,7 +241,7 @@ def add_node(
try:
add_node(
node, input_config_filename=config,
output_config_filename=config
output_config_filename=config, fdb_config_data=fdb_config_data,
)
if not get_dbroots(node, config):
add_dbroot(
Expand Down
293 changes: 293 additions & 0 deletions cmapi/cmapi_server/handlers/foundation_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
import json
import logging
import re
import socket
from os import replace
from typing import Tuple, Optional

from cmapi_server.constants import (
FDB_CONFIG_PATH, FDB_CLUSTER_CONFIG_PATH,
)
from cmapi_server.process_dispatchers.foundation import FDBDispatcher
from cmapi_server.exceptions import CMAPIBasicError


class FDBHandler:

@staticmethod
def read_config(filename:str) -> str:
"""Read config file.
:param filename: filename
:type filename: str
:return: config string
:rtype: str
"""
with open(filename, encoding='utf-8') as fdb_file:
fdb_cl_conf = fdb_file.read()
return fdb_cl_conf

@staticmethod
def read_fdb_config() -> str:
"""Read FoundationDB config file
:return: FoundationDB config file data
:rtype: str
"""
return FDBHandler.read_config(FDB_CONFIG_PATH)

@staticmethod
def read_fdb_cluster_config() -> str:
"""Read FoundationDB cluster config.
:return: FoundationDB cluster config file data
:rtype: str
"""
return FDBHandler.read_config(FDB_CLUSTER_CONFIG_PATH)

@staticmethod
def write_config(filename: str, data: str) -> None:
"""Write config data to file.
:param filename: filename to write
:type filename: str
:param data: data to write
:type data: str
"""
# atomic replacement
tmp_filename = 'config.cmapi.tmp'
with open(tmp_filename, 'w', encoding='utf-8') as fdb_file:
fdb_file.write(data)
replace(tmp_filename, filename)

@staticmethod
def write_fdb_config(data: str) -> None:
"""Write data to FoundationDB config file.
:param data: data to write into FoundationDB config file
:type data: str
"""
FDBHandler.write_config(FDB_CONFIG_PATH, data)

@staticmethod
def write_fdb_cluster_config(data: str) -> None:
"""Write data to FoundationDB cluster config file.
:param data: data to write into FoundationDB cluster config file
:type data: str
"""
FDBHandler.write_config(FDB_CLUSTER_CONFIG_PATH, data)

@staticmethod
def get_node_ipaddress() -> str:
"""Get FoundationDB node ip adress.
:return: FoundationDB node ip address
:rtype: str
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('www.foundationdb.org', 80))
return s.getsockname()[0]
except Exception:
logging.error(
'Could not determine node IP address.',
exc_info=True
)
# TODO: try to handle it? For eg switch to internal logic w\o FDB.
raise

@staticmethod
def make_public(make_tls: bool = False) -> Tuple[str, bool]:
"""Make FoundationDB node externally accessed.
This method is a rewrited make_public.py from original FoundationDB
repo.
:param make_tls: use TLS, defaults to False
:type make_tls: bool, optional
:raises CMAPIBasicError: if FoundationDB cluster file is invalid
:raises CMAPIBasicError: if modified and node address is not 127.0.0.1
:return: ip adress and use_tls flag
:rtype: Tuple[str, bool]
"""
ip_addr = FDBHandler.get_node_ipaddress()
fdb_cluster_conf = FDBHandler.read_fdb_cluster_config()

cluster_str = None
for line in fdb_cluster_conf.split('\n'):
line = line.strip()
if len(line) > 0:
if cluster_str is not None:
# TODO: try to handle it?
raise CMAPIBasicError('FDB cluster file is not valid')
cluster_str = line

if cluster_str is None:
raise CMAPIBasicError('FDB cluster file is not valid')

if not re.match(
'^[a-zA-Z0-9_]*:[a-zA-Z0-9]*@([0-9\\.]*:[0-9]*(:tls)?,)*[0-9\\.]*:[0-9]*(:tls)?$',
cluster_str
):
raise CMAPIBasicError('FDB cluster file is not valid')

if not re.match(
'^.*@(127\\.0\\.0\\.1:[0-9]*(:tls)?,)*127\\.0\\.0\\.1:[0-9]*(:tls)?$',
cluster_str
):
raise CMAPIBasicError(
'Cannot modify FDB cluster file whose coordinators are not at '
'address 127.0.0.1'
)

cluster_str.replace('127.0.0.1', ip_addr)

if make_tls:
cluster_str = re.sub('([0-9]),', '\\1:tls,', cluster_str)
if not cluster_str.endswith(':tls'):
cluster_str += ':tls'

FDBHandler.write_fdb_cluster_config(cluster_str)

return ip_addr, cluster_str.count(':tls') != 0

@staticmethod
def get_status() -> dict:
"""Get FoundationDB status in json format.
:return: dict with all FoundationDB status details
:rtype: dict
"""
cmd = f'fdbcli --exec "status json"'
success, output = FDBDispatcher.exec_command(cmd)
config_dict = json.load(output)
return config_dict

@staticmethod
def get_machines_count() -> int:
"""Get machines in FoundationDB cluster count.
:return: machines count
:rtype: int
"""
return len(FDBHandler.get_status()['cluster']['machines'])

@staticmethod
def change_cluster_redundancy(mode: str) -> bool:
"""Change FoundationDB cluster redundancy mode,
:param mode: FoundationDB cluster redundancy mode
:type mode: str
:return: True if success
:rtype: bool
"""
if mode not in ('single', 'double', 'triple', 'three_data_hall'):
logging.error(
f'FDB cluster redundancy mode is wrong: {mode}. Keep old.'
)
return
cmd = f'fdbcli --exec "configure {mode}"'
success, _ = FDBDispatcher.exec_command(cmd)

return success

@staticmethod
def set_coordinators(
nodes_ips: Optional[list] = None, auto: bool = True
) -> bool:
"""Set FDB cluster coordinators.
It sets coordinators ips or `auto`. If `auto` used it will add all
available nodes to coordinators, so if one coordinators is down,
cluster still stay healthy.
:return: True if success
:rtype: bool
"""
if not nodes_ips and not auto:
# do nothing
logging.warning(
'No IP address provided to set coordinators, '
'and auto is False. Nothing to do'
)
return
elif coordinators_string:
coordinators_with_port = [
f'{addr}:4500'
for addr in nodes_ips
]
coordinators_string = ', '.join(coordinators_with_port)
elif not nodes_ips and auto:
coordinators_string = 'auto'
cmd = 'fdbcli --exec "coordinators {coordinators_string}"'
success, _ = FDBDispatcher.exec_command(cmd)
return success

@staticmethod
def include_all_nodes() -> bool:
"""Invoke command 'include all' in fdbcli.
Command includes all available machines in a cluster. Mandatory if node
added after it was removed from a cluster
:return: True if success
:rtype: bool
"""
cmd = 'fdbcli --exec "include all"'
success, _ = FDBDispatcher.exec_command(cmd)
return success

@staticmethod
def exclude_node() -> bool:
"""Exclude current machine from FoundationDB cluster.
Method invokes command 'exclude <IP>'
:return: True if success
:rtype: bool
"""
ip_addr = FDBHandler.get_node_ipaddress()
cmd = f'fdbcli --exec "exclude {ip_addr}"'
success, _ = FDBDispatcher.exec_command(cmd)
return success

@staticmethod
def add_to_cluster(cluster_config_data: str):
"""Add current machine to FoundationDB cluster using cluster conf data.
:param cluster_config_data: FoundationDb cluster config data
:type cluster_config_data: str
"""
FDBDispatcher.start()
FDBHandler.write_fdb_cluster_config(cluster_config_data)
FDBDispatcher().restart()
new_nodes_count = FDBHandler.get_machines_count() + 1
if 5 > new_nodes_count >= 3:
FDBHandler.change_cluster_redundancy('double')
elif new_nodes_count >= 5:
FDBHandler.change_cluster_redundancy('triple')
elif new_nodes_count < 3:
FDBHandler.change_cluster_redundancy('single')
# TODO: add error handler
FDBHandler.include_all_nodes()
FDBHandler.set_coordinators(auto=True)

@staticmethod
def remove_from_cluster():
"""Remove current machine from FoundationDB cluster."""
new_nodes_count = FDBHandler.get_machines_count() - 1
if 5 > new_nodes_count >= 3:
FDBHandler.change_cluster_redundancy('double')
elif new_nodes_count >= 5:
FDBHandler.change_cluster_redundancy('triple')
elif new_nodes_count < 3:
FDBHandler.change_cluster_redundancy('single')
# this operation could take a while depending on data size stored in
# FDB. May be it could be replaced with the same command with `failed`
# flag. Data loss? TODO: Have to be tested.
FDBHandler.exclude_node()
# exclude node from coordinators
FDBHandler.set_coordinators(auto=True)
# TODO: set single node cluster file
FDBDispatcher.restart()
14 changes: 14 additions & 0 deletions cmapi/cmapi_server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,20 @@ def get_dispatcher_name_and_path(
return dispatcher_name, dispatcher_path


def get_txn_handler(config_parser: configparser.ConfigParser) -> str:
"""Get internal cmapi transaction handler name from cmapi conf file.
:param config_parser: cmapi conf file parser
:type config_parser: configparser.ConfigParser
:return: transaction handler name
:rtype: str
"""
txn_manager_name = dequote(
config_parser.get('Txn_handler', 'name', fallback='cmapi')
)
return txn_manager_name


def build_url(
base_url: str, query_params: dict, scheme: str = 'https',
path: str = '', params: str = '', fragment: str = '',
Expand Down
Loading

0 comments on commit a2a3d12

Please sign in to comment.