Computers

Multithreading in Java

Here's a simple framework that I wrote for multithreading an application.

 

ThreadManager class

package com.cssathya.examples.thread;

import java.util.Observable;
import java.util.Observer;


/**
 * The boss-man class that passes numbers to different threads that it manages.
 *
 * @author Sathya.Srinivasan
 */
public abstract class ThreadManager implements Observer {
    private int maxThreads;
    private static final Object LOCK = new Object();
    private int activeThreads = 0;
    private boolean stopProcessing;

    /**
     * Sets the maximum number of threads to use.
     *
     * @param maxThreads Maximum threads to use
     */
    public void setMaxThreads(int maxThreads) {
        this.maxThreads = maxThreads;
    }

    /**
     * Starts this program.
     */
    public void start() {
        if(maxThreads <= 0) {
            throw new IllegalStateException("Max. Threads not set.");
        }

        while(!stopProcessing) {
            if (activeThreads >= maxThreads) {
                this.waitOnLock();
            }

            ThreadWorker worker = this.createWorker();
            worker.addObserver(this);
            activeThreads++;
            new Thread(worker).start();
        }

        // Wait till all threads complete their jobs.         
while
(activeThreads != 0) { this.waitOnLock(); } System.out.println("All threads completed."); } /** * Stop processing records */ public void stop() { stopProcessing = true; } /** * Delegate method that needs to be implemented by the real manager. * * @return The worker class that the sub-classed manager wants to interact with. */ public abstract ThreadWorker createWorker(); /** * Wait for the lock to be freed. */ private void waitOnLock() { synchronized (LOCK) { try { System.out.println("Waiting for threads to free up."); LOCK.wait(); } catch (InterruptedException e) { System.err.println("Interrupted while waiting for lock."); } } } /** * Notification method for registered observers. * * @param observable The object being observed. * @param object The object sent by the observed object. */ public void update(Observable observable, Object object) { synchronized (LOCK) { activeThreads--; System.out.println("Active Threads : " + activeThreads); LOCK.notifyAll(); } } }

ThreadWorker class

package com.cssathya.examples.thread;

import java.util.Observable;


/**
 * A simple class that displays some number passed to it. It also sleeps at
 * random from 0 - 2 seconds once the number is displayed.
 *
 * @author Sathya.Srinivasan
 */
public abstract class ThreadWorker extends Observable implements Runnable {
    /**
     * Over-ridden run() method from the Runnable class.
     */
    public void run() {
        this.process();
        this.setChanged();
        this.notifyObservers();
    }

	/**
	 * Method that must be implemented by the actual worker.
	 */
    public abstract void process();
}


h3. NumberManager class - A sample 'manager' class that generates random numbers.
package com.cssathya.examples.thread;

/**
 * @author Sathya Srinivasan
 * @version 1.0
 */
public class NumberManager extends ThreadManager {
    private int maxRecords;
    private int recordsProcessed;

    /**
     * Sets maximum number of dummy records to generate.
     *
     * @param maxRecords Maximum dummy records to generate.
     */
    private void setRecordsToGenerate(int maxRecords) {
        this.maxRecords = maxRecords;
    }

    /**
     * Creates a worker thread.
     */
    public ThreadWorker createWorker() {
        NumberWorker worker = new NumberWorker();
        worker.set((int) (Math.random() * 100));
        if(++recordsProcessed == maxRecords) {
            super.stop();
        }
        return worker;
    }
}

NumberWorker class - A sample 'worker' class that prints numbers

package com.cssathya.examples.thread;

/**
 * @author Sathya Srinivasan
 * @version 1.0
 */
public class NumberWorker extends ThreadWorker {
	private int number;

	/**
	 * Sets some number to be displayed.
	 *
	 * @param number Number to be displayed.
	 */
	public void set(int number) {
		this.number = number;
	}

    public void process() {
	System.out.println("Printing number [" + number +
	        "] from thread [" + Thread.currentThread().getName() + "]");
	try {
		// Sleep anywhere from 0 - 2 seconds. 		
Thread
.sleep((int) (Math.random() * 3000)); } catch(InterruptedException e) { System.err.println("Thread [" + Thread.currentThread().getName() + "] was interrupted while sleeping."); } } }

Binding Application

package com.cssathya.examples.thread;

/**
 * @author Sathya Srinivasan
 * @version 1.0
 */
public class Application {
    /**
     * Main method for this program.
     *
     * @param args Command line arguments. None needed.
     */
    public static void main(String args[]) {
        ThreadManager mgr = new NumberManager();
        mgr.setMaxThreads(5);
        mgr.setRecordsToGenerate(10);
        mgr.start();
    }
}

Output

Printing number [79] from thread [Thread-1]
Waiting for threads to free up.
Printing number [95] from thread [Thread-0]
Printing number [6] from thread [Thread-2]
Printing number [33] from thread [Thread-4]
Printing number [62] from thread [Thread-3]
Active Threads : 4
Waiting for threads to free up.
Printing number [32] from thread [Thread-5]
Active Threads : 4
Waiting for threads to free up.
Printing number [65] from thread [Thread-6]
Active Threads : 4
Waiting for threads to free up.
Printing number [47] from thread [Thread-7]
Active Threads : 4
Waiting for threads to free up.
Printing number [98] from thread [Thread-8]
Active Threads : 4
Waiting for threads to free up.
Printing number [10] from thread [Thread-9]
Active Threads : 4
Waiting for threads to free up.
Active Threads : 3
Waiting for threads to free up.
Active Threads : 2
Waiting for threads to free up.
Active Threads : 1
Waiting for threads to free up.
Active Threads : 0
All threads completed.