Knowledge Guide
HomeConcurrencyConcurrency Problems

Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools

Overview

In this problem, we are presented with a multi-buffered system where multiple master threads produce data, and multiple worker threads consume this data. The unique aspect of the problem is that while each master thread is dedicated to its own buffer, the worker threads have the flexibility to consume data from any buffer that has data available. This design aims to ensure optimal resource utilization; workers don't remain idle even if a particular buffer is empty. The primary challenge here is to ensure proper synchronization using pthread condition variables to prevent race conditions, especially given the existence of multiple buffers.

Pseudocode

Initialize: BUFFER[...] // Array of buffers. BUFFER_LOCK[...] // Array of locks for each buffer. BUFFER_NOT_EMPTY[...] // Array of condition variables signaling buffer not empty. BUFFER_NOT_FULL[...] // Array of condition variables signaling buffer not full. Function MASTER_THREAD(thread_id): While True: data = GenerateData() AcquireLock(BUFFER_LOCK[thread_id]) While BUFFER[thread_id] is Full: WaitForCondition(BUFFER_NOT_FULL[thread_id]) AddToBuffer(BUFFER[thread_id], data) SignalCondition(BUFFER_NOT_EMPTY[thread_id]) ReleaseLock(BUFFER_LOCK[thread_id]) Function WORKER_THREAD(): While True: chosen_buffer = None For i from 0 to length(BUFFER) - 1: AcquireLock(BUFFER_LOCK[i]) If BUFFER[i] is Not Empty: chosen_buffer = i Break Else: ReleaseLock(BUFFER_LOCK[i]) If chosen_buffer is None: For i from 0 to length(BUFFER) - 1: AcquireLock(BUFFER_LOCK[i]) If BUFFER[i] is Not Empty: chosen_buffer = i Break WaitForCondition(BUFFER_NOT_EMPTY[i]) ReleaseLock(BUFFER_LOCK[i]) data = RemoveFromBuffer(BUFFER[chosen_buffer]) SignalCondition(BUFFER_NOT_FULL[chosen_buffer]) ReleaseLock(BUFFER_LOCK[chosen_buffer]) ProcessData(data)

Algorithm Walkthrough

Let's go through a simple and understandable walkthrough of the algorithm, focusing on the synchronization aspects to ensure clarity:

Initialization

  1. Buffers: Initialize five empty buffers to hold integers.
  2. Mutexes and Condition Variables: Initialize mutexes and condition variables for each buffer.

Execution Starts

  1. Start Master Threads: Launch five master threads, each responsible for one buffer.
  2. Start Worker Threads: Launch two worker threads responsible for consuming data from any buffer.

Master Thread Operation (Repeatedly for each Master)

  1. Generate Data: Each master thread generates a random number.
  2. Lock Buffer: Acquires a lock on the mutex associated with its buffer.
  3. Check if Buffer is Full:
    • If the buffer is full, wait on the “buffer not full” condition variable.
    • If not, proceed to the next step.
  4. Add Data: Add the generated number to the buffer.
  5. Signal Workers: Signal “buffer not empty” condition variable to wake up worker threads if any are waiting.
  6. Unlock Buffer: Release the lock on the mutex.

Worker Thread Operation (Repeatedly for each Worker)

  1. Choose a Buffer: Each worker thread goes through the buffers to find non-empty ones.
  2. Lock Buffer: Acquires a lock on the current buffer’s mutex.
  3. Check if Buffer is Empty:
    • If the buffer is not empty, proceed to the next step.
    • If empty, release the lock and check the next buffer.
  4. Consume Data: Remove an item from the buffer.
  5. Signal Master: Signal “buffer not full” condition variable to wake up master threads if any are waiting.
  6. Unlock Buffer: Release the lock on the mutex.
  7. Process Data: (This part is assumed and not explicitly shown in the code)

Synchronization Aspects Highlight

Main Thread Sleep and Exit

  1. Main Thread Sleep: The main thread sleeps for 2 seconds, allowing the worker and master threads to do some work.
  2. Exit: After sleeping, the main thread exits, setting stop = true, this makes all other threads exit gracefully.

This walkthrough illustrates the algorithm's flow in a multi-buffer producer-consumer scenario, with a strong focus on synchronization to ensure safe and efficient multi-threaded operation.

Flow Chart
Flow Chart
java
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Solution {

  private static final int NUM_BUFFERS = 5;
  private static final int BUFFER_SIZE = 10;

  // Buffers for each master thread
  private static List<Integer>[] buffers = new ArrayList[NUM_BUFFERS];

  // Locks and semaphores for synchronization
  private static Lock[] bufferLocks = new ReentrantLock[NUM_BUFFERS];
  private static Semaphore[] bufferNotEmpty = new Semaphore[NUM_BUFFERS];
  private static Semaphore[] bufferNotFull = new Semaphore[NUM_BUFFERS];

  private static volatile boolean stop = false;

  public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < NUM_BUFFERS; i++) {
      buffers[i] = new ArrayList<>(BUFFER_SIZE);
      bufferLocks[i] = new ReentrantLock();
      bufferNotEmpty[i] = new Semaphore(0); // Start empty
      bufferNotFull[i] = new Semaphore(BUFFER_SIZE); // Can hold BUFFER_SIZE elements
    }

    // Start master threads
    List<Thread> masterThreads = new ArrayList<>();
    for (int i = 0; i < NUM_BUFFERS; i++) {
      final int id = i;
      Thread masterThread = new Thread(() -> masterThread(id));
      masterThreads.add(masterThread);
      masterThread.start();
    }

    // Start worker threads
    Thread worker1 = new Thread(Solution::workerThread);
    Thread worker2 = new Thread(Solution::workerThread);
    worker1.start();
    worker2.start();

    // Let the simulation run for a while
    Thread.sleep(1000);
    stop = true; // Stop the simulation

    // Wait for all threads to finish
    for (Thread master : masterThreads) {
      master.join();
    }
    worker1.join();
    worker2.join();
  }

  // Function executed by each master thread
  private static void masterThread(int threadId) {
    Random rand = new Random();
    while (!stop) {
      int data = rand.nextInt(100); // Generate random data

      try {
        bufferNotFull[threadId].acquire(); // Wait if the buffer is full
        bufferLocks[threadId].lock(); // Lock the buffer
        buffers[threadId].add(data); // Add data to the buffer
        System.out.println("Master " + threadId + " produced data " + data);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } finally {
        bufferLocks[threadId].unlock(); // Release the lock
        bufferNotEmpty[threadId].release(); // Notify that buffer has data
      }

      try {
        Thread.sleep(100); // Simulate production delay
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }

  // Function executed by worker threads
  private static void workerThread() {
    while (!stop) {
      int chosenBuffer = -1;
      int data = -1;

      for (int i = 0; i < NUM_BUFFERS; i++) {
        bufferLocks[i].lock(); // Try to lock the buffer
        try {
          if (!buffers[i].isEmpty()) {
            chosenBuffer = i;
            data = buffers[i].remove(buffers[i].size() - 1); // Consume data
            break;
          }
        } finally {
          bufferLocks[i].unlock(); // Release the lock
        }
      }

      if (chosenBuffer != -1) {
        System.out.println(
          "Worker consumed data " + data + " from buffer " + chosenBuffer
        );
        bufferNotFull[chosenBuffer].release(); // Notify that the buffer is not full
      } else {
        Thread.yield(); // Yield if no data found
      }
    }
  }
}
🤖 Don't fully get this? Learn it with Claude

Stuck on Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools? Open Claude, copy a block below, and it'll teach you this exact concept — visually and interactively.

🪜 Hint ladder (no spoilers)

Progressively stronger hints — you still solve it.

I'm working on the problem **Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools** (Concurrency). Give me a HINT LADDER: start with the tiniest nudge, then wait. Only reveal the next, stronger hint when I ask. Do NOT show the full solution unless I type 'show solution'. Keep me doing the thinking. If you're unsure or a claim isn't standard, say so and reason from first principles instead of guessing.
🎨 Explain the approach visually

See the technique, not just code.

Explain the optimal approach to **Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools** with a VISUAL walkthrough: trace it on a small concrete example using ASCII art / a step-by-step diagram, narrate what changes each step, then give time & space complexity with a one-line derivation. If you're unsure or a claim isn't standard, say so and reason from first principles instead of guessing.
🔍 Review my solution

Catch bugs, edge cases, sub-optimality.

I'll paste my solution to **Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools**. Review it for correctness, missed edge cases, and time/space complexity, then coach me toward the optimal — don't just rewrite it. Ask me to paste my code now. If you're unsure or a claim isn't standard, say so and reason from first principles instead of guessing.
🔁 Drill the pattern

Lock in recognition with look-alikes.

Give me 2 problems that use the SAME underlying pattern as **Problem 14 Advanced Synchronization in Multi-Buffered Master-Worker Thread Pools**. For each, let me attempt first, then review my answer and name the trigger signal that reveals the pattern. If you're unsure or a claim isn't standard, say so and reason from first principles instead of guessing.

📝 My notes