Conductor Error Report - Join 이후 Task 실행이 안되는 현상
현상
간헐적으로 Join 노드 이후 다음 Task 실행이 안되는 현상이 발생.
1차 시도(Zookeeper -> LocalOnly)
Conductor 에서는 분산 환경에서 Lock 선택을 위한 Interface 를 제공합니다.
- Lock: LocalOnlyLock, NoopLock, RedisLock, ZookeeperLock
- application.properties 으로 타입 조정 가능
#disable locking during workflow execution
conductor.app.workflow-execution-lock-enabled=true
conductor.workflow-execution-lock.type=local_only
발생 에러 공통점은 Lock 획득 실패한 이후 Join 은 “COMPLETED” 상태가 되었고 더 이상 SystemTaskWorker 에 의해 실행되지 않았습니다.
23:35:43.418 [system-task-worker-4] DEBUG com.netflix.conductor.core.execution.AsyncSystemTaskExecutor - TaskModel{taskType='JOIN', status=COMPLETED, inputData={joinOn=[n_13, n_12]}, referenceTaskName='join_14', retryCount=0, seq=11, correlationId='null', pollCount=4, taskDefName='JOIN', scheduledTime=1712586902536, startTime=1712586902476, endTime=1712586941452, updateTime=1712586926966, startDelayInSeconds=0, retriedTaskId='null', retried=false, executed=false, callbackFromWorker=true, responseTimeoutSeconds=0, workflowInstanceId='127c9254-684c-41b7-8130-2391ba0fdfc5', workflowType='crazy-nested-fork-join-spark', taskId='c704a472-a3df-409f-8223-0372b1bae63a', reasonForIncompletion='null', callbackAfterSeconds=1, workerId='null', outputData={n_13={query=set spark.ndap.query.invoker=WORKFLOW
..
23:35:43.924 [system-task-worker-4] DEBUG com.netflix.conductor.service.ExecutionLockService - Thread 72 failed to acquire lock to lockId 127c9254-684c-41b7-8130-2391ba0fdfc5.
...
???
Lock 문제 가능성을 생각했습니다.
현재 Conductor-server 1개 이므로 Zookeeper Lock -> LocalOnly Lock 으로 변경하였습니다.
그러나 동일 문제 발생하였습니다.
2차 시도(Lock Acquire Time 변경)
Lock 을 얻기 위한 Blocking Time 이 너무 짧아서 그런가? 라고 생각했습니다.
별도 Thread 에서 Lock 을 얻기 위해서 기다리는 시간인 timeToTry 를 조정했습니다.
기존 500ms 대기 후 failed to acquire lock ~ 로그를 출력합니다.
대기 시간(timeToTry)을 2초로 늘리고 테스트 진행했습니다
/**
* Acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry
* duration before giving up
*
* @param lockId resource to lock on
* @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock
* @param leaseTime Lock lease expiration duration.
* @param unit time unit
* @return true, if successfully acquired
*/
boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);
/**
* The time (in milliseconds) for which the thread will block in an attempt to acquire the lock.
*/
private Duration lockTimeToTry = Duration.ofMillis(500);
conductor.app.lock-time-to-try=2000
기존: 23:35:43.418 → 23:35:43.924 : 500ms
변경 후: 10:36:44.223 -> 10:36:46:353 : 2000ms
2초 후 로그가 찍히는 건 확인했습니다.
결과는 동일현상 발생하였습니다.
3차 시도(Conductor 측 문의 하기)
해결하기 어려워 Conductor 측에 문의하였습니다.
Conductor 측에서 저희 상황을 보고 분석해보겠다는 답변이 왔습니다.

2일 후 아래 PR 이 머지 돼서 이것을 저희 코드에 적용 했지만 동일 현상 발생하였습니다.
https://github.com/conductor-oss/conductor/pull/120
Mark join task synchronous by v1r3n · Pull Request #120 · conductor-oss/conductor
Pull Request type Bugfix Feature Refactoring (no functional changes, no api changes) Build related changes WHOSUSING.md Other (please describe): NOTE: Please remember to run ./gradlew spotl...
github.com
4차 시도(Lock 획득 실패 시 Join 상태 변경(COMPLETED -> IN_PROGRESS))
만약 JOIN 노드가 "COMPLETED" 되고 아래의 decide 메서드 호출 실패 시 다시 “IN_PROGRESS" 만들고 queue 에 넣는 방식을 해야할 것 같아서 PR 을 하였습니다.
....
} finally {
executionDAOFacade.updateTask(task);
if (shouldRemoveTaskFromQueue) {
queueDAO.remove(queueName, task.getTaskId());
LOGGER.debug("{} removed from queue: {}", task, queueName);
}
// if the current task execution has completed, then the workflow needs to be evaluated
if (hasTaskExecutionCompleted) {
workflowExecutor.decide(workflowId);
}
}
...
그러나 Conductor 측 대답은 "만약 Lock 얻기를 실패했다면 WorkflowSweeper 가 그 역할을 주기적으로 체크하여 다음 Task 를 SCHEDULED 상태로 만들어 줄 것이다. 만약 그렇지 않다면 WorkflowSweeper 가 문제이다." 라는 답변이 왔습니다.

<WorkflowReconciler>
@Scheduled(
fixedDelayString = "${conductor.sweep-frequency.millis:500}",
initialDelayString = "${conductor.sweep-frequency.millis:500}")
public void pollAndSweep() {
5차 시도(WorkflowReconciler 변경)
WorkflowReconciler 역할은 RUNNING 중인 워크플로우 Evaluation, Timeout 체크해주는 등을 담당합니다.
condcutor-server 는 WorkflowReconciler 내부의 WorkflowRepairService 를 미사용 중이라 사용할 수 있도록 변경하였습니다.
@Autowired
public WorkflowSweeper(
WorkflowExecutor workflowExecutor,
Optional<WorkflowRepairService> workflowRepairService,
ConductorProperties properties,
QueueDAO queueDAO,
ExecutionDAOFacade executionDAOFacade) {
this.properties = properties;
this.queueDAO = queueDAO;
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.workflowRepairService = workflowRepairService.orElse(null);
LOGGER.info("WorkflowSweeper initialized.");
}
.....
/**
* A helper service that tries to keep ExecutionDAO and QueueDAO in sync, based on the task or
* workflow state.
*
* <p>This service expects that the underlying Queueing layer implements {@link
* QueueDAO#containsMessage(String, String)} method. This can be controlled with <code>
* conductor.workflow-repair-service.enabled</code> property.
*/
@Service
@ConditionalOnProperty(name = "conductor.workflow-repair-service.enabled", havingValue = "true")
public class WorkflowRepairService {
}
추가 속성입니다.
# By default with dynomite, we want the repair service enabled
conductor.workflow-repair-service.enabled=true
동일 현상 발생합니다.
6차 시도(Conductor community 사용자들의 경험을 바탕으로 개선)
아래 그림과 같이 여러 사용자들이 어려움을 겪고 자기만의 방식으로 해결해가고 있습니다.
Task 상태 업데이트 시 decide 를 필수로 호출하는 방식, ES7 Migration 방식 등이 있습니다.
저희는 Schedule 방식으로 RUNNING 중인 Workflow 대하여 decide 를 필수적으로 1분마다 호출 할 수 있도록 하였습니다.
결과는 문제 해결하였습니다.
결과
사용자가 많은 오픈소스에서도 큰 결함을 가질 수 있다는 점을 알았습니다.
그러나 이러한 결함을 이길 정도로 기능이 좋거나 레거시를 건드리기엔 위험 부담이 커 계속 사용하고 있다고 생각합니다.
점점 많은 사용자가 모이고 점진적으로 개선이 되면 위 문제도 본질적으로 해결될 것이라 믿습니다.