Java

Java - ExecutorService 병렬처리 알아보기 (Feat. Lucene OpenSource)

성만이 2020. 1. 31. 00:27

1. ExecutorService Interface

ExecutorService란 java.util.concurrent 패키지에서 제공하는 인터페이스로서, Executor 인터페이스를 상속받습니다.

ExecutorService가 속한 패키지명을 보면, 짐작하시듯 병렬처리에 관련된 인터페이스입니다.

 

그렇다면 ExecutorService는 어떤 기능을 하며, Executor에서 무엇이 확장되었을까요?

아래의 다이아그램을 보도록 하겠습니다.

Executor & ExecutorService Class Diagram

Executor는 execute 메소드를 통해 Runnable의 객체 command를 새로운 스레드에서 실행할 수 있도록 되어 있습니다.

Runnable 인터페이스의 한계로 Executor를 사용하면 반환값 없이 실행만 가능합니다.

 

ExecutorService는 오버로딩(overloading)된 submit 메소드를 사용하여 Callable 인터페이스를 인자로 하여 결과값을 받아 활용할 수 있습니다.

또한 shutdown, shutdownNow 메소드를 통해 병렬처리 작업 중인 Task 리소스들을 안전하게 관리할 수 있습니다.

 

ExecutorService는 작업 처리를 위해 필요한 리소스들을 기본적으로 내부에 갖고 있도록 설계가 되어있습니다.

아래의 그림과 같이 작업을 공급받아 BlockingQueue에 적재하고, 동시에 ThreadPool의 개수 만큼 병렬처리로 작업을 수행합니다.

ExecutorService 구조

 

작업 수행에 있어서는, Callable 혹은 Runnable을 구현함으로써 수행할 작업을 정의하고, Executor 인터페이스를 구현한 클래스에서 execute, submit 메소드를 통해 병렬처리로 정의한 작업을 실행할 수 있습니다. 

추가로, CompletionService 인터페이스를 구현하여 submit하여 작업이 완료되어 반환된 결과값을 사용할 수 있습니다.

 

이러한 Executor는 Producer/Consumer 패턴을 구현할 때 활용하여 쉽게 구현할 수 있게 해줍니다.

Producer에서 작업을 정의하여 작업에 필요한 리소스와 함께 Consumer로 공급하는 역할을 하고 Consumer인 Executor에서는 작업을 수행하는 소비자의 역할을 합니다.

 

2. 구현

ExecutorService를 사용해 소문자의 영문 텍스트를 대문자로 치환하는 간단한 예제를 구현하도록 하겠습니다.

소스코드는 Lucene에서 공개된 소스코드를 사용하여 ExecutorService를 구현하겠습니다.

 

Lucene Open SourceApache에서 지원하고 있는 검색엔진 라이브러리로 오픈소스 프로젝트입니다. 

ElasticSearch 등의 검색엔진 소프트웨어의 기반이 되는 오픈소스 입니다.

 

3개의 클래스를 작성합니다.

- ExecutionHelper : ExecutorService를 구현한 객체를 가지며, 작업 완료 후 결과값을 쉽게 사용할 수 있도록 돕는 클래스

- NamedThreadFactory : Thread를 생성하는 Factory

- Searcher : ExecutionHelper에 작업을 공급하는 Producer

 

ExecutionHelper

Iterator와 Iterable 인터페이스를 구현함으로써 작업이 완료된 CompletionService를 쉽게 사용할 수 있게 합니다.

Executor 객체를 만드는 createExecutor 메소드, 작업을 병렬처리로 실행하는 submit 메소드, 마지막으로 안전하게 종료시키는 shutdownExecutorService 메소드를 구현합니다.

package test.excutor_service;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutionHelper<T> implements Iterator<T>, Iterable<T> {
	private final CompletionService<T> service;
	private int numTasks;
	
	public ExecutionHelper(final Executor executor) {
		this.service = new ExecutorCompletionService<>(executor);
	}

	@Override
	public boolean hasNext() {
		return numTasks > 0;
	}

	@Override
	public T next() {
		if (!this.hasNext()) 
			throw new NoSuchElementException("next() is called but hasNext() returned false");
		try {
			return service.take().get();
		} catch (InterruptedException e) {
			throw new ThreadInterruptedException(e);
		} catch (ExecutionException e) {
			throw new RuntimeException(e);
		} finally {
			--numTasks;
		}
	}
	
	public Iterator<T> iterator() {
		return this;
	}
	
	public void submit(Callable<T> task) {
		this.service.submit(task);
		++numTasks;
	}
	
	public static ExecutorService createExecutor(int poolSize, String FactoryName) {
		ExecutorService executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS
				, new LinkedBlockingQueue<Runnable>()
				, new NamedThreadFactory(FactoryName));
		return executor;
	}
	
	public static void shutdownExecutorService(ExecutorService es) {
		if (es != null) {
			try {
				es.shutdown();
				es.awaitTermination(1, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				System.err.println("Could not properly shutdown executor service.");
				e.printStackTrace(System.err);
			}
		}
	}
}

 

NamedThreadFactory

스레드 생성을 관리하는 Factory로 생성될 스레드의 이름과 상태를 설정합니다.

package test.excutor_service;

import java.util.Locale;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

	private static final AtomicInteger THREAD_POOL_NUMBER = new AtomicInteger(1);
	private final ThreadGroup group;
	private final AtomicInteger threadNumber = new AtomicInteger(1);
	private static final String NAME_PATTERN = "%s-%d-thread";
	private final String threadNamePrefix;
	
	public NamedThreadFactory(String threadNamePrefix) {
		final SecurityManager s = System.getSecurityManager();
		group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		this.threadNamePrefix = String.format(Locale.ROOT, NAME_PATTERN,
				checkPrefix(threadNamePrefix), THREAD_POOL_NUMBER.getAndIncrement());
	}

	private static String checkPrefix(String prefix) {
		return prefix == null || prefix.length() == 0 ? "Lucene" : prefix;
	}
	
	@Override
	public Thread newThread(Runnable r) {
		final Thread t = new Thread(group, r, String.format(Locale.ROOT, "%s-%d",
				threadNamePrefix, threadNumber.getAndIncrement()), 0);
		t.setDaemon(false);
		t.setPriority(Thread.NORM_PRIORITY);
		return t;
	}
}

 

Searcher

위의 ExecutionHelper를 사용해 Executor를 생성하고, 작업을 정의하여 작업을 공급합니다.

search 메소드의 마지막에서는 생성한 Executor를 shutdown 시킴으로써 안전하게 종료시킵니다.

package test.excutor_service;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

public class Searcher {
	public void search(String[] texts) {

		ExecutorService service = ExecutionHelper.createExecutor(texts.length, "test");
		try {
			final ExecutionHelper<String> runner = new ExecutionHelper<String>(service);

			Random random = new Random();

			for (String text : texts) {
				int randomSec = random.nextInt(3);
				runner.submit(new Callable<String>() {
					@Override
					public String call() throws Exception {
						// 멀티스레드 동작을 쉽게 확인하기 위해 스레드 sleep 사용
						Thread.sleep(randomSec * 1000);
						return text + " 지연 : " + (randomSec * 1000) + " => " + text.toUpperCase();
					}
				});
			}
			
            // 위의 Task가 완료되어 CompletionService에 들어오면,
            // 반환값을 꺼내 사용
			// 1. Iterator
			// while (runner.hasNext()) {
			// String converted = runner.next();
			// System.out.println(converted);
			// }

			// 2.Iterable
			for (String converted : runner) {
				System.out.println(converted);
			}

		} finally {
			ExecutionHelper.shutdownExecutorService(service);
		}
	}

	public static void main(String[] args) {
		Searcher searcher = new Searcher();
		searcher.search(new String[] { "a", "b", "c", "d" });
	}
}

 

3. 참고

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html

 

Executor (Java Platform SE 8 )

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explici

docs.oracle.com

https://lucene.apache.org/core/

 

Apache Lucene - Apache Lucene Core

Apache Lucene Core Apache LuceneTM is a high-performance, full-featured text search engine library written entirely in Java. It is a technology suitable for nearly any application that requires full-text search, especially cross-platform. Apache Lucene is

lucene.apache.org

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

 

ExecutorService (Java Platform SE 8 )

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks. An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are p

docs.oracle.com