ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava Worker 핵심 정리 — 스케줄러와 스레드 제어 제대로 알기
    카테고리 없음 2026. 4. 8. 23:55

    Worker란 무엇인가?

    RxJava에서 Worker는 Scheduler의 내부 추상 클래스로, 특정 스레드 또는 스레드 풀 위에서 작업을 예약하고 실행하는 단위입니다.
    Scheduler가 "어느 스레드에서 실행할지"를 결정하는 정책이라면, Worker는 그 정책을 실제로 수행하는 실행 주체입니다.
    직접 Worker를 사용하는 경우는 커스텀 Scheduler를 구현하거나, Scheduler 내부 동작을 세밀하게 제어할 때 주로 등장합니다.

    Scheduler와 Worker의 관계

    Scheduler는 createWorker() 메서드를 통해 Worker 인스턴스를 생성합니다.
    하나의 Scheduler에서 여러 Worker를 만들 수 있으며, 각 Worker는 독립된 작업 큐를 갖습니다.
    Worker는 Disposable을 구현하므로 생명주기 관리가 가능하고, dispose() 호출 시 예약된 작업이 모두 취소됩니다.

    Scheduler scheduler = Schedulers.io();
    Scheduler.Worker worker = scheduler.createWorker();
    
    worker.schedule(() -> {
        System.out.println("작업 실행: " + Thread.currentThread().getName());
    });
    
    // 사용 후 반드시 해제
    worker.dispose();

    위 코드처럼 Worker를 직접 생성해 작업을 예약할 수 있습니다.
    schedule() 메서드는 즉시 실행을 예약하며, 지연 실행과 주기 실행도 지원합니다.
    Worker는 일회성 사용 후 반드시 dispose()를 호출해 리소스를 해제해야 합니다.

    Worker의 주요 메서드

    Worker는 세 가지 핵심 메서드를 제공합니다.
    각 메서드는 작업 예약 방식에 차이가 있으므로 상황에 맞게 선택해야 합니다.

    • schedule(Runnable run) — 즉시 실행 예약
    • schedule(Runnable run, long delay, TimeUnit unit) — 지정 시간 후 실행
    • schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) — 초기 지연 후 주기적 반복 실행
    Scheduler.Worker worker = Schedulers.computation().createWorker();
    
    // 2초 후 1번 실행
    worker.schedule(() -> System.out.println("delayed task"), 2, TimeUnit.SECONDS);
    
    // 1초 후 시작해서 3초마다 반복 실행
    Disposable periodic = worker.schedulePeriodically(
        () -> System.out.println("periodic task"),
        1, 3, TimeUnit.SECONDS
    );
    
    // 반복 작업 취소
    periodic.dispose();

    schedulePeriodically()의 반환값은 Disposable이므로 반복 작업만 개별적으로 취소할 수 있습니다.
    Worker 전체를 dispose()하면 해당 Worker에 예약된 모든 작업이 일괄 취소됩니다.

    RxJava 기본 제공 Scheduler 별 Worker 특성

    RxJava는 용도에 따라 다양한 Scheduler를 제공하며, 각각 Worker 동작 방식이 다릅니다.

    Scheduler Worker 특성 적합한 작업
    Schedulers.io() 필요에 따라 스레드 생성, 캐싱 재사용 네트워크, 파일 I/O
    Schedulers.computation() CPU 코어 수만큼 고정 스레드 풀 CPU 집약적 연산
    Schedulers.newThread() schedule 마다 새 스레드 생성 독립적 단발성 작업
    Schedulers.single() 단일 스레드로 직렬 처리 순서 보장이 필요한 작업
    AndroidSchedulers.mainThread() Android 메인 루퍼 사용 UI 업데이트

    Schedulers.io()computation()은 내부적으로 스레드 풀을 공유하므로, Worker를 여러 개 생성해도 실제 스레드 수는 제한됩니다.
    newThread()는 Worker 생성마다 새 스레드를 만들기 때문에 빈번하게 사용하면 오버헤드가 커집니다.

    커스텀 Scheduler 구현 시 Worker 활용

    표준 Scheduler가 요구사항을 충족하지 못할 때 커스텀 Scheduler를 구현합니다.
    이때 반드시 Scheduler를 상속하고 createWorker() 메서드를 오버라이드해야 합니다.

    public class SingleThreadScheduler extends Scheduler {
    
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
    
        @Override
        public Worker createWorker() {
            return new SingleThreadWorker(executor);
        }
    
        static class SingleThreadWorker extends Worker {
            private final ExecutorService executor;
            private final CompositeDisposable tasks = new CompositeDisposable();
    
            SingleThreadWorker(ExecutorService executor) {
                this.executor = executor;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (isDisposed()) return Disposables.disposed();
    
                ScheduledRunnable task = new ScheduledRunnable(run, tasks);
                tasks.add(task);
    
                if (delay <= 0) {
                    executor.execute(task);
                } else {
                    // 지연 실행은 별도 처리 필요
                }
                return task;
            }
    
            @Override
            public void dispose() {
                tasks.dispose();
            }
    
            @Override
            public boolean isDisposed() {
                return tasks.isDisposed();
            }
        }
    }

    커스텀 Worker 구현 시 dispose()isDisposed() 메서드를 반드시 올바르게 구현해야 메모리 누수를 방지할 수 있습니다.
    CompositeDisposable을 활용하면 예약된 모든 작업을 한 번에 관리할 수 있어 편리합니다.

    Worker 사용 시 주의사항

    Worker를 사용할 때 가장 흔히 발생하는 문제는 dispose() 누락으로 인한 리소스 누수입니다.
    특히 Android 환경에서 Activity나 Fragment가 종료될 때 Worker를 해제하지 않으면 백그라운드 작업이 계속 실행되어 크래시로 이어질 수 있습니다.

    • Worker는 재사용하지 않는다 — dispose() 후 동일 Worker에 작업을 예약하면 무시됨
    • isDisposed() 확인 후 작업 예약 — 이미 해제된 Worker에 접근하면 NullPointerException 발생 가능
    • 주기 작업은 반환된 Disposable로 관리 — Worker 전체 해제 전에 개별 취소가 필요한 경우 사용
    // 안전한 패턴
    if (!worker.isDisposed()) {
        worker.schedule(() -> { /* 작업 */ });
    }
    
    // try-finally로 반드시 해제 보장
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        worker.schedule(() -> doWork());
    } finally {
        worker.dispose();
    }

    마무리

    RxJava Worker는 Scheduler의 실행 단위로, 스레드 수준의 세밀한 제어가 필요할 때 직접 활용합니다.
    일반적인 observeOn() / subscribeOn() 사용만으로도 대부분의 비동기 처리가 가능하지만, 커스텀 Scheduler 구현이나 지연·주기 실행 제어가 필요한 상황에서 Worker의 진가가 발휘됩니다.
    Worker 사용 후 dispose()를 통한 명시적 해제를 습관화하면 메모리 누수 없이 안정적인 비동기 코드를 작성할 수 있습니다.

Designed by Tistory.