-
Conductor Error Report - Join 이후 Task 실행이 안되는 현상오픈소스 프로젝트 2024. 4. 24. 14:03반응형
현상
간헐적으로 Join 노드 이후 다음 Task 실행이 안되는 현상이 발생.
1차 시도(Zookeeper -> LocalOnly)
Conductor 에서는 분산 환경에서 Lock 선택을 위한 Interface 를 제공합니다.
- Lock: LocalOnlyLock, NoopLock, RedisLock, ZookeeperLock
- application.properties 으로 타입 조정 가능
#disable locking during workflow execution conductor.app.workflow-execution-lock-enabled=true conductor.workflow-execution-lock.type=local_only
발생 에러 공통점은 Lock 획득 실패한 이후 Join 은 “COMPLETED” 상태가 되었고 더 이상 SystemTaskWorker 에 의해 실행되지 않았습니다.
23:35:43.418 [system-task-worker-4] DEBUG com.netflix.conductor.core.execution.AsyncSystemTaskExecutor - TaskModel{taskType='JOIN', status=COMPLETED, inputData={joinOn=[n_13, n_12]}, referenceTaskName='join_14', retryCount=0, seq=11, correlationId='null', pollCount=4, taskDefName='JOIN', scheduledTime=1712586902536, startTime=1712586902476, endTime=1712586941452, updateTime=1712586926966, startDelayInSeconds=0, retriedTaskId='null', retried=false, executed=false, callbackFromWorker=true, responseTimeoutSeconds=0, workflowInstanceId='127c9254-684c-41b7-8130-2391ba0fdfc5', workflowType='crazy-nested-fork-join-spark', taskId='c704a472-a3df-409f-8223-0372b1bae63a', reasonForIncompletion='null', callbackAfterSeconds=1, workerId='null', outputData={n_13={query=set spark.ndap.query.invoker=WORKFLOW .. 23:35:43.924 [system-task-worker-4] DEBUG com.netflix.conductor.service.ExecutionLockService - Thread 72 failed to acquire lock to lockId 127c9254-684c-41b7-8130-2391ba0fdfc5. ... ???
Lock 문제 가능성을 생각했습니다.
현재 Conductor-server 1개 이므로 Zookeeper Lock -> LocalOnly Lock 으로 변경하였습니다.
그러나 동일 문제 발생하였습니다.
2차 시도(Lock Acquire Time 변경)
Lock 을 얻기 위한 Blocking Time 이 너무 짧아서 그런가? 라고 생각했습니다.
별도 Thread 에서 Lock 을 얻기 위해서 기다리는 시간인 timeToTry 를 조정했습니다.
기존 500ms 대기 후 failed to acquire lock ~ 로그를 출력합니다.
대기 시간(timeToTry)을 2초로 늘리고 테스트 진행했습니다
/** * Acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry * duration before giving up * * @param lockId resource to lock on * @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock * @param leaseTime Lock lease expiration duration. * @param unit time unit * @return true, if successfully acquired */ boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);
/** * The time (in milliseconds) for which the thread will block in an attempt to acquire the lock. */ private Duration lockTimeToTry = Duration.ofMillis(500);
conductor.app.lock-time-to-try=2000
기존: 23:35:43.418 → 23:35:43.924 : 500ms
변경 후: 10:36:44.223 -> 10:36:46:353 : 2000ms
2초 후 로그가 찍히는 건 확인했습니다.
결과는 동일현상 발생하였습니다.
3차 시도(Conductor 측 문의 하기)
해결하기 어려워 Conductor 측에 문의하였습니다.
Conductor 측에서 저희 상황을 보고 분석해보겠다는 답변이 왔습니다.
2일 후 아래 PR 이 머지 돼서 이것을 저희 코드에 적용 했지만 동일 현상 발생하였습니다.
https://github.com/conductor-oss/conductor/pull/120
Mark join task synchronous by v1r3n · Pull Request #120 · conductor-oss/conductor
Pull Request type Bugfix Feature Refactoring (no functional changes, no api changes) Build related changes WHOSUSING.md Other (please describe): NOTE: Please remember to run ./gradlew spotl...
github.com
4차 시도(Lock 획득 실패 시 Join 상태 변경(COMPLETED -> IN_PROGRESS))
만약 JOIN 노드가 "COMPLETED" 되고 아래의 decide 메서드 호출 실패 시 다시 “IN_PROGRESS" 만들고 queue 에 넣는 방식을 해야할 것 같아서 PR 을 하였습니다.
.... } finally { executionDAOFacade.updateTask(task); if (shouldRemoveTaskFromQueue) { queueDAO.remove(queueName, task.getTaskId()); LOGGER.debug("{} removed from queue: {}", task, queueName); } // if the current task execution has completed, then the workflow needs to be evaluated if (hasTaskExecutionCompleted) { workflowExecutor.decide(workflowId); } } ...
그러나 Conductor 측 대답은 "만약 Lock 얻기를 실패했다면 WorkflowSweeper 가 그 역할을 주기적으로 체크하여 다음 Task 를 SCHEDULED 상태로 만들어 줄 것이다. 만약 그렇지 않다면 WorkflowSweeper 가 문제이다." 라는 답변이 왔습니다.
<WorkflowReconciler>
@Scheduled( fixedDelayString = "${conductor.sweep-frequency.millis:500}", initialDelayString = "${conductor.sweep-frequency.millis:500}") public void pollAndSweep() {
5차 시도(WorkflowReconciler 변경)
WorkflowReconciler 역할은 RUNNING 중인 워크플로우 Evaluation, Timeout 체크해주는 등을 담당합니다.
condcutor-server 는 WorkflowReconciler 내부의 WorkflowRepairService 를 미사용 중이라 사용할 수 있도록 변경하였습니다.
@Autowired public WorkflowSweeper( WorkflowExecutor workflowExecutor, Optional<WorkflowRepairService> workflowRepairService, ConductorProperties properties, QueueDAO queueDAO, ExecutionDAOFacade executionDAOFacade) { this.properties = properties; this.queueDAO = queueDAO; this.workflowExecutor = workflowExecutor; this.executionDAOFacade = executionDAOFacade; this.workflowRepairService = workflowRepairService.orElse(null); LOGGER.info("WorkflowSweeper initialized."); } ..... /** * A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the task or * workflow state. * * <p>This service expects that the underlying Queueing layer implements {@link * QueueDAO#containsMessage(String, String)} method. This can be controlled with <code> * conductor.workflow-repair-service.enabled</code> property. */ @Service @ConditionalOnProperty(name = "conductor.workflow-repair-service.enabled", havingValue = "true") public class WorkflowRepairService { }
추가 속성입니다.
# By default with dynomite, we want the repair service enabled conductor.workflow-repair-service.enabled=true
동일 현상 발생합니다.
6차 시도(Conductor community 사용자들의 경험을 바탕으로 개선)
아래 그림과 같이 여러 사용자들이 어려움을 겪고 자기만의 방식으로 해결해가고 있습니다.
Task 상태 업데이트 시 decide 를 필수로 호출하는 방식, ES7 Migration 방식 등이 있습니다.
저희는 Schedule 방식으로 RUNNING 중인 Workflow 대하여 decide 를 필수적으로 1분마다 호출 할 수 있도록 하였습니다.
결과는 문제 해결하였습니다.
결과
사용자가 많은 오픈소스에서도 큰 결함을 가질 수 있다는 점을 알았습니다.
그러나 이러한 결함을 이길 정도로 기능이 좋거나 레거시를 건드리기엔 위험 부담이 커 계속 사용하고 있다고 생각합니다.
점점 많은 사용자가 모이고 점진적으로 개선이 되면 위 문제도 본질적으로 해결될 것이라 믿습니다.
반응형'오픈소스 프로젝트' 카테고리의 다른 글
오픈소스에 커스텀 코드가 들어갈 때 어떻게 관리해야할까? (0) 2024.01.18