Paralelismo en Python-El runner que era demasiado amable.
Azure ofrece la posibilidad de enviar comandos y scripts (de Powershell si son hosts Windows) a sus VM utilizando la extensión Run Command. Esto es algo que podemos aprovechar para reducir el toil y administrar de manera cómoda grandes cantidades de servidores. Uno de los proyectos en los que participo es el desarrollo de una herramienta Python in-house que, entre otras cosas, permite paralelizar esta funcion enviando el mismo comando o script a varias VMs.
Durante el desarrollo me he encargado de esta parte de la herramienta desde su versión inicial. Pero no estaba contento con ella, la primera versión era demasiado amable: Esperaba a que todos las instancias de los scripts lanzados en paralelo terminasen sin dar ningún contexto o resultado intermedio, incluso si una se quedaba bloqueada o era increíblemente lenta. Tocaba debugear.
Veamos un poco cómo está organizado el programa (el código ha sido truncado y cualquier módulo referenciado debe asumirse como correctamente importado)
The polite Runner
class Runner:
"""
Class to manage the launch of scripts in parallel.
ASCYNCHRONOUS.
"""
def __init__(targetvmlist):
[...]
def runScripts(self):
"""
Run the script against all the VMs specified during object instantiation.
"""
with ThreadPoolExecutor(max_workers=30) as executor:
futures = []
for vm in self.targetvmlist:
#Instantiate a PowerShellScript object per VM to launch in. Sumbit all to the executor
pwshS = PowerShellScript(vm)
futures.append(executor.submit(pwshS.run))
execution_outputs=[]
#When execution is complete, return output
for future in as_completed(futures):
execution_outputs.append(future.result())
return(execution_outputs)
PowershellScript
class PowerShellScript:
"""
Represents a powershell script object
"""
def __init__([...]):
[...]
def run(self):
"""
Runs a powershell script in a certain VM.
"""
# Commands are prepared and readied here
[...]
ran_script = subprocess.run(command_with_parameters, capture_output=True)
#Simplified, I actually prepare an output dict
[...]
return ran_script.stdout
- La clase PowershellScript utiliza
subprocess.run
para lanzar el comando de AzCLI. Es el método que se paraleliza. - La clase Runner utiliza ThreadPoolExecutor como context manager para paralelizar la ejecución, basada en el Ejemplo de ThreadPoolExecutor en los docs oficiales. Los objetos de scripts a enviar (su método
.run
) son puestos en cola por ThreadPoolExecutor y representados como Futures. Un Future es, en una ejecución paralela, un concepto abstracto que representa la función que se ha enviado para ejecutar. - Según la documentación de ThreadPoolExecutor, cada función encolada utilizará su propio thread. Así que para n scripts de Powershell tendremos n threads.
- Los Futures terminan la ejecución tan rápido como es posible y devuelven su resultado a ThreadPoolExecutor usando
as_completed
. Esta aproximación async tiene sentido y permite que, si la primera función en la cola tarda más, otras puedan terminar igualmente e ir saliendo para devolver output. - La primera versión es increíblemente naive, sin manejo de errores ni timeouts de ningún tipo. Un script lento puede bloquear todo el programa incluso si el resto de threads terminan perfectamente. Y esto ocurría a menudo.
Así que mi primer paso, después de leer más sobre concurrent.futures, fue agregar un timeout a as_completed
, que lanza una TimeoutError exception si un thread permanece en ejecución más tiempo que el número de segundos especificado.
Less polite Runner
class Runner:
"""
Class to manage the launch of scripts in parallel.
ASCYNCHRONOUS.
"""
def __init__(targetvmlist):
[...]
def runScripts(self):
"""
Run the script against all the VMs specified during object instantiation.
"""
with ThreadPoolExecutor(max_workers=30) as executor:
futures = []
for vm in self.targetvmlist:
#Instantiate a PowerShellScript object per VM to launch in. Sumbit all to the executor
pwshS = PowerShellScript(vm)
futures.append(executor.submit(pwshS.run))
execution_outputs=[]
#When execution is complete, return output
try:
for future in as_completed(futures, timeout=240):
execution_outputs.append(future.result())
except TimeoutError as error:
execution_outputs.append(f"TIMEOUT ERROR")
finally:
return(execution_outputs)
Esto es un buen principio, ¿verdad?. Sí pero no. Runner aún era extremadamente amable: Si un thread permanecía más de 240 segundos en ejecución, lanzaba la excepción pero no terminaba la ejecución hasta que todos los threads terminasen. No devolvía el control a la función que llamaba a Runner. Un vistazo rápido por los docs me hizo pensar que podría usar future.cancel()
para matar el thread. Pero sólo pueden cancelarse threads que estén puestos en cola pero no hayan empezado aún a ejecutar, y si uno de mis threads estaban lanzando TimeoutError desde luego estaban en ejecución.
Cancelar un thread que ha empezado a ejecutarse es complicado, según mi limitada experiencia. Fue entonces cuando se me ocurrió que hacer las cosas bien y de manera sencilla es normalmente lo mejor. En ese momento tengo:
- Una instancia de la clase Runner que usa ThreadPoolExecutor para ejecutar en paralelo funciones que reciba, en este caso representando ejecuciones de un script remoto en Powershell usando subprocess.
- Eso, esencialmente, significa que tengo x threads con subprocess preparados para ejecutar con un ThreadPoolExecutor.
- Cuando uno de esos threads encuentra un problema y permanece en ejecución, todo el programa se bloquea esperando por el. Mi Runner no deja a nadie atrás.
- Esto se debe a que ThreadPoolExecutor, y por ende mi Runner, aún es demasiado amable. Incluso lanzando TimeoutError y haciendo catch de esa exception, el thread sigue en ejecución. Cuando el resto han terminado, el programa sigue esperando por el thread que llegó al timeout y aún no ha terminado de ejecutar. Esto es por diseño de ThreadPoolExecutor.
En este punto podía elegir hacer catch del timeout, continuar y no esperar por todos los resultados, pero el thread seguiría corriendo y ThreadPoolExecutor seguiría esperando. No me gustaba demasiado la idea.
También me planteé intentar forzar a ThreadPoolExecutor a detenerse y matar cualquier thread que aún no hubiera terminado, pero esto también me parecía drástico, contraintuitivo. Seguro que había una solución más simple, más correcta. ¿Qué estaba bloqueado o podía ejecutarse más lento de lo esperado? No era ningún proceso abstracto sobre el que no tenía control. Era una instancia de mi clase PowerShellScript, que por dentro usaba subprocess
. La manera más sencilla de enfocar esto no era obligar a ThreadPoolExecutor a preocuparse y controlar lo que hiciera ese objeto que ejecutaba el script usando subprocess
, era obligar a dichos objetos a comportarse debidamente. Tenía que agregar un timeout a subprocess
para que no forzase al resto del programa a esperar indefinidamente.
Less narcissitic PowershellScript
class PowerShellScript:
"""
Represents a powershell script object
"""
def __init__([...]):
[...]
def run(self):
"""
Runs a powershell script in a certain VM.
"""
# Commands are prepared and readied here
[...]
ran_script = subprocess.run(command_with_parameters, capture_output=True, timeout=220)
#Simplified, I actually prepare an output dict
[...]
return ran_script.stdout
Ahora, si un script tarda demasiado, es un problema que se gestiona en la clase que lo representa, y el timeout ocurre ahí. Pensé que estaba teniendo un problema con la ejecución en paralelo, pero creo que estaba teniendo un problema de acoplamiento. Obligar a una clase a gestionar un timeout y una ejecución que nunca debería haber sido su problema.
Como dije, estoy aprendiendo paralelismo en Python y otros conceptos intermedios (me resisto a llamar a nada de lo que estoy aprendiendo aún como “avanzado”), pero por el camino no dejo de darme cuenta de que los conceptos básicos nunca se tienen lo suficientemente afianzados.