Python & Hadoop - пример
Primary tabs
ПРИМЕЧАНИЕ: В пример ниже хадуп установлен в папку: /usr/local/hadoop
Итак - можно использовать разные языки программирования - и конечно же java, но тем не менее оказывается интересным попробовать что-то "скриптовое" и "менее зависимое".
Как вы уже угадали - сейчас мы рассмотрим простой пример использвания Python в Hadoop.
Для начала создадим несколько файлов - например в директории:
/home/hduser/python/wordcount/
1) mapper.py
#!/usr/bin/env python import sys # читаем из стандартного входа for line in sys.stdin: # для каждой посткпающейе строки # удаляем пробелы в начале и конце строки line = line.strip() # разбиваем строчку на слова words = line.split() # наращиваем счётчики for word in words: # пишем результат в стандартный поток вывода # - выход мэппера будем входом # редуктора reducer.py # # разделям табом слово и назначаем ему число вхождений 1 print '%s\t%s' % (word, 1)
2) reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # удаляем проблемы в начале и конце строчки line = line.strip() # разбиваем каждую по символу таба # чтобы получить ключ и значение (число вхождений) word, count = line.split('\t', 1) # пытаемся перевести строку в число (число вхождений) try: count = int(count) except ValueError: # если перевести не получилось # то просто игнорируем эту строку и continue # продолжаем выполнение # Следующий блок отработает только потому, # что хадуп сначала отсортирует значения по ключу # а только потом пошлёт их нашему редуктору if current_word == word: current_count += count else: if current_word: # записывает результат в стандартный поток вывода # опять же разделяя значения табом. print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # не забудем напечатать и последнее слово (если оно есть) if current_word == word: print '%s\t%s' % (current_word, current_count)
Сделаем оба эти файла исполняемыми.
И протестируем их:
echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py
мы должны получить ответ в виде:
foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1
Теперь выполним такую команду (одна строка):
echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py | sort -k1,1 | /home/hduser/python/wordcount/reducer.py
В примере выше мы перенаправляем потоки ввода-вывода - и после выхода мэппера сортируем строки. а затем уже отправляем их редуктору.
Получим ответ:
bar 1 foo 3 labs 1 quux 2
Можно протестировать код и на каком-нибудь файле - выполнив командной строке следующую команду с указанием пути к файлу (я буду использовать те файлы, что остались у меня от примера на яве) :
cat /home/hduser/javacode/wordcount/input/text1 | /home/hduser/python/wordcount/mapper.py
вывод тот же -список слов, рядом с каждым - единичка.
Базовое тестирование нашего примеры мы завершили =)
Запуск Питона под управлением Hadoop
Прежде чем запускаться следует добавить файлы в HDFS - ну и конечно - вообще установить хадуп.
Теперь мы попробуем запустить наш код используя уже имеющийся в релизе хадупа 2.2.0 jar файл (далее одна строка) :
bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/hduser/python/wordcount/mapper.py -mapper "python /home/hduser/python/wordcount/mapper.py" -file /home/hduser/python/wordcount/reducer.py -reducer "python /home/hduser/python/wordcount/reducer.py" -input /user/hduser/myinput/* -output /user/hduser/pyoutput
ну и заберём результат в локальную папку:
hadoop fs -copyToLocal /user/hduser/pyoutput /home/hduser/python/wordcount/out
Полюбуемся результатами и усовершенствуем код мэппера и редуктора таким образом.
Источники:
- Log in to post comments
- 10357 reads