Python & Hadoop - пример

ПРИМЕЧАНИЕ: В пример ниже хадуп установлен в папку: /usr/local/hadoop

Итак - можно использовать разные языки программирования - и конечно же java, но тем не менее оказывается интересным попробовать что-то "скриптовое" и "менее зависимое".

Как вы уже угадали - сейчас мы рассмотрим простой пример использвания Python в Hadoop.

Для начала создадим несколько файлов - например в директории:

/home/hduser/python/wordcount/

1) mapper.py

#!/usr/bin/env python

import sys

# читаем из стандартного входа
for line in sys.stdin: # для каждой посткпающейе строки
    # удаляем пробелы в начале и конце строки
    line = line.strip()
    # разбиваем строчку на слова
    words = line.split()
    # наращиваем счётчики
    for word in words:
        #  пишем результат в стандартный поток вывода
        # - выход мэппера будем входом
        #   редуктора reducer.py 
        #
        # разделям табом слово и назначаем ему число вхождений 1
        print '%s\t%s' % (word, 1)

2) reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # удаляем проблемы в начале и конце строчки
    line = line.strip()

    # разбиваем каждую по символу таба
    # чтобы получить ключ и значение (число вхождений)
    word, count = line.split('\t', 1)

    # пытаемся перевести строку в число (число вхождений)
    try:
        count = int(count)
    except ValueError:
        # если перевести не получилось
        # то просто игнорируем эту строку и
        continue # продолжаем выполнение

# Следующий блок отработает только потому, 
# что хадуп  сначала отсортирует значения по ключу
#  а только потом пошлёт их нашему редуктору 
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # записывает результат в стандартный поток вывода
            # опять же разделяя значения табом.
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# не забудем напечатать и последнее слово (если оно есть)
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Сделаем оба эти файла исполняемыми.

И протестируем их:

echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py

мы должны получить ответ в виде:

foo	1
foo	1
quux	1
labs	1
foo	1
bar	1
quux	1

Теперь выполним такую команду (одна строка):

echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py | sort -k1,1 | /home/hduser/python/wordcount/reducer.py

В примере выше мы перенаправляем потоки ввода-вывода - и после выхода мэппера сортируем строки. а затем уже отправляем их редуктору.
Получим ответ:

bar	1
foo	3
labs	1
quux	2

Можно протестировать код и на каком-нибудь файле - выполнив командной строке следующую команду с указанием пути к файлу (я буду использовать те файлы, что остались у меня от примера на яве) :

cat /home/hduser/javacode/wordcount/input/text1 |  /home/hduser/python/wordcount/mapper.py

вывод тот же -список слов, рядом с каждым - единичка.
Базовое тестирование нашего примеры мы завершили =)

Запуск Питона под управлением Hadoop

Прежде чем запускаться следует добавить файлы в HDFS - ну и конечно - вообще установить хадуп.

Теперь мы попробуем запустить наш код используя уже имеющийся в релизе хадупа 2.2.0 jar файл (далее одна строка) :

bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/hduser/python/wordcount/mapper.py -mapper "python /home/hduser/python/wordcount/mapper.py" -file /home/hduser/python/wordcount/reducer.py -reducer "python /home/hduser/python/wordcount/reducer.py" -input /user/hduser/myinput/* -output /user/hduser/pyoutput

ну и заберём результат в локальную папку:

hadoop fs -copyToLocal /user/hduser/pyoutput  /home/hduser/python/wordcount/out

Полюбуемся результатами и усовершенствуем код мэппера и редуктора таким образом.

Источники:

Key Words for FKN + antitotal forum (CS VSU):