From 8c9992017e145a8d8ff98eb4ce888fc690727263 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Wed, 27 Jan 2021 15:01:47 +0000 Subject: [PATCH 1/2] heal the cluster if there are failures --- src/AzManagers.jl | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index ba85bc12..2c231fd1 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -130,6 +130,7 @@ mutable struct AzManager <: ClusterManager scalesets::Vector{ScaleSet} pending_up::Channel{TCPSocket} pending_down::Dict{ScaleSet,Vector{String}} + vm_failure_count::Int port::UInt16 server::Sockets.TCPServer task_add::Task @@ -152,6 +153,7 @@ function azmanager!(session, nretry, verbose) _manager.port,_manager.server = listenany(getipaddr(), 9000) _manager.pending_up = Channel{TCPSocket}(32) _manager.pending_down = Dict{ScaleSet,Vector{Int}}() + _manager.vm_failure_count = 0 _manager.scalesets = Vector{ScaleSet}[] _manager.task_add = @async add_pending_connections() _manager.task_process = @async process_pending_connections() @@ -177,6 +179,7 @@ function scaleset_monitor() sleep(10) delete_empty_scalesets() delete_pending_down_vms() + heal() end catch e @error "scaleset monitor error:" @@ -186,6 +189,7 @@ end scalesets(manager::AzManager) = isdefined(manager, :scalesets) ? manager.scalesets : ScaleSet[] pending_down(manager::AzManager) = isdefined(manager, :pending_down) ? manager.pending_down : Dict{ScaleSet,Vector{String}}() +vm_failure_count(manager::AzManager) = isdefined(manager, :vm_failure_count) ? manager.vm_failure_count : 0 function delete_empty_scalesets() manager = azmanager() @@ -216,6 +220,38 @@ function delete_pending_down_vms() end end +function heal() + manager = azmanager() + _vm_failure_count = vm_failure_count(manager) + if _vm_failure_count > 0 + @info "there are vm failures, count=$_vm_failure_count" + active_vms = filter!(!isempty, [isa(worker, Distributed.Worker) ? worker.config.userdata : Dict() for worker in Distributed.PGRP.workers]) + orphan_vms = Dict{ScaleSet,Vector{Dict}}() + + norphans = 0 + for scaleset in scalesets(manager) + vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose) + _active_vms = filter!(vm->(vm["subscriptionid"]==scaleset.subscriptionid && vm["resourcegroup"]==scaleset.resourcegroup && vm["scalesetname"]==scaleset.scalesetname), active_vms) + orphan_vms[scaleset] = filter!(vm->vm["name"] ∉ [_active_vm["name"] for _active_vm in _active_vms], vms) + norphans += length(orphan_vms[scaleset]) + end + + @info "number of orphan vms=$norphans" + if norphans <= _vm_failure_count + @info "number of orphans is less than or equal to the number of failures, recovering..." + for scaleset in scalesets(manager) + ids = [orphan_vm["instanceid"] for orphan_vm in orphan_vms[scaleset]] + delete_vms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, ids, manager.nretry, manager.verbose) + add_vms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, length(ids), manager.nretry, manager.verbose) + manager.vm_failure_count -= length(ids) + if manager.vm_failure_count < 0 + @error "something went wrong" + end + end + end + end +end + function delete_scalesets() manager = azmanager() _scalesets = scalesets(manager) @@ -232,6 +268,7 @@ function add_pending_connections() push!(manager.pending_up, s) end catch + manager.vm_failures += 1 @error "AzManagers, error adding pending connection" for (exc, bt) in Base.catch_stack() showerror(stderr, exc, bt) From e1d0ed36946d1f491e1ef80616d256d43b1470a8 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Mon, 1 Feb 2021 16:36:34 +0000 Subject: [PATCH 2/2] fixup --- src/AzManagers.jl | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 2c231fd1..f52a04fe 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -1330,6 +1330,23 @@ function delete_vms(manager::AzManager, subscriptionid, resourcegroup, scalesetn json(body)) end +function add_vms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, δ, nretry, verbose) + r = @retry nretry azrequest( + "GET", + verbose, + "https://management.azure.com/subscriptions/$subscriptionid/resourceGroups/$resourcegroup/providers/Microsoft.Compute/virtualMachineScaleSets/$scalesetname?api-version=2020-06-01", + ["Authorization"=>"Bearer $(token(manager.session))"]) + body = JSON.parse(String(r.body)) + body["sku"]["capacity"] += δ + + @retry nretry azrequest( + "PUT", + verbose, + "https://management.azure.com/subscriptions/$subscriptionid/resourceGroups/$resourcegroup/providers/Microsoft.Compute/virtualMachineScaleSets/$scalesetname?api-version=2020-06-01", + ["Authorization"=>"Bearer $(token(manager.session))", "Content-Type"=>"application/json"], + json(body)) +end + # # detached service and REST API #