Solución de productor y consumidor usando BlockingQueue en Java Thread

El problema productor-consumidor es un problema de sincronización que surge cuando uno o más subprocesos generan datos, colocándolos en un búfer y, simultáneamente, uno o más subprocesos consumen datos del mismo búfer. 

Si lo hace, puede provocar una condición de carrera en la que los subprocesos compiten entre sí para completar su tarea. En este escenario, nada les impide ingresar métodos al mismo tiempo y producir resultados erróneos. 

Además, debido a la falta de comunicación entre subprocesos , el consumidor puede intentar eliminar un elemento aunque el búfer esté vacío. De manera similar, el productor puede intentar agregar un elemento cuando el búfer está lleno. 

Soluciones posibles:

  1. Verificar el tamaño del búfer antes de quitarlo y agregarlo parece una solución. Los sistemas productor-consumidor generalmente usan un ciclo while infinito. Verificar el tamaño de cada iteración del ciclo sería ineficiente. Además, no se puede garantizar la seguridad de subprocesos. Por lo tanto, esta solución no se utiliza.
  2. Los métodos wait() y notificar() se pueden usar para establecer una comunicación entre subprocesos.
  3. BlockingQueue es una alternativa segura para subprocesos menos compleja para esperar() y notificar(). Este método se analiza en este artículo.

Cola de bloqueo:

La interfaz BlockingQueue es parte del paquete java.util.concurrent. 

  • Si un subproceso productor intenta poner un elemento en una BlockingQueue completa, se bloquea y permanece bloqueado hasta que un consumidor elimina un elemento.
  • De manera similar, si un subproceso de consumidor intenta tomar un elemento de un BlockingQueue vacío, se bloquea y permanece bloqueado hasta que un productor agrega un elemento.

BlockingQueue tiene dos métodos principales, es decir, put() y take()

poner()

void put(E e) lanza InterruptedException

e es el elemento que se agregará
InterruptedException se lanza si el subproceso se interrumpe mientras espera

tomar()

E take() lanza InterruptedException

devuelve el encabezado de la cola
InterruptedException se lanza si el subproceso se interrumpe mientras espera

BlockingQueue también tiene métodos add(E e) y remove() . Pero estos métodos no deben usarse para problemas productor-consumidor porque:

  • add lanzará IllegalStateException si la cola está llena.
  • remove devuelve un valor booleano, pero se devolverá un elemento.

Implementación de BlockingQueue

Dado que BlockingQueue es una interfaz, no podemos crear su objeto. En su lugar, podemos crear objetos de una de las clases que implementan BlockingQueue. Para esta demostración, se utilizará ArrayBlockingQueue.

ArrayBlockingQueue

  • Como sugiere el nombre, ArrayBlockingQueue usa la estructura de datos de la array como un búfer.
  • Dado que es una array, su capacidad se fija después de la declaración.
  • Proporciona equidad como una opción. Esto significa que los subprocesos tienen acceso al búfer por orden de llegada. La equidad está desactivada de forma predeterminada. Se puede activar colocando el valor booleano verdadero dentro del constructor.

Solución Producción-Consumidor

Ahora que entendemos qué es BlockingQueue y su uso. Apliquemos este conocimiento para resolver el problema productor-consumidor. Podemos dividir este problema en dos subproblemas creando una clase separada para productor y consumidor por conveniencia. El productor y el consumidor se verán afectados por diferentes subprocesos, pero compartirán un búfer BlockingQueue común.

Productor: como sugiere el nombre, la clase de productor producirá datos. En nuestro caso, la clase productora está produciendo números en el rango [1,4]. Colocará estos datos en el búfer BlockingQueue.

Java

// Java program to demonstrate producer code
  
// Implement Runnable since object
// of this class will be executed by
// a separate thread
class producer implements Runnable {
  
    BlockingQueue<Integer> obj;
  
    public producer(BlockingQueue<Integer> obj)
    {
        // accept an ArrayBlockingQueue object from
        // constructor
        this.obj = obj;
    }
  
    @Override public void run()
    {
          
         // Produce numbers in the range [1,4]
         // and put them in the buffer
        for (int i = 1; i <= 4; i++) {
            try {
                obj.put(i);
                System.out.println("Produced " + i);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumidor: El Consumidor tomará datos del búfer BlockingQueue. En nuestro caso, estos datos simplemente se imprimirán.

Java

// Java program to demonstrate consumer code
  
// Implement Runnable since object
// of this class will be executed by
// a separate thread
class consumer implements Runnable {
  
    BlockingQueue<Integer> obj;
  
    // Initialize taken to -1
    // to indicate that no number
    // has been taken so far.
    int taken = -1;
  
    public consumer(BlockingQueue<Integer> obj)
    {
        // accept an ArrayBlockingQueue object from
        // constructor
        this.obj = obj;
    }
  
    @Override public void run()
    {
  
        // Take numbers from the buffer and
        // print them, if the last number taken
        // is 4 then stop
        while (taken != 4) {
            try {
                taken = obj.take();
                System.out.println("Consumed " + taken);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Ahora vamos a crear un objeto de búfer ArrayBlockingQueue, un subproceso para cada productor y consumidor, y ejecutar la solución.

Solución del problema productor-consumidor:

Java

// Java Program to demonstrate producer consumer
// problem solution
  
// Import the BlockingQueue interface and
// ArrayBlockingQueue class
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
  
// Create a Main class for execution
public class Main {
    public static void main(String[] args)
    {
  
        // Create an ArrayBlockingQueue object with capacity
        // 4
        BlockingQueue<Integer> bqueue
            = new ArrayBlockingQueue<Integer>(4);
  
        // Create 1 object each for producer
        // and consumer and pass them the common
        // buffer created above
        producer p1 = new producer(bqueue);
        consumer c1 = new consumer(bqueue);
  
        // Create 1 thread each for producer
        // and consumer and pass them their
        // respective objects.
        Thread pThread = new Thread(p1);
        Thread cThread = new Thread(c1);
  
        // Start both threads
        pThread.start();
        cThread.start();
    }
}
  
class producer implements Runnable {
  
    BlockingQueue<Integer> obj;
  
    public producer(BlockingQueue<Integer> obj)
    {
        this.obj = obj;
    }
  
    @Override public void run()
    {
        for (int i = 1; i <= 4; i++) {
            try {
                obj.put(i);
                System.out.println("Produced " + i);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  
class consumer implements Runnable {
  
    BlockingQueue<Integer> obj;
  
    int taken = -1;
  
    public consumer(BlockingQueue<Integer> obj)
    {
        this.obj = obj;
    }
  
    @Override public void run()
    {
        while (taken != 4) {
            try {
                taken = obj.take();
                System.out.println("Consumed " + taken);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
Producción

Produced 1
Produced 2
Produced 3
Produced 4
Consumed 1
Consumed 2
Consumed 3
Consumed 4

 

Nota: 

  • El programa anterior puede dar diferentes órdenes de producción y consumo con cada ejecución. Pero cabe destacar que todos los números producidos se consumirán y no habrá problemas de comunicación entre subprocesos.
  • Elemento veneno: Este elemento señala el final de la actividad de Producción-Consumo, en el ejemplo anterior, 4 es el elemento veneno.
  • En caso de que no se conozca el número de elementos de antemano, se puede utilizar LinkedBlockingQueue .

Publicación traducida automáticamente

Artículo escrito por sanatr y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *