Skip to content

Commit

Permalink
fix disconnect event issue (#2219)
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese authored and Coduz committed Dec 18, 2018
1 parent 200b831 commit 0869254
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,44 @@
*******************************************************************************/
package org.eclipse.kapua.broker.core.message.system;

import java.util.Map;

import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext;

import com.google.common.base.Splitter;

/**
* Default system message creator
*
* @since 1.0
*/
public class DefaultSystemMessageCreator implements SystemMessageCreator {

private static final String CONNECT_MESSAGE_TEMPLATE = "Device: [%s] - connected by user: [%s]";
private static final String DISCONNECT_MESSAGE_TEMPLATE = "Device: [%s] - disconnected by user: [%s]";
private static final String FIELD_SEPARATOR = ",,";
private static final String PAIR_SEPARATOR = ",";
private static final String DEVICE_ID_KEY = "DeviceId";
private static final String EVENT_KEY = "Event";
private static final String USERNAME_KEY = "Username";

@Override
public String createMessage(SystemMessageType systemMessageType, KapuaConnectionContext kbc) {
switch (systemMessageType) {
case CONNECT:
return String.format(CONNECT_MESSAGE_TEMPLATE,
kbc.getClientId(),
kbc.getUserName());
case DISCONNECT:
return String.format(DISCONNECT_MESSAGE_TEMPLATE,
kbc.getClientId(),
kbc.getUserName());
default:
return "";
}
StringBuilder builder = new StringBuilder();
builder.append(EVENT_KEY).append(PAIR_SEPARATOR).append(systemMessageType.name());
builder.append(FIELD_SEPARATOR).append(DEVICE_ID_KEY).append(PAIR_SEPARATOR).append(kbc.getClientId());
builder.append(FIELD_SEPARATOR).append(USERNAME_KEY).append(PAIR_SEPARATOR).append(kbc.getUserName());
return builder.toString();
}

public Map<String, String> convertFrom(String message) {
return Splitter.on(FIELD_SEPARATOR).withKeyValueSeparator(PAIR_SEPARATOR).split(message);
}

public String getDeviceId(Map<String, String> map) {
return map.get(DEVICE_ID_KEY);
}

public String getUsername(Map<String, String> map) {
return map.get(USERNAME_KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ public KapuaConnectionContext(String brokerId, ConnectionInfo info) {
}
}

public KapuaConnectionContext(String brokerId, KapuaPrincipal kapuaPrincipal, ConnectionInfo info, String fullClientIdPattern) {
public KapuaConnectionContext(String brokerId, String brokerIpOrHostName, KapuaPrincipal kapuaPrincipal, String accountName, ConnectionInfo info, String fullClientIdPattern) {
authDestinations = new ArrayList<>();
this.brokerId = brokerId;
this.brokerIpOrHostName = brokerIpOrHostName;
this.accountName = accountName;
userName = info.getUserName();
clientId = kapuaPrincipal.getClientId();
scopeId = kapuaPrincipal.getAccountId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
public class KapuaSecurityBrokerFilter extends BrokerFilter {

protected static final Logger logger = LoggerFactory.getLogger(KapuaSecurityBrokerFilter.class);

protected static final List<String> VT_DURABLE_PREFIX = ImmutableList.of("Consumer.{0}:AT_LEAST_ONCE.{1}", "Consumer.{0}:EXACTLY_ONCE.{1}");
protected static final String VT_CONSUMER_PREFIX = "Consumer";

Expand Down Expand Up @@ -484,7 +483,19 @@ public void removeConnection(ConnectionContext context, ConnectionInfo info, Thr
try {
KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context);
KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal());
kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), kapuaPrincipal, info, MULTI_ACCOUNT_CLIENT_ID);
//get account name
final Account account;
try {
account = KapuaSecurityUtils.doPrivileged(() -> accountService.find(kapuaPrincipal.getAccountId()));
} catch (Exception e) {
// to preserve the original exception message (if possible)
if (e instanceof AuthenticationException) {
throw (AuthenticationException) e;
} else {
throw new ShiroException("Error while find account!", e);
}
}
kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), brokerIpResolver.getBrokerIpOrHostName(), kapuaPrincipal, account.getName(), info, MULTI_ACCOUNT_CLIENT_ID);
kcc.updateOldConnectionId(CONNECTION_MAP.get(kcc.getFullClientId()));
// TODO fix the kapua session when run as feature will be implemented
KapuaSecurityUtils.setSession(new KapuaSession(kapuaPrincipal));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.core.plugin.Acl;
import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext;
import org.eclipse.kapua.broker.core.plugin.KapuaDuplicateClientIdException;

/**
* Admin profile authentication logic implementation
Expand All @@ -41,7 +42,31 @@ public List<AuthorizationEntry> connect(KapuaConnectionContext kcc) throws Kapua
}

@Override
public void disconnect(KapuaConnectionContext kcc, Throwable error) {
public boolean disconnect(KapuaConnectionContext kcc, Throwable error) {
boolean stealingLinkDetected = false;
logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error);
if (kcc.getOldConnectionId() != null) {
stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId());
}
else {
logger.error("Cannot find connection id for client id {} on connection map. Correct connection id is {} - IP: {}",
kcc.getClientId(),
kcc.getConnectionId(),
kcc.getClientIp());
}
if (!stealingLinkDetected && (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException))) {
logger.warn("Detected Stealing link for cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {}",
kcc.getClientId(),
kcc.getScopeId(),
kcc.getOldConnectionId(),
kcc.getConnectionId(),
kcc.getClientIp());
stealingLinkDetected = true;
}
if (stealingLinkDetected) {
loginMetric.getAdminStealingLinkDisconnect().inc();
}
return !stealingLinkDetected;
}

protected List<AuthorizationEntry> buildAuthorizationMap(KapuaConnectionContext kcc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ public abstract List<org.eclipse.kapua.broker.core.plugin.authentication.Authori
*
* @param kcc
* @param error
* @return true send disconnect message (if the disconnection is a clean disconnection)
* false don't send disconnect message (the disconnection is caused by a stealing link or the device is currently connected to another node)
*/
public abstract void disconnect(KapuaConnectionContext kcc, Throwable error);
public abstract boolean disconnect(KapuaConnectionContext kcc, Throwable error);

/**
* @param kcc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,51 +96,45 @@ public void disconnect(KapuaConnectionContext kcc, Throwable error) {
adminAuthenticationLogic.disconnect(kcc, error);
} else {
clientMetric.getDisconnectionClient().inc();
userAuthenticationLogic.disconnect(kcc, error);
sendDisconnectMessage(kcc);
if (userAuthenticationLogic.disconnect(kcc, error)) {
sendDisconnectMessage(kcc);
}
}
}

@Override
public void sendConnectMessage(KapuaConnectionContext kcc) {
Context loginSendLogingUpdateMsgTimeContex = loginMetric.getSendLoginUpdateMsgTime().time();
String message = systemMessageCreator.createMessage(SystemMessageType.CONNECT, kcc);
JmsAssistantProducerWrapper producerWrapper = null;
try {
producerWrapper = JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).borrowObject();
producerWrapper.send(String.format((String) options.get(Authenticator.ADDRESS_CONNECT_PATTERN_KEY),
SystemSetting.getInstance().getMessageClassifier(), kcc.getAccountName(), kcc.getClientId()),
message,
kcc);
} catch (Exception e) {
logger.error("Exception sending the connect message: {}", e.getMessage(), e);
} finally {
if (producerWrapper != null) {
JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).returnObject(producerWrapper);
}
}
loginSendLogingUpdateMsgTimeContex.stop();
sendMessage(kcc, Authenticator.ADDRESS_CONNECT_PATTERN_KEY, SystemMessageType.CONNECT);
}

@Override
public void sendDisconnectMessage(KapuaConnectionContext kcc) {
Context loginSendLogingUpdateMsgTimeContex = loginMetric.getSendLoginUpdateMsgTime().time();
String message = systemMessageCreator.createMessage(SystemMessageType.DISCONNECT, kcc);
JmsAssistantProducerWrapper producerWrapper = null;
try {
producerWrapper = JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).borrowObject();
producerWrapper.send(String.format((String) options.get(Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY),
SystemSetting.getInstance().getMessageClassifier(), kcc.getAccountName(), kcc.getClientId()),
message,
kcc);
} catch (Exception e) {
logger.error("Exception sending the connect message: {}", e.getMessage(), e);
} finally {
if (producerWrapper != null) {
JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).returnObject(producerWrapper);
sendMessage(kcc, Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY, SystemMessageType.DISCONNECT);
}

private void sendMessage(KapuaConnectionContext kcc, String messageAddressPattern, SystemMessageType systemMessageType) {
if (systemMessageType != null) {
Context loginSendLogingUpdateMsgTimeContex = loginMetric.getSendLoginUpdateMsgTime().time();
String message = systemMessageCreator.createMessage(systemMessageType, kcc);
JmsAssistantProducerWrapper producerWrapper = null;
try {
producerWrapper = JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).borrowObject();
producerWrapper.send(String.format((String) options.get(messageAddressPattern),
SystemSetting.getInstance().getMessageClassifier(), kcc.getAccountName(), kcc.getClientId()),
message,
kcc);
} catch (Exception e) {
logger.error("Exception sending the {} message: {}", systemMessageType.name().toLowerCase(), e.getMessage(), e);
} finally {
if (producerWrapper != null) {
JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).returnObject(producerWrapper);
}
}
loginSendLogingUpdateMsgTimeContex.stop();
}
else {
logger.warn("Cannot send system message for address pattern {} since the system message type is null!", messageAddressPattern);
}
loginSendLogingUpdateMsgTimeContex.stop();
}

protected boolean isAdminUser(KapuaConnectionContext kcc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,41 +99,58 @@ public List<AuthorizationEntry> connect(KapuaConnectionContext kcc) throws Kapua
}

@Override
public void disconnect(KapuaConnectionContext kcc, Throwable error) {
public boolean disconnect(KapuaConnectionContext kcc, Throwable error) {
boolean stealingLinkDetected = false;
logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "NULL"), error);
boolean deviceOwnedByTheCurrentNode = true;
logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error);
if (kcc.getOldConnectionId() != null) {
stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId());
} else {
}
else {
logger.error("Cannot find connection id for client id {} on connection map. Correct connection id is {} - IP: {}",
kcc.getClientId(),
kcc.getConnectionId(),
kcc.getClientIp());
}
if (stealingLinkDetected) {
loginMetric.getStealingLinkDisconnect().inc();
// stealing link detected, skip info
if (!stealingLinkDetected && (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException))) {
stealingLinkDetected = true;
logger.warn("Detected Stealing link for cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!",
kcc.getClientId(),
kcc.getScopeId(),
kcc.getOldConnectionId(),
kcc.getConnectionId(),
kcc.getClientIp());
} else {
}
if (stealingLinkDetected) {
loginMetric.getStealingLinkDisconnect().inc();
logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}",
kcc.getClientId(),
kcc.getConnectionId());
}
else {
// update device connection (if the disconnection wasn't caused by a stealing link)
DeviceConnection deviceConnection;
try {
deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId()));
} catch (Exception e) {
throw new ShiroException("Error while looking for device connection on updating the device!", e);
throw new ShiroException("Error while looking for device connection on updating the device status!", e);
}
// the device connection must be not null
if (deviceConnection != null) {
// the device connection must be not null
// update device connection (if the disconnection wasn't caused by a stealing link)
if (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException)) {
logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}",
if (kcc.getBrokerIpOrHostName() == null) {
logger.warn("Broker Ip or host name is not correctly set! Please check the configuration!");
}
else if (!kcc.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) {
//the device is connected to a different node so skip to update the status!
deviceOwnedByTheCurrentNode = false;
logger.warn("Detected disconnection for client connected to another node: cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!",
kcc.getClientId(),
kcc.getConnectionId());
} else {
kcc.getScopeId(),
kcc.getOldConnectionId(),
kcc.getConnectionId(),
kcc.getClientIp());
}
if(deviceOwnedByTheCurrentNode) {
deviceConnection.setStatus(error == null ? DeviceConnectionStatus.DISCONNECTED : DeviceConnectionStatus.MISSING);
try {
KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.update(deviceConnection));
Expand All @@ -143,6 +160,7 @@ public void disconnect(KapuaConnectionContext kcc, Throwable error) {
}
}
}
return !stealingLinkDetected && deviceOwnedByTheCurrentNode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class LoginMetric {
private Counter normalUserAttempt;
private Counter stealingLinkConnect;
private Counter stealingLinkDisconnect;
private Counter adminStealingLinkDisconnect;
protected Counter remoteStealingLinkDisconnect;
private Timer addConnectionTime;
private Timer normalUserTime;
Expand All @@ -54,6 +55,7 @@ private LoginMetric() {
normalUserAttempt = metricsService.getCounter("security", "login", "normal", "count");
stealingLinkConnect = metricsService.getCounter("security", "login", "stealing_link", "connect", "count");
stealingLinkDisconnect = metricsService.getCounter("security", "login", "stealing_link", "disconnect", "count");
adminStealingLinkDisconnect = metricsService.getCounter("security", "login", "admin_stealing_link", "disconnect", "count");
remoteStealingLinkDisconnect = metricsService.getCounter("security", "login", "remote_stealing_link", "disconnect", "count");
// login time
addConnectionTime = metricsService.getTimer("security", "login", "add_connection", "time", "s");
Expand Down Expand Up @@ -103,6 +105,14 @@ public Counter getRemoteStealingLinkDisconnect() {
return remoteStealingLinkDisconnect;
}

public Counter getAdminStealingLinkDisconnect() {
return adminStealingLinkDisconnect;
}

public void setAdminStealingLinkDisconnect(Counter adminStealingLinkDisconnect) {
this.adminStealingLinkDisconnect = adminStealingLinkDisconnect;
}

public Timer getAddConnectionTime() {
return addConnectionTime;
}
Expand Down

0 comments on commit 0869254

Please sign in to comment.