¿Cómo usar ThreadPoolExecutor en Python3?

Requisito previo: subprocesamiento múltiple

La creación de subprocesos permite el paralelismo del código y el lenguaje Python tiene dos formas de lograrlo: la primera es a través del módulo de multiprocesamiento y la segunda es a través del módulo de subprocesos múltiples. Los subprocesos múltiples son adecuados para acelerar las tareas vinculadas a E/S, como realizar una solicitud web, operaciones de base de datos o leer/escribir en un archivo. En contraste con esto, las tareas intensivas de CPU, como las tareas de cálculo matemático, se benefician más utilizando el multiprocesamiento. Esto sucede debido a GIL (Global Interpreter Lock).

Desde Python 3.2 en adelante, se introdujo una nueva clase llamada ThreadPoolExecutor en Python en el módulo concurrent.futures para administrar y crear hilos de manera eficiente. Pero espere, si Python ya tenía un módulo de subprocesos incorporado, ¿por qué se introdujo un nuevo módulo? Déjame responder esto primero.

  • Generar nuevos subprocesos sobre la marcha no es un problema cuando la cantidad de subprocesos es menor, pero se vuelve realmente engorroso administrar los subprocesos si se trata de muchos subprocesos. Aparte de esto, es computacionalmente ineficiente crear tantos subprocesos que conducirán a una disminución en el rendimiento. Un enfoque para mantener el rendimiento es crear e instanciar un grupo de subprocesos inactivos de antemano y reutilizar los subprocesos de este grupo hasta que se agoten todos los subprocesos. De esta manera se reduce la sobrecarga de crear nuevos hilos.
  • Además, el grupo realiza un seguimiento y administra el ciclo de vida de los subprocesos y los programa en nombre del programador, lo que hace que el código sea mucho más simple y con menos errores.

Sintaxis: concurrent.futures.ThreadPoolExecutor(max_workers=Ninguno, thread_name_prefix=”, initializer=Ninguno, initargs=())

Parámetros:

  • max_workers : es una cantidad de subprocesos, también conocido como tamaño del grupo. Desde 3.8 en adelante, el valor predeterminado es min (32, os.cpu_count() + 4). De estos 5 subprocesos se conservan para tareas enlazadas de E/S.
  • thread_name_prefix : thread_name_prefix se agregó desde python 3.6 en adelante para dar nombres a los hilos para facilitar la depuración.
  • inicializador: el inicializador toma un invocable que se invoca al inicio de cada subproceso de trabajo.
  • initargs: Es una tupla de argumentos pasados ​​al inicializador.

Métodos ThreadPoolExecutor:

La clase ThreadPoolExecutor expone tres métodos para ejecutar subprocesos de forma asíncrona. A continuación se proporciona una explicación detallada.

  1. enviar (fn, *args, **kwargs): ejecuta un método invocable o un método y devuelve un objeto Future que representa el estado de ejecución del método.
  2. mapa (fn, * iterables, tiempo de espera = ninguno, tamaño de fragmento = 1): 
    • Mapea el método y los iterables juntos de inmediato y generará una excepción concurrente. futures.TimeoutError si no lo hace dentro del límite de tiempo de espera.
    • Si los iterables son muy grandes, tener un tamaño de fragmento mayor que 1 puede mejorar el rendimiento al usar ProcessPoolExecutor pero con ThreadPoolExecutor no tiene esa ventaja, es decir, se puede dejar en su valor predeterminado.
  3. apagar (esperar = Verdadero, *, cancelar_futuros = Falso): 
    • Le indica al ejecutor que libere todos los recursos cuando los futuros terminen de ejecutarse.
    • Debe llamarse antes del método executor.submit() y executor.map(), de lo contrario arrojaría RuntimeError.
    • wait=True hace que el método no regrese hasta que se complete la ejecución de todos los subprocesos y se liberen los recursos.
    • cancel_futures=True, entonces el ejecutor cancelará todos los subprocesos futuros que aún no se han iniciado.

Ejemplo 1:

El siguiente código demuestra el uso de ThreadPoolExecutor, tenga en cuenta que, a diferencia del módulo de subprocesamiento, no tenemos que llamar explícitamente usando un bucle, mantener un seguimiento del subproceso usando una lista o esperar subprocesos usando unirse para la sincronización, o liberar los recursos después de los subprocesos. están terminados, todo lo toma bajo el capó el propio constructor, lo que hace que el código sea compacto y libre de errores.

Python3

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
values = [3,4,5,6]
 
def cube(x):
    print(f'Cube of {x}:{x*x*x}')
 
 
if __name__ == '__main__':
    result =[]
    with ThreadPoolExecutor(max_workers=5) as exe:
        exe.submit(cube,2)
         
        # Maps the method 'cube' with a list of values.
        result = exe.map(cube,values)
     
    for r in result:
      print(r)

Producción:

Output: 
Cube of 2:8
Cube of 3:27
Cube of 4:64
Cube of 5:125
Cube of 6:216

Ejemplo 2:

El siguiente código está obteniendo imágenes a través de Internet al realizar una solicitud HTTP, estoy usando la biblioteca de requests para lo mismo. La primera sección del código realiza una llamada uno a uno a la API y, por ejemplo, la descarga es lenta, mientras que la segunda sección del código realiza una solicitud paralela mediante subprocesos para obtener la API.

Puede probar todos los parámetros discutidos anteriormente para ver cómo ajusta la aceleración, por ejemplo, si hago un grupo de subprocesos de 6 en lugar de 3, la aceleración es más significativa.

Python3

import requests
import time
import concurrent.futures
  
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
  
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
  
# Download images 1 by 1 => slow
for img in img_urls:
  download_image(img)
t2 = time.perf_counter()
print(f'Single Threaded Code Took :{t2 - t1} seconds')
  
print('*'*50)
  
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
  
# Fetching images concurrently thus speeds up the download.
with concurrent.futures.ThreadPoolExecutor(3) as executor:
    executor.map(download_image, img_urls)
  
t2 = time.perf_counter()
print(f'MultiThreaded Code Took:{t2 - t1} seconds')

Producción:

Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Threaded Code Took :2.5529379630024778 seconds
**************************************************
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
MultiThreaded Code Took:0.5221083430078579 seconds

Publicación traducida automáticamente

Artículo escrito por arun singh y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *