diff --git a/src/AzManagers.jl b/src/AzManagers.jl index ba85bc12..f52a04fe 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -130,6 +130,7 @@ mutable struct AzManager <: ClusterManager scalesets::Vector{ScaleSet} pending_up::Channel{TCPSocket} pending_down::Dict{ScaleSet,Vector{String}} + vm_failure_count::Int port::UInt16 server::Sockets.TCPServer task_add::Task @@ -152,6 +153,7 @@ function azmanager!(session, nretry, verbose) _manager.port,_manager.server = listenany(getipaddr(), 9000) _manager.pending_up = Channel{TCPSocket}(32) _manager.pending_down = Dict{ScaleSet,Vector{Int}}() + _manager.vm_failure_count = 0 _manager.scalesets = Vector{ScaleSet}[] _manager.task_add = @async add_pending_connections() _manager.task_process = @async process_pending_connections() @@ -177,6 +179,7 @@ function scaleset_monitor() sleep(10) delete_empty_scalesets() delete_pending_down_vms() + heal() end catch e @error "scaleset monitor error:" @@ -186,6 +189,7 @@ end scalesets(manager::AzManager) = isdefined(manager, :scalesets) ? manager.scalesets : ScaleSet[] pending_down(manager::AzManager) = isdefined(manager, :pending_down) ? manager.pending_down : Dict{ScaleSet,Vector{String}}() +vm_failure_count(manager::AzManager) = isdefined(manager, :vm_failure_count) ? manager.vm_failure_count : 0 function delete_empty_scalesets() manager = azmanager() @@ -216,6 +220,38 @@ function delete_pending_down_vms() end end +function heal() + manager = azmanager() + _vm_failure_count = vm_failure_count(manager) + if _vm_failure_count > 0 + @info "there are vm failures, count=$_vm_failure_count" + active_vms = filter!(!isempty, [isa(worker, Distributed.Worker) ? worker.config.userdata : Dict() for worker in Distributed.PGRP.workers]) + orphan_vms = Dict{ScaleSet,Vector{Dict}}() + + norphans = 0 + for scaleset in scalesets(manager) + vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose) + _active_vms = filter!(vm->(vm["subscriptionid"]==scaleset.subscriptionid && vm["resourcegroup"]==scaleset.resourcegroup && vm["scalesetname"]==scaleset.scalesetname), active_vms) + orphan_vms[scaleset] = filter!(vm->vm["name"] ∉ [_active_vm["name"] for _active_vm in _active_vms], vms) + norphans += length(orphan_vms[scaleset]) + end + + @info "number of orphan vms=$norphans" + if norphans <= _vm_failure_count + @info "number of orphans is less than or equal to the number of failures, recovering..." + for scaleset in scalesets(manager) + ids = [orphan_vm["instanceid"] for orphan_vm in orphan_vms[scaleset]] + delete_vms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, ids, manager.nretry, manager.verbose) + add_vms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, length(ids), manager.nretry, manager.verbose) + manager.vm_failure_count -= length(ids) + if manager.vm_failure_count < 0 + @error "something went wrong" + end + end + end + end +end + function delete_scalesets() manager = azmanager() _scalesets = scalesets(manager) @@ -232,6 +268,7 @@ function add_pending_connections() push!(manager.pending_up, s) end catch + manager.vm_failures += 1 @error "AzManagers, error adding pending connection" for (exc, bt) in Base.catch_stack() showerror(stderr, exc, bt) @@ -1293,6 +1330,23 @@ function delete_vms(manager::AzManager, subscriptionid, resourcegroup, scalesetn json(body)) end +function add_vms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, δ, nretry, verbose) + r = @retry nretry azrequest( + "GET", + verbose, + "https://management.azure.com/subscriptions/$subscriptionid/resourceGroups/$resourcegroup/providers/Microsoft.Compute/virtualMachineScaleSets/$scalesetname?api-version=2020-06-01", + ["Authorization"=>"Bearer $(token(manager.session))"]) + body = JSON.parse(String(r.body)) + body["sku"]["capacity"] += δ + + @retry nretry azrequest( + "PUT", + verbose, + "https://management.azure.com/subscriptions/$subscriptionid/resourceGroups/$resourcegroup/providers/Microsoft.Compute/virtualMachineScaleSets/$scalesetname?api-version=2020-06-01", + ["Authorization"=>"Bearer $(token(manager.session))", "Content-Type"=>"application/json"], + json(body)) +end + # # detached service and REST API #