Skip to content

Commit

Permalink
Merge pull request #4299 from c-po/radius-smoketest
Browse files Browse the repository at this point in the history
T7038: T7039: fix broken RADIUS IPv6 source address and add smoketests
  • Loading branch information
c-po authored Jan 13, 2025
2 parents e858172 + 21b2541 commit 99d0c7a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 82 deletions.
4 changes: 2 additions & 2 deletions data/templates/login/pam_radius_auth.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{% if address | is_ipv4 %}
{% set source_address.ipv4 = address %}
{% elif address | is_ipv6 %}
{% set source_address.ipv6 = "[" + address + "]" %}
{% set source_address.ipv6 = address %}
{% endif %}
{% endfor %}
{% endif %}
Expand All @@ -21,7 +21,7 @@
{% if server | is_ipv4 %}
{{ server }}:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv4 if source_address.ipv4 is vyos_defined }}
{% else %}
[{{ server }}]:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv6 if source_address.ipv6 is vyos_defined }}
{{ server | bracketize_ipv6 }}:{{ options.port }} {{ "%-25s" | format(options.key) }} {{ "%-10s" | format(options.timeout) }} {{ source_address.ipv6 if source_address.ipv6 is vyos_defined }}
{% endif %}
{% endfor %}
{% endif %}
Expand Down
6 changes: 6 additions & 0 deletions debian/vyos-1x-smoketest.postinst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ TACPLUS_PATH="/usr/share/vyos/tacplus-alpine.tar"
if [[ ! -f $TACPLUS_PATH ]]; then
skopeo copy --additional-tag "$TACPLUS_TAG" "docker://$TACPLUS_TAG" "docker-archive:/$TACPLUS_PATH"
fi

RADIUS_TAG="docker.io/dchidell/radius-web:latest"
RADIUS_PATH="/usr/share/vyos/radius-latest.tar"
if [[ ! -f $RADIUS_PATH ]]; then
skopeo copy --additional-tag "$RADIUS_TAG" "docker://$RADIUS_TAG" "docker-archive:/$RADIUS_PATH"
fi
210 changes: 130 additions & 80 deletions smoketest/scripts/cli/test_system_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@
from pwd import getpwall

from vyos.configsession import ConfigSessionError
from vyos.configquery import ConfigTreeQuery
from vyos.utils.auth import get_current_user
from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
from vyos.utils.file import read_file
from vyos.utils.file import write_file
from vyos.template import inc_ip
from vyos.template import is_ipv6
from vyos.xml_ref import default_value

base_path = ['system', 'login']
users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice']

SSH_PROCESS_NAME = 'sshd'
ssh_test_command = '/opt/vyatta/bin/vyatta-op-cmd-wrapper show version'

ssh_pubkey = """
AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF
Expand All @@ -57,7 +59,6 @@

tac_image = 'docker.io/lfkeitel/tacacs_plus:alpine'
tac_image_path = '/usr/share/vyos/tacplus-alpine.tar'

TAC_PLUS_TMPL_SRC = """
id = spawnd {
debug redirect = /dev/stdout
Expand Down Expand Up @@ -100,6 +101,25 @@
member = admin
}
}
"""

radius_image = 'docker.io/dchidell/radius-web:latest'
radius_image_path = '/usr/share/vyos/radius-latest.tar'
RADIUS_CLIENTS_TMPL_SRC = """
client SMOKETEST {
secret = {{ radius_key }}
nastype = other
ipaddr = {{ source_address }}
}
"""
RADIUS_USERS_TMPL_SRC = """
# User configuration
{{ username }} Cleartext-Password := "{{ password }}"
Service-Type = NAS-Prompt-User,
Cisco-AVPair = "shell:priv-lvl=15"
"""

class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
Expand All @@ -112,16 +132,36 @@ def setUpClass(cls):
cls.cli_delete(cls, base_path + ['radius'])
cls.cli_delete(cls, base_path + ['tacacs'])

# Load image for smoketest provided in vyos-1x-smoketest
# Load images for smoketest provided in vyos-1x-smoketest
if not os.path.exists(tac_image_path):
cls.fail(cls, f'{tac_image} image not available')
cmd(f'sudo podman load -i {tac_image_path}')

if not os.path.exists(radius_image_path):
cls.fail(cls, f'{radius_image} image not available')
cmd(f'sudo podman load -i {radius_image_path}')

cls.ssh_test_command_result = cls.op_mode(cls, ['show', 'version'])

# Dynamically start SSH service if it's not running
config = ConfigTreeQuery()
cls.is_sshd_pre_test = config.exists(['service', 'sshd'])
if not cls.is_sshd_pre_test:
# Start SSH service
cls.cli_set(cls, ['service', 'ssh'])

@classmethod
def tearDownClass(cls):
# Stop SSH service - if it was not running before starting the test
if not cls.is_sshd_pre_test:
cls.cli_set(cls, ['service', 'ssh'])
cls.cli_commit(cls)

super(TestSystemLogin, cls).tearDownClass()
# Cleanup podman image

# Cleanup container images
cmd(f'sudo podman image rm -f {tac_image}')
cmd(f'sudo podman image rm -f {radius_image}')

def tearDown(self):
# Delete individual users from configuration
Expand Down Expand Up @@ -152,9 +192,6 @@ def test_add_linux_system_user(self):
self.cli_delete(base_path + ['user', system_user])

def test_system_login_user(self):
# Check if user can be created and we can SSH to localhost
self.cli_set(['service', 'ssh', 'port', '22'])

for user in users:
name = f'VyOS Roxx {user}'
home_dir = f'/tmp/smoketest/{user}'
Expand Down Expand Up @@ -240,71 +277,71 @@ def test_radius_kernel_features(self):
self.assertIn(f'{option}=y', kernel_config)

def test_system_login_radius_ipv4(self):
# Verify generated RADIUS configuration files

radius_key = 'VyOSsecretVyOS'
radius_server = '172.16.100.10'
radius_source = '127.0.0.1'
radius_port = '2000'
radius_timeout = '1'

self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
self.cli_set(base_path + ['radius', 'source-address', radius_source])
self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
radius_servers = ['100.64.0.4', '100.64.0.5']
radius_source = '100.64.0.1'
self._system_login_radius_test_helper(radius_servers, radius_source)

# check validate() - Only one IPv4 source-address supported
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])

self.cli_commit()
def test_system_login_radius_ipv6(self):
radius_servers = ['2001:db8::4', '2001:db8::5']
radius_source = '2001:db8::1'
self._system_login_radius_test_helper(radius_servers, radius_source)

# this file must be read with higher permissions
pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server,
radius_port, radius_key, radius_timeout,
radius_source), pam_radius_auth_conf)
self.assertTrue(tmp)
def _system_login_radius_test_helper(self, radius_servers: list, radius_source: str):
# Verify generated RADIUS configuration files
radius_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10))

# required, static options
self.assertIn('priv-lvl 15', pam_radius_auth_conf)
self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf)
default_port = default_value(base_path + ['radius', 'server', radius_servers[0], 'port'])
default_timeout = default_value(base_path + ['radius', 'server', radius_servers[0], 'timeout'])

# PAM
pam_common_account = read_file('/etc/pam.d/common-account')
self.assertIn('pam_radius_auth.so', pam_common_account)
dummy_if = 'dum12760'

pam_common_auth = read_file('/etc/pam.d/common-auth')
self.assertIn('pam_radius_auth.so', pam_common_auth)
# Load container image for FreeRADIUS server
radius_config = '/tmp/smoketest-radius-server'
radius_container_path = ['container', 'name', 'radius-1']

pam_common_session = read_file('/etc/pam.d/common-session')
self.assertIn('pam_radius_auth.so', pam_common_session)

pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive')
self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive)
# Generate random string with 10 digits
username = 'radius-admin'
password = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(10))
radius_source_mask = '32'
if is_ipv6(radius_source):
radius_source_mask = '128'
radius_test_user = {
'username' : username,
'password' : password,
'radius_key' : radius_key,
'source_address' : f'{radius_source}/{radius_source_mask}'
}

# NSS
nsswitch_conf = read_file('/etc/nsswitch.conf')
tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf)
self.assertTrue(tmp)
tmpl = jinja2.Template(RADIUS_CLIENTS_TMPL_SRC)
write_file(f'{radius_config}/clients.cfg', tmpl.render(radius_test_user))

tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
self.assertTrue(tmp)
tmpl = jinja2.Template(RADIUS_USERS_TMPL_SRC)
write_file(f'{radius_config}/users', tmpl.render(radius_test_user))

def test_system_login_radius_ipv6(self):
# Verify generated RADIUS configuration files
# Start tac_plus container
self.cli_set(radius_container_path + ['allow-host-networks'])
self.cli_set(radius_container_path + ['image', radius_image])
self.cli_set(radius_container_path + ['volume', 'clients', 'destination', '/etc/raddb/clients.conf'])
self.cli_set(radius_container_path + ['volume', 'clients', 'mode', 'ro'])
self.cli_set(radius_container_path + ['volume', 'clients', 'source', f'{radius_config}/clients.cfg'])
self.cli_set(radius_container_path + ['volume', 'users', 'destination', '/etc/raddb/users'])
self.cli_set(radius_container_path + ['volume', 'users', 'mode', 'ro'])
self.cli_set(radius_container_path + ['volume', 'users', 'source', f'{radius_config}/users'])

radius_key = 'VyOS-VyOS'
radius_server = '2001:db8::1'
radius_source = '::1'
radius_port = '4000'
radius_timeout = '4'
# Start container
self.cli_commit()

self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
# Deinfine RADIUS servers
for radius_server in radius_servers:
# Use this system as "remote" RADIUS server
dummy_address_mask = '32'
if is_ipv6(radius_server):
dummy_address_mask = '128'
self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_server}/{dummy_address_mask}'])
self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])

# Define RADIUS traffic source address
self.cli_set(['interfaces', 'dummy', dummy_if, 'address', f'{radius_source}/{radius_source_mask}'])
self.cli_set(base_path + ['radius', 'source-address', radius_source])
self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])

Expand All @@ -317,10 +354,13 @@ def test_system_login_radius_ipv6(self):

# this file must be read with higher permissions
pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server,
radius_port, radius_key, radius_timeout,
radius_source), pam_radius_auth_conf)
self.assertTrue(tmp)

for radius_server in radius_servers:
if is_ipv6(radius_server):
# it is essential to escape the [] brackets when searching with a regex
radius_server = rf'\[{radius_server}\]'
tmp = re.findall(rf'\n?{radius_server}:{default_port}\s+{radius_key}\s+{default_timeout}\s+{radius_source}', pam_radius_auth_conf)
self.assertTrue(tmp)

# required, static options
self.assertIn('priv-lvl 15', pam_radius_auth_conf)
Expand All @@ -347,6 +387,27 @@ def test_system_login_radius_ipv6(self):
tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf)
self.assertTrue(tmp)

# Login with proper credentials
out, err = self.ssh_send_cmd(ssh_test_command, username, password)
# verify login
self.assertFalse(err)
self.assertEqual(out, self.ssh_test_command_result)

# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
_, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1')

# Remove RADIUS configuration
self.cli_delete(base_path + ['radius'])
# Remove RADIUS container
self.cli_delete(radius_container_path)
# Remove dummy interface
self.cli_delete(['interfaces', 'dummy', dummy_if])
self.cli_commit()

# Remove rendered tac_plus daemon configuration
shutil.rmtree(radius_config)

def test_system_login_max_login_session(self):
max_logins = '2'
timeout = '600'
Expand Down Expand Up @@ -390,12 +451,6 @@ def test_system_login_tacacs(self):
tmpl = jinja2.Template(TAC_PLUS_TMPL_SRC)
write_file(f'{tac_plus_config}/tac_plus.cfg', tmpl.render(tac_test_user))

# Check if SSH service is running
ssh_running = process_named_running(SSH_PROCESS_NAME)
if not ssh_running:
# Start SSH service
self.cli_set(['service', 'ssh'])

# Start tac_plus container
self.cli_set(tac_container_path + ['allow-host-networks'])
self.cli_set(tac_container_path + ['image', tac_image])
Expand Down Expand Up @@ -450,15 +505,14 @@ def test_system_login_tacacs(self):
self.assertIn(f'server={server}', nss_tacacs_conf)

# Login with proper credentials
test_command = 'uname -a'
out, err = self.ssh_send_cmd(test_command, username, password)
out, err = self.ssh_send_cmd(ssh_test_command, username, password)
# verify login
self.assertFalse(err)
self.assertEqual(out, cmd(test_command))
self.assertEqual(out, self.ssh_test_command_result)

# Login with invalid credentials
with self.assertRaises(paramiko.ssh_exception.AuthenticationException):
_, _ = self.ssh_send_cmd(test_command, username, f'{password}1')
_, _ = self.ssh_send_cmd(ssh_test_command, username, f'{password}1')

# Remove TACACS configuration
self.cli_delete(base_path + ['tacacs'])
Expand All @@ -471,10 +525,6 @@ def test_system_login_tacacs(self):
# Remove rendered tac_plus daemon configuration
shutil.rmtree(tac_plus_config)

# Stop SSH service if it was not running before
if not ssh_running:
self.cli_delete(['service', 'ssh'])

def test_delete_current_user(self):
current_user = get_current_user()

Expand Down

0 comments on commit 99d0c7a

Please sign in to comment.