-
Notifications
You must be signed in to change notification settings - Fork 899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix workers not restarting when auth/endpoints are changed. #22269
base: master
Are you sure you want to change the base?
Changes from all commits
2c4ff02
8ad2e53
e250c81
6a43cb9
2016808
6f5a865
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,18 +176,41 @@ def self.create_from_params(params, endpoints, authentications) | |
|
||
def edit_with_params(params, endpoints, authentications) | ||
tap do |ems| | ||
endpoints_changed = false | ||
authentications_changed = false | ||
|
||
endpoint_changes = {} | ||
authentication_changes = {} | ||
|
||
transaction do | ||
# Remove endpoints/attributes that are not arriving in the arguments above | ||
ems.endpoints.where.not(:role => nil).where.not(:role => endpoints.map { |ep| ep['role'] }).delete_all | ||
ems.authentications.where.not(:authtype => nil).where.not(:authtype => authentications.map { |au| au['authtype'] }).delete_all | ||
endpoints_to_delete = ems.endpoints.where.not(:role => nil).where.not(:role => endpoints.map { |ep| ep['role'] }) | ||
authentications_to_delete = ems.authentications.where.not(:authtype => nil).where.not(:authtype => authentications.map { |au| au['authtype'] }) | ||
|
||
endpoints_changed ||= endpoints_to_delete.delete_all > 0 | ||
authentications_changed ||= authentications_to_delete.delete_all > 0 | ||
|
||
ems.assign_attributes(params) | ||
ems.endpoints = endpoints.map(&method(:assign_nested_endpoint)) | ||
ems.authentications = authentications.map(&method(:assign_nested_authentication)) | ||
ems.endpoints = endpoints.map { |ep| assign_nested_endpoint(ep) } | ||
ems.authentications = authentications.map { |auth| assign_nested_authentication(auth) } | ||
|
||
endpoint_changes = ems.endpoints.select(&:changed?).to_h do |ep| | ||
[ep.role.to_sym, ep.changes] | ||
end | ||
|
||
authentication_changes = ems.authentications.select(&:changed?).to_h do |auth| | ||
[auth.authtype.to_sym, auth.changes] | ||
end | ||
|
||
endpoints_changed ||= endpoint_changes.present? | ||
authentications_changed ||= authentication_changes.present? | ||
|
||
ems.provider.save! if ems.provider.present? && ems.provider.changed? | ||
ems.save! | ||
end | ||
|
||
after_update_endpoints(endpoint_changes) if endpoints_changed | ||
after_update_authentication(authentication_changes) if authentications_changed | ||
end | ||
end | ||
|
||
|
@@ -834,8 +857,12 @@ def perf_capture_enabled? | |
# Some workers hold open a connection to the provider and thus do not | ||
# automatically pick up authentication changes. These workers have to be | ||
# restarted manually for the new credentials to be used. | ||
def after_update_authentication | ||
stop_event_monitor_queue_on_credential_change | ||
def after_update_authentication(changes) | ||
stop_event_monitor_queue_on_credential_change(changes) | ||
end | ||
|
||
def after_update_endpoints(changes) | ||
stop_event_monitor_queue_on_change(changes) | ||
end | ||
|
||
################################### | ||
|
@@ -847,6 +874,14 @@ def self.event_monitor_class | |
end | ||
delegate :event_monitor_class, :to => :class | ||
|
||
def endpoint_role_for_events | ||
:default | ||
end | ||
|
||
def authtype_for_events | ||
default_authentication_type | ||
end | ||
|
||
def event_monitor | ||
return if event_monitor_class.nil? | ||
event_monitor_class.find_by_ems(self).first | ||
|
@@ -874,15 +909,15 @@ def stop_event_monitor_queue | |
) | ||
end | ||
|
||
def stop_event_monitor_queue_on_change | ||
if event_monitor_class && !self.new_record? && default_endpoint.changed.include_any?("hostname", "ipaddress") | ||
def stop_event_monitor_queue_on_change(changes) | ||
if event_monitor_class && !new_record? && changes[endpoint_role_for_events]&.keys&.include_any?("hostname", "ipaddress") | ||
_log.info("EMS: [#{name}], Hostname or IP address has changed, stopping Event Monitor. It will be restarted by the WorkerMonitor.") | ||
stop_event_monitor_queue | ||
end | ||
end | ||
|
||
def stop_event_monitor_queue_on_credential_change | ||
if event_monitor_class && !self.new_record? && self.credentials_changed? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE I didn't want to rewrite the Also that method only checked I would like to be able to specify e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that something you want to do as a followup or for this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I should be able to get that working in this PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added |
||
def stop_event_monitor_queue_on_credential_change(changes) | ||
if event_monitor_class && !new_record? && changes[authtype_for_events].present? | ||
_log.info("EMS: [#{name}], Credentials have changed, stopping Event Monitor. It will be restarted by the WorkerMonitor.") | ||
stop_event_monitor_queue | ||
end | ||
|
@@ -908,6 +943,14 @@ def self.refresh_worker_class | |
end | ||
delegate :refresh_worker_class, :to => :class | ||
|
||
def endpoint_role_for_refresh | ||
:default | ||
end | ||
|
||
def authtype_for_refresh | ||
default_authentication_type | ||
end | ||
|
||
def refresh_worker | ||
return if refresh_worker_class.nil? | ||
|
||
|
@@ -938,15 +981,15 @@ def stop_refresh_worker_queue | |
) | ||
end | ||
|
||
def stop_refresh_worker_queue_on_change | ||
if refresh_worker_class && !self.new_record? && default_endpoint.changed.include_any?("hostname", "ipaddress") | ||
def stop_refresh_worker_queue_on_change(changes) | ||
if refresh_worker_class && !new_record? && changes["default"]&.keys&.include_any?("hostname", "ipaddress") | ||
_log.info("EMS: [#{name}], Hostname or IP address has changed, stopping Refresh Worker. It will be restarted by the WorkerMonitor.") | ||
stop_refresh_worker_queue | ||
end | ||
end | ||
|
||
def stop_refresh_worker_queue_on_credential_change | ||
if refresh_worker_class && !self.new_record? && self.credentials_changed? | ||
def stop_refresh_worker_queue_on_credential_change(changes) | ||
if refresh_worker_class && !new_record? && changes[authtype_for_refresh].present? | ||
_log.info("EMS: [#{name}], Credentials have changed, stopping Refresh Worker. It will be restarted by the WorkerMonitor.") | ||
stop_refresh_worker_queue | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE I'm not sure how necessary it is to differentiate between endpoints and authentications changing. Currently as far as I can tell we only use this information to restart workers and we need to do that when either endpoints or authentications change.