Clase ProcessPoolExecutor en Python

Requisito previo: multiprocesamiento

Permite el paralelismo del código y el lenguaje Python tiene dos formas de lograrlo: la primera es a través del módulo de multiprocesamiento y la segunda es a través del módulo de subprocesos múltiples. Desde Python 3.2 en adelante, se introdujo una nueva clase llamada ProcessPoolExecutor en python en concurrent. El módulo de futuros para administrar y crear procesos de manera eficiente. Pero espere, si Python ya tenía un módulo de multiprocesamiento incorporado, ¿por qué se introdujo un nuevo módulo? Déjame responder esto primero.

  • Generar un nuevo proceso sobre la marcha no es un problema cuando la cantidad de procesos es menor, pero se vuelve realmente engorroso administrar los procesos si se trata de muchos procesos. Aparte de esto, es computacionalmente ineficiente crear tantos procesos que conducirán a una disminución en el rendimiento. Un enfoque para mantener el rendimiento es crear e instanciar un grupo de Procesos inactivos de antemano y reutilizar los Procesos de este grupo hasta que se agoten todos los Procesos. De esta manera se reduce la sobrecarga de crear nuevos procesos.
  • Además, el grupo realiza un seguimiento y administra el ciclo de vida del proceso y los programa en nombre del programador, lo que hace que el código sea mucho más simple y menos defectuoso.

Sintaxis:

concurrent.futures.ProcessPoolExecutor(max_workers=Ninguno, mp_context=”, initializer=Ninguno, initargs=())

Parámetros:

  • max_workers : es el número de procesos, también conocido como el tamaño del grupo. Si el valor es Ninguno, en Windows se crean 61 procesos de forma predeterminada, incluso si la cantidad de núcleos disponibles es mayor.
  • mp_context : es el contexto de multiprocesamiento, si no hay ninguno o está vacío, se utiliza el contexto de multiprocesamiento predeterminado. Permite al usuario controlar el método de inicio.
  • initializer : el inicializador toma un invocable que se invoca al inicio de cada proceso de trabajo.
  • initargs : es una tupla de argumentos pasados ​​al inicializador.

Métodos ProcessPoolExecutor: la clase ProcessPoolExecutor expone los siguientes métodos para ejecutar Process de forma asíncrona. A continuación se proporciona una explicación detallada.

  • enviar (fn, *args, **kwargs): ejecuta un método invocable o un método y devuelve un objeto Future que representa el estado de ejecución del método.
  • map(fn, *iterables, timeout=None, chunksize=1): Mapea el método y los iterables juntos inmediatamente y generará una excepción concurrente. futures.TimeoutError si no lo hace dentro del límite de tiempo de espera.
    • Si los iterables son muy grandes, tener un tamaño de fragmento mayor que 1 puede mejorar el rendimiento al usar ProcessPoolExecutor.
  • apagado (esperar = Verdadero, *, cancelar_futuros = Falso):
    • Le indica al ejecutor que libere todos los recursos cuando los futuros terminen de ejecutarse.
    • Debe llamarse antes del método executor.submit() y executor.map(), de lo contrario arrojaría RuntimeError.
    • wait=True hace que el método no regrese hasta que se complete la ejecución de todos los subprocesos y se liberen los recursos.
    • cancel_futures=True, entonces el ejecutor cancelará todos los subprocesos futuros que aún no se han iniciado.
  • Cancelar(): intenta cancelar la llamada, si la llamada no se puede cancelar, devuelve falso, de lo contrario, verdadero.
  • cancelled(): devuelve True si se cancela la llamada.
  • running(): devuelve True si el proceso se está ejecutando y no se puede cancelar.
  • done(): devuelve True si el proceso ha terminado de ejecutarse.
  • result(timeout=None): devuelve el valor devuelto por el proceso, si el proceso aún está en ejecución, espera el tiempo de espera especificado; de lo contrario, genera un TimeoutError, si se especifica Ninguno, esperará eternamente a que finalice el proceso.
  • add_done_callback(fn): adjunta una función de devolución de llamada que se llama cuando el proceso finaliza su ejecución.

Ejemplo 1

El siguiente código demuestra el uso de ProcessPoolExecutor, tenga en cuenta que, a diferencia del módulo de multiprocesamiento, no tenemos que llamar explícitamente usando un bucle, mantener un seguimiento del proceso usando una lista o esperar el proceso usando unirse para la sincronización, o liberar los recursos después el proceso está terminado, todo lo toma bajo el capó el propio constructor, lo que hace que el código sea compacto y libre de errores.

Python3

from concurrent.futures import ProcessPoolExecutor
from time import sleep
 
values = [3,4,5,6]
def cube(x):
    print(f'Cube of {x}:{x*x*x}')
 
 
if __name__ == '__main__':
    result =[]
    with ProcessPoolExecutor(max_workers=5) as exe:
        exe.submit(cube,2)
         
        # Maps the method 'cube' with a iterable
        result = exe.map(cube,values)
     
    for r in result:
      print(r)

Producción:

Cube of 2:8
Cube of 3:27
Cube of 6:216
Cube of 4:64
Cube of 5:125

Ejemplo 2

El siguiente código está obteniendo imágenes a través de Internet al realizar una solicitud HTTP, estoy usando la biblioteca de requests para lo mismo. La primera sección del código realiza una llamada individual a la API y, por ejemplo, la descarga es lenta, mientras que la segunda sección del código realiza una solicitud paralela utilizando varios procesos para obtener la API.

Puede probar todos los parámetros discutidos anteriormente para ver cómo ajusta la aceleración, por ejemplo, si hago un grupo de procesos de 6 en lugar de 3, la aceleración es más significativa.

Python3

import requests
import time
import os
import concurrent.futures
  
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
  
t1 = time.time()
print("Downloading images with single process")
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
      
for img in img_urls:
    download_image(img)
  
t2 = time.time()
print(f'Single Process Code Took :{t2-t1} seconds')
print('*'*50)
  
t1 = time.time()
print("Downloading images with Multiprocess")
  
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print(f"[Process ID]:{os.getpid()} Downloading..")
  
with concurrent.futures.ProcessPoolExecutor(3) as exe:
    exe.map(download_image, img_urls)
  
t2 = time.time()
print(f'Multiprocess Code Took:{t2-t1} seconds')

Producción:

Downloading images with single process
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Process Code Took :1.2382981777191162 seconds
**************************************************
Downloading images with Multiprocess
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
Multiprocess Code Took:0.8398590087890625 seconds

Publicación traducida automáticamente

Artículo escrito por arun singh y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *