Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 0 additions & 58 deletions dev_ar/dags/functions/dag_etl_utn_untref.py

This file was deleted.

22 changes: 0 additions & 22 deletions dev_ar/dags/scripts/uni_tres_de_febrero.sql

This file was deleted.

22 changes: 0 additions & 22 deletions dev_ar/dags/scripts/uni_utn.sql

This file was deleted.

1 change: 0 additions & 1 deletion dev_bjm/dags/dag_universidades_p.py

This file was deleted.

Empty file removed dev_bjm/dags/scripts/file.txt
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
101 changes: 101 additions & 0 deletions dev_jb/big_data/testing/mapper_rf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import xml.etree.ElementTree as ET
import datetime

def mapper():
"""Function used to map the 3 required tasks:
- 1 - Top 10 posts views
- 2 - Top 10 words in tags
- 3 - Score and answer time
Returns
-------
post_views : list
List of dicts. Each dict has key 'Id' and 'ViewCount'.
mapped_tags : list
List of dicts. Each dict has as key and tag and a value 1.
score_answertime : list
List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours.
"""

def get_answer_dict(root):
"""Function to get the answers ids with the creation dates
Parameters
----------
root : object
Element Tree root object
Returns
-------
answer_dict : dict
Dictionary that has as key the answer_id and as value the creation_date
"""

# Initialize variable
answer_dict = {}

# Loop into each row to get the answer_id and creation_date
for child in root:
dict = child.attrib

# PostTypeId == 2 means that it is an answer. PostTypeId == 1 means is a question.
if dict["PostTypeId"] == "2":
answer_dict[dict["Id"]] = dict["CreationDate"]

return answer_dict

# ./112010 Meta Stack Overflow/posts.xml
# Load and parse the posts.xml file
tree = ET.parse(r"D:\Alkemy\dev_jb\big_data\112010 Stack Overflow\posts.xml")
# Get the root of the xml
root = tree.getroot()

# Initialize variables
post_views = []
mapped_tags = []
score_answertime = []

# Get the answer dict. key=answer_id, value=CreationDate
answer_dict = get_answer_dict(root)

# Loop into each row element
for child in root:
# Get the attributes of each row element
dict = child.attrib

# 1 - Top 10 posts views
# Append to the list the post_id and the view_count of each post
post_views.append({"Id": dict["Id"], "ViewCount": int(dict["ViewCount"])})

# 2 - Top 10 words in tags
# If the post has a tag replace the <> and split to get the different words.
try:
tags = dict["Tags"].replace("<", " ").replace(">", " ").strip().split()
# Map each individual tag
for tag in tags:
mapped_tags.append({tag: 1})
except:
# If the post hasn't a tag then continue
continue

# 3 - Score and answer time
# If the post is a question
if dict["PostTypeId"] == "1":
# Get question score and creation_time
post_score = int(dict["Score"])
post_creation_time = datetime.datetime.fromisoformat(dict["CreationDate"])
try:
# Some posts haven't an accepted answer, so they will be skipped as they do not have an AcceptedAnswerId

# Get the accepted_answer_id
accepted_answer_id = dict["AcceptedAnswerId"]

# With the accepted_answer_id go to the answer_dict and take the creation_date value and transform it to datetime
accepted_answer_time = datetime.datetime.fromisoformat(answer_dict[accepted_answer_id])

# Calculate response time from question creation to accepted answer creation (in hours)
response_time = round((accepted_answer_time - post_creation_time).seconds / 3600, 2)

# Append the score and response time to a list of dicts
score_answertime.append({"Score": post_score, "ResponseTime": response_time})
except:
continue

return post_views, mapped_tags, score_answertime
84 changes: 84 additions & 0 deletions dev_jb/big_data/testing/mapreduce_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Modulos #
import pytest
from mapper_rf import mapper
from reduce_rf import reducer
"""
Testing mapreduce de archivo .xml realizado por Rodrigo Fuentes
"""
# Se genera las varaibles a testear
post_views, mapped_tags, score_answertime = mapper()
top10_post_views, tags_reduced, average_answer_time = reducer(post_views, mapped_tags, score_answertime)

"""
Verifica la cantiadad de registros mapeados por la funcion mapper para el top 10 post mas vistos

Resultado: passed
"""
def test_mapreduce_post_views(
):
assert len(post_views) == 3675849

"""
Verifica la cantiadad de registros mapeados por la funcion mapper para el top 10 palabras mas nombradas en los post por tag

Resultado: passed
"""
def test_mapreduce_mapped_tags(
):
assert len(mapped_tags) == 3044145

"""
Verifica la cantiadad de registros mapeados por la funcion mapper para el tiempo de respuestas con score entre 200 y 300 puntos

Resultado: passed
"""
def test_mapreduce_score_answertime(
):
assert len(score_answertime) == 652089

"""
Verifica si el output del reducer coincide con lo esperado para el top 10 post mas vistos

Resultado: passed
"""
def test_mapreduce_top10_post_view(
):
assert top10_post_views == [{'Id': '4', 'ViewCount': 5534},
{'Id': '6', 'ViewCount': 1175},
{'Id': '7', 'ViewCount': 0},
{'Id': '8', 'ViewCount': 736},
{'Id': '9', 'ViewCount': 24779},
{'Id': '11', 'ViewCount': 11628},
{'Id': '12', 'ViewCount': 0},
{'Id': '13', 'ViewCount': 8601},
{'Id': '14', 'ViewCount': 9906},
{'Id': '16', 'ViewCount': 9390}]


"""
Verifica si el output del reducer coincide con lo esperado para el top 10 palabras mas nombradas en los post por tag

Resultado: passed
"""
def test_mapreduce_tags(
):
assert tags_reduced == [('c#', 118980),
('java', 75037),
('php', 65739),
('javascript', 57210),
('.net', 56348),
('iphone', 54736),
('asp.net', 52655),
('jquery', 47927),
('c++', 45909),
('python', 38083)]

"""
Verifica si el output del reducer coincide con lo esperado para el tiempo de respuestas con score entre 200 y 300 puntos

Resultado: passed
"""
def test_mapreduce_average_answer_time(
):
assert average_answer_time == 4.64395

51 changes: 51 additions & 0 deletions dev_jb/big_data/testing/reduce_rf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import collections
from statistics import mean

def reducer(post_views, mapped_tags, score_answertime):
"""Function to reduce the results that come from shuffler
Parameters
-------
post_views : list
List of dicts. Each dict has key 'Id' and 'ViewCount'. It is sorted
mapped_tags : list
List of dicts. Each dict has as key and tag and a value 1. It is sorted
score_answertime : list
List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. It is sorted
Returns
-------
top10_post_views : list
List of dicts. Only the Top 10
mapped_tags : list
List of dicts. Only the Top 10
score_answertime : float
Result of the average time to get an accepted answer of the top 200 questions by highest score (hour)
"""

# Get the 10 most viewed posts
top10_post_views = post_views[0:10]

# Use a Counter to reduce the tags
counter = collections.Counter()
for d in mapped_tags:
counter.update(d)

# Transform the counter object to a dictionary
tags_reduced = {}
for key, value in counter.items():
tags_reduced[key] = value

# Sort the reduced tags and get the top 10
tags_reduced = sorted(tags_reduced.items(), key=lambda x: x[1], reverse=True)[0:10]

# Get the top 200 answers by score
score_answertime = score_answertime[0:200]

# Get the top 200 answers response_time
answer_times = []
for dict in score_answertime:
answer_times.append(dict["ResponseTime"])

# Calculate the average response time
average_answer_time = mean(answer_times)

return top10_post_views, tags_reduced, average_answer_time
18 changes: 0 additions & 18 deletions dev_rf/dags/scripts/sql_univ_kennedy.sql

This file was deleted.

18 changes: 0 additions & 18 deletions dev_rf/dags/scripts/sql_univ_latinoamericana.sql

This file was deleted.

Loading