gimmesilver's blog

Agbird.egloos.com

포토로그



In Mapper Combiner 프로그래밍

 MapReduce 프로그램에서는 mapper에서 reducer로 전달되는 데이터 양을 줄이는 것이 매우 중요한 성능 최적화 요소이다. 그리고 이를 위해 하둡에서는 Combiner라는 인터페이스를 제공한다. Combiner는 쉽게 말해 local reducer 이다. 즉, 각 mapper 마다 자신이 처리한 데이터에 대해서 reducer 작업을 수행한 후 그 결과만 reducer 에 넘김으로써 전달할 양을 줄이는 것이다. 가령 reducer에서 처리할 일이 합산(sum) 이라면 mapper마다 자신이 처리한 데이터에 대해서 먼저 key 별 합산을 한 결과만 넘겨 주는 것이다.

 그렇다고 combiner를 아무 때나 쓸 수 있는 것은 아니고 sum, min, max 같은 추이적인(transitive) 작업에서는 이 combiner를 쓸 수 있지만 join 같은 연산에서는 사용할 수 없다. 심지어 counting 의 경우에도 combiner에서 couting을 하게 되면 reducer는 sum 작업을 해야 한다.

 어쨌든 이런 저런 주의가 필요하긴 하지만 쓸 수만 있다면 combiner는 아주 좋은 최적화 기법이다. 그런데 Data-Intensive Text Processing with MapReduce(http://www.morganclaypool.com/doi/abs/10.2200/S00274ED1V01Y201006HLT007?journalCode=hlt) 라는 책을 보면 In-Mapper Combiner 라는 기법이 나온다. 간단히 설명하자면 mapper에서 키별로 reducing 결과를 계속 메모리에 누적해서 가지고 있다가 mapper가 종료되는 순간(혹은 버퍼가 꽉차면) 계산한 결과를 한꺼번에 결과로 쓰는 방법이다. 코드로 설명하자면 아래와 같다.
class Mapper {
buffer

init() {
buffer = HashMap.new
}

map(key, data) {
elements = process(data)
for each element {
....
check_and_put(buffer, k2, v2)
}
}

check_and_put(buffer, k2, v2) {
if buffer.full {
for each k2 in buffer.keys {
emit(k2, buffer[k2])
}
}
}

close() {
for each k2 in buffer.keys {
emit(k2, buffer[k2])
}
}
}

 원래 Combiner는 Mapper에서 결과를 로컬 디스크에 정렬해서 쓰면 그 데이터를 읽어서 처리하는 방식인데 이 In-Mapper Combiner를 사용하면 디스크에 쓰고/읽는 작업을 생략할 수 있어서 더 빠르게 combining 을 수행할 수 있다. 이 기법을 주장한 저자의 벤치 마크 결과는 아래와 같다. 


 위 그래프에서 IMC 라고 되어 있는 게 In-Mapper Combiner 를 사용한 결과이다. 

 내가 이 내용을 접했을 때 떠오른 생각은 이 기법을 Cascading에 적용해 보자는 것이었다. Cascading은 하둡에서 복잡한 MapReduce 작업을 할 때 사용하는 라이브러리이다. mapper와 reducer 라는 개념 대신에 pipe 라는 것을 사용해서 여러 개의 map/reduce 작업을 pipe로 연결해서 하나의 프로그램으로 표현하며 데이터도 key/value 쌍이 아닌 좀 더 복잡한 스키마의 데이터를 처리할 수 있도록 tuple 단위로 관리한다. 때문에 여러 단계의 복잡한 작업을 프로그래밍할 때 편리하다.

 그런데 Cascading이 가진 큰 단점 중에 하나가 Combiner를 지원하지 않는다는 것이다. 메일링 리스트에서 밝힌 cascading 개발자의 설명에 의하면 기존의 map/reduce 모델을 변형하다보니 combiner를 명세하기가 까다로워졌기 때문이다. 어쨌든 지금까지 이점이 다소 아쉬웠는데 in mapper combiner 기법을 이용하면 combiner 역할을 하는 general map function 을 만들 수 있다는 생각이 들었고 그래서 Aggregator(Cascading에서 사용하는 reducer용 interface) 를 인자로 받아 combining을 해주는 Function(Cascading에서 사용하는 mapper용 interface) 을 하나 만들었다.

 기존에 만들어 놓은 Cascading job 중 적절한 것 몇 개에 적용해서 테스트를 해보니 확실히 네트워크 I/O 도 수십 분의 1로 줄었고 속도도 몇 배 빨라졌다. 그래서 '야 좋다~ 테스크 코드 추가해서 contribution 이나 할까'  뭐 이러고 있는데 며칠 전 Cascading 홈페이지에 공지(http://www.cascading.org/2010/12/cascading-12-now-available.html)가 하나 떴다.

 여기서 'Composable map-side partial aggregations (AggregateBy)' 라는 부분이 눈에 띄는데 내용인 즉슨 in mapper combiner 가 기능에 추가되었다는 뜻이다. 

제길슨...

덧글

  • 처로 2010/12/06 11:28 # 삭제 답글

    Hive에도 map-side aggregation이 있어서 왠만한 transitive operation을 깔끔하게 처리할 수 있어요. 다만 key들의 분포에 따라 mapper가 필요한 메모리 양이 달라서 가끔 memory 부족 에러를 내긴 하는데.. (물론 runtime option으로 조정 가능)

    확실히 MR이 hadoop에 의해 대중화되어 여기저기 쓰이면서 구글 논문에 나온 전형적인 map/reduce task가 아닌 다양한 task 패턴들이 요구되었고 요즘은 그 패턴들이 구현되어서 쓸만한 시점이 되가는 것 같아요. 패턴에 따라 아예 분산처리 framework를 새로 짜는 거도 방법이 되겠으나, 비용대비 효과를 생각해봐야 할 정도로 hadoop 기반 프로젝트들이 많이 생기는 요즘이라 이래저래 고민이 되지 않을 수 없는 시기임 ㅠ
댓글 입력 영역