Многопоточная сортировка с использованием пула потоков на Java

В данном посте будет рассказано, как реализовать сортировку на Java c использованием ExecutorService. Общая суть сортировки в следующем:

  1. Массив разбивается на части
  2. Каждая часть массива сортируется
  3. Идем по упорядоченным массивам, сливаем их в один

Здесь применяются идеи сортировки слиянием, но массив разбивается только на две части (рекурсия не используется).

Для слияния можно использовать следующую функцию:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Код функции слияния взят отсюда.

Суть слияния в следующем: в начале указатели находятся на первом элементе для обоих массивов. Далее значения элементов, соответствующих позициям указателя сравниваются и указатель для меньшего элемента сдвигается на следующий элемент, сам элемент добавляется в результирующий массив. Цикл продолжается до тех пор, пока не дойдем до конца одного из массивов, тогда оставшаяся часть второго массива будет скопирована в конец результирующего массива. Таким образом, на выходе получаем отсортированный массив.

Также был создан класс для многопоточной сортировки, в нем был создан метод run, который выполняется, когда к объекту типа Thread применяется метод start(). В нашем случае за это будет отвечать executorService. Приведем здесь код класса merge, объекты которого и будут создаваться для реализации многопоточной сортировки:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Для сортировки частей массива была использована встроенная в java сортировка. Далее приведен код для сортировки с помощью пула потоков. Проводятся замеры времени для многопоточного и обычного варианта (спойлер: многопоточная дает ускорение только на большом объеме данных):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

В начале основной функции массив заполняется произвольными строками, которые содержат числа от 0 до 10000000. В качестве количества потоков берется количество доступное на устройстве. Переменная batchSize отвечает за размерность массивов для сортировки в параллельном режиме. Затем создается executorService с фиксированным количеством потоков.

Для каждого потока создается свой объект класса merge, затем этот ставим задачу по сортировке в очередь на выполнение. С помощью future ждем, пока все посчитается, собираем все отсортированные части массива и сливаем их по очереди в результирующий массив. Останавливаем executorService и можем посмотреть на временные затраты последовательного и параллельного варианта реализации.

Код лежит здесь

Специально для сайта ITWORLD.UZ. Новость взята с сайта Хабр