ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Conductor Error Report - Join 이후 Task 실행이 안되는 현상
    오픈소스 프로젝트 2024. 4. 24. 14:03
    반응형

    현상

    간헐적으로 Join 노드 이후 다음 Task 실행이 안되는 현상이 발생.

     

     

    1차 시도(Zookeeper -> LocalOnly)

    Conductor 에서는 분산 환경에서 Lock 선택을 위한 Interface 를 제공합니다.

    - Lock: LocalOnlyLock, NoopLock, RedisLock, ZookeeperLock

    - application.properties 으로 타입 조정 가능

    #disable locking during workflow execution
    conductor.app.workflow-execution-lock-enabled=true
    conductor.workflow-execution-lock.type=local_only

     

    발생 에러 공통점은 Lock 획득 실패한 이후 Join“COMPLETED” 상태가 되었고 더 이상 SystemTaskWorker 에 의해 실행되지 않았습니다.

     
    23:35:43.418 [system-task-worker-4] DEBUG com.netflix.conductor.core.execution.AsyncSystemTaskExecutor - TaskModel{taskType='JOIN', status=COMPLETED, inputData={joinOn=[n_13, n_12]}, referenceTaskName='join_14', retryCount=0, seq=11, correlationId='null', pollCount=4, taskDefName='JOIN', scheduledTime=1712586902536, startTime=1712586902476, endTime=1712586941452, updateTime=1712586926966, startDelayInSeconds=0, retriedTaskId='null', retried=false, executed=false, callbackFromWorker=true, responseTimeoutSeconds=0, workflowInstanceId='127c9254-684c-41b7-8130-2391ba0fdfc5', workflowType='crazy-nested-fork-join-spark', taskId='c704a472-a3df-409f-8223-0372b1bae63a', reasonForIncompletion='null', callbackAfterSeconds=1, workerId='null', outputData={n_13={query=set spark.ndap.query.invoker=WORKFLOW
    ..
    23:35:43.924 [system-task-worker-4] DEBUG com.netflix.conductor.service.ExecutionLockService - Thread 72 failed to acquire lock to lockId 127c9254-684c-41b7-8130-2391ba0fdfc5.
    ...
    ???

     

    Lock 문제 가능성을 생각했습니다.

    현재 Conductor-server 1개 이므로 Zookeeper Lock -> LocalOnly Lock 으로 변경하였습니다.

    그러나 동일 문제 발생하였습니다.

     

     

    2차 시도(Lock Acquire Time 변경)

    Lock 을 얻기 위한 Blocking Time 이 너무 짧아서 그런가? 라고 생각했습니다.

    별도 Thread 에서 Lock 을 얻기 위해서 기다리는 시간인 timeToTry 를 조정했습니다.

    기존 500ms 대기 후 failed to acquire lock ~ 로그를 출력합니다.

    대기 시간(timeToTry)을 2초로 늘리고 테스트 진행했습니다

    /** 
      * Acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry
      * duration before giving up
      *
      * @param lockId resource to lock on
      * @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock
      * @param leaseTime Lock lease expiration duration.
      * @param unit time unit
      * @return true, if successfully acquired
      */
    boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);
    /**
      * The time (in milliseconds) for which the thread will block in an attempt to acquire the lock.
      */
    private Duration lockTimeToTry = Duration.ofMillis(500);
    conductor.app.lock-time-to-try=2000

     

    기존: 23:35:43.418 → 23:35:43.924 : 500ms

    변경 후: 10:36:44.223 -> 10:36:46:353 : 2000ms

     

    2초 후 로그가 찍히는 건 확인했습니다.

    결과는 동일현상 발생하였습니다.

     

    3차 시도(Conductor 측 문의 하기)

    해결하기 어려워 Conductor 측에 문의하였습니다.

    Conductor 측에서 저희 상황을 보고 분석해보겠다는 답변이 왔습니다.

     

    2일 후 아래 PR 이 머지 돼서 이것을 저희 코드에 적용 했지만 동일 현상 발생하였습니다.

    https://github.com/conductor-oss/conductor/pull/120

     

    Mark join task synchronous by v1r3n · Pull Request #120 · conductor-oss/conductor

    Pull Request type Bugfix Feature Refactoring (no functional changes, no api changes) Build related changes WHOSUSING.md Other (please describe): NOTE: Please remember to run ./gradlew spotl...

    github.com

     

    4차 시도(Lock 획득 실패 시 Join 상태 변경(COMPLETED -> IN_PROGRESS))

    만약 JOIN 노드가 "COMPLETED" 되고 아래의 decide 메서드 호출 실패 시 다시 “IN_PROGRESS" 만들고 queue 에 넣는 방식을 해야할 것 같아서 PR 을 하였습니다.

    ....
    } finally {
        executionDAOFacade.updateTask(task);
        if (shouldRemoveTaskFromQueue) {
            queueDAO.remove(queueName, task.getTaskId());
            LOGGER.debug("{} removed from queue: {}", task, queueName);
        }
        // if the current task execution has completed, then the workflow needs to be evaluated
        if (hasTaskExecutionCompleted) {
            workflowExecutor.decide(workflowId);
        }
    }
    ...

     

    그러나 Conductor 측 대답은 "만약 Lock 얻기를 실패했다면 WorkflowSweeper 가 그 역할을 주기적으로 체크하여 다음 Task 를 SCHEDULED 상태로 만들어 줄 것이다. 만약 그렇지 않다면 WorkflowSweeper 가 문제이다." 라는 답변이 왔습니다.

     

    <WorkflowReconciler>

    @Scheduled(
            fixedDelayString = "${conductor.sweep-frequency.millis:500}",
            initialDelayString = "${conductor.sweep-frequency.millis:500}")
    public void pollAndSweep() {

     

    5차 시도(WorkflowReconciler 변경)

     

    WorkflowReconciler 역할은 RUNNING 중인 워크플로우 Evaluation, Timeout 체크해주는 등을 담당합니다.

    condcutor-server 는 WorkflowReconciler 내부의 WorkflowRepairService 를 미사용 중이라 사용할 수 있도록 변경하였습니다.

    @Autowired
    public WorkflowSweeper(
            WorkflowExecutor workflowExecutor,
            Optional<WorkflowRepairService> workflowRepairService,
            ConductorProperties properties,
            QueueDAO queueDAO,
            ExecutionDAOFacade executionDAOFacade) {
        this.properties = properties;
        this.queueDAO = queueDAO;
        this.workflowExecutor = workflowExecutor;
        this.executionDAOFacade = executionDAOFacade;
        this.workflowRepairService = workflowRepairService.orElse(null);
        LOGGER.info("WorkflowSweeper initialized.");
    }
    .....
    /**
     * A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the task or
     * workflow state.
     *
     * <p>This service expects that the underlying Queueing layer implements {@link
     * QueueDAO#containsMessage(String, String)} method. This can be controlled with <code>
     * conductor.workflow-repair-service.enabled</code> property.
     */
    @Service
    @ConditionalOnProperty(name = "conductor.workflow-repair-service.enabled", havingValue = "true")
    public class WorkflowRepairService {
    }

     

    추가 속성입니다.

    # By default with dynomite, we want the repair service enabled
    conductor.workflow-repair-service.enabled=true

     

    동일 현상 발생합니다.

     

    6차 시도(Conductor community 사용자들의 경험을 바탕으로 개선)

    아래 그림과 같이 여러 사용자들이 어려움을 겪고 자기만의 방식으로 해결해가고 있습니다.

    Task 상태 업데이트 시 decide 를 필수로 호출하는 방식, ES7 Migration 방식 등이 있습니다.

    저희는 Schedule 방식으로 RUNNING 중인 Workflow 대하여 decide 를 필수적으로 1분마다 호출 할 수 있도록 하였습니다.

     

    결과는 문제 해결하였습니다.

     

    결과

    사용자가 많은 오픈소스에서도 큰 결함을 가질 수 있다는 점을 알았습니다.

    그러나 이러한 결함을 이길 정도로 기능이 좋거나 레거시를 건드리기엔 위험 부담이 커 계속 사용하고 있다고 생각합니다.

    점점 많은 사용자가 모이고 점진적으로 개선이 되면 위 문제도 본질적으로 해결될 것이라 믿습니다.

     

    반응형

    댓글

Designed by Tistory.