diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java index 687a28d40c3..a6b6e698809 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java @@ -11,8 +11,12 @@ *******************************************************************************/ package org.eclipse.kapua.broker.core.message.system; +import java.util.Map; + import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import com.google.common.base.Splitter; + /** * Default system message creator * @@ -20,23 +24,31 @@ */ public class DefaultSystemMessageCreator implements SystemMessageCreator { - private static final String CONNECT_MESSAGE_TEMPLATE = "Device: [%s] - connected by user: [%s]"; - private static final String DISCONNECT_MESSAGE_TEMPLATE = "Device: [%s] - disconnected by user: [%s]"; + private static final String FIELD_SEPARATOR = ",,"; + private static final String PAIR_SEPARATOR = ","; + private static final String DEVICE_ID_KEY = "DeviceId"; + private static final String EVENT_KEY = "Event"; + private static final String USERNAME_KEY = "Username"; @Override public String createMessage(SystemMessageType systemMessageType, KapuaConnectionContext kbc) { - switch (systemMessageType) { - case CONNECT: - return String.format(CONNECT_MESSAGE_TEMPLATE, - kbc.getClientId(), - kbc.getUserName()); - case DISCONNECT: - return String.format(DISCONNECT_MESSAGE_TEMPLATE, - kbc.getClientId(), - kbc.getUserName()); - default: - return ""; - } + StringBuilder builder = new StringBuilder(); + builder.append(EVENT_KEY).append(PAIR_SEPARATOR).append(systemMessageType.name()); + builder.append(FIELD_SEPARATOR).append(DEVICE_ID_KEY).append(PAIR_SEPARATOR).append(kbc.getClientId()); + builder.append(FIELD_SEPARATOR).append(USERNAME_KEY).append(PAIR_SEPARATOR).append(kbc.getUserName()); + return builder.toString(); + } + + public Map convertFrom(String message) { + return Splitter.on(FIELD_SEPARATOR).withKeyValueSeparator(PAIR_SEPARATOR).split(message); + } + + public String getDeviceId(Map map) { + return map.get(DEVICE_ID_KEY); + } + + public String getUsername(Map map) { + return map.get(USERNAME_KEY); } } diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java index fc6e869d6e6..fae12a4a505 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java @@ -71,9 +71,11 @@ public KapuaConnectionContext(String brokerId, ConnectionInfo info) { } } - public KapuaConnectionContext(String brokerId, KapuaPrincipal kapuaPrincipal, ConnectionInfo info, String fullClientIdPattern) { + public KapuaConnectionContext(String brokerId, String brokerIpOrHostName, KapuaPrincipal kapuaPrincipal, String accountName, ConnectionInfo info, String fullClientIdPattern) { authDestinations = new ArrayList<>(); this.brokerId = brokerId; + this.brokerIpOrHostName = brokerIpOrHostName; + this.accountName = accountName; userName = info.getUserName(); clientId = kapuaPrincipal.getClientId(); scopeId = kapuaPrincipal.getAccountId(); diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java index 6e57af1f42c..ca6c86cb9c1 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java @@ -97,7 +97,6 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { protected static final Logger logger = LoggerFactory.getLogger(KapuaSecurityBrokerFilter.class); - protected static final List VT_DURABLE_PREFIX = ImmutableList.of("Consumer.{0}:AT_LEAST_ONCE.{1}", "Consumer.{0}:EXACTLY_ONCE.{1}"); protected static final String VT_CONSUMER_PREFIX = "Consumer"; @@ -484,7 +483,19 @@ public void removeConnection(ConnectionContext context, ConnectionInfo info, Thr try { KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); - kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), kapuaPrincipal, info, MULTI_ACCOUNT_CLIENT_ID); + //get account name + final Account account; + try { + account = KapuaSecurityUtils.doPrivileged(() -> accountService.find(kapuaPrincipal.getAccountId())); + } catch (Exception e) { + // to preserve the original exception message (if possible) + if (e instanceof AuthenticationException) { + throw (AuthenticationException) e; + } else { + throw new ShiroException("Error while find account!", e); + } + } + kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), brokerIpResolver.getBrokerIpOrHostName(), kapuaPrincipal, account.getName(), info, MULTI_ACCOUNT_CLIENT_ID); kcc.updateOldConnectionId(CONNECTION_MAP.get(kcc.getFullClientId())); // TODO fix the kapua session when run as feature will be implemented KapuaSecurityUtils.setSession(new KapuaSession(kapuaPrincipal)); diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java index 1faea195e30..3d007ef9838 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java @@ -18,6 +18,7 @@ import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.broker.core.plugin.Acl; import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaDuplicateClientIdException; /** * Admin profile authentication logic implementation @@ -41,7 +42,31 @@ public List connect(KapuaConnectionContext kcc) throws Kapua } @Override - public void disconnect(KapuaConnectionContext kcc, Throwable error) { + public boolean disconnect(KapuaConnectionContext kcc, Throwable error) { + boolean stealingLinkDetected = false; + logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); + if (kcc.getOldConnectionId() != null) { + stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId()); + } + else { + logger.error("Cannot find connection id for client id {} on connection map. Correct connection id is {} - IP: {}", + kcc.getClientId(), + kcc.getConnectionId(), + kcc.getClientIp()); + } + if (!stealingLinkDetected && (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException))) { + logger.warn("Detected Stealing link for cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {}", + kcc.getClientId(), + kcc.getScopeId(), + kcc.getOldConnectionId(), + kcc.getConnectionId(), + kcc.getClientIp()); + stealingLinkDetected = true; + } + if (stealingLinkDetected) { + loginMetric.getAdminStealingLinkDisconnect().inc(); + } + return !stealingLinkDetected; } protected List buildAuthorizationMap(KapuaConnectionContext kcc) { diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java index 2683611320c..e12cbeefe58 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java @@ -101,8 +101,10 @@ public abstract List connect(KapuaConnectionContext kcc) throws Kapua } @Override - public void disconnect(KapuaConnectionContext kcc, Throwable error) { + public boolean disconnect(KapuaConnectionContext kcc, Throwable error) { boolean stealingLinkDetected = false; - logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "NULL"), error); + boolean deviceOwnedByTheCurrentNode = true; + logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); if (kcc.getOldConnectionId() != null) { stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId()); - } else { + } + else { logger.error("Cannot find connection id for client id {} on connection map. Correct connection id is {} - IP: {}", kcc.getClientId(), kcc.getConnectionId(), kcc.getClientIp()); } - if (stealingLinkDetected) { - loginMetric.getStealingLinkDisconnect().inc(); - // stealing link detected, skip info + if (!stealingLinkDetected && (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException))) { + stealingLinkDetected = true; logger.warn("Detected Stealing link for cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!", kcc.getClientId(), kcc.getScopeId(), kcc.getOldConnectionId(), kcc.getConnectionId(), kcc.getClientIp()); - } else { + } + if (stealingLinkDetected) { + loginMetric.getStealingLinkDisconnect().inc(); + logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}", + kcc.getClientId(), + kcc.getConnectionId()); + } + else { + // update device connection (if the disconnection wasn't caused by a stealing link) DeviceConnection deviceConnection; try { deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId())); } catch (Exception e) { - throw new ShiroException("Error while looking for device connection on updating the device!", e); + throw new ShiroException("Error while looking for device connection on updating the device status!", e); } + // the device connection must be not null if (deviceConnection != null) { - // the device connection must be not null - // update device connection (if the disconnection wasn't caused by a stealing link) - if (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException)) { - logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}", + if (kcc.getBrokerIpOrHostName() == null) { + logger.warn("Broker Ip or host name is not correctly set! Please check the configuration!"); + } + else if (!kcc.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) { + //the device is connected to a different node so skip to update the status! + deviceOwnedByTheCurrentNode = false; + logger.warn("Detected disconnection for client connected to another node: cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!", kcc.getClientId(), - kcc.getConnectionId()); - } else { + kcc.getScopeId(), + kcc.getOldConnectionId(), + kcc.getConnectionId(), + kcc.getClientIp()); + } + if(deviceOwnedByTheCurrentNode) { deviceConnection.setStatus(error == null ? DeviceConnectionStatus.DISCONNECTED : DeviceConnectionStatus.MISSING); try { KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.update(deviceConnection)); @@ -143,6 +160,7 @@ public void disconnect(KapuaConnectionContext kcc, Throwable error) { } } } + return !stealingLinkDetected && deviceOwnedByTheCurrentNode; } @Override diff --git a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java index c7f6e321778..af29aeafda1 100644 --- a/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java +++ b/broker-core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java @@ -28,6 +28,7 @@ public class LoginMetric { private Counter normalUserAttempt; private Counter stealingLinkConnect; private Counter stealingLinkDisconnect; + private Counter adminStealingLinkDisconnect; protected Counter remoteStealingLinkDisconnect; private Timer addConnectionTime; private Timer normalUserTime; @@ -54,6 +55,7 @@ private LoginMetric() { normalUserAttempt = metricsService.getCounter("security", "login", "normal", "count"); stealingLinkConnect = metricsService.getCounter("security", "login", "stealing_link", "connect", "count"); stealingLinkDisconnect = metricsService.getCounter("security", "login", "stealing_link", "disconnect", "count"); + adminStealingLinkDisconnect = metricsService.getCounter("security", "login", "admin_stealing_link", "disconnect", "count"); remoteStealingLinkDisconnect = metricsService.getCounter("security", "login", "remote_stealing_link", "disconnect", "count"); // login time addConnectionTime = metricsService.getTimer("security", "login", "add_connection", "time", "s"); @@ -103,6 +105,14 @@ public Counter getRemoteStealingLinkDisconnect() { return remoteStealingLinkDisconnect; } + public Counter getAdminStealingLinkDisconnect() { + return adminStealingLinkDisconnect; + } + + public void setAdminStealingLinkDisconnect(Counter adminStealingLinkDisconnect) { + this.adminStealingLinkDisconnect = adminStealingLinkDisconnect; + } + public Timer getAddConnectionTime() { return addConnectionTime; }