Conductor Error Report - Join 이후 Task 실행이 안되는 현상

HOONY_612 2024. 4. 24. 14:03


간헐적으로 Join 노드 이후 다음 Task 실행이 안되는 현상이 발생.



1차 시도(Zookeeper -> LocalOnly)

Conductor 에서는 분산 환경에서 Lock 선택을 위한 Interface 를 제공합니다.

- Lock: LocalOnlyLock, NoopLock, RedisLock, ZookeeperLock

- application.properties 으로 타입 조정 가능

#disable locking during workflow execution


발생 에러 공통점은 Lock 획득 실패한 이후 Join“COMPLETED” 상태가 되었고 더 이상 SystemTaskWorker 에 의해 실행되지 않았습니다.

23:35:43.418 [system-task-worker-4] DEBUG com.netflix.conductor.core.execution.AsyncSystemTaskExecutor - TaskModel{taskType='JOIN', status=COMPLETED, inputData={joinOn=[n_13, n_12]}, referenceTaskName='join_14', retryCount=0, seq=11, correlationId='null', pollCount=4, taskDefName='JOIN', scheduledTime=1712586902536, startTime=1712586902476, endTime=1712586941452, updateTime=1712586926966, startDelayInSeconds=0, retriedTaskId='null', retried=false, executed=false, callbackFromWorker=true, responseTimeoutSeconds=0, workflowInstanceId='127c9254-684c-41b7-8130-2391ba0fdfc5', workflowType='crazy-nested-fork-join-spark', taskId='c704a472-a3df-409f-8223-0372b1bae63a', reasonForIncompletion='null', callbackAfterSeconds=1, workerId='null', outputData={n_13={query=set spark.ndap.query.invoker=WORKFLOW
23:35:43.924 [system-task-worker-4] DEBUG com.netflix.conductor.service.ExecutionLockService - Thread 72 failed to acquire lock to lockId 127c9254-684c-41b7-8130-2391ba0fdfc5.


Lock 문제 가능성을 생각했습니다.

현재 Conductor-server 1개 이므로 Zookeeper Lock -> LocalOnly Lock 으로 변경하였습니다.

그러나 동일 문제 발생하였습니다.



2차 시도(Lock Acquire Time 변경)

Lock 을 얻기 위한 Blocking Time 이 너무 짧아서 그런가? 라고 생각했습니다.

별도 Thread 에서 Lock 을 얻기 위해서 기다리는 시간인 timeToTry 를 조정했습니다.

기존 500ms 대기 후 failed to acquire lock ~ 로그를 출력합니다.

대기 시간(timeToTry)을 2초로 늘리고 테스트 진행했습니다

  * Acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry
  * duration before giving up
  * @param lockId resource to lock on
  * @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock
  * @param leaseTime Lock lease expiration duration.
  * @param unit time unit
  * @return true, if successfully acquired
boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);
  * The time (in milliseconds) for which the thread will block in an attempt to acquire the lock.
private Duration lockTimeToTry = Duration.ofMillis(500);


기존: 23:35:43.418 → 23:35:43.924 : 500ms

변경 후: 10:36:44.223 -> 10:36:46:353 : 2000ms


2초 후 로그가 찍히는 건 확인했습니다.

결과는 동일현상 발생하였습니다.


3차 시도(Conductor 측 문의 하기)

해결하기 어려워 Conductor 측에 문의하였습니다.

Conductor 측에서 저희 상황을 보고 분석해보겠다는 답변이 왔습니다.


2일 후 아래 PR 이 머지 돼서 이것을 저희 코드에 적용 했지만 동일 현상 발생하였습니다.



Mark join task synchronous by v1r3n · Pull Request #120 · conductor-oss/conductor

Pull Request type Bugfix Feature Refactoring (no functional changes, no api changes) Build related changes WHOSUSING.md Other (please describe): NOTE: Please remember to run ./gradlew spotl...



4차 시도(Lock 획득 실패 시 Join 상태 변경(COMPLETED -> IN_PROGRESS))

만약 JOIN 노드가 "COMPLETED" 되고 아래의 decide 메서드 호출 실패 시 다시 “IN_PROGRESS" 만들고 queue 에 넣는 방식을 해야할 것 같아서 PR 을 하였습니다.

} finally {
    if (shouldRemoveTaskFromQueue) {
        queueDAO.remove(queueName, task.getTaskId());
        LOGGER.debug("{} removed from queue: {}", task, queueName);
    // if the current task execution has completed, then the workflow needs to be evaluated
    if (hasTaskExecutionCompleted) {


그러나 Conductor 측 대답은 "만약 Lock 얻기를 실패했다면 WorkflowSweeper 가 그 역할을 주기적으로 체크하여 다음 Task 를 SCHEDULED 상태로 만들어 줄 것이다. 만약 그렇지 않다면 WorkflowSweeper 가 문제이다." 라는 답변이 왔습니다.



        fixedDelayString = "${conductor.sweep-frequency.millis:500}",
        initialDelayString = "${conductor.sweep-frequency.millis:500}")
public void pollAndSweep() {


5차 시도(WorkflowReconciler 변경)


WorkflowReconciler 역할은 RUNNING 중인 워크플로우 Evaluation, Timeout 체크해주는 등을 담당합니다.

condcutor-server 는 WorkflowReconciler 내부의 WorkflowRepairService 를 미사용 중이라 사용할 수 있도록 변경하였습니다.

public WorkflowSweeper(
        WorkflowExecutor workflowExecutor,
        Optional<WorkflowRepairService> workflowRepairService,
        ConductorProperties properties,
        QueueDAO queueDAO,
        ExecutionDAOFacade executionDAOFacade) {
    this.properties = properties;
    this.queueDAO = queueDAO;
    this.workflowExecutor = workflowExecutor;
    this.executionDAOFacade = executionDAOFacade;
    this.workflowRepairService = workflowRepairService.orElse(null);
    LOGGER.info("WorkflowSweeper initialized.");
 * A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the task or
 * workflow state.
 * <p>This service expects that the underlying Queueing layer implements {@link
 * QueueDAO#containsMessage(String, String)} method. This can be controlled with <code>
 * conductor.workflow-repair-service.enabled</code> property.
@ConditionalOnProperty(name = "conductor.workflow-repair-service.enabled", havingValue = "true")
public class WorkflowRepairService {


추가 속성입니다.

# By default with dynomite, we want the repair service enabled


동일 현상 발생합니다.


6차 시도(Conductor community 사용자들의 경험을 바탕으로 개선)

아래 그림과 같이 여러 사용자들이 어려움을 겪고 자기만의 방식으로 해결해가고 있습니다.

Task 상태 업데이트 시 decide 를 필수로 호출하는 방식, ES7 Migration 방식 등이 있습니다.

저희는 Schedule 방식으로 RUNNING 중인 Workflow 대하여 decide 를 필수적으로 1분마다 호출 할 수 있도록 하였습니다.


결과는 문제 해결하였습니다.



사용자가 많은 오픈소스에서도 큰 결함을 가질 수 있다는 점을 알았습니다.

그러나 이러한 결함을 이길 정도로 기능이 좋거나 레거시를 건드리기엔 위험 부담이 커 계속 사용하고 있다고 생각합니다.

점점 많은 사용자가 모이고 점진적으로 개선이 되면 위 문제도 본질적으로 해결될 것이라 믿습니다.

