diff --git a/dev_rf/big_data/logs/logs.log b/dev_rf/big_data/logs/logs.log deleted file mode 100644 index de170e5..0000000 --- a/dev_rf/big_data/logs/logs.log +++ /dev/null @@ -1,5 +0,0 @@ -"08/10/2022" - INFO - root - Starting the main module... -"08/10/2022" - INFO - root - Starting the mapper module... -"08/10/2022" - INFO - root - Starting the shuffler module... -"08/10/2022" - INFO - root - Starting the reducer module... -"08/10/2022" - INFO - root - Took 1.3 seconds to finish the tasks diff --git a/dev_rf/big_data/big_data_logging.py b/dev_rf/big_data/with_chunks/big_data_logging.py similarity index 100% rename from dev_rf/big_data/big_data_logging.py rename to dev_rf/big_data/with_chunks/big_data_logging.py diff --git a/dev_rf/big_data/with_chunks/chunks.py b/dev_rf/big_data/with_chunks/chunks.py new file mode 100644 index 0000000..59c516b --- /dev/null +++ b/dev_rf/big_data/with_chunks/chunks.py @@ -0,0 +1,50 @@ +import xml.etree.ElementTree as ET +from big_data_logging import configured_logger + +logger = configured_logger() + + +def chunks(list, n=10): + """Yield n number of striped chunks from list. + Parameters + ---------- + list : list + List that must be chunked + n : int + Number of chunks + + Returns + ------- + yield : generator + Generator with the different chunks + + """ + + for i in range(0, n): + yield list[i::n] + + +def generate_chunks(): + """Function that creates a generator with the different chunks + + Returns + ------- + generator : generator + returns a generator with the different chunks of the xml file + """ + + logger.info("Starting the chunks module...") + + # Load and parse the posts.xml file + tree = ET.parse("./112010 Meta Stack Overflow/posts.xml") + # Get the root of the xml + root = tree.getroot() + + list_to_chunk = [] + + # Loop into each row element + for child in root: + # Get the attributes of each row element + list_to_chunk.append(child.attrib) + + return chunks(list_to_chunk) diff --git a/dev_rf/big_data/logger.cfg b/dev_rf/big_data/with_chunks/logger.cfg similarity index 100% rename from dev_rf/big_data/logger.cfg rename to dev_rf/big_data/with_chunks/logger.cfg diff --git a/dev_rf/big_data/with_chunks/logs/logs.log b/dev_rf/big_data/with_chunks/logs/logs.log new file mode 100644 index 0000000..b3eb164 --- /dev/null +++ b/dev_rf/big_data/with_chunks/logs/logs.log @@ -0,0 +1,33 @@ +"12/10/2022" - INFO - root - Starting the main module... +"12/10/2022" - INFO - root - Starting the chunks module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Took 11.4 seconds to finish the tasks +"12/10/2022" - INFO - root - Starting the shuffler module... +"12/10/2022" - INFO - root - Starting the reducer module... +"12/10/2022" - INFO - root - Took 11.5 seconds to finish the tasks +"12/10/2022" - INFO - root - Starting the main module... +"12/10/2022" - INFO - root - Starting the chunks module... +"12/10/2022" - INFO - root - Took 0.9 seconds to finish the tasks +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Took 10.9 seconds to finish the tasks +"12/10/2022" - INFO - root - Starting the shuffler module... +"12/10/2022" - INFO - root - Starting the reducer module... +"12/10/2022" - INFO - root - Took 11.0 seconds to finish the tasks diff --git a/dev_rf/big_data/with_chunks/logs/logs.log.2022-10-12_19 b/dev_rf/big_data/with_chunks/logs/logs.log.2022-10-12_19 new file mode 100644 index 0000000..9752eb3 --- /dev/null +++ b/dev_rf/big_data/with_chunks/logs/logs.log.2022-10-12_19 @@ -0,0 +1,15 @@ +"12/10/2022" - INFO - root - Starting the main module... +"12/10/2022" - INFO - root - Starting the chunks module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the shuffler module... +"12/10/2022" - INFO - root - Starting the reducer module... +"12/10/2022" - INFO - root - Took 10.8 seconds to finish the tasks diff --git a/dev_rf/big_data/with_chunks/main.py b/dev_rf/big_data/with_chunks/main.py new file mode 100644 index 0000000..aab70a4 --- /dev/null +++ b/dev_rf/big_data/with_chunks/main.py @@ -0,0 +1,64 @@ +import time + +from chunks import generate_chunks +from mapper import mapper +from shuffler import shuffler +from reducer import reducer +from big_data_logging import configured_logger +from save_into_file import save_into_file + +logger = configured_logger() + + +def main(): + """Main function that calls the mapper, shuffler and reducer functions and then save to csv the results""" + logger.info("Starting the main module...") + + start_time = time.time() + + # Create chunks + chunks = generate_chunks() + + duration = time.time() - start_time + logger.info(f"Took {round(duration,1)} seconds to finish the tasks") + + # Initialize lists for the results + post_views_list = [] + mapped_tags_list = [] + score_answertime_list = [] + + for chunk in chunks: + # Use the mapper function to map the different tasks + post_views, mapped_tags, score_answertime = mapper(chunk) + + # Append result to list of results + post_views_list.append(post_views) + mapped_tags_list.append(mapped_tags) + score_answertime_list.append(score_answertime) + + duration = time.time() - start_time + logger.info(f"Took {round(duration,1)} seconds to finish the tasks") + + # Make the lists of lists only one list + post_views_list = sum(post_views_list, []) + mapped_tags_list = sum(mapped_tags_list, []) + score_answertime_list = sum(score_answertime_list, []) + + # Use the shuffler function to shuffle the mapped tasks + post_views_list, mapped_tags_list, score_answertime_list = shuffler( + post_views_list, mapped_tags_list, score_answertime_list + ) + + # Use the reducer function to reduce the shuffled tasksS + top10_post_views, tags_reduced, average_answer_time = reducer( + post_views_list, mapped_tags_list, score_answertime_list + ) + + save_into_file(top10_post_views, tags_reduced, average_answer_time) + + duration = time.time() - start_time + logger.info(f"Took {round(duration,1)} seconds to finish the tasks") + + +if __name__ == "__main__": + main() diff --git a/dev_rf/big_data/with_chunks/mapper.py b/dev_rf/big_data/with_chunks/mapper.py new file mode 100644 index 0000000..b48ea80 --- /dev/null +++ b/dev_rf/big_data/with_chunks/mapper.py @@ -0,0 +1,107 @@ +import xml.etree.ElementTree as ET +import datetime +from big_data_logging import configured_logger + +logger = configured_logger() + + +def get_answer_dict(): + """Function to get the answers ids with the creation dates + + Parameters + ---------- + root : object + Element Tree root object + + Returns + ------- + answer_dict : dict + Dictionary that has as key the answer_id and as value the creation_date + + """ + # Load and parse the posts.xml file + tree = ET.parse("./112010 Meta Stack Overflow/posts.xml") + # Get the root of the xml + root = tree.getroot() + # Initialize variable + answer_dict = {} + + # Loop into each row to get the answer_id and creation_date + for child in root: + dict = child.attrib + + # PostTypeId == 2 means that it is an answer. PostTypeId == 1 means is a question. + if dict["PostTypeId"] == "2": + answer_dict[dict["Id"]] = dict["CreationDate"] + + return answer_dict + + +def mapper(chunk): + """Function used to map the 3 required tasks: + - 1 - Top 10 posts views + - 2 - Top 10 words in tags + - 3 - Score and answer time + + Returns + ------- + post_views : list + List of dicts. Each dict has key 'Id' and 'ViewCount'. + mapped_tags : list + List of dicts. Each dict has as key and tag and a value 1. + score_answertime : list + List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. + + """ + logger.info("Starting the mapper module...") + + # Initialize variables + post_views = [] + mapped_tags = [] + score_answertime = [] + + # Get the answer dict. key=answer_id, value=CreationDate + answer_dict = get_answer_dict() + + # Loop into each row element + for dict in chunk: + + # 1 - Top 10 posts views + # Append to the list the post_id and the view_count of each post + post_views.append({"Id": dict["Id"], "ViewCount": int(dict["ViewCount"])}) + + # 2 - Top 10 words in tags + # If the post has a tag replace the <> and split to get the different words. + try: + tags = dict["Tags"].replace("<", " ").replace(">", " ").strip().split() + # Map each individual tag + for tag in tags: + mapped_tags.append({tag: 1}) + except: + # If the post hasn't a tag then continue + continue + + # 3 - Score and answer time + # If the post is a question + if dict["PostTypeId"] == "1": + # Get question score and creation_time + post_score = int(dict["Score"]) + post_creation_time = datetime.datetime.fromisoformat(dict["CreationDate"]) + try: + # Some posts haven't an accepted answer, so they will be skipped as they do not have an AcceptedAnswerId + + # Get the accepted_answer_id + accepted_answer_id = dict["AcceptedAnswerId"] + + # With the accepted_answer_id go to the answer_dict and take the creation_date value and transform it to datetime + accepted_answer_time = datetime.datetime.fromisoformat(answer_dict[accepted_answer_id]) + + # Calculate response time from question creation to accepted answer creation (in hours) + response_time = round((accepted_answer_time - post_creation_time).seconds / 3600, 2) + + # Append the score and response time to a list of dicts + score_answertime.append({"Score": post_score, "ResponseTime": response_time}) + except: + continue + + return post_views, mapped_tags, score_answertime diff --git a/dev_rf/big_data/reducer.py b/dev_rf/big_data/with_chunks/reducer.py similarity index 100% rename from dev_rf/big_data/reducer.py rename to dev_rf/big_data/with_chunks/reducer.py diff --git a/dev_rf/big_data/with_chunks/results/AverageAnswerTime.csv b/dev_rf/big_data/with_chunks/results/AverageAnswerTime.csv new file mode 100644 index 0000000..ea1b963 --- /dev/null +++ b/dev_rf/big_data/with_chunks/results/AverageAnswerTime.csv @@ -0,0 +1,2 @@ +Average time to get an accepted answer (hours) +5.9158 diff --git a/dev_rf/big_data/results/MostUsedTags.csv b/dev_rf/big_data/with_chunks/results/MostUsedTags.csv similarity index 100% rename from dev_rf/big_data/results/MostUsedTags.csv rename to dev_rf/big_data/with_chunks/results/MostUsedTags.csv diff --git a/dev_rf/big_data/results/Top10MostViewedPosts.csv b/dev_rf/big_data/with_chunks/results/Top10MostViewedPosts.csv similarity index 100% rename from dev_rf/big_data/results/Top10MostViewedPosts.csv rename to dev_rf/big_data/with_chunks/results/Top10MostViewedPosts.csv diff --git a/dev_rf/big_data/with_chunks/save_into_file.py b/dev_rf/big_data/with_chunks/save_into_file.py new file mode 100644 index 0000000..1e3e163 --- /dev/null +++ b/dev_rf/big_data/with_chunks/save_into_file.py @@ -0,0 +1,25 @@ +import os +import pandas as pd + + +def save_into_file(top10_post_views, tags_reduced, average_answer_time): + # Create resultss folder if it doesnt exist + cwd = os.getcwd() + + try: + if not os.path.exists(cwd + "/results"): + os.makedirs(cwd + "/results") + except: + print("Folder cannot be created") + + # Save to a csv the Top 10 most viewed posts + top10_post_views_df = pd.DataFrame(top10_post_views) + top10_post_views_df.to_csv("./results/Top10MostViewedPosts.csv", index=False) + + # Save to a csv the Top 10 tags + tags_reduced_df = pd.DataFrame(tags_reduced, columns=["Tag", "Count"]) + tags_reduced_df.to_csv("./results/MostUsedTags.csv", index=False) + + # Save to a csv the Average time to get an accepted answer + average_answer_time_serie = pd.Series(average_answer_time, name="Average time to get an accepted answer (hours)") + average_answer_time_serie.to_csv("./results/AverageAnswerTime.csv", index=False) diff --git a/dev_rf/big_data/shuffler.py b/dev_rf/big_data/with_chunks/shuffler.py similarity index 100% rename from dev_rf/big_data/shuffler.py rename to dev_rf/big_data/with_chunks/shuffler.py diff --git a/dev_rf/big_data/without_chunks/big_data_logging.py b/dev_rf/big_data/without_chunks/big_data_logging.py new file mode 100644 index 0000000..730ed31 --- /dev/null +++ b/dev_rf/big_data/without_chunks/big_data_logging.py @@ -0,0 +1,29 @@ +import logging +from logging import config +import os + + +def configured_logger(): + """This function is going to setup the ogger using the logger.cfg file. + The logger has 2 functions: + 1 - Display the logging messages in the console + 2 - Save the files to a log file every week (Every Sunday)""" + + # Get current working directory + cwd = os.getcwd() + + # Create logs folder if it does not exist + try: + if not os.path.exists(cwd + "/logs"): + os.makedirs(cwd + "/logs") + except: + print("Folder cannot be created") + + # Load the logger.cfg file + # cwd + "big_data/logger.cfg" + config.fileConfig("logger.cfg") + + # Create logger with the configuration + logger = logging.getLogger("root") + + return logger diff --git a/dev_rf/big_data/without_chunks/logger.cfg b/dev_rf/big_data/without_chunks/logger.cfg new file mode 100644 index 0000000..42da32e --- /dev/null +++ b/dev_rf/big_data/without_chunks/logger.cfg @@ -0,0 +1,35 @@ +[loggers] +keys=root + +[handlers] +keys=consoleHandler, fileHandler + +[formatters] +keys=myFormatter + +[logger_root] +level=INFO +handlers=consoleHandler, fileHandler + +# Handler to display in console +[handler_consoleHandler] +class=StreamHandler +level=INFO +formatter=myFormatter +args=(sys.stdout,) + +#Handler to save the log in files +[handler_fileHandler] +class=handlers.TimedRotatingFileHandler +level=INFO +formatter=myFormatter +# Create a new log every Sunday and have 1 log file as backup +when='W6' +interval=1 +backupCount=1 +args=("logs/logs.log",) + +# Formatter +[formatter_myFormatter] +format=%(asctime)s - %(levelname)s - %(name)s - %(message)s +datefmt="%d/%m/%Y" \ No newline at end of file diff --git a/dev_rf/big_data/without_chunks/logs/logs.log b/dev_rf/big_data/without_chunks/logs/logs.log new file mode 100644 index 0000000..6b66577 --- /dev/null +++ b/dev_rf/big_data/without_chunks/logs/logs.log @@ -0,0 +1,5 @@ +"12/10/2022" - INFO - root - Starting the main module... +"12/10/2022" - INFO - root - Starting the mapper module... +"12/10/2022" - INFO - root - Starting the shuffler module... +"12/10/2022" - INFO - root - Starting the reducer module... +"12/10/2022" - INFO - root - Took 1.5 seconds to finish the tasks diff --git a/dev_rf/big_data/main.py b/dev_rf/big_data/without_chunks/main.py similarity index 100% rename from dev_rf/big_data/main.py rename to dev_rf/big_data/without_chunks/main.py diff --git a/dev_rf/big_data/mapper.py b/dev_rf/big_data/without_chunks/mapper.py similarity index 98% rename from dev_rf/big_data/mapper.py rename to dev_rf/big_data/without_chunks/mapper.py index 1012cd8..f49d9e1 100644 --- a/dev_rf/big_data/mapper.py +++ b/dev_rf/big_data/without_chunks/mapper.py @@ -20,8 +20,6 @@ def get_answer_dict(root): """ - logger.info("Starting the mapper module...") - # Initialize variable answer_dict = {} @@ -52,7 +50,8 @@ def mapper(): List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. """ - # ./112010 Meta Stack Overflow/posts.xml + logger.info("Starting the mapper module...") + # Load and parse the posts.xml file tree = ET.parse("./112010 Meta Stack Overflow/posts.xml") # Get the root of the xml diff --git a/dev_rf/big_data/without_chunks/reducer.py b/dev_rf/big_data/without_chunks/reducer.py new file mode 100644 index 0000000..0117093 --- /dev/null +++ b/dev_rf/big_data/without_chunks/reducer.py @@ -0,0 +1,60 @@ +import collections +from statistics import mean +from big_data_logging import configured_logger + +logger = configured_logger() + + +def reducer(post_views, mapped_tags, score_answertime): + """Function to reduce the results that come from shuffler + + Parameters + ------- + post_views : list + List of dicts. Each dict has key 'Id' and 'ViewCount'. It is sorted + mapped_tags : list + List of dicts. Each dict has as key and tag and a value 1. It is sorted + score_answertime : list + List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. It is sorted + + Returns + ------- + top10_post_views : list + List of dicts. Only the Top 10 + mapped_tags : list + List of dicts. Only the Top 10 + score_answertime : float + Result of the average time to get an accepted answer of the top 200 questions by highest score (hour) + + """ + + logger.info("Starting the reducer module...") + + # Get the 10 most viewed posts + top10_post_views = post_views[0:10] + + # Use a Counter to reduce the tags + counter = collections.Counter() + for d in mapped_tags: + counter.update(d) + + # Transform the counter object to a dictionary + tags_reduced = {} + for key, value in counter.items(): + tags_reduced[key] = value + + # Sort the reduced tags and get the top 10 + tags_reduced = sorted(tags_reduced.items(), key=lambda x: x[1], reverse=True)[0:10] + + # Get the top 200 answers by score + score_answertime = score_answertime[0:200] + + # Get the top 200 answers response_time + answer_times = [] + for dict in score_answertime: + answer_times.append(dict["ResponseTime"]) + + # Calculate the average response time + average_answer_time = mean(answer_times) + + return top10_post_views, tags_reduced, average_answer_time diff --git a/dev_rf/big_data/results/AverageAnswerTime.csv b/dev_rf/big_data/without_chunks/results/AverageAnswerTime.csv similarity index 100% rename from dev_rf/big_data/results/AverageAnswerTime.csv rename to dev_rf/big_data/without_chunks/results/AverageAnswerTime.csv diff --git a/dev_rf/big_data/without_chunks/results/MostUsedTags.csv b/dev_rf/big_data/without_chunks/results/MostUsedTags.csv new file mode 100644 index 0000000..206fc7d --- /dev/null +++ b/dev_rf/big_data/without_chunks/results/MostUsedTags.csv @@ -0,0 +1,11 @@ +Tag,Count +discussion,5791 +feature-request,4590 +support,2984 +bug,2648 +status-completed,1836 +stackoverflow,1527 +tags,973 +reputation,945 +questions,762 +area51,692 diff --git a/dev_rf/big_data/without_chunks/results/Top10MostViewedPosts.csv b/dev_rf/big_data/without_chunks/results/Top10MostViewedPosts.csv new file mode 100644 index 0000000..26e7b4f --- /dev/null +++ b/dev_rf/big_data/without_chunks/results/Top10MostViewedPosts.csv @@ -0,0 +1,11 @@ +Id,ViewCount +28625,33344 +37328,28372 +31913,26601 +9134,20536 +1777,19695 +2267,15180 +7931,12584 +61142,9918 +20420,8903 +53346,8619 diff --git a/dev_rf/big_data/without_chunks/shuffler.py b/dev_rf/big_data/without_chunks/shuffler.py new file mode 100644 index 0000000..62d46a4 --- /dev/null +++ b/dev_rf/big_data/without_chunks/shuffler.py @@ -0,0 +1,31 @@ +from big_data_logging import configured_logger + +logger = configured_logger() + + +def shuffler(post_views, mapped_tags, score_answertime): + """Function to shuffle the results that come from mapper + + Parameters and Returns (sorted) + ------- + post_views : list + List of dicts. Each dict has key 'Id' and 'ViewCount'. + mapped_tags : list + List of dicts. Each dict has as key and tag and a value 1. + score_answertime : list + List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. + + """ + + logger.info("Starting the shuffler module...") + + # Sort post_views from higher views to lower views + post_views = sorted(post_views, key=lambda x: x["ViewCount"], reverse=True) + + # Sort word_tags by alphabetical order + mapped_tags = sorted(mapped_tags, key=lambda d: list(d.keys())) + + # Sort score_answertime from higher score to lower score + score_answertime = sorted(score_answertime, key=lambda x: x["Score"], reverse=True) + + return post_views, mapped_tags, score_answertime diff --git a/dev_rf/testing/logger/logger_config.cfg b/dev_rf/testing/logger/logger_config.cfg new file mode 100644 index 0000000..8e1a3b5 --- /dev/null +++ b/dev_rf/testing/logger/logger_config.cfg @@ -0,0 +1,40 @@ +# Archivo de configuracion para un logger +[loggers] +keys = root + +[handlers] +keys = consoleHandler, timedRotatingFileHandler + +[formatters] +keys = logerFormatter, timedRotatingFormatter + +[logger_root] +level = INFO +handlers = consoleHandler, timedRotatingFileHandler +qualname = root + +# Parametros para el logger en formato de consola +[handler_consoleHandler] +class = StreamHandler +level = INFO +formatter = logerFormatter +args = (sys.stdout) + +# Parametros para el logger en el archivo .log +[handler_timedRotatingFileHandler] +class = handlers.TimedRotatingFileHandler +level = INFO +formatter = timedRotatingFormatter +# El parametro 'W0' indica que se logeara cada 7 dias los lunes. +args=('./dev_jb/logs/logs.log', 'W0', 1, 1) + +# Formato del mensaje para la consola +[formatter_logerFormatter] +format = %(asctime)s:%(levelname)s:%(message)s +datefmt = "%d/%m/%Y" + +# Formato de mensaje para el archivo .log +[formatter_timedRotatingFormatter] +format = %(asctime)s:%(levelname)s:%(message)s +datefmt = "%d/%m/%Y" +class = logging.Formatter \ No newline at end of file diff --git a/dev_rf/testing/logger_cfg.py b/dev_rf/testing/logger_cfg.py new file mode 100644 index 0000000..e21856b --- /dev/null +++ b/dev_rf/testing/logger_cfg.py @@ -0,0 +1,22 @@ +# Modulos # +from pathlib import Path +import logging +from logging import config + +# Path donde se encuentra el archivo de configuracion .cfg +path_file = 'D:\Alkemy\OT302-python\dev_jb\big_data\logger' + +# Nombre del archivo de configuracion .cfg +filename_cfg = "logger_config.cfg" + + +def logger_comfig(): + # Genera el path de configuracion + log_config_path = Path(path_file, filename_cfg) + + # Configura el logger + config.fileConfig(log_config_path) + logger = logging.getLogger('log') + + # Devuelve el logger + return logger \ No newline at end of file diff --git a/dev_rf/testing/mapreduce_big_data.py b/dev_rf/testing/mapreduce_big_data.py new file mode 100644 index 0000000..bbbb305 --- /dev/null +++ b/dev_rf/testing/mapreduce_big_data.py @@ -0,0 +1,188 @@ +# Modulos # +import xml.etree.ElementTree as ET +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from datetime import datetime, timedelta +from pathlib import Path +from collections import defaultdict + +# Variables a utilizar +# file_path = r"/Users/jeremy/Code/big_data/Stack Overflow 11-2010/112010 Meta Stack Overflow" +file_path = r"/Users/rodri/OneDrive/Documentos/Data Science/Courses/Alkemy/112010 Meta Stack Overflow" +file_name = "posts.xml" +file_name2 = "comments.xml" + +""" +Funcion: mapea el archivo .xml dejando una lista para cada consigna con los datos listos para hacer un reduce +Args: + file_path (str, obligatorio): path donde se encuentran almacendaos los archivos + file_name (str, obligatorio): nombre del archivo post.xml + file_name2 (str, obligatorio): nombre del archivo comments.xml +Return: + list_tags: lista con todos los tags de los post con respuestas aceptadas + list_relation: lista de 2 coordenadas con los score y cantidad de palabras por pregunta de un post + list_answer_delay: lista de 3 coordenadas con el id del post, la fercha de creacion del post y la fecha de creacion + del primer comentario +""" + + +def mapper(file_path: str, file_name: str, file_name2: str): + """ + Funcion: mapea el archivo comments.xml devolviendo un diccionario con el id del post y la fecha de careacion de todos los + comentarios relacionado al post + Args: + file_path (str, obligatorio): path donde se encuentran almacendaos los archivos + file_name (str, obligatorio): nombre del archivo comments.xml + Return: + list_end_time: lista de 2 coordenadas con el id del post y la fechad ecereacion del comentario + """ + + def mapper_comments(file_path: str, file_name: str): + # Genera el path donde se encuentra el archivo comments.xml + xml_file = Path(file_path, file_name) + + # Crea el objeto tree + tree = ET.parse(xml_file) + + # Genera el elemento root + root = tree.getroot() + + # Lista que se utilizara en el proceso + list_end_time = [] + + # Itera sobre las filas del .xml para obtener los datos del mappeo + for row in root.iter("row"): + # Genera el objeto de treae la columna deseada + cursor_row = row.attrib + + # Guarda el postId + post_id = cursor_row.get("PostId") + + # Guarda el tiempo de creacion del comentario + end_time = cursor_row.get("CreationDate") + + # Genera el array de 2 coordenadas con el postId y el tiempo de creacion del comentario + list_end_time.append((post_id, end_time)) + + # Devuele la lista + return list_end_time + + """ + Funcion: deja una lista con el primer comentario de cada id de post + Args: + list_mapped (list, obligatorio)): lista mapeada obteniada de la funcion de mapeo + Return: + list_comment: devuelve la lista con el id del post relacionado al comentatio y la fecha del primer comentario + """ + + def reduce_comments(list_mapped: list): + # Crea los diccionarios a utilizar + arrange = defaultdict(list) + list_comment = defaultdict(list) + + # Iterea sobre la lista recibida para genrar una diccionario con key unica referida al id + # y los calores de todos los creation time referidos a ese id + for id, time in list_mapped: + arrange[id].append(time) + + # Itera sobre le diccionario previamente cereado para encontrar el valor minimo de cada creation time y + # gurdarlo en un nuveo diccionario + for id, times in arrange.items(): + list_comment[id].append(min(times)) + + # Devuelve el listado final + return list_comment + + # Genera el path donde se encuentra el archivo posts.xml + xml_file = Path(file_path, file_name) + + # Crea el objeto tree + tree = ET.parse(xml_file) + + # Genera el elemento root + root = tree.getroot() + + # Listas y diccionarios que se utilizaran en el proceso + list_tags = [] + list_relation = [] + list_answer_delay = [] + list_start_time = {} + aux_answer_delay = defaultdict(list) + + # Itera sobre las filas del .xml para obtener los datos del mappeo + for row in root.iter("row"): + # Genera el objeto de treae la columna deseada + cursor_row = row.attrib + + # Accede solo a los posts que sean de tipo 'question' + if cursor_row.get("PostTypeId") == "1": + + # Cuenta la contidad de palabras que hay en el texto del post + body_w_count = len(cursor_row.get("Body").replace("<", "").replace(">", "").strip().split(" ")) + + # Guarda el score de post + post_score = cursor_row.get("Score") + + # Crea un array de 2 coordenadas por posicion que almacena los datos para relacionar el score con + # la cantidad de palabras en el post + list_relation.append([int(post_score), body_w_count]) + + # Gurda la fecha de creacion del post + ceration_time = cursor_row.get("CreationDate") + + # Revisa que el post tenga una respuesta aceptada + if cursor_row.get("AcceptedAnswerId") != None: + # Itera sobre los tags un post para agregarlos a una lista total + for tag in cursor_row.get("Tags").replace("<", "").replace(">", " ").strip().split(" "): + list_tags.append(tag) + + # Toma las columnas que son Answer + else: + # Guarda el id del post + question_id = cursor_row.get("Id") + # Gurda el el tiempo de creacion del post en una lista referenciado por su id de post + list_start_time[question_id] = ceration_time + + # Obtiene los datos de tiempo de creacion del primer comentario refereido a cada id + aux_answer_delay = reduce_comments(mapper_comments(file_path, file_name2)) + + # Guarda los id que no tienen un post asignado + to_delete = {x: aux_answer_delay[x] for x in set(aux_answer_delay) - set(list_start_time)} + + # Elimina los comentarios que no tengan un post al que ser referidos + for key in to_delete.keys(): + aux_answer_delay.pop(key) + + # Genera un a lista con el id, la fecha de creacion del post y del comentario + for id, values in aux_answer_delay.items(): + list_answer_delay.append([key, values[0], list_start_time[id]]) + + # Devueleve las listas con los datos finales mapeados + return list_tags, list_relation, list_answer_delay + + +# Desempaqueta las listas +list_tags, list_relation, list_answer_delay = mapper(file_path, file_name, file_name2) + +# Top 10 tags con mayores respuestas acetadas +print(pd.Series(list_tags).value_counts().head(10)) +print("\n") + +# Correlacion entre cantidad de palabras en un post y su score +df_relation = pd.DataFrame(data=np.array(list_relation), columns=["score", "body"]) +print(df_relation.corr()) +print("\n") + +# Grafico de la relacion +plt.scatter(x=df_relation["score"], y=df_relation["body"]) + +# Demora de respuesta promedio en posts +df_answer_delay = pd.DataFrame(data=np.array(list_answer_delay), columns=["id", "end_time", "start_time"]) +df_answer_delay["end_time"] = df_answer_delay["end_time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f")) +df_answer_delay["start_time"] = df_answer_delay["start_time"].apply( + lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f") +) +df_answer_delay["diff_time"] = df_answer_delay["end_time"] - df_answer_delay["start_time"] +avg_answer_time = np.average(df_answer_delay["diff_time"].dt.total_seconds()) +print(str(timedelta(seconds=avg_answer_time))) diff --git a/dev_rf/testing/resultados_map_reduce.jpeg b/dev_rf/testing/resultados_map_reduce.jpeg new file mode 100644 index 0000000..9cb251f Binary files /dev/null and b/dev_rf/testing/resultados_map_reduce.jpeg differ diff --git a/dev_rf/testing/testing.py b/dev_rf/testing/testing.py new file mode 100644 index 0000000..c7987b0 --- /dev/null +++ b/dev_rf/testing/testing.py @@ -0,0 +1,83 @@ +# This file is going to be used to test Jeremy's mapreduce script. +# The objectives of the MapReduce are: +# - Top 10 tags con mayores respuestas aceptadas +# - Relación entre cantidad de palabras en un post y su puntaje +# - Demora de respuesta promedio en posts + +# Tests have been done to what can be tested. The final results are outside a function +# so they cannot be tested in a separate file.. + +import datetime + +# Import all functions from map_reduce +from mapreduce_big_data import mapper + +# path and file names +file_path = r"/Users/rodri/OneDrive/Documentos/Data Science/Courses/Alkemy/112010 Meta Stack Overflow" +file_name = "posts.xml" +file_name2 = "comments.xml" + + +def test_mapper_types(): + """According to Jeremy's documentation the mapper function returns 3 lists. + + This Function tests that the mapper function returns the 3 lists. + If the assertion fails it is going to raise an AssertionError with the corresponding message""" + + list_tags, list_relation, list_answer_delay = mapper( + file_path=file_path, file_name=file_name, file_name2=file_name2 + ) + assert isinstance(list_tags, list), "A list is expected for the tags" + assert isinstance(list_relation, list), "A list is expected for the relation" + assert isinstance(list_answer_delay, list), "A list is expected for the answer delay" + + +def test_correct_elements_into_lists(): + """According to Jeremy's documentation the lists that are returned from the mapper function + have the following characteristics: + - list_tags: list of str with the different tags + - list_relation: list with 2 coordinates [score, post_quantity_of_words] + - list_answer_delay: list with 3 coordinates [post_id, creation_date_post, creation_date_first_comment] + + This function tests that the lists have the characteristics that are mentioned above + """ + list_tags, list_relation, list_answer_delay = mapper( + file_path=file_path, file_name=file_name, file_name2=file_name2 + ) + + # Assert that all tags are of type string + assert all(map(lambda x: isinstance(x, str), list_tags)), "Not all tags are of type str" + + # Assert that all list_relation elements are a list with 2 integers + for coordinates in list_relation: + # Assert that coordinates is a list + assert isinstance(coordinates, list), "Not all coordinates are lists" + # Assert that it has 2 elements + assert len(coordinates) == 2, "There are coordinates with more than 2 elements" + + # Assert that every coordinate is an integer + for coordinate in coordinates: + assert isinstance(coordinate, int), "Coordinates elements are not of type int" + + # Assert that all list_answer_delay have 3 elements and that post_id can be transformed to int + # and that creation_date_post, creation_date_first_comment can be transformed to datetime + for coordinates in list_answer_delay: + # Assert that coordinates is a list + assert isinstance(coordinates, list), "Not all coordinates are lists" + # Assert that it has 3 elements + assert len(coordinates) == 3, "There are coordinates with more than 3 elements" + + # Assert for every coordinate + + # post_id can be transformed to int + assert isinstance(int(coordinates[0]), int) + # creation_date_post can be transformed to a datetime.datetime object + assert isinstance(datetime.datetime.fromisoformat(coordinates[1]), datetime.datetime) + # creation_date_first_comment can be transformed to a datetime.datetime object + assert isinstance(datetime.datetime.fromisoformat(coordinates[2]), datetime.datetime) + + +test_mapper_types() +test_correct_elements_into_lists() + +# All test pass