1. Flume 사용자 가이드
    1. 개요
    2. 소개
      1. 아키텍처
      2. 신뢰성(Reliability)
        1. E2E(End-to-End)
        2. SoF(Store on Failure)
        3. BE(Best Effort)
      3. 확장성(Scalability)
      4. 운영가능성(Manageability)
      5. 확장성(Extensibility)
  2. 독립 모드로 Flume 설치하기
    1. 미리 설치해 둘 것들
    2. 설치하기
    3. 실행하기
  3. 의사 분산 모드로 Flume 설치하기
    1. 실행하기
    2. 설정하기
    3. 로그 생성 및 수집하기
    4. 결론
  4. 데코레이터 예제코드
    1. 메타데이터 추가 데코레이터
    2. 인코딩 변환 데코레이터
  5. Flume 에이전트 성능 측정 결과

Flume 사용자 가이드

개요

플럼(Flume)는 분산 환경에서 대량의 로그 데이터를 효과적으로 수집하여, 합친 후 다른 곳으로 전송할 수 있는 신뢰할 수 있는 서비스다.
플럼은 단순하며 유연한 스트리밍 데이터 플로우(streaming data flow) 아키텍처를 기반으로 한다.
또한 플럼은 장애에 쉽게 대처 가능하며, 로그 유실에 대한 신뢰 수준을 상황에 맞게 변경할 수 있을뿐만 아니라, 장애 발생시 다양한 복구 메커니즘을 제공한다.
게다가 실시간으로 로그를 분석하는 어플리케이션을 개발할 수 있도록, 간단하며 확장 가능한 데이터 모델을 사용한다.

소개

클러스터에 있는 모든 장치로부터 로그 파일들을 수집한 후, 하둡 분산 파일 시스템(HDFS)과 같은 중앙 저장소에 저장하는 로깅 시스템을 구축해야 할 때 플럼은 가장 제격이다.

플럼은 아래의 핵심 목표를 만족시키도록 만들어졌다.

  • 신뢰성(Reliability)
  • 확장성(Scalability)
  • 운영가능성(Manageability)
  • 확장성(Extensibility)

이번 장에서는 플럼의 아키텍처에 대해 가장 추상적인 관점에서 살펴보며, 위의 4가지 목표를 어떻게 만족시켰는지 설명하려고 한다.

아키텍처

플럼의 아키텍처는 단순하며, 튼튼하고, 유연하다.
플럼 아키텍처는 스트림 지향의 데이터 플로우를 기반으로 한다.

데이터 플로우(data flow)

란 하나의 데이터 스트림이 생성지에서 목표지로 전달되어 처리되는 방식을 뜻한다.
데이터 플로우는 이벤트를 전송하고 수집하는 일련의 논리 노드(logical node)들로 구성된다.
여러 논리 노드들을 서로 순차적으로 연결하여 데이터 플로우를 구성한다.
논리 노드가 서로 연결되는 방식을 논리 노드 설정(logical node's configuration)이라고 부른다.

이러한 논리 노드들은 모두 플럼 마스터(Flume Master)에서 관리한다.
플럼 마스터란 플럼이 설치된 물리 노드(physical node)와 논리 노드에 대한 정보를 유지하고 있는 별도의 서비스다.
플럼 마스터가 설정을 논리 노드에 할당하며, 사용자가 설정을 바꾼 경우 변경된 사항을 모든 논리 노드에 알려주는 역할을 한다.
각 논리노드는 차례대로 플럼 마스터와 주기적으로 통신하여(heartbeat), 모니터링 관련 정보를 서로 공유하며 자신과 관련된 설정정보가 변경되었는지 확인한다.

위 그림에서 다수의 어플리케이션 서버에서 로그 데이터를 수집하기 위해 플럼을 배치하는 전형적인 아키텍처를 볼 수 있다.
플럼 아키텍처는 다수의 논리 노드(logical node)로 구성되며, 각 논리 노드는 3개의 티어로 분류된다.
첫 번째 티어는 에이전트(agent) 티어다.
에이전트 노드는 일반적으로 로그를 생성하는 머신에 설치되며, 플럼과 통신하는 데이터의 첫 번째 지점에 해당한다.
에이전트 노드는 데이터를 다음 티어인 컬렉터 노드로 전송한다.
컬렉터 노드는 각각의 데이터 흐름을 모은 후, 스토리지 티어로 다시 전송한다.

예를 들면 에이전트는 syslog 데이터를 리스닝하거나 웹서버나 하둡 JobTracker와 같은 서비스에서 생성되는 로그를 모니터링하고 있는 머신일 수 있다.
에이전트는 데이터 스트림을 생성하여 컬렉터로 전송한다.
그러면 컬렉터는 데이터 스트림을 합친 후, HDFS와 같은 스토리지 티어에 효과적으로 저장할 수 있게 된다.

논리 노드는 매우 유연하도록 추상화되었다. 각 논리 노드는 모두 source와 sink, 두개의 요소로 구성된다.

source

는 논리 노드가 수집할 데이터에 대한 설정이며, sink는 수집한 데이터를 전송할 대상에 대한 설정이다.
따라서 각 논리 노드는 source와 sink가 어떻게 설정되느냐에 따라 달라진다.

뿐만 아니라 각 논리노드를 서로 연결할 수도 있다.

source와 sink는 모두 decorator를 통해 추가적으로 설정할 수 있다.

decorator

는 전송할 데이터에 대해 간단한 처리작업이 필요할 때 사용한다.
앞의 예제의 경우, 컬렉터와 에이전트는 모두 동일한 소프트웨어로 실행한다.
런타임에 플럼 마스터가 각 논리 노드에 설정 정보를 할당한다.
즉 각 노드의 설정과 관련된 정보는 모두 런타임에 만들어지므로, 관련된 자바 프로세스를 재기동하거나 해당 장비에 로그인할 필요 없이
플럼 서비스가 기동중이라면 언제라도 설정 정보를 갱신할 수 있다.

source와 sink, 그리고 decorator(선택적으로 적용)는 매우 강력한 기본 요소다.
플럼은 이러한 아키텍처를 사용해서, 각 데이터 플로우별로 속성을 지정할 수 있다.
또한 이벤트의 메타데이터를 처리하거나, 데이터 플로우에 새로운 이벤트를 생성할 수도 있다.
또한 논리 노드는 데이터를 여러 개의 논리 노드로 전송할 수도 있다{color:red}(fan-out){color}.

이를 통해 다수의 데이터 흐름을 만들 수 있으며, 각 하위 흐름은 서로 다르게 설정할 수도 있다.
예를 들면 하나의 데이터 플로우에서는 수집 경로를 통해 데이터를 영속적인 저장소에 저장할 수도 있으며,
이와 동시에 다른 데이터 플루우에서는 해당 데이터를 또다른 시스템으로 전송하여 간단한 분석작업을 할수도 있다.

chaining

에이전트가 이벤트를 전송하던 컬렉터 서버가 다운된 경우, 자동으로 설정된 다음 컬렉터 서버로 이벤트를 전송하는 기능을 체이닝(chaining)이라고 부른다.

논리 노드와 물리 노드

논리 노드와 물리 노드를 반드시 구분할 수 있어야 한다.
물리 노드는 단일 머신의 단일 JVM 인스턴스에서 실행된 단일 자바 프로스세스에 해당한다.
일반적으로 각 장비마다 하나의 물리 노드만 존재한다.
물리 노드는 논리 노드의 컨테이너 역할을 하며, 각 논리 노드를 서로 결합해서 데이터 흐름을 만들어낼 수 있다.
각 물리 노드는 다수의 논리 노드를 호스팅하는 역할을 하며, 장비의 자원을 각 논리 노드에 할당하는 일을 중재한다.
따라서 앞의 예제에서 프로세스들을 논리적으로 분리했지만, 이들 논리 노드는 모두 하나의 물리 노드에서 실행될 수도 있다.
플럼을 사용하면 데이터에 대한 계산 작업을 수행하거나 전송할 위치를 기술할 수 있는 유연성을 제공한다.
이 가이드의 나머지 부분에서는 별다른 언급이 없는 한 논리 노드에 대한 부분을 설명한다.

신뢰성(Reliability)

신뢰성이란 장애가 나더라도 로그를 유실없이 전송할 수 있는 능력을 뜻하며, 프럼이 제공하는 핵심적인 특징이다.
대규모의 분산 시스템이라면 부분적인 장애가 나기 쉽상이다.
장비가 물리적으로 고장나거나, 네트워크 대역폭, 메모리 같은 자원이 꽉 차거나, 또는 소프트웨어가 다운되거나 느리게 실행될 수도 있다.
플럼은 fault-tolerance를 핵심 설계 원칙으로 내세우며, 여러 구성요소가 다운되더라도 로그를 계속해서 수집할 수 있다.

플럼은 에이저트 노드가 실행되는 상황이라면, 어떠한 경우에도 에이전트 노드가 수집한 로그를 결국에는 데이터 플로우의 종착역인 컬렉터 노드로 전송한다는 점을 보장한다.
즉, 데이터가 목표점에 도달한다는 사실을 보장한다.

하지만 이처럼 신뢰성을 보장하려면 자원이 많이 들며, 실제 필요로 하는 신뢰성은 이보다 더 낮은 경우가 많다.
따라서 플럼은 사용자가 데이터 플로우별로 신뢰 수준(level of reliability)을 설정할 수 있는 기능을 제공한다.
플럼에서는 세가지 종류의 신뢰도를 제공한다.

  • E2E(End-to-End)
  • SoF(Store on Failure)
  • BE(Best Effort)
E2E(End-to-End)

플럼에서 이벤트를 수집한 이상, 마지막 목적지까지 이벤트를 전송한다는 점을 보장한다. 단 이벤트를 수집하는 에이전트는 실행중이어야 한다.
이러한 신뢰도로 설정된 경우 에이전트는 머저 이벤트를 디스크의 write-ahead log(WAL)에 기록한다.
따라서 에이전트가 다운되어 재시작되더라도, 어디까지 이벤트를 전송했는지 알 수 있게 된다.
이벤트가 성공적으로 데이터 플로우의 끝지점까지 전송되면, 이벤트를 전송한 에이전트로 ack이 반송된다.
이를 통해 디스크에 기록된 이벤트가 더이상 전송할 필요가 없다는 사실을 판단한다.
이정도의 신뢰도를 설정하게 되면, 에이전트에 장애가 나는 대다수의 상황에도 끄떡없다.

SoF(Store on Failure)

SoF 레벨에서는 한 홉(hop)까지만의 ack을 요구한다.
따라서 이벤트를 전송하는 노드가 장애를 인지하게 되면, 데이터를 로컬 디스크에 기록해두고, 다음 노드가 복구될 때까지 이벤트를 저장하거나 또 다른 노드로 전송하게 된다.
이 레벨에서는 복합적인 장애가 발생하게 되면 데이터가 유실될 수도 있다.
SoF 레벨은 DFO(Disk Fail Over) 라고도 부른다.

BE(Best Effort)

BE 레벨에서는 다음 홉에 데이터를 전송하며, ack을 받거나 재시도하는 처리를 전혀 하지 않는다.
따라서 노드가 다운되면, 전송중이거나 전송받는 중이었던 데이터는 유실될 수 있다.
BE 레벨이 신뢰도가 가장 낮은 레벨이지만, 가장 가볍다.

확장성(Scalability)

노드 추가 및 제거가 용이하다.

운영가능성(Manageability)

운영이 간편하다.

확장성(Extensibility)

Decorator 등을 활용해서 새로운 기능을 쉽게 추가할 수 있다.

독립 모드로 Flume 설치하기

우분투 10.4 버전에 Flume을 설치해보도록 하자.
설치하는 방식에는 RPM, 데비안 패키지, tarball 등의 방식이 있다.
여기에서는 tarball을 직접 풀어서 설치하도록 하겠다.
하지만 실제 운영할 장비라면 다양한 기능이 지원되는 rpm이나 데비안 패지지를 통한 설치가 권장된다.

미리 설치해 둘 것들

  • Oracle 버전의 JRE 1.6 이상이 설치되어 있어야 한다. 여기에서는 32bit용 JRE를 설치했다.
    유닉스 계열의 경우 64bit 머신이라면 64bit용 JRE를 설치해도 괜찮은 것 같다.
    단 윈도우 계열의 경우 머신이 64bit더라도 32bit용 JRE를 설치할 것을 권장하고 있다.
    bq."We require using the 32-bit JVM even on 64 bit machines because of lower memory footprint."

설치하기

클라우데라 GitHub에서 Flume tarball을 다운로드한다.
여기에서는 현재 최신 버전인 flume-distribution-0.9.4-bin.tar.gz을 다운로드 했다.
원하는 위치에 tarball을 푼다. 여기에서는 flume/ 하위에 flume-distribution-0.9.4/로 압축을 풀었다.

실행하기

여기에서는 단일 머신에서 노드와 마스터를 모두 실행한 후, 제대로 실행되었는지 dump 명령어를 실행해서 확인한다.
먼저 마스터를 실행한다.

bin/flume master

마스터가 실행 된 후, 새로운 터미널을 열어서 노드를 실행한다.

bin/flume node_nowatch

노드가 실행되면, 새로운 터미널을 열어서 dump 명령어를 실행해서 확인해 본다.

bin/flume dump console

그러면 콘솔에 텍스트를 입력하고, 입력한 텍스트가 다시 콘솔에 나오는 것을 확인할 수 있다.

또는 웹 화면으로도 실행 중인 마스터와 노드의 정보를 확인할 수 있다.
마스터에 대한 웹은 http://localhost:35871로 접속한다. 노드에 대한 웹은 http://localhost:35862/로 접속한다.
하나의 머신에 여러 노드가 실행되고 있다면, 포트 번호를 1 증가시켜 접속해볼 수 있다.

의사 분산 모드로 Flume 설치하기

여기에서는 한 머신에서 의사 분산 모드로 Flume을 실행해서, sink와 source를 사용하는 방법을 알아본다.
Flume 설치 가이드에 따라서 Flume이 설치되어 있어야 한다.

실행하기

한 머신에서 마스터, 에이전트, 컬렉터를 실행한다. 먼저 마스터를 실행한다.

bin/flume master

마스터가 실행이 완료되면 Flume Master 웹에 접속할 수 있다.
아래 그림에서 보다시피 아직 노드를 실행하지 않았으므로, 모든 정보가 다 비어 있는 상태다.

이제 동일한 머신에서 에이전트를 실행한다. 여기에서 실행한다는 의미는 물리적 노드(physical node)를 실행한다는 뜻이다.
에이전트의 물리적인 노드 이름은 "agent_a"로 주자.

bin/flume node_nowatch -n agent_a

에이전트가 실행되면, 아래와 같이 노드가 추가되었음을 확인할 수 있다.

아직 source와 sink에 대한 설정을 하지 않았으므로, 상태는 IDLE다.
물리 노드를 실행하면, 물리 노드와 이름이 같은 논리노드가 디폴트로 실행된다.
이러한 논리노드를 기본 논리 노드(Primary Logical Node)라고 부른다.

마지막으로 컬렉터를 실행한다.
에이전트의 물리적인 노드 이름은 "collector_x"로 주자.

bin/flume node_nowatch -n collector_x

에이전트와 마찬가지로, Flume Master 웹 화면에서 컬렉터가 등록되었음을 확인할 수 있다.

설정하기

여기에서는 단순한 흐름을 따라 로그를 수집하려고 한다.
에이전트는 콘솔로부터 데이터를 입력받아, 컬렉터로 전송한다.
컬렉터는 에이전트로부터 전송받은 로그를 다시 콘솔로 출력한다.

노드에 대한 설정은 Flume Master 웹 화면의 "config" 메뉴에서 실행한다.
"Configure multiple nodes" 밑의 텍스트 영역에 아래의 명령어를 입력한 후, "Submit query' 버튼을 누른다.
agent_a : console | agentSink("localhost", 35853);
collector_x : collectorSource(35853) | console;

Flume Master 웹 화면의 메인으로 돌아오면 아래와 같이, Command history 영역에서 명령어가 성공했음을 알 수 있다.
또한, Node configuration에 보면 source와 sink가 제대로 설정되었음을 확인할 수 있다.
그리고 Node status 영역에서는 각 노드의 상태가 "ACTIVE" 상태로 바뀌었음을 볼 수 있다.

로그 생성 및 수집하기

에이전트 노드를 실행한 콘솔로 가서, 아무런 텍스트를 입력한 후 엔터키를 누른다.
그리고 나서 컬렉터 노드를 실행한 콘솔로 가 보면, 에이전트에서 전송한 로그가 콘솔에 출력됨을 확인할 수 있다.

결론

여기에서는 단일 노드에서 마스터 / 컬렉터 /에이전트 노드를 모두 실행했다.
실제 환경이라면 마스터는 별도의 마스터 서버에서, 컬렉터는 로그를 수집할 서버에, 에이전트는 로그를 생성할 서버(웹서버 등)에 설치해서 실행할 것이다.
여기에서는 기본 논리 노드에 sink | source 정보를 바로 설정했다.
이와는 달리, 설정 정보만을 담고 있는 별도의 논리 노드를 만든 후, 해당 논리 노드를 물리 노드에 매핑(map 명령어를 이용해서)하는 방식도 있다.

데코레이터 예제코드

메타데이터 추가 데코레이터

DataTypeTaggingDecorator.java


public class DataTypeTaggingDecorator <S extends EventSink> extends EventSinkDecorator<S> {
  static final Logger LOG = LoggerFactory.getLogger(DataTypeTaggingDecorator.class);
  private String type;
  private byte[] typeBytes;
  private String cid;
  private byte[] cidBytes;
  private String ip;
  private byte[] ipBytes;
  private String domain;
  private byte[] domainBytes;
  
  public DataTypeTaggingDecorator(S s, String... argv) {
    super(s);
    
    this.type = argv[0];
    this.typeBytes = type.getBytes();
    this.cid = argv[1];
    this.cidBytes = cid.getBytes();
    this.ip = argv[2];
    this.ipBytes = ip.getBytes();
    if( argv.length > 3 ) {
    	domain = argv[3];
    	domainBytes = domain.getBytes();
    }
  }
  
  @Override
  public void append(Event e) throws IOException, InterruptedException {
    e.set("type", typeBytes);
    e.set("cid", cidBytes);
    e.set("aip", ipBytes);
    e.set("domain", domainBytes);
//    e.set("seq", (ip+"_"+(seqNum++)).getBytes());
    super.append(e);
  }
  
  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        if (argv.length < 3) {
        	throw new IllegalArgumentException("usage: dataTypeTaggingDeco(<type name>,<cid>,<aip>[,domain] )");
        }
        return new DataTypeTaggingDecorator<EventSink>(null, argv);
      }
    };
  }
  
  public static List<Pair<String, SinkFactory.SinkDecoBuilder>> getDecoratorBuilders() {
    return Arrays.asList(new Pair<String, SinkFactory.SinkDecoBuilder>("dataTypeTaggingDeco", builder()));
  }

}

아래는 위의 데코레이터를 사용해서 설정하는 명령어 샘플이다.

인코딩 변환 데코레이터

EncodingConvertDecorator.java


package com.samsung.sds.beaver.flumeplugin.decorator;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;

public class EncodingConvertDecorator <S extends EventSink> extends EventSinkDecorator<S> {
  static final Logger LOG = LoggerFactory.getLogger(EncodingConvertDecorator.class);
  private String sourceEnc;
  private byte[] sourceEncBytes;
  private String targetEnc;
  private byte[] targetEncBytes;
  
  public EncodingConvertDecorator(S s, String... argv) {
    super(s);
    
    this.sourceEnc = argv[0];
    this.sourceEncBytes = sourceEnc.getBytes();
    this.targetEnc = "UTF-8";
    if( argv.length > 1 ) {
    	this.targetEnc = argv[1];
    }
    this.targetEncBytes = targetEnc.getBytes();
  }
  
  @Override
  public void append(Event e) throws IOException, InterruptedException {
  	e.set("sourceEnc", sourceEncBytes);
    e.set("targetEnc", targetEncBytes);
    e = new EventImpl((new String(e.getBody(),sourceEnc)).getBytes(targetEnc), e.getTimestamp()
    																							, e.getPriority(), e.getNanos(), e.getHost(), e.getAttrs());
    super.append(e);
  }
  
  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        if (argv.length < 1) {
        	throw new IllegalArgumentException("usage: encCvtDeco(sourceEncoding[, targetEncoding=\"UTF-8\"]))");
        }
        return new EncodingConvertDecorator<EventSink>(null, argv);
      }
    };
  }
  
  public static List<Pair<String, SinkFactory.SinkDecoBuilder>> getDecoratorBuilders() {
    return Arrays.asList(new Pair<String, SinkFactory.SinkDecoBuilder>("encCvtDeco", builder()));
  }

}


Flume 에이전트 성능 측정 결과

첨부 PDF 참조