오픈소스 프로젝트

Conductor Error Report - Join 이후 Task 실행이 안되는 현상

HOONY_612 2024. 4. 24. 14:03
반응형

현상

간헐적으로 Join 노드 이후 다음 Task 실행이 안되는 현상이 발생.

 

 

1차 시도(Zookeeper -> LocalOnly)

Conductor 에서는 분산 환경에서 Lock 선택을 위한 Interface 를 제공합니다.

- Lock: LocalOnlyLock, NoopLock, RedisLock, ZookeeperLock

- application.properties 으로 타입 조정 가능

#disable locking during workflow execution
conductor.app.workflow-execution-lock-enabled=true
conductor.workflow-execution-lock.type=local_only

 

발생 에러 공통점은 Lock 획득 실패한 이후 Join“COMPLETED” 상태가 되었고 더 이상 SystemTaskWorker 에 의해 실행되지 않았습니다.

 
23:35:43.418 [system-task-worker-4] DEBUG com.netflix.conductor.core.execution.AsyncSystemTaskExecutor - TaskModel{taskType='JOIN', status=COMPLETED, inputData={joinOn=[n_13, n_12]}, referenceTaskName='join_14', retryCount=0, seq=11, correlationId='null', pollCount=4, taskDefName='JOIN', scheduledTime=1712586902536, startTime=1712586902476, endTime=1712586941452, updateTime=1712586926966, startDelayInSeconds=0, retriedTaskId='null', retried=false, executed=false, callbackFromWorker=true, responseTimeoutSeconds=0, workflowInstanceId='127c9254-684c-41b7-8130-2391ba0fdfc5', workflowType='crazy-nested-fork-join-spark', taskId='c704a472-a3df-409f-8223-0372b1bae63a', reasonForIncompletion='null', callbackAfterSeconds=1, workerId='null', outputData={n_13={query=set spark.ndap.query.invoker=WORKFLOW
..
23:35:43.924 [system-task-worker-4] DEBUG com.netflix.conductor.service.ExecutionLockService - Thread 72 failed to acquire lock to lockId 127c9254-684c-41b7-8130-2391ba0fdfc5.
...
???

 

Lock 문제 가능성을 생각했습니다.

현재 Conductor-server 1개 이므로 Zookeeper Lock -> LocalOnly Lock 으로 변경하였습니다.

그러나 동일 문제 발생하였습니다.

 

 

2차 시도(Lock Acquire Time 변경)

Lock 을 얻기 위한 Blocking Time 이 너무 짧아서 그런가? 라고 생각했습니다.

별도 Thread 에서 Lock 을 얻기 위해서 기다리는 시간인 timeToTry 를 조정했습니다.

기존 500ms 대기 후 failed to acquire lock ~ 로그를 출력합니다.

대기 시간(timeToTry)을 2초로 늘리고 테스트 진행했습니다

/** 
  * Acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry
  * duration before giving up
  *
  * @param lockId resource to lock on
  * @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock
  * @param leaseTime Lock lease expiration duration.
  * @param unit time unit
  * @return true, if successfully acquired
  */
boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);
/**
  * The time (in milliseconds) for which the thread will block in an attempt to acquire the lock.
  */
private Duration lockTimeToTry = Duration.ofMillis(500);
conductor.app.lock-time-to-try=2000

 

기존: 23:35:43.418 → 23:35:43.924 : 500ms

변경 후: 10:36:44.223 -> 10:36:46:353 : 2000ms

 

2초 후 로그가 찍히는 건 확인했습니다.

결과는 동일현상 발생하였습니다.

 

3차 시도(Conductor 측 문의 하기)

해결하기 어려워 Conductor 측에 문의하였습니다.

Conductor 측에서 저희 상황을 보고 분석해보겠다는 답변이 왔습니다.

 

2일 후 아래 PR 이 머지 돼서 이것을 저희 코드에 적용 했지만 동일 현상 발생하였습니다.

https://github.com/conductor-oss/conductor/pull/120

 

Mark join task synchronous by v1r3n · Pull Request #120 · conductor-oss/conductor

Pull Request type Bugfix Feature Refactoring (no functional changes, no api changes) Build related changes WHOSUSING.md Other (please describe): NOTE: Please remember to run ./gradlew spotl...

github.com

 

4차 시도(Lock 획득 실패 시 Join 상태 변경(COMPLETED -> IN_PROGRESS))

만약 JOIN 노드가 "COMPLETED" 되고 아래의 decide 메서드 호출 실패 시 다시 “IN_PROGRESS" 만들고 queue 에 넣는 방식을 해야할 것 같아서 PR 을 하였습니다.

....
} finally {
    executionDAOFacade.updateTask(task);
    if (shouldRemoveTaskFromQueue) {
        queueDAO.remove(queueName, task.getTaskId());
        LOGGER.debug("{} removed from queue: {}", task, queueName);
    }
    // if the current task execution has completed, then the workflow needs to be evaluated
    if (hasTaskExecutionCompleted) {
        workflowExecutor.decide(workflowId);
    }
}
...

 

그러나 Conductor 측 대답은 "만약 Lock 얻기를 실패했다면 WorkflowSweeper 가 그 역할을 주기적으로 체크하여 다음 Task 를 SCHEDULED 상태로 만들어 줄 것이다. 만약 그렇지 않다면 WorkflowSweeper 가 문제이다." 라는 답변이 왔습니다.

 

<WorkflowReconciler>

@Scheduled(
        fixedDelayString = "${conductor.sweep-frequency.millis:500}",
        initialDelayString = "${conductor.sweep-frequency.millis:500}")
public void pollAndSweep() {

 

5차 시도(WorkflowReconciler 변경)

 

WorkflowReconciler 역할은 RUNNING 중인 워크플로우 Evaluation, Timeout 체크해주는 등을 담당합니다.

condcutor-server 는 WorkflowReconciler 내부의 WorkflowRepairService 를 미사용 중이라 사용할 수 있도록 변경하였습니다.

@Autowired
public WorkflowSweeper(
        WorkflowExecutor workflowExecutor,
        Optional<WorkflowRepairService> workflowRepairService,
        ConductorProperties properties,
        QueueDAO queueDAO,
        ExecutionDAOFacade executionDAOFacade) {
    this.properties = properties;
    this.queueDAO = queueDAO;
    this.workflowExecutor = workflowExecutor;
    this.executionDAOFacade = executionDAOFacade;
    this.workflowRepairService = workflowRepairService.orElse(null);
    LOGGER.info("WorkflowSweeper initialized.");
}
.....
/**
 * A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the task or
 * workflow state.
 *
 * <p>This service expects that the underlying Queueing layer implements {@link
 * QueueDAO#containsMessage(String, String)} method. This can be controlled with <code>
 * conductor.workflow-repair-service.enabled</code> property.
 */
@Service
@ConditionalOnProperty(name = "conductor.workflow-repair-service.enabled", havingValue = "true")
public class WorkflowRepairService {
}

 

추가 속성입니다.

# By default with dynomite, we want the repair service enabled
conductor.workflow-repair-service.enabled=true

 

동일 현상 발생합니다.

 

6차 시도(Conductor community 사용자들의 경험을 바탕으로 개선)

아래 그림과 같이 여러 사용자들이 어려움을 겪고 자기만의 방식으로 해결해가고 있습니다.

Task 상태 업데이트 시 decide 를 필수로 호출하는 방식, ES7 Migration 방식 등이 있습니다.

저희는 Schedule 방식으로 RUNNING 중인 Workflow 대하여 decide 를 필수적으로 1분마다 호출 할 수 있도록 하였습니다.

 

결과는 문제 해결하였습니다.

 

결과

사용자가 많은 오픈소스에서도 큰 결함을 가질 수 있다는 점을 알았습니다.

그러나 이러한 결함을 이길 정도로 기능이 좋거나 레거시를 건드리기엔 위험 부담이 커 계속 사용하고 있다고 생각합니다.

점점 많은 사용자가 모이고 점진적으로 개선이 되면 위 문제도 본질적으로 해결될 것이라 믿습니다.

 

반응형