User Tools

Site Tools


java:threads:multiple-producers-and-consumers

Multiple Producers and Consumers Problem using Java Threads

The Producer-Consumer problem with multiple producers and consumers involves managing a shared resource (like a buffer or queue) where producers add items to the resource and consumers remove them.

To handle multiple producers and consumers, we need to ensure thread-safe access to the shared resource and coordinate the producers and consumers properly.

MultiProducerConsumer.java
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class MultiProducerConsumer {
    private static final int CAPACITY = 5;
    private final Queue<Integer> queue = new LinkedList<>();
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
 
    // Producer
    class Producer implements Runnable {
        @Override
        public void run() {
            int value = 0;
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == CAPACITY) {
                        notFull.await();
                    }
                    queue.offer(value);
                    System.out.println(Thread.currentThread().getName() + " produced " + value);
                    value++;
                    notEmpty.signalAll();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
 
    // Consumer
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        notEmpty.await();
                    }
                    int value = queue.poll();
                    System.out.println(Thread.currentThread().getName() + " consumed " + value);
                    notFull.signalAll();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
 
    public static void main(String[] args) {
        MultiProducerConsumer pc = new MultiProducerConsumer();
 
        // Starting multiple producers and consumers
        new Thread(pc.new Producer(), "Producer1").start();
        new Thread(pc.new Producer(), "Producer2").start();
        new Thread(pc.new Consumer(), "Consumer1").start();
        new Thread(pc.new Consumer(), "Consumer2").start();
    }
}

Shared Resource (Queue): The queue is the shared resource between producers and consumers. It holds the items produced by the producers that are to be consumed by the consumers. The capacity of the queue is set to CAPACITY, which limits how many items it can hold at a time.

Locks and Conditions: The ReentrantLock (lock) is used to ensure mutual exclusion when accessing the shared queue. The notEmpty and notFull are condition variables associated with this lock. Producers use notFull to wait when the queue is full, and consumers use notEmpty to wait when the queue is empty.

Producer Class Implementation: The Producer class, which implements Runnable, adds new items to the queue. If the queue is at its CAPACITY, the producer thread will wait (notFull.await()) until a consumer removes an item from the queue. Once an item is added, the producer signals (notEmpty.signalAll()) to wake up consumer threads waiting for data to consume.

Consumer Class Implementation: The Consumer class also implements Runnable and removes items from the queue. If the queue is empty, the consumer will wait (notEmpty.await()) until a producer adds a new item. After consuming an item, it signals (notFull.signalAll()) to wake up producer threads waiting to add data to the queue.

Thread Synchronization: The use of lock.lock() and lock.unlock() in both the producer and consumer ensures that the queue's state is modified in a thread-safe manner, preventing concurrent access issues.

Scalability: This implementation supports multiple producers and consumers by design. The number of producers and consumers can be scaled up by creating more threads of each type.

Main Method: In the main method, multiple threads of producers and consumers are created and started. This simulates a real-world scenario where multiple entities produce and consume resources concurrently.

This implementation of the multi-producer and multi-consumer problem demonstrates a classic approach to handling concurrent access to a shared resource in a thread-safe manner. It showcases how to use locks and condition variables effectively in Java to coordinate the actions of multiple threads.

java/threads/multiple-producers-and-consumers.txt · Last modified: 2024/01/17 03:46 by odefta