Python & Hadoop - пример
Primary tabs
ПРИМЕЧАНИЕ: В пример ниже хадуп установлен в папку: /usr/local/hadoop
Итак - можно использовать разные языки программирования - и конечно же java, но тем не менее оказывается интересным попробовать что-то "скриптовое" и "менее зависимое".
Как вы уже угадали - сейчас мы рассмотрим простой пример использвания Python в Hadoop.
Для начала создадим несколько файлов - например в директории:
/home/hduser/python/wordcount/
1) mapper.py
#!/usr/bin/env python
import sys
# читаем из стандартного входа
for line in sys.stdin: # для каждой посткпающейе строки
# удаляем пробелы в начале и конце строки
line = line.strip()
# разбиваем строчку на слова
words = line.split()
# наращиваем счётчики
for word in words:
# пишем результат в стандартный поток вывода
# - выход мэппера будем входом
# редуктора reducer.py
#
# разделям табом слово и назначаем ему число вхождений 1
print '%s\t%s' % (word, 1)
2) reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# удаляем проблемы в начале и конце строчки
line = line.strip()
# разбиваем каждую по символу таба
# чтобы получить ключ и значение (число вхождений)
word, count = line.split('\t', 1)
# пытаемся перевести строку в число (число вхождений)
try:
count = int(count)
except ValueError:
# если перевести не получилось
# то просто игнорируем эту строку и
continue # продолжаем выполнение
# Следующий блок отработает только потому,
# что хадуп сначала отсортирует значения по ключу
# а только потом пошлёт их нашему редуктору
if current_word == word:
current_count += count
else:
if current_word:
# записывает результат в стандартный поток вывода
# опять же разделяя значения табом.
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# не забудем напечатать и последнее слово (если оно есть)
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Сделаем оба эти файла исполняемыми.
И протестируем их:
echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py
мы должны получить ответ в виде:
foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1
Теперь выполним такую команду (одна строка):
echo "foo foo quux labs foo bar quux" | /home/hduser/python/wordcount/mapper.py | sort -k1,1 | /home/hduser/python/wordcount/reducer.py
В примере выше мы перенаправляем потоки ввода-вывода - и после выхода мэппера сортируем строки. а затем уже отправляем их редуктору.
Получим ответ:
bar 1 foo 3 labs 1 quux 2
Можно протестировать код и на каком-нибудь файле - выполнив командной строке следующую команду с указанием пути к файлу (я буду использовать те файлы, что остались у меня от примера на яве) :
cat /home/hduser/javacode/wordcount/input/text1 | /home/hduser/python/wordcount/mapper.py
вывод тот же -список слов, рядом с каждым - единичка.
Базовое тестирование нашего примеры мы завершили =)
Запуск Питона под управлением Hadoop
Прежде чем запускаться следует добавить файлы в HDFS - ну и конечно - вообще установить хадуп.
Теперь мы попробуем запустить наш код используя уже имеющийся в релизе хадупа 2.2.0 jar файл (далее одна строка) :
bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/hduser/python/wordcount/mapper.py -mapper "python /home/hduser/python/wordcount/mapper.py" -file /home/hduser/python/wordcount/reducer.py -reducer "python /home/hduser/python/wordcount/reducer.py" -input /user/hduser/myinput/* -output /user/hduser/pyoutput
ну и заберём результат в локальную папку:
hadoop fs -copyToLocal /user/hduser/pyoutput /home/hduser/python/wordcount/out
Полюбуемся результатами и усовершенствуем код мэппера и редуктора таким образом.
Источники:
- Log in to post comments
- 11330 reads