Micrometer custom metric 생성하기
개요
Micrometer provides a simple facade over the instrumentation clients for the most popular observability systems, allowing you to instrument your JVM-based application code without vendor lock-in. Think SLF4J, but for application observability! Data recorded by Micrometer are intended to be used to observe, alert, and react to the current/recent operational state of your environment.
업무를 하며 워크플로우 전환 후 사용자들이 메트릭을 필요로 하였다.
기존 Hive Oozie Workflow는 process per task로 동작하지만 Conductor는 thread per task로 동작한다.
고객들은 Worker가 Task 실행 중인 스레드가 몇 개 인지 확인이 필요하다고 요구사항으로 내놓았다.
Conductor server - worker 로 동작하는데 worker는 우리가 직접 개발해야하는 인터페이스 형태로 제공된다.
결과적으로 모니터링도 직접 구축해야한다는 얘기가 된다.
그래서 Conductor server에서는 어떻게 Metric 모니터링을 하고 있을까? 라는 생각이 들었다.
조사 결과 Micrometer Registry를 Spectator Registry로 등록하여 Metric을 수집하고 있다.
Wrapped Class와 유사한 개념으로 사용한다.
Spectator는 netflix에서 만든 Data Collector이다.
Spectator는 Prometheus를 지원하지 않는다.
https://github.com/Netflix/spectator/tree/main/spectator-reg-micrometer
https://github.com/Netflix/spectator/issues/516
그래서 Conductor server와는 다르게 Micrometer를 사용해보려고한다.
https://github.com/micrometer-metrics/micrometer/tree/main/implementations
micrometer 핵심 개념에 대해서 살펴보자.
개념
- Registry: Meter Set을 관리하는 장소. SimpleMeterRegistry, CompositeMeterRegistry, GlobalMeterRegistry 등이 있다.
// Simple: 각 미터들을 메모리 저장. 미터셋을 다른 시스템으로 보내지 않음.
MeterRegistry registry = new SimpleMeterRegistry();
// Composite: 여러개의 모니터링 시스템에 메트릭을 보내야하는 경우 사용함
CompositeMeterRegistry composite = new CompositeMeterRegistry();
Counter compositeCounter = composite.counter("counter");
compositeCounter.increment();
Registry registry = Spectator.globalRegistry()
composite.add(registry);
SimpleMeterRegistry simple = new SimpleMeterRegistry();
composite.add(simple);
compositeCounter.increment();
//Global
Metrics.counter("feature.2", "type", type).increment();
Metrics.addRegistry(new SimpleMeterRegistry());
- Meter: 어플리케이션의 측정 값들의 Set을 모으기 위해 사용하는 것. 중요한 요소로는 Counter, Timer, Gauge 등이 있다.
기본적인 Naming 규칙은 "."으로 사용한다.
더 나아가 MeterFilter를 사용하고 싶으면 Document 참고하자.
// Custom Nameing 규칙 등록하고 싶을 때
registry.config().namingConvention(myCustomNamingConvention);
// 등록 시 각 Tool에서 나오는 이름
registry.timer("http.server.requests");
Prometheus - http_server_requests_duration_seconds
Atlas - httpServerRequests
Graphite - http.server.requests
InfluxDB - http_server_requests
// Tag로 Metric 그룹화 하기
registry.counter("database.calls", "db", "users")
registry.counter("http.requests", "uri", "/api/users")
registry.config().commonTags(Arrays.asList(Tag.of("stack", "prod"), Tag.of("region", "us-east-1"))); // equivalently
- Counter: 이벤트를 측정 할 때 사용한다.
예를 들어 inser, remove 시 이벤트를 측정하는데 사용한다.
또한 일정 시간 간격 당 카운팅되는 속도를 보고 이상감지에도 이용할 수 있다.
이렇게 만들어진 Meter를 Visualization Tool로 Emitting할 수 있다.
// Counter 등록
Counter counter = Counter
.builder("counter")
.register(registry);
// Counter 사용
requestList.add(element);
insertCounter.increment();
- Timer: 이벤트가 소요되는 시간을 측정한다.
갑자기 일정시간 걸리다 많은 시간이 소요되는 Task를 확인하는 경우 유용하다.
어노테이션 기반 @Timed 사용할 수 있다.
// Runnable, Callabel 등이 사용가능.
Timer timer = Metrics.globalRegistry.timer("sink.task.timer");
timer.record(() -> taskLogic(task, taskResult));
// @Timed Annotation 사용.
@Service
public class ExampleService {
@Timed
public void sync() {
// @Timed will record the execution time of this method,
// from the start and until it exits normally or exceptionally.
}
@Async
@Timed
public CompletableFuture<?> async() {
// @Timed will record the execution time of this method,
// from the start and until the returned CompletableFuture
// completes normally or exceptionally.
return CompletableFuture.supplyAsync(...);
}
}
- Guage: Gauge는 특정 시점에 thread 갯수나 heap memory 사용량이 필요할 때 사용된다.
사용할 Thread 갯수는 아래와 같이 AtomicInteger로 표현할 수 있다.
Time 형식의 추적을 도와주는 TimeGauge, 다양한 필드를 커스텀하게 Tagging 해줄 수 있는 MultiGauge도 사용할 수 있다.
// Gauge 등록
AtomicInteger numOfSinkTaskThread = Metrics.globalRegistry.gauge("sink.task.thread.count", new AtomicInteger(0));
// 값 조정.
numOfSinkTaskThread.incrementAndGet();
numOfSinkTaskThread.decrementAndGet();
// TimeGauge
AtomicInteger msTimeGauge = new AtomicInteger(4000);
AtomicInteger usTimeGauge = new AtomicInteger(4000);
TimeGauge.builder("my.gauge", msTimeGauge, TimeUnit.MILLISECONDS, AtomicInteger::get).register(registry);
TimeGauge.builder("my.other.gauge", usTimeGauge, TimeUnit.MICROSECONDS, AtomicInteger::get).register(registry);
//Multi-gauge
MultiGauge statuses = MultiGauge.builder("statuses")
.tag("job", "dirty")
.description("The number of widgets in various statuses")
.baseUnit("widgets")
.register(registry);
statuses.register(
resultSet.stream()
.map(result -> Row.of(Tags.of("status", result.getAsString("status")), result.getAsInt("count")))
.collect(toList())
);
Conductor 사용 사례
주요 클래스는 Monitor, WorkflowMonitor이다.
Monitor는 위에서 설명한 Timer, Gauge, Counter Wrapped Class로 이해하자.
WorkflowMonitor는 2분 Schedule Job을 이용하여 Metric를 Gauge를 통하여 기록하고 있다.
* Counter
public static void error(String className, String methodName) {
}
public static void recordCounter(String name, long count, String... additionalTags) {
}
public static void recordTaskPollError(String taskType, String domain, String exception) {
}
public static void recordTaskPoll(String taskType) {
}
public static void recordTaskPollCount(String taskType, String domain, int count) {
}
public static void recordTaskTimeout(String taskType) {
}
public static void recordTaskResponseTimeout(String taskType) {
}
public static void recordWorkflowTermination(
String workflowType, WorkflowModel.Status status, String ownerApp) {
}
public static void recordWorkflowStartSuccess(
String workflowType, String version, String ownerApp) {
}
public static void recordWorkflowStartError(String workflowType, String ownerApp) {
}
public static void recordUpdateConflict(
String taskType, String workflowType, WorkflowModel.Status status) {
}
public static void recordUpdateConflict(
String taskType, String workflowType, TaskModel.Status status) {
}
public static void recordTaskUpdateError(String taskType, String workflowType) {
}
public static void recordTaskExtendLeaseError(String taskType, String workflowType) {
}
public static void recordTaskQueueOpError(String taskType, String workflowType) {
}
public static void recordEventQueueMessagesProcessed(
String queueType, String queueName, int count) {
}
public static void recordObservableQMessageReceivedErrors(String queueType) {
}
public static void recordEventQueueMessagesHandled(String queueType, String queueName) {
}
public static void recordEventQueueMessagesError(String queueType, String queueName) {
}
public static void recordEventExecutionSuccess(String event, String handler, String action) {
}
public static void recordEventExecutionError(
String event, String handler, String action, String exceptionClazz) {
}
public static void recordEventActionError(String action, String entityName, String event) {
}
public static void recordDaoRequests(
String dao, String action, String taskType, String workflowType) {
}
public static void recordDaoEventRequests(String dao, String action, String event) {
}
public static void recordExternalPayloadStorageUsage(
String name, String operation, String payloadType) {
}
public static void recordDaoError(String dao, String action) {
}
public static void recordAckTaskError(String taskType) {
}
public static void recordDiscardedIndexingCount(String queueType) {
}
public static void recordAcquireLockUnsuccessful() {
}
public static void recordAcquireLockFailure(String exceptionClassName) {
}
public static void recordWorkflowArchived(String workflowType, WorkflowModel.Status status) {
}
public static void recordDiscardedArchivalCount() {
}
public static void recordSystemTaskWorkerPollingLimited(String queueName) {
}
public static void recordQueueMessageRepushFromRepairService(String queueName) {
}
* Timer
public static void recordQueueWaitTime(String taskType, long queueWaitTime) {
}
public static void recordTaskExecutionTime(
String taskType, long duration, boolean includesRetries, TaskModel.Status status) {
}
public static void recordWorkflowDecisionTime(long duration) {
}
public static void recordWorkflowCompletion(
String workflowType, long duration, String ownerApp) {
}
public static void recordUnackTime(String workflowType, long duration) {
}
* Gauge
public static void recordGauge(String name, long count) {
}
public static void recordQueueDepth(String taskType, long size, String ownerApp) {
}
public static void recordTaskInProgress(String taskType, long size, String ownerApp) {
}
public static void recordRunningWorkflows(long count, String name, String ownerApp) {
}
public static void recordTaskPendingTime(String taskType, String workflowType, long duration) {
}
public static void recordTaskRateLimited(String taskDefName, int limit) {
}
public static void recordTaskConcurrentExecutionLimited(String taskDefName, int limit) {
}
public static void recordDaoPayloadSize(
String dao, String action, String taskType, String workflowType, int size) {
}
public static void recordWorkerQueueSize(String queueType, int val) {
}
public static void recordArchivalDelayQueueSize(int val) {
}
public static void recordEventQueuePollSize(String queueType, int val) {
}
public static void recordTaskExecLogSize(int val) {
}
실습
의존성 추가하자.
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
# For Using Probe Api
management.endpoint.prometheus.enabled=true
management.endpoints.web.exposure.include=metrics, prometheus, info, health
Worker 코드에 Gauge 적용해보려한다.
@Slf4j
@Component
public class SinkWorker extends AbstractTemplateWorker {
private AtomicInteger numOfSinkTaskThread =
Metrics.globalRegistry.gauge("sink.task.thread.count", new AtomicInteger(0));
@Override
protected void taskLogic(Task task, TaskResult taskResult) {
log.info("Sink Worker Started");
numOfSinkTaskThread.incrementAndGet();
// Task 5초 걸린다고 가정.
sleep(5000);
...
log.info("Sink Worker Completed");
numOfSinkTaskThread.decrementAndGet();
}
}
"sink_task_thread_count"라는 이름으로 Metric이 나올 것이다.
sink-task는 한 번에 10개까지 병렬 실행이 가능하고 11개 부터는 Queue에서 대기하고 있다가 이전 task 들이 끝나면 Consume해오는 방식이다. 아래 Workflow는 Fork-Join 방식으로 동시에 10개 실행시키고 1개는 나중에 실행하도록 만들어놓은 Workflow이다.
http://localhost:8082 접속해보자.(8080은 conductor-server가 실행 중)
아래 Type의 Metric을 찾아볼 수 있다.(현재 0개)
...
# HELP sink_task_thread_count
# TYPE sink_task_thread_count gauge
sink_task_thread_count 0.0
...
Workflow 실행하고 새로고침해보자.
...
# HELP sink_task_thread_count
# TYPE sink_task_thread_count gauge
sink_task_thread_count 10.0
...
10개가 실행되어지고 Thread 갯수 변경을 감지할 수 있다. 5초 뒤 1개가 된다.
모든 Task작업이 끝난 후에는 0개가 되어야한다.
Prometheus와 Grafana로 확인해보자.
연동 방법은 다른 블로그 참고해서 구축해보자.
Grafana Polling time이 15초이므로 하나의 Task당 20초 정도 걸린다고 수정하고 측정해보자.
참고
https://docs.micrometer.io/micrometer/reference/
https://github.com/Netflix/spectator
GitHub - Netflix/spectator: Client library for collecting metrics.
Client library for collecting metrics. Contribute to Netflix/spectator development by creating an account on GitHub.
github.com