Multiprocesamiento en Python | Conjunto 2 (Comunicación entre procesos)

Multiprocesamiento en Python | Conjunto 1
Estos artículos analizan el concepto de compartir datos y pasar mensajes entre procesos mientras se usa el módulo de multiprocesamiento en Python.
En el multiprocesamiento, cualquier proceso recién creado hará lo siguiente:

  • ejecutar de forma independiente
  • tienen su propio espacio de memoria.

Considere el siguiente programa para entender este concepto:

import multiprocessing
  
# empty list with global scope
result = []
  
def square_list(mylist):
    """
    function to square a given list
    """
    global result
    # append squares of mylist to global list result
    for num in mylist:
        result.append(num * num)
    # print global list result
    print("Result(in process p1): {}".format(result))
  
if __name__ == "__main__":
    # input list
    mylist = [1,2,3,4]
  
    # creating new process
    p1 = multiprocessing.Process(target=square_list, args=(mylist,))
    # starting process
    p1.start()
    # wait until process is finished
    p1.join()
  
    # print global result list
    print("Result(in main program): {}".format(result))
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

En el ejemplo anterior, intentamos imprimir el contenido del resultado de la lista global en dos lugares:

  • En la función square_list . Dado que el proceso p1 llama a esta función , la lista de resultados cambia solo en el espacio de memoria del proceso p1 .
  • Después de completar el proceso p1 en el programa principal. Dado que el programa principal es ejecutado por un proceso diferente, su espacio de memoria aún contiene la lista de resultados vacía .

El diagrama que se muestra a continuación aclara este concepto:

Compartir datos entre procesos

  1. Memoria compartida: el módulo de multiprocesamiento proporciona objetos Array y Value para compartir datos entre procesos.
    • Array: una array de ctypes asignada desde la memoria compartida .
    • Valor: un objeto ctypes asignado desde la memoria compartida .

    A continuación se muestra un ejemplo simple que muestra el uso de Array y Value para compartir datos entre procesos.

    import multiprocessing
      
    def square_list(mylist, result, square_sum):
        """
        function to square a given list
        """
        # append squares of mylist to result array
        for idx, num in enumerate(mylist):
            result[idx] = num * num
      
        # square_sum value
        square_sum.value = sum(result)
      
        # print result Array
        print("Result(in process p1): {}".format(result[:]))
      
        # print square_sum Value
        print("Sum of squares(in process p1): {}".format(square_sum.value))
      
    if __name__ == "__main__":
        # input list
        mylist = [1,2,3,4]
      
        # creating Array of int data type with space for 4 integers
        result = multiprocessing.Array('i', 4)
      
        # creating Value of int data type
        square_sum = multiprocessing.Value('i')
      
        # creating new process
        p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
      
        # starting process
        p1.start()
      
        # wait until the process is finished
        p1.join()
      
        # print result array
        print("Result(in main program): {}".format(result[:]))
      
        # print square_sum Value
        print("Sum of squares(in main program): {}".format(square_sum.value))
    Result(in process p1): [1, 4, 9, 16]
    Sum of squares(in process p1): 30
    Result(in main program): [1, 4, 9, 16]
    Sum of squares(in main program): 30
    

    Tratemos de entender el código anterior línea por línea:

    • En primer lugar, creamos un resultado de array como este:
        result = multiprocessing.Array('i', 4)
      
      • El primer argumento es el tipo de datos . ‘i’ representa un número entero, mientras que ‘d’ representa un tipo de datos flotante.
      • El segundo argumento es el tamaño de la array. Aquí, creamos una array de 4 elementos.

      Del mismo modo, creamos un Value square_sum como este:

        square_sum = multiprocessing.Value('i')
      

      Aquí, solo necesitamos especificar el tipo de datos. Al valor se le puede dar un valor inicial (digamos 10) como este:

        square_sum = multiprocessing.Value('i', 10)
      
    • En segundo lugar, pasamos result y square_sum como argumentos al crear el objeto Process .
        p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
      
    • Los elementos de la array de resultados reciben un valor especificando el índice del elemento de la array.
        for idx, num in enumerate(mylist):
            result[idx] = num * num
      

      square_sum recibe un valor usando su atributo de valor :

        square_sum.value = sum(result)
      
    • Para imprimir los elementos de la array de resultados , usamos result[:] para imprimir la array completa.
        print("Result(in process p1): {}".format(result[:]))
      

      El valor de square_sum simplemente se imprime como:

        print("Sum of squares(in process p1): {}".format(square_sum.value))
      

    Aquí hay un diagrama que muestra cómo los procesos comparten Array y Value object:

  2. Proceso del servidor: cada vez que se inicia un programa de python, también se inicia un proceso del servidor . A partir de ahí, cada vez que se necesita un nuevo proceso, el proceso principal se conecta al servidor y le solicita que bifurque un nuevo proceso.
    Un proceso de servidor puede contener objetos de Python y permite que otros procesos los manipulen mediante proxies. El módulo de
    multiprocesamiento proporciona una clase Manager que controla un proceso de servidor. Por lo tanto, los gerentes brindan una forma de crear datos que se pueden compartir entre diferentes procesos.

    Los administradores de procesos del servidor son más flexibles que el uso de objetos de memoria compartida porque se pueden crear para admitir tipos de objetos arbitrarios como listas, diccionarios, cola, valor, array, etc. Además, los procesos en diferentes computadoras pueden compartir un solo administrador a través de una red. . Sin embargo, son más lentos que usar la memoria compartida.

    Considere el ejemplo dado a continuación:

    import multiprocessing
      
    def print_records(records):
        """
        function to print record(tuples) in records(list)
        """
        for record in records:
            print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
      
    def insert_record(record, records):
        """
        function to add a new record to records(list)
        """
        records.append(record)
        print("New record added!\n")
      
    if __name__ == '__main__':
        with multiprocessing.Manager() as manager:
            # creating a list in server process memory
            records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
            # new record to be inserted in records
            new_record = ('Jeff', 8)
      
            # creating new processes
            p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
            p2 = multiprocessing.Process(target=print_records, args=(records,))
      
            # running process p1 to insert new record
            p1.start()
            p1.join()
      
            # running process p2 to print records
            p2.start()
            p2.join()
    New record added!
    
    Name: Sam
    Score: 10
    
    Name: Adam
    Score: 9
    
    Name: Kevin
    Score: 9
    
    Name: Jeff
    Score: 8
    

    Tratemos de entender el fragmento de código anterior:

    • En primer lugar, creamos un objeto administrador usando:
        with multiprocessing.Manager() as manager:
      

      Todas las líneas debajo del bloque de declaraciones están bajo el alcance del objeto administrador .

    • Luego, creamos una lista de registros en la memoria de proceso del servidor usando:
        records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
      

      Del mismo modo, puede crear un diccionario como método manager.dict .

    • Finalmente, creamos los procesos p1 (para insertar un nuevo registro en la lista de registros ) y p2 (para imprimir registros ) y los ejecutamos mientras pasamos registros como uno de los argumentos.

    El concepto de proceso del servidor se representa en el diagrama que se muestra a continuación:

Comunicación entre procesos

El uso efectivo de múltiples procesos generalmente requiere cierta comunicación entre ellos, de modo que el trabajo se pueda dividir y los resultados se puedan agregar.
el multiprocesamiento admite dos tipos de canales de comunicación entre procesos:

  • Cola
  • Tubo
  1. Cola: una forma sencilla de comunicarse entre procesos con multiprocesamiento es usar una Cola para pasar mensajes de un lado a otro. Cualquier objeto de Python puede pasar a través de una cola.
    Nota: La clase multiprocessing.Queue es casi un clon de queue.Queue .
    Considere el programa de ejemplo dado a continuación:

    import multiprocessing
      
    def square_list(mylist, q):
        """
        function to square a given list
        """
        # append squares of mylist to queue
        for num in mylist:
            q.put(num * num)
      
    def print_queue(q):
        """
        function to print queue elements
        """
        print("Queue elements:")
        while not q.empty():
            print(q.get())
        print("Queue is now empty!")
      
    if __name__ == "__main__":
        # input list
        mylist = [1,2,3,4]
      
        # creating multiprocessing Queue
        q = multiprocessing.Queue()
      
        # creating new processes
        p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
        p2 = multiprocessing.Process(target=print_queue, args=(q,))
      
        # running process p1 to square list
        p1.start()
        p1.join()
      
        # running process p2 to get queue elements
        p2.start()
        p2.join()
    Queue elements:
    1
    4
    9
    16
    Queue is now empty!
    

    Tratemos de entender el código anterior paso a paso:

    • En primer lugar, creamos una cola de multiprocesamiento usando:
        q = multiprocessing.Queue()
      
    • Luego pasamos la cola vacía q a la función square_list a través del proceso p1 . Los elementos se insertan en la cola mediante el método put .
        q.put(num * num)
      
    • Para imprimir los elementos de la cola, usamos el método get hasta que la cola no esté vacía.
        while not q.empty():
            print(q.get())
      

    A continuación se muestra un diagrama simple que representa las operaciones en la cola:

  2. Tuberías: una tubería solo puede tener dos extremos. Por lo tanto, se prefiere a la cola cuando solo se requiere comunicación bidireccional.

    El módulo de multiprocesamiento proporciona la función Pipe() que devuelve un par de objetos de conexión conectados por una tubería. Los dos objetos de conexión devueltos por Pipe() representan los dos extremos de la tubería. Cada objeto de conexión tiene métodos send() y recv() (entre otros).
    Considere el programa dado a continuación:

    import multiprocessing
      
    def sender(conn, msgs):
        """
        function to send messages to other end of pipe
        """
        for msg in msgs:
            conn.send(msg)
            print("Sent the message: {}".format(msg))
        conn.close()
      
    def receiver(conn):
        """
        function to print the messages received from other
        end of pipe
        """
        while 1:
            msg = conn.recv()
            if msg == "END":
                break
            print("Received the message: {}".format(msg))
      
    if __name__ == "__main__":
        # messages to be sent
        msgs = ["hello", "hey", "hru?", "END"]
      
        # creating a pipe
        parent_conn, child_conn = multiprocessing.Pipe()
      
        # creating new processes
        p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
        p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
      
        # running processes
        p1.start()
        p2.start()
      
        # wait until processes finish
        p1.join()
        p2.join()
    Sent the message: hello
    Sent the message: hey
    Sent the message: hru?
    Received the message: hello
    Sent the message: END
    Received the message: hey
    Received the message: hru?
    

    Tratemos de entender el código anterior:

    • Se creó una tubería simplemente usando:
        parent_conn, child_conn = multiprocessing.Pipe()
      

      La función devolvió dos objetos de conexión para los dos extremos de la tubería.

    • El mensaje se envía de un extremo de la tubería a otro mediante el método de envío .
        conn.send(msg)
      
    • Para recibir cualquier mensaje en un extremo de una tubería, usamos el método recv .
        msg = conn.recv()
      
    • En el programa anterior, enviamos una lista de mensajes de un extremo a otro. En el otro extremo, leemos los mensajes hasta que recibimos el mensaje «FIN».

    Considere el siguiente diagrama que muestra la relación b/w tubería y procesos:

Nota: Los datos en una tubería pueden corromperse si dos procesos (o subprocesos) intentan leer o escribir en el mismo extremo de la tubería al mismo tiempo. Por supuesto, no hay riesgo de corrupción de los procesos que utilizan diferentes extremos de la tubería al mismo tiempo. También tenga en cuenta que las colas realizan una sincronización adecuada entre procesos, a expensas de una mayor complejidad. Por lo tanto, se dice que las colas son seguras para subprocesos y procesos.

Próximo:

Este artículo es una contribución de Nikhil Kumar . Si te gusta GeeksforGeeks y te gustaría contribuir, también puedes escribir un artículo usando write.geeksforgeeks.org o enviar tu artículo por correo a review-team@geeksforgeeks.org. Vea su artículo que aparece en la página principal de GeeksforGeeks y ayude a otros Geeks.

Escriba comentarios si encuentra algo incorrecto o si desea compartir más información sobre el tema tratado anteriormente.

Publicación traducida automáticamente

Artículo escrito por GeeksforGeeks-1 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *