스파크는 잘 짜면 빠르고 못 짜면 느리다. 맥주가 미지근하면 얼음을 넣으세연~
데이터는 이렇다.
(u, [s, f])
그런데 s는 set으로 합칠 것이고 f는 list로 합칠 것이다. 게다가 f는 이미 리스트이다. 예를 들어 내가 돌아다니며 한 짓을 다 때려넣었다고 치자.
(신진철, [영통, [술, 도박, 게임]])
자, 이제 원하는 결론은 이렇다.
(신진철, [set(['영통', '판교']), [술, 당구, 스타크, 술, 당구, 스타크, 술, 술, 술])
나중에 애그리게이션 퉁치면 이런 결론을 얻는다. 신진철은 영통과 판교를 찍고 다니고 술, 당구, 스타크 순으로 시간을 소모한다.
combineByKey를 쓰면 이렇다. 영어 매뉴얼이나 스칼라 예제를 보면 잘 안 와닿는다.
combineByKey(
원하는 결과의 스키마 초기화,
키를 처음 만났을 때 초기화 데이터에 더하기,
키끼리 모을때 밸류를 어떻게 애그리게이션 할래)
그러면 함수는 이렇게 된다.
...
return (uid, [seg, fs]) # 매퍼의 리턴형이다.
def d1(a):
return [set([a[0]]), a[1]]
def d2(a, b):
a[0].add(b[0]) # value [seg, fs]가 들어오니까 add를 쓴다.
a[1].extend(b[1])
return a
def d3(a, b):
a[0] |= b[0] # [set(seg), list(fs)] 가 들어오니까 |를 쓴다.
a[1].extend(b[1])
return a
if 1:
data_src0 = sc.textFile('lal2_multi_train')\
.map(map_f1).combineByKey(d1, d2, d3)
...
하.. 이제 이해가 좀 가네..