이번에는 자바5 부터 멀티 쓰레드 기반의 동시성 프로그래밍을 위해 추가된 Executor, ExecutorService, ScheduledExecutorService와 Callable, Future를 살펴보도록 하겠습니다.
1. Callable과 Future 인터페이스에 대한 이해 및 사용법
[ Thread와 Runnable의 단점 및 한계 ]
Thread와 Runnable을 직접 사용하는 방식은 다음과 같은 한계점이 있습니.
- 지나치게 저수준의 API(쓰레드의 생성)에 의존함
- 값의 반환이 불가능
- 매번 쓰레드 생성과 종료하는 오버헤드가 발생
- 쓰레드들의 관리가 어려움
먼저 쓰레드를 어떻게 만드는지는 애플리케이션 개발자의 관심과는 거리가 먼데, Thread와 Runnable를 통한 쓰레드의 생성과 실행은 너무 저수준의 API를 필요로 합니다. 그리고 쓰레드의 작업이 끝난 후 결과를 반환받는 것도 불가능합니다. 또한 쓰레드를 사용하려면 항상 새롭게 쓰레드를 생성해야 하는데, 이는 비용이 많이 드는 작업이며 직접 쓰레드를 만드는 만큼 관리 역시 어렵습니다. 그래서 Java는 쓰레드를 위한 기술들을 꾸준히 발전시키고 있는데, 먼저 Java5에서 결과를 반환하도록 추가된 Callable과 Future를 알아보도록 합시다.
[ Callable 인터페이스 ]
기존의 Runnable 인터페이스는 결과를 반환할 수 없다는 한계점이 있었습니다. 반환값을 얻으려면 공용 메모리나 파이프 등을 사용해야 했는데, 이러한 작업은 상당히 번거롭게 합니다. 그래서 Runnable의 발전된형태로써, Java5에 함께 추가된 제네릭을 사용해 결과를 받을 수 있는 Callable이 추가되었습니다.
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
[ Future 인터페이스 ]
Callable 인터페이스의 구현체인 작업(Task)은 가용 가능한 쓰레드가 없어서 실행이 미뤄질 수 있고, 작업 시간이 오래 거릴 수도 있습니다. 그래서 실행 결과를 바로 받지 못하고 미래의 어느 시점에 얻을 수 있는데, 미래에 완료된 Callable의 반환 값을 구하기 위해 사용되는 것이 Future입니다. 즉, Future는 비동기 작업을 갖고 있어 미래에 실행 결과를 얻도록 도와줍니다. 이를 위해 비동기 작업의 현재 상태를 확인하고, 기다리며, 결과를 얻는 방법 등을 제공합니다. Future 인터페이스는 다음과 같은데, 각각의 메소드들에 대해 살펴보도록 합시다.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- get
- 블로킹 방식으로 결과를 가져옴
- 타임아웃 설정 가능
- isDone, isCancelled
- isDone은 작업의 완료 여부, isCancelled는 작업의 취소 여부를 반환함
- 완료 여부를 boolean으로 반환
- cancle
- 작업을 취소시키며, 취소 여부를 boolean으로 반환함
- cancle 후에 isDone()는 항상 true를 반환함
cancle의 파라미터로는 boolean 값을 전달할 수 있는데, ture를 전달하면 쓰레드를 interrupt 시켜 InterruptException을 발생시키고 false를 전달하면 진행중인 작업이 끝날때까지 대기합니다. cancle은 작업이 이미 정상적으로 완료되어 취소할 수 없는 경우에는 false를, 그렇지 않으면 true를 반환합니다. 그 외에도 작업이 이미 취소되었거나 취소가 불가능한 경우에도 false가 반환될 수 있습니다.
아래의 코드는 3초가 걸리는 작업의 결과를 얻기 위해 get을 호출하고 있습니다. get은 결과를 기다리는 블로킹 요청이므로 아래의 실행은 적어도 3초가 걸리며, 작업이 끝나고 isDone이 ture가 되면 아래의 실행은 종료가 됩니다.
@Test
void future() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(3000L);
return "Thread: " + Thread.currentThread().getName();
}
};
// It takes 3 seconds by blocking(블로킹에 의해 3초 걸림)
Future<String> future = executorService.submit(callable);
System.out.println(future.get());
executorService.shutdown();
}
2. Executors, Executor, ExecutorService등에 대한 이해 및 사용법
Java5에는 쓰레드의 생성과 관리를 위한 쓰레드 풀을 위한 기능들도 추가되었는데, 이번에는 쓰레드 풀을 위한 Executor, ExecutorService, ScheduledExecutorService와 쓰레드 풀 생성을 도와주는 팩토리 클래스인 Executor까지 살펴보도록 합시다.
[ Executor 인터페이스 ]
동시에 여러 요청을 처리해야 하는 경우에 매번 쓰레드를 만드는 것은 비효율적입니다. 그래서 쓰레드를 미리 만들어두고 재사용하기 위한 쓰레드 풀(Thread Pool)이 등장하게 되었는데, Excutor 인터페이스는 쓰레드 풀의 구현을 위한 인터페이스 입니다. 이러한 Executor 인터페이스를 간단히 정리하면 다음과 같습니다.
- 등록된 작업(Runnable)을 실행하기 위한 인터페이스
- 작업 등록과 작업 실행 중에서 작업 실행만을 책임짐
쓰레드는 크게 작업의 등록과 실행으로 나누어집니다. 그중에서도 Executor 인터페이슨느 분리 원칙(Interface Segregation Principle)에 맞게 등록된 작업을 실행하는 책임만 갖습니다. 그래서 전달받은 작업(Runnable)을 실행하는 메소드만 가지고 있습니다.
public interface Executor {
void execute(Runnable command);
}
Executor 인터페이스는 개발자들이 해당 작업의 실행과 쓰레드의 사용 및 스케줄링 등으로부터 벗어날 수 있도록 도와줍니다. 단순히 전달받은 Runnable 작업을 사용하는 코드를 Executor로 구현하면 다음과 같습니다.
@Test
void executorRun(){
final Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
Executor executor = new Executor();
executor.execute(runnalbe);
}
static class RunExecutor implements Executor {
@Override
public void execute(final Runnable command){
command.run();
}
}
하지만 위와 같은 코드는 단순히 객체의 메소드를 호출하는 것이므로, 새로운 쓰레드가 아닌 메인 쓰레드에서 실행이 됩니다. 만약 위의 코드를 새로운 쓰레드에서 실행시키면 Excutor의 execute 메소드를 다음과 같이 수정하면 됩니다.
@Test
void executorRun() {
final Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
Executor executor = new StartExecutor();
executor.execute(runnable);
}
static class StartExecutor implements Executor {
@Override
public void execute(final Runnable command) {
new Thread(command).start();
}
}
[ ExecutorService 인터페이스 ]
ExecutorService는 작업(Runnable, Callable)등록을 위한 인터페이스 입니다. ExecutorService는 Executor를 상속받아서 작업 등록 뿐만 아니라 실행을 위한 책임도 갖습니다. 그래서 쓰레드 풀은 기본적으로 Executor Service 인터페이스를 구현합니다. 대표적으로 ThreadPoolExecutor가 ExecutorService의 구현체인데, ThreadPoolExecutor 내부에 있는 블로킹 큐에 작업을 등록해줍니다.
위와 같이 크기가 2인 쓰레드 풀이 있다고 합시다. 각각의 쓰레드는 작업들을 할당받아 처리하는데, 만약 사용가능한 쓰레드가 없다면 작업은 큐에 대기하게 됩니다. 그러다가 쓰레드가 작업을 끝내면 다음 작업을 할당받게 되는 것입니다.
이러한 ExecutorService가 제공하는 퍼블릭 메소드들은 다음과 같이 분류가 가능합니다.
- 라이프사이클 관리를 위한 기능들
- 비동기 작업을 위한 기능들
라이프사이클 관리를 위한 기능들
ExecutorService는 Executor의 상태 확인과 작업 종료 등 라이프사이클 관리를 위한 메소드들을 제공하고 있습니다.
- shutdown
- 새로운 작업들을 더 이상 받아들이지 않음
- 호출 전에 제출된 작업들은 그대로 실행이 끝나고 종료됨(Graceful Shutdown)
- shutdownNow
- shutdown 기능에 더해 이미 제출된 작업들을 인터럽트 시킴
- 실행을 위해 대기중인 작업 목록(List<Runnable>을 반환함)
- isShutdown
- Executor의 shutdown여부를 반환함
- isTerminated
- shutdown 실행 후 모든 작업의 종료 여부를 반환함
- awaitTermination
- shutdown 실행 후, 지정한 시간 동안 모든 작업이 종료될 때 까지 대기함
- 지정한 시간 내에 모든 작업이 종료되었는지 여부를 반환
ExecutorService를 만들어 작업을 실행해보면, shutdown이 호출되기 전까지 계속해서 다음 작업을 대기하게 됩니다. 그러므로 작업이 완료되었다면 반드시 shutdown을 명시적으로 호출해주어야 합니다.
@Test
void shutdown() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
executorService.execute(runnable);
// shutdown 호출
executorService.shutdown();
// shutdown 호출 이후에는 새로운 작업들을 받을 수 없음, 에러 발생
RejectedExecutionException result = assertThrows(RejectedExecutionException.class, () -> executorService.execute(runnable));
assertThat(result).isInstanceOf(RejectedExecutionException.class);
}
만약 작업 실행 후에 shutdown을 해주지 않으면 다음과 같이 프로세스가 끝나지 않고, 계속해서 다음 작업을 기다리게 됩니다. 다음의 메인 메소드를 실행해보면 애플리케이션이 끝나지 않음을 확인할 수 있습니다.
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
executorService.execute(runnable);
executorService.shutdown();
}
shutdown과 shutdownNow 시에 중요한 것은, 만약 실행중인 작업들에서 인터럽트 여부에 따른 처리 코드가 없다면 계속 실행된다는 것입니다. 그러므로 필요하다면 다음과 같이 인터럽트 시에 추가적인 조치를 구현해야 합니다.
@Test
void shutdownNow() throws InterruptedException {
Runnable runnable = () -> {
System.out.println("Start");
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted");
break;
}
}
System.out.println("End");
};
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(runnable);
executorService.shutdownNow();
Thread.sleep(1000L);
}
비동기 작업을 위한 기능들
ExecutorService는 Runnable과 Callable 작업을 사용하기 위한 메소드를 제공합니다. 동시에 여러 작업들을 실행시키는 메소드도 제공하고 있는데, 비동기 작업의 진행을 추적할 수 있도록 Future를 반홥합니다. 반환된 Future들은 모두 실행된것이므로 반환된 isDone은 ture입니다. 하지만 작업들은 정상적으로 종료되었을 수도 있고, 예외에 의해 종료되었을 수도 있으므로 항상 성공한 것은 아닙니다. 이러한 ExecutorService가 갖는 비동기 작업을 위한 메소드들을 정리하면 다음과 같습니다.
- submit
- 실행할 작업들을 추가하고, 작업의 상태와 결과를 포함하는 Future를 반환함
- Future의 get을 호출하면 성공적으로 작업이 완료된 후 결과를 얻을 수 있음
- invokeAll
- 모든 결과가 나올 때 까지 대기하는 블로킹 방식의 요청
- 동시에 주어진 작업들을 모두 실행하고, 전부 끝나면 각각의 상태와 결과를 갖는 List<Future>을 반환함
- invokeAny
- 가장 빨리 실행된 결과가 나올 때 까지 대기하는 블로킹 방식의 요청
- 동시에 주어진 작업들을 모두 실행하고, 가장 빨리 완료된 하나의 결과를 Future로 반환받음
ExecutorService의 구현체로는 AbstractExecutorService가 있는데, ExecutorService의 메소드들(submit,invokeAll, invokeAny)에 대한 기본 구현들을 제공합니다. invokeAll은 최대 쓰레드 풀의 크기만큼 작업을 동시에 실행시킵니다. 그러므로 쓰레드가 충분하다면 동시에 실행되는 작업들 중에서 가장 오래 걸리는 작업만큼 시간이 소요됩니다. 하지만 만약 쓰레드가 부족하다면 대기되는 작업들이 발생하므로 가장 오래 걸리는 작업의 시간에 더해 추가 시간이 필요합니다.
@Test
void invokeAll() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Instant start = Instant.now();
Callable<String> hello = () -> {
Thread.sleep(1000L);
final String result = "Hello";
System.out.println("result = " + result);
return result;
};
Callable<String> mang = () -> {
Thread.sleep(4000L);
final String result = "Java";
System.out.println("result = " + result);
return result;
};
Callable<String> kyu = () -> {
Thread.sleep(2000L);
final String result = "kyu";
System.out.println("result = " + result);
return result;
};
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, mang, kyu));
for(Future<String> f : futures) {
System.out.println(f.get());
}
System.out.println("time = " + Duration.between(start, Instant.now()).getSeconds());
executorService.shutdown();
}
invokeAny는 가장 빨리 끝난 작업 결과만을 구하므로, 동시에 실행한 작업들 중에서 가장 짧게 걸리는 작업만큼 시간이 걸립니다. 또한 가장 빠르게 처리된 작업 외의 나머지 작업들은 완료되지 않았으므로 cancel 처리되며, 작업이 진행되는 동안 작업들이 수정되면 결과가 정의되지 않습니다.
@Test
void invokeAny() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Instant start = Instant.now();
Callable<String> hello = () -> {
Thread.sleep(1000L);
final String result = "Hello";
System.out.println("result = " + result);
return result;
};
Callable<String> lo = () -> {
Thread.sleep(4000L);
final String result = "Java";
System.out.println("result =" + result);
return result;
};
Callable<String> cal = () -> {
Thread.sleep(2000L);
final String result = "cal";
System.out.println("result = " + result);
return result;
};
String result = executorService.invokeAny(Arrays.asList(hello, lo, cal));
System.out.println("result = " + result + " time = " + Duration.between(start, Instant.now()).getSeconds());
executorService.shutdown();
}
[ ScheduledExecutorService 인터페이스 ]
ScheduledExecutorService 는 ExecutorService를 상속받는 인터페이스로써, 특정 시간 이후에 또는 주기적으로 작업을 실행시키는 메소드가 추가되었습니다. 그래서 특정 시간대에 작업을 실행하거나 주기적으로 작업을 실행하고 싶을때 등에 사용할 수 있습니다.
- schedule
- 특정시간(delay) 이후에 작업을 실행시킴
- scheduleAtFixedRate
- 특정시간(delay) 이후 처음 작업을 실행시킴
- 작업이 실행되고 특정 시간마다 작업을 실행시킴
- scheduleWithFixedDelay
- 특정 시간(delay)이후 처음 작업을 실행시킴
- 작업이 완료되고 특정 시간이 지나면 작업을 실행시킴
가장 먼저 scheduleAtFixedRate를 살펴보도록 합시다. 예를 들어 1초가 걸리는 작업이 있고, 실행 주기(rate)를 2초로 설정했다고 합시다. schedulteAtFixedRate는 특정 식나마다 작업을 반복시키는 것이므로, 해당 작업은 다음과 같이 실행될 것입니다.
- 0:00 작업 실행
- 0:01 작업 종료
- 1초 대기(0:00초에 실행되어 rate가 2초이므로, 0:02초에 실행되어야 함)
- 0:02 작업 실행
- 0:03 작업 종료
- 1초 대기( 0:02초에 실행되어 rate가 2초이므로, 0:04초에 실행되어야 함)
- 0:04 작업 실행
- 0:05 작업 종료
scheduleWithFixedDelay는 조금 다르게 실행됩니다. 예를 들어 1초가 걸리는 작업이 있고, 실행 텀 (delay)를 2초로 설정했다고 합시다. scheduleWithFizedDelay는 이전 작업이 끝나고 나서 특정 시간 이후에 작업을 실행하므로, 실행 결과는 다음과 같습니다.
- 0:00 작업 실행
- 0:01 작업 종료
- 2초 delay
- 0:03 작업 실행
- 0:04 작업 종료
- 2초 delay
- 0:06 작업 실행
- 0:07 작업 종료
[ Excutors ]
앞서 살펴본 Executor, ExecutorService, ScheduledExecutorService는 쓰레드 풀을 위한 인터페이스입니다. 직접 쓰레드를 다루는 것은 번거로우므로, 이를 도와주는 팩토리 클래스인 Executors가 등장하게 되었습니다. Executors는 고수준 (High-Level)의 동시성 프로그래밍 모델로써 Executor, ExecutorService 또는 ScheduledExecutorService를 구현한 쓰레드 풀을 손쉽게 생성해줍니다.
- newFixedThreadPool
- 고정된 쓰레드 개수를 갖는 쓰레드풀을 생성함
- ExecutorService 인터페이스를 구현한 ThreadPoolExecutor 객체가 생성됨
- newCachedThreadPool
- 필요할때 필요한 만큼의 쓰레드 풀 생성함
- 이미 생성된 쓰레드가 있다면 이를 재활용 할 수 있음
- newScheduledThreadPool
- 일정 시간 뒤 혹은 주기적으로 실행되어야 하는 작업을 위한 쓰레드 풀을 생성함
- ScheduledExecutorService 인터페이스를 구현한 ScheduledThreadPoolExecutor 객체가 생성됨
- newSingleThreadExecutor, newSingleThreadScheduledExecutor
- 1개의 쓰레드만을 갖는 쓰레드 풀을 생성함
- 각각 newFixedThreadPool와 newScheduledThreadPool에 1개의 쓰레만을 생성하도록 한것임
Executors를 통해 쓰레드의 개수 및 종류를 정할 수 있으며, 이를 통해 쓰레드 생성과 실행 및 관리가 매우 용이해집니다. 하지만 쓰레드 풀을 생성 시에는 주의해야 합니다. 만약 newFixedThreadPool을 사용해 2개의 쓰레드를 갖는 쓰레드 풀을 생성했는데, 3개의 작업을 동시에 실행시킨다면 1개의 ㅈ가업은 실행되지 못합니다. 그러다가 쓰레드가 작업을 끝내고 반환되어 가용가능한 쓰레드가 생기면 남은 작업이 실행됩니다.
Java5에서 좋은 기능들이 많이 추가되었지만 Java5에 등장한 Future는 결과를 얻으려면 블로킹 방식으로 대기를 해야한다는 단점이 있습니다. Future에 처리 결과에 대한 콜백을 정의하면 이러한 문제를 쉽게 해결할 수 있는데, 이를 보완하여 Java8에 추가된것이 바로 CompletableFuture입니다.
관련 포스팅
1. https://jolocal.tistory.com/21
[Java] Thread와 Runnable에 대한 이해 및 사용법
자바 초기부터 멀티 쓰레드 기반의 동시성 프로그래밍을 위해 만들어졌던 Thread와 Runnable를 살펴보도록 하겠습니다. 1. Thread와 Runnable에 대한 이해 및 사용법 [ 쓰레드와 자바의 멀티 쓰레드] 쓰레
jolocal.tistory.com
2. https://jolocal.tistory.com/20
[Java] CompletableFuture에 대한 이해 및 사용법
1. CompletableFuture에 대한 이해 [ Future의 단점 및 한계 ] Java5에 Future가 추가되면서 비동기 작업에 대한 겨로가값을 반환 받을 수 있게 되었습니다. 하지만 Future는 다음과 같은 한계점이 있습니다.
jolocal.tistory.com
'Java' 카테고리의 다른 글
[Java] Java NIO의 Tomcat에서의 동작 (0) | 2023.12.28 |
---|---|
[Java] 동기/비동기 & NonBlocking Blocking과 Spring의 관계 (1) | 2023.12.28 |
[Java] Thread와 Runnable에 대한 이해 및 사용법 (0) | 2023.12.13 |
[Java] CompletableFuture에 대한 이해 및 사용법 (0) | 2023.12.13 |
[Java] try-with-resources란? (1) | 2023.12.07 |