Java parallelism


Creative Commons License
This -Java parallelism- tutorial is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License
Preamble
Headlines
From Java 5, significant modifications occur in the way “parallel programming” (i.e., concurrency, multithreading…) can be managed, including the use of the original synchronized and volatile keywords.

Operating System (OS) level

Example
String[] command = {"/usr/local/bin/node", "--version"}; // macOS-specific!
try {
    Process p = Runtime.getRuntime().exec(command); // Heavyweight process with probable incompatibilities across operating systems like Windows, macOS, LINUX, etc.
    if (p.waitFor() == 0) { // Blocking! Better solution required, e.g., look at https://www.javaworld.com/article/2071275/when-runtime-exec---won-t.html
        try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getInputStream()))) {
            String content;
            while ((content = input.readLine()) != null) System.out.println("Out: " + content); // 'Out: v11.8.0' is displayed...
        }
    } else {
        try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getErrorStream()))) {
            String content;
            while ((content = input.readLine()) != null) System.err.println("Err: " + content);
        }
    }
} catch (InterruptedException | java.io.IOException ieioe) {
    System.err.println("Runtime.getRuntime().exec(command): " + ieioe.getClass().getSimpleName() + ": " + ieioe.getMessage());
}

Java Virtual Machine (JVM) level

Example
private boolean _stop = false;
private Thread _t1; // Lightweight process
…
_t1 = new Thread() {
    public void run() {
        while(! _stop) { /* Something to do… */ }
    }
};
…
_t1.start(); // 'Thread' objects are NEVER restartable!
…
_stop = true; // Never used '_t1.stop();'
_t1 = null; // For garbage collector

synchronized keyword

Synchronized method

Rule(s)
  • A synchronized method is by definition unstoppable. In other words, a thread running a synchronized method cannot be stopped for the benefit of another concurrent thread.
Example
public class Illustration_of_synchronized {

    static private int _I = 0; // 'static' is for convenience and simplification...

    static { // Static initializer is for convenience and simplification...
        Thread plusplus = new Thread("plusplus") {
            @Override
            public void run() {
                assert (Thread.currentThread() == this);
//                System.out.println(Thread.currentThread().getName()); // 'plusplus'
                _Plus();
                _Plus();
            }
        };
        assert (plusplus.getState() == Thread.State.NEW);

        Thread minus = new Thread("minus") {
            @Override
            public void run() {
                assert (Thread.currentThread() == this);
//                System.out.println(Thread.currentThread().getName()); // "minus"
                _Minus();
            }
        };
        assert (minus.getState() == Thread.State.NEW);

        plusplus.start(); // 0 -> 1 -> 2 *OR* -1 -> 0 -> 1
        assert (plusplus.getState() == Thread.State.RUNNABLE || plusplus.getState() == Thread.State.TERMINATED);
        minus.start(); // 2 -> 1 *OR* 0 -> -1
        assert (minus.getState() == Thread.State.RUNNABLE || minus.getState() == Thread.State.TERMINATED);
        /**
         * Caution: execution can be -plusplus ; minus- *OR* -minus ; plusplus-
         */
    }

    static synchronized private void _Plus() {
        _I++;
    }

    static synchronized private void _Minus() {
        _I--;
    }

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName()); // 'main'
        System.out.println("Due to 'plusplus' and 'minus' are BOTH unstoppable, result cannot be anything but '1': " + _I);
    }
}

Synchronized block

Rule(s)
  • A synchronized block allows the delimitation of unstoppable code pieces. In this case, synchronized then requires the object to be synchronized while a synchronized method acts on this. Synchronized blocks cannot cope with primitive types… From Java 5, synchronized benefits from being replaced by the java.util.concurrent.locks.Lock interface.
Example
Temperature t = new Temperature();
synchronized(t) { // Mutex acquisition on 't'
    // Synchronized code…
} // Mutex release on 't'

“Parallel programming”-based support in java.lang.Object

Rule(s)
Example
public class Simple_dead_lock {

    private int _i = 0;

    synchronized public void f() {
        System.err.println("Start of 'f()'");
        _i++;
        _g();
        System.err.println("Value of '_i': " + _i);
        notify();
    }

    synchronized private void _g() {
        System.err.println("Start of '_g()'");
        try {
            wait(5000); // Dead lock occurs for 5 sec.!
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        System.err.println("End of 'wait()'");
    }

    public static void main(String[] args) {
        Simple_dead_lock sdl = new Simple_dead_lock();
        sdl.f();
    }
}

volatile keyword

Rule(s)
Example
public class Illustration_of_volatile {

    private int _bads = 0;
    private int _goods = 0;
    volatile private boolean _stop = false; // Important issue: '_stop' is used in read mode only, which is a smart use of 'volatile'!

    /**
     * The game is avoiding the following inconsistent compositions:
     * "FrançoisSarkozy" and "NicolasHollande"
     */
// 1. Executing the program "as is" leads to some bad compositions...
// 2. Uncomment 'volatile' for '_name', does it work? Unfortunately no, some bad compositions persist...
// 3. Use 'java.util.concurrent.atomic.AtomicReference' instead... Does it work? Fortunately yes!
    /*volatile*/ private String _name = "?";
//    java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");
    private Thread _t1, _t2, _t3;

    private void set(String given_name, String surname) {
        /**
         * Executions 1 & 2
         */
        _name = given_name;
        try {
            Thread.sleep(1); // Ficticious delay so that 'set' lasts and then may be "actually" interrupted...
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        _name += surname;
        /**
         * Execution 3
         */
//        _name.set(given_name + surname); // To be used with 'java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");'
    }

    /**
     * If not 'synchronized' then 'check' can be interrupted by '_t1' and '_t2',
     * but, anyway, bad compositions may occur...
     */
    /*synchronized*/ private void check() {
        if (_name.equals("Fran" + (char) 0x00E7 + "ois" + "Sarkozy") || _name.equals("Nicolas" + "Hollande")) {
            _bads++;
        } else {
            _goods++;
        }
    }

    public Illustration_of_volatile() {
        _t1 = new Thread() {
            public void run() {
                while (!_stop) {
                    set("Fran" + (char) 0x00E7 + "ois", "Hollande");
                }
            }
        };
        _t2 = new Thread() {
            public void run() {
                while (!_stop) {
                    set("Nicolas", "Sarkozy");
                }
            }
        };
        _t3 = new Thread() {
            public void run() {
                while (!_stop) {
                    check();
                }
            }
        };
    }

    public void start() {
        _t1.start();
        _t2.start();
        _t3.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        _stop = true; // Let think about this: while all of the three threads are stopped at the same time?
        _t1 = null;
        _t2 = null;
        _t3 = null;
    }

    public static void main(String[] args) {
        Illustration_of_volatile iov = new Illustration_of_volatile();
        iov.start();
        System.out.println("Bad compositions: " + iov._bads);
        System.out.println("Good compositions: " + iov._goods);
    }
}
Resource(s)
From Java 5, the java.util.concurrent package revisits the way “parallel programming” may be performed in Java.
Rule(s)
Example
public class Illustration_of_Lock_and_Condition {

    private final java.util.LinkedList<String> _data = new java.util.LinkedList();
    private java.nio.file.Path _file;
    // https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/locks/Lock.html
    private final java.util.concurrent.locks.Lock _lock = new java.util.concurrent.locks.ReentrantLock(/* Exercise 1: */ /* true */);
    // https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/locks/Condition.html
    private final java.util.concurrent.locks.Condition _condition = _lock.newCondition();
    private boolean _stop = false;

    /* Exercise 1: */ /* private int _count = 0; */

    public Illustration_of_Lock_and_Condition(String data) {
        assert (data != null && data.length() > 0);
        try {
            _file = new java.io.File("my_file").toPath();
            java.nio.file.Files.deleteIfExists(_file);
            java.nio.file.Files.write(_file, data.getBytes());
        } catch (java.io.IOException ioe) {
            System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage());
            Runtime.getRuntime().exit(-1); // For simplicity only, i.e., didactical program
        }
    }

    private void _loading() {
        try {
            java.util.List<String> lines = java.nio.file.Files.readAllLines(_file, java.nio.charset.Charset.defaultCharset());
            for (String line : lines) {
                _data.add(line);
                try {
                    Thread.sleep(1); // Ficticious delay so that 'for' loop lasts and then may be artificially paused...
                } catch (InterruptedException ie) {
                    System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                    Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
                }
//                System.out.println(line + " loaded...");
            }
        } catch (java.io.IOException ioe) {
            System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage());
            Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
        } finally {
            _stop = true;
        }
//        System.out.println("'_loading' is ending...");
    }

    private void _processing() {
        while (!_stop || !_data.isEmpty()) {
            _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition...
            try {
                _data.addFirst(_data.removeFirst().toUpperCase());
                _condition.signal();
                /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */
                try {
                    Thread.sleep(1); // Ficticious delay so that processing lasts and then may be artificially paused...
                } catch (InterruptedException ie) {
                    System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                    Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
                }
            } catch (java.util.NoSuchElementException nsee) {
                // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program!
            } finally {
                _lock.unlock();
            }
        }
//        System.out.println("'_processing' is ending...");
    }

    private void _unloading() {
        while (!_stop || !_data.isEmpty()) {
            _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition...
            try {
                // "Spurious" wakeup unmanaged:
                _condition.await(); // Lock is atomically released...
                System.out.println(_data.removeFirst() + " unloaded...");
            } catch (InterruptedException ie) {
                System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
            } catch (java.util.NoSuchElementException nsee) {
                // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program!
            } finally {
                _lock.unlock(); // If 'await' fails then the lock is released...
            }
        }
//        System.out.println("'_unloading' is ending...");
    }

    public static void main(String[] args) {
        final Illustration_of_Lock_and_Condition iolac = new Illustration_of_Lock_and_Condition("f\nr\na\nn\nc\nk\n \nb\na\nr\nb\ni\ne\nr");
        new Thread("unloading") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._unloading();
            }
        }.start();
        new Thread("processing") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._processing();
            }
        }.start();
        new Thread("loading") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._loading();
            }
        }.start();
    }
}
Exercise 1
The program looks pretty slow. This is confirmed in uncommenting /* Exercise 1: */ /* private int _count = 0; */ and /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */.
In fact, the processing thread performs many useless changes (≈ _count) to the first element of _data. This occurs because the unloading thread is the most waiting thread: data (i.e., simple strings) can be unloaded only when processed (i.e., changed to upper case). The java.util.concurrent.locks.ReentrantLock class then comes with a “fairness” parameter in the constructor: true means that the most waiting threads have priority. By uncommenting /* Exercise 1: */ /* true */, the program goes faster!
Resource(s)
From Java 5, the java.util.concurrent.ExecutorService interface aims at making more intuitive the use of threads. From Java 7, java.util.concurrent.ForkJoinPool (here…) as subtype of java.util.concurrent.ExecutorService enhances synchronization capabilities (difference is in particular discussed here…).
Example
// File system "Observer" pattern (from Java 7):
java.nio.file.WatchService ws = java.nio.file.FileSystems.getDefault().newWatchService();
/** 'java.nio.file.Path' interface extends 'java.nio.file.Watchable' interface */
// 'database' directory is observed by a daemon for any file creation:
java.nio.file.FileSystems.getDefault().getPath("database").register(ws, java.nio.file.StandardWatchEventKinds.ENTRY_CREATE);
…
// Watch service waits for just created file in 'database':
final java.util.concurrent.Callable<String> c = () -> { // Override 'call' method...
    // 'take' is a blocking statement...
    java.nio.file.WatchEvent<?> we = ws.take().pollEvents().get(0);
    return ((java.nio.file.WatchEvent<java.nio.file.Path>) we).context().toString(); // Return file name of just created file...
};
// Task 'c' is executed as a thread to prevent blocking:
java.util.concurrent.Future<String> f = java.util.concurrent.Executors.newSingleThreadExecutor().submit(c);
try { // Simply get result:
    String file_name_of_just_created_file = f.get();
} catch (java.util.concurrent.ExecutionException | InterruptedException eeie) {
    …
}
Resource(s)
Java performance benchmarking (here…).