Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added dev_ar/bigdata/hadoop/hadoop-streaming-2.7.1.jar
Binary file not shown.
42 changes: 42 additions & 0 deletions dev_ar/bigdata/hadoop/hadoop_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/python
# -*-coding:utf-8 -*

import sys
import xml.etree.ElementTree as ET
from datetime import datetime

def _mapper_task_3(item):
"""
Aux function that returns single tuple with id and timedelta using CreationDate and LastActivityDate
For using in reduce optimized method
Args:
item (dict from root object): dictionary get from getroot method of and xlm file
Returns:
tuple: tuple with Id and Timedelta set in days
"""
if item.get('PostTypeId') == '1':
aux_delta = datetime.fromisoformat(item.get('LastActivityDate')) - datetime.fromisoformat(item.get('CreationDate'))
aux_tuple = (item.get('Id'), aux_delta.days)
return aux_tuple

def _mapper_task_2(item):
"""
Aux Function that returns a sinle tuple with answers and score
Args:
item (dict): dictionary get from getroot method of and xlm file

Returns:
tuple: tuple with answers and scores
"""
answers = 0 if item.get('AnswerCount') is None else int(item.get('AnswerCount', 0))
scores = int(item.get('Score', 0))
return (answers, scores)

tree = ET.parse(sys.stdin)
root = tree.getroot()

for node in root:
printable = _mapper_task_2(node)
if printable is not None:
# print('{} {}'.format(printable[0], printable[1]))
print(str(printable[0]) + ' ' + str(printable[1]))
24 changes: 24 additions & 0 deletions dev_ar/bigdata/hadoop/hadoop_reducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/python
# -*-coding:utf-8 -*

import sys
import re
# from operator import itemgetter

aux_list = []
num = 0
den = 0
for key_value in sys.stdin:
aux = (re.findall(r'(\d+)', key_value))
num += int(aux[0])
den += int(aux[1])
# aux_list.append(key_value.split(' '))

# aux_list.sort(key = itemgetter(1), reverse = True)
# print(aux_list[0:10])

# aux_list = [(float(e[0]), float(e[0])) for e in aux_list]
# result = list(zip(*aux_list))
# print(sum(result[0]) / sum(result[1]))
# print(str(result[0]) + ' ' + str(result[1]))
print(float(num) / float(den))
37 changes: 37 additions & 0 deletions dev_ar/bigdata/hadoop/paso_a_paso.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
***************
Paso a paso Hadoop

[Bajar y correr la imagen de Hadoop en Docker]
docker pull sequenceiq/hadoop-docker:2.7.1
docker run -it sequenceiq/hadoop-docker:2.7.1 /etc/bootstrap.sh -bash

[Desde el bash dentro del container de docker de hadoop, crear carpeta]
mkdir extras
cd extras/
docker ps [para conocer el CONTAINER ID]

[Desde otra terminal copiar los archivos]
docker cp hadoop_mapper.py [CONTAINER ID]:extras/hadoop_mapper.py
docker cp hadoop_reducer.py [CONTAINER ID]:extras/hadoop_reducer.py
docker cp posts.xml [CONTAINER ID]:extras/posts.xml
docker cp hadoop-streaming-2.7.1.jar [CONTAINER ID]:extras/hadoop-streaming-2.7.1.jar
[Se puede descargar desde https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-streaming/2.7.1/]

[Desde la terminal del containter docker hadoop copiar los archivos al sistema de archivos distribuidos]
/usr/local/hadoop/bin/hadoop fs -mkdir -p /extras/in
/usr/local/hadoop/bin/hdfs dfs -put ./* /extras/in

[Cambiar los permisos]
chmod a+x scripts/mapper.py

[Por último probar distintas posibilidades con los archivos.py]
/usr/local/hadoop/bin/hadoop jar hadoop-streaming-2.7.1.jar -mapper hadoop_mapper.py -reducer hadoop_reducer.py -input /extras/in/posts.xml -output extras/in/output_001

/usr/local/hadoop/bin/hadoop jar hadoop-streaming-2.7.1.jar -mapper extras/in/hadoop_mapper.py -reducer extras/in/hadoop_reducer.py -input /extras/in/posts.xml -output extras/in/output_002

/usr/local/hadoop/bin/hadoop jar hadoop-streaming-2.7.1.jar -mapper 'python hadoop_mapper.py' -reducer 'python hadoop_reducer.py' -input /extras/in/posts.xml -output extras/in/output_003

/usr/local/hadoop/bin/hadoop jar hadoop-streaming-2.7.1.jar -files hadoop_mapper.py,hadoop_reducer.py -mapper 'python hadoop_mapper.py' -reducer 'python hadoop_reducer.py' -input /extras/in/posts.xml -output extras/in/output_004



56,977 changes: 56,977 additions & 0 deletions dev_ar/bigdata/hadoop/posts.xml

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions dev_ar/bigdata/hadoop_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/python3
# -*-coding:utf-8 -*

import sys
import xml.etree.ElementTree as ET
from datetime import datetime

def _mapper_task_3(item):
"""
Aux function that returns single tuple with id and timedelta using CreationDate and LastActivityDate
For using in reduce optimized method
Args:
item (dict from root object): dictionary get from getroot method of and xlm file
Returns:
tuple: tuple with Id and Timedelta set in days
"""
if item.get('PostTypeId') == '1':
aux_delta = datetime.fromisoformat(item.get('LastActivityDate')) - datetime.fromisoformat(item.get('CreationDate'))
aux_tuple = (item.get('Id'), aux_delta.days)
return aux_tuple

tree = ET.parse(sys.stdin)
root = tree.getroot()

for node in root:
printable = _mapper_task_3(node)
if printable is not None:
print('{} {}'.format(printable[0], printable[1]))
14 changes: 14 additions & 0 deletions dev_ar/bigdata/hadoop_reducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/python3
# -*-coding:utf-8 -*

import sys
import re
from operator import itemgetter

aux_list = []
for key_value in sys.stdin:
aux_list.append(re.findall(r'(\d+)', key_value))
# key_value.split(' '))

aux_list.sort(key = itemgetter(1), reverse = True)
print(aux_list[0:10])
4 changes: 2 additions & 2 deletions dev_ar/bigdata/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def _mapper_task_3(item):
tuple: tuple with Id and Timedelta set in days
"""
if item.get('PostTypeId') == '1':
aux_delta = datetime.fromisoformat(item['LastActivityDate']) - datetime.fromisoformat(item['CreationDate'])
aux_tuple = (item['Id'], aux_delta.days)
aux_delta = datetime.fromisoformat(item.get('LastActivityDate')) - datetime.fromisoformat(item.get('CreationDate'))
aux_tuple = (item.get('Id'), aux_delta.days)
return aux_tuple

def reducer_task_3(
Expand Down
117 changes: 117 additions & 0 deletions dev_ar/testing/testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Script for testing Walter MapReduce Functions from B Group
# Tickets
# Top 10 Tag without accepted answers
# Word count vs views ratio
# Average Score vs most favourites ratio

# Modules
import pytest
import wl_map_reduce
import pandas as pd

# Testing Class for later Inheritance
class Test_MapReduce:
"""
Class with testing methods and parameters for wl_map_reduce.py mapreduce functions
Uses parametrize decorators for looping testing functions
Focus on type, length and just one result testing
"""
# Class Parameters
# XML File Path
xml_file_path = '../bigdata/raw_data/posts.xml'
# Testing Call Functions Results
# Mapper Function call and definition
mapper_result = wl_map_reduce.mapper(xml_file_path)
# Shuffle Function call and definition
shuffle_sort_result = wl_map_reduce.shuffle_sort(mapper_result)
# Reduce Function call and definition
reduce_result = wl_map_reduce.reduce(shuffle_sort_result)

# General testing functions
# Parametrize results and types
# To be used on above function calls
@pytest.mark.parametrize(
'result, idx, types', [
# Tests for wl_map_reduce.mapper function results
(mapper_result, 0, list),
(mapper_result, 1, list),
(mapper_result, 2, list),
# Tests for wl_map_reduce.shuffle_sort function results
(shuffle_sort_result, 0, list),
(shuffle_sort_result, 1, list),
(shuffle_sort_result, 2, list),
# Tests for wl_map_reduce.reduce function results
(reduce_result, 0, list),
(reduce_result, 1, list),
(reduce_result, 2, pd.Series)
]
)

# Function for testing types
def testing_general_types(self, result, idx, types):
"""
Tests general types from results of the mapreduce functions from wl_map_reduce script.
Use parametrized args from decorator pytests method
Args:
result (to be checked): function call result
idx (int): index of list returned by function
types (type): type of the function result
"""
assert isinstance(result[idx], types)

# Parameters for three main functions type results
# Parametrizes results of all three mapreduce functions. Should be lists of lists.
@pytest.mark.parametrize(
'result', [
mapper_result, shuffle_sort_result, reduce_result
]
)

# Function for types
def test_types(self, result):
"""
Function for testing basic mapreduce functions that returns list of lists
Args:
result (list): list of lists with mapreduce results
"""
assert isinstance(result, list)

# Parameters for lenght results
# Parametrize lenght result of mapreduce functions. Should all be on lenght 3
@pytest.mark.parametrize(
'result, length', [
(mapper_result, 3),
(shuffle_sort_result, 3),
(reduce_result, 3)
]
)

# Function for testing lenghts
def testing_general_length(self, result, length):
"""
Function for testing lenghts of list results of mapreduce basic call
Args:
result (list): Each mapreduce function call result
length (int): Lenght of each list, should be 3 in all cases
"""
assert len(result) == length

# Parameter for testing the actual result in one function
# Checked result for one of the final results
# Index 0 for first ticker result in list of lists, as function was seted.
# 1 or 2 ticket results can be added as 1 or 2 index in the form of "(0, [RESULT])"" tuple as below parametrized marks
@pytest.mark.parametrize(
'idx2, results', [
(0, [('discussion', 2916), ('feature-request', 2815), ('bug', 1396), ('support', 1261), ('stackoverflow', 848), ('status-completed', 647), ('tags', 524), ('reputation', 427), ('area51', 372), ('questions', 354)])
]
)

# Result testing function
def testing_general_results(self, idx2, results):
"""
Function for testing final result of one of the tickets. Others results are too long for testing here.
Args:
idx2 (int): Index of mapreduce function list result
results (list of tuples): Actual and Cheched result
"""
assert wl_map_reduce.reduce(wl_map_reduce.shuffle_sort(wl_map_reduce.mapper(self.xml_file_path)))[idx2] == results
Loading