diff --git a/dev_ar/dags/functions/dag_etl_utn_untref.py b/dev_ar/dags/functions/dag_etl_utn_untref.py deleted file mode 100644 index 20789cb..0000000 --- a/dev_ar/dags/functions/dag_etl_utn_untref.py +++ /dev/null @@ -1,58 +0,0 @@ -# Datetime modules -from datetime import datetime, timedelta - -# DAG Object -from airflow import DAG - -# Airflow Operators -from airflow.operators.dummy import DummyOperator - -# Arguments pass on each operator -default_args = { - 'owner' : 'dev_ar', - 'retries' : 1, - 'retry_delay' : timedelta(minutes = 1) -} - -# DAG -with DAG( - dag_id = 'dag_etl_utn_untref', - default_args = default_args, - description = 'ETL Consulta a UTN/UNTREF para carga en S3', - start_date = datetime(2022, 9, 16), - # Use datetime.timedelta also can be used crontab - schedule_interval = timedelta(hours = 1), - catchup = False -) as dag: - ## Initial Task - #initial_op = DummyOperator( - # task_id = 'initial_operation' - # #DummyOperator just in case any init proceess needed - #) - - # Extract task - # Operator to perform sql queries on each university - sql_queries = DummyOperator( - task_id = 'sql_queries', - #Add retries arg at operator level - #Will retry 5 times just in the sql queries and not other tasks - retries = 5 - #To be replaced with PythonOperator to make SQL queries - ) - - # Transform task - # Operator to transform data using pandas - transform_pandas = DummyOperator( - task_id = 'transform_pandas' - #Replace with PythonOperator to import pandas for data transformation - ) - - # Load task - # Operator to load transformed data into AWS S3 - load_S3 = DummyOperator( - task_id = 'load_s3' - # - ) - - #Graph structure - sql_queries >> transform_pandas >> load_S3 \ No newline at end of file diff --git a/dev_ar/dags/scripts/uni_tres_de_febrero.sql b/dev_ar/dags/scripts/uni_tres_de_febrero.sql deleted file mode 100644 index aef73df..0000000 --- a/dev_ar/dags/scripts/uni_tres_de_febrero.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Universidad Tres de Febrero --- All columns renamed [university, careers, inscription_date, first_name, last_name, gender, age, postal_code, location, email] --- inscription_date in date format --- first_name and last_name already splitted --- age and location raw data from db - -SELECT - untref.universidad AS university, - untref.careers as careers, - TO_DATE(untref.fecha_de_inscripcion, 'DD/Mon/YY') AS inscription_date, - SPLIT_PART(REGEXP_REPLACE(untref.names, 'mrs\._|mr\._|dr\._|', ''), '_', 1) AS first_name, - SPLIT_PART(REGEXP_REPLACE(untref.names, 'mrs\._|mr\._|dr\._|', ''), '_', 2) AS last_name, - untref.sexo AS gender, - untref.birth_dates AS age, - untref.codigo_postal AS postal_code, - untref.direcciones AS location, - untref.correos_electronicos AS email -FROM - palermo_tres_de_febrero untref -WHERE - untref.universidad = 'universidad_nacional_de_tres_de_febrero' - AND TO_DATE(untref.fecha_de_inscripcion, 'DD/Mon/YY') BETWEEN '2020-09-01' AND '2021-02-01'; \ No newline at end of file diff --git a/dev_ar/dags/scripts/uni_utn.sql b/dev_ar/dags/scripts/uni_utn.sql deleted file mode 100644 index c8d4ead..0000000 --- a/dev_ar/dags/scripts/uni_utn.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Universidad Tecnológica Nacional --- All columns renamed --- inscription_date in date format --- first_name and last_name already splitted --- age, postal_code and location raw data from db - -SELECT - utn.university AS university, - utn.trabajo AS careers, - TO_DATE(utn.inscription_date, 'YYYY/MM/DD') AS inscription_date, - SPLIT_PART(REGEXP_REPLACE(utn.nombre, 'mrs\. |mr\. |dr\. ', ''), ' ', 1) AS first_name, - SPLIT_PART(REGEXP_REPLACE(utn.nombre, 'mrs\. |mr\. |dr\. ', ''), ' ', 2) AS last_name, - utn.sexo AS gender, - utn.birth_date AS age, - utn.location AS postal_code, - utn.direccion AS location, - utn.email AS email -FROM - jujuy_utn utn -WHERE - utn.university = 'universidad tecnológica nacional' - AND TO_DATE(utn.inscription_date, 'YYYY/MM/DD') BETWEEN '2020-09-01' AND '2021-02-01'; \ No newline at end of file diff --git a/dev_bjm/dags/dag_universidades_p.py b/dev_bjm/dags/dag_universidades_p.py deleted file mode 100644 index d2d7cb7..0000000 --- a/dev_bjm/dags/dag_universidades_p.py +++ /dev/null @@ -1 +0,0 @@ -from airflow import DAG diff --git a/dev_bjm/dags/scripts/file.txt b/dev_bjm/dags/scripts/file.txt deleted file mode 100644 index e69de29..0000000 diff --git a/dev_jb/big_data/testing/__pycache__/mapper_rf.cpython-39.pyc b/dev_jb/big_data/testing/__pycache__/mapper_rf.cpython-39.pyc new file mode 100644 index 0000000..e1dbfaf Binary files /dev/null and b/dev_jb/big_data/testing/__pycache__/mapper_rf.cpython-39.pyc differ diff --git a/dev_jb/big_data/testing/__pycache__/mapreduce_testing.cpython-39-pytest-7.1.3.pyc b/dev_jb/big_data/testing/__pycache__/mapreduce_testing.cpython-39-pytest-7.1.3.pyc new file mode 100644 index 0000000..ff4d186 Binary files /dev/null and b/dev_jb/big_data/testing/__pycache__/mapreduce_testing.cpython-39-pytest-7.1.3.pyc differ diff --git a/dev_jb/big_data/testing/__pycache__/reduce_rf.cpython-39.pyc b/dev_jb/big_data/testing/__pycache__/reduce_rf.cpython-39.pyc new file mode 100644 index 0000000..e9d170c Binary files /dev/null and b/dev_jb/big_data/testing/__pycache__/reduce_rf.cpython-39.pyc differ diff --git a/dev_jb/big_data/testing/mapper_rf.py b/dev_jb/big_data/testing/mapper_rf.py new file mode 100644 index 0000000..0d1f6d2 --- /dev/null +++ b/dev_jb/big_data/testing/mapper_rf.py @@ -0,0 +1,101 @@ +import xml.etree.ElementTree as ET +import datetime + +def mapper(): + """Function used to map the 3 required tasks: + - 1 - Top 10 posts views + - 2 - Top 10 words in tags + - 3 - Score and answer time + Returns + ------- + post_views : list + List of dicts. Each dict has key 'Id' and 'ViewCount'. + mapped_tags : list + List of dicts. Each dict has as key and tag and a value 1. + score_answertime : list + List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. + """ + + def get_answer_dict(root): + """Function to get the answers ids with the creation dates + Parameters + ---------- + root : object + Element Tree root object + Returns + ------- + answer_dict : dict + Dictionary that has as key the answer_id and as value the creation_date + """ + + # Initialize variable + answer_dict = {} + + # Loop into each row to get the answer_id and creation_date + for child in root: + dict = child.attrib + + # PostTypeId == 2 means that it is an answer. PostTypeId == 1 means is a question. + if dict["PostTypeId"] == "2": + answer_dict[dict["Id"]] = dict["CreationDate"] + + return answer_dict + + # ./112010 Meta Stack Overflow/posts.xml + # Load and parse the posts.xml file + tree = ET.parse(r"D:\Alkemy\dev_jb\big_data\112010 Stack Overflow\posts.xml") + # Get the root of the xml + root = tree.getroot() + + # Initialize variables + post_views = [] + mapped_tags = [] + score_answertime = [] + + # Get the answer dict. key=answer_id, value=CreationDate + answer_dict = get_answer_dict(root) + + # Loop into each row element + for child in root: + # Get the attributes of each row element + dict = child.attrib + + # 1 - Top 10 posts views + # Append to the list the post_id and the view_count of each post + post_views.append({"Id": dict["Id"], "ViewCount": int(dict["ViewCount"])}) + + # 2 - Top 10 words in tags + # If the post has a tag replace the <> and split to get the different words. + try: + tags = dict["Tags"].replace("<", " ").replace(">", " ").strip().split() + # Map each individual tag + for tag in tags: + mapped_tags.append({tag: 1}) + except: + # If the post hasn't a tag then continue + continue + + # 3 - Score and answer time + # If the post is a question + if dict["PostTypeId"] == "1": + # Get question score and creation_time + post_score = int(dict["Score"]) + post_creation_time = datetime.datetime.fromisoformat(dict["CreationDate"]) + try: + # Some posts haven't an accepted answer, so they will be skipped as they do not have an AcceptedAnswerId + + # Get the accepted_answer_id + accepted_answer_id = dict["AcceptedAnswerId"] + + # With the accepted_answer_id go to the answer_dict and take the creation_date value and transform it to datetime + accepted_answer_time = datetime.datetime.fromisoformat(answer_dict[accepted_answer_id]) + + # Calculate response time from question creation to accepted answer creation (in hours) + response_time = round((accepted_answer_time - post_creation_time).seconds / 3600, 2) + + # Append the score and response time to a list of dicts + score_answertime.append({"Score": post_score, "ResponseTime": response_time}) + except: + continue + + return post_views, mapped_tags, score_answertime \ No newline at end of file diff --git a/dev_jb/big_data/testing/mapreduce_testing.py b/dev_jb/big_data/testing/mapreduce_testing.py new file mode 100644 index 0000000..840c0b8 --- /dev/null +++ b/dev_jb/big_data/testing/mapreduce_testing.py @@ -0,0 +1,84 @@ +# Modulos # +import pytest +from mapper_rf import mapper +from reduce_rf import reducer +""" +Testing mapreduce de archivo .xml realizado por Rodrigo Fuentes +""" +# Se genera las varaibles a testear +post_views, mapped_tags, score_answertime = mapper() +top10_post_views, tags_reduced, average_answer_time = reducer(post_views, mapped_tags, score_answertime) + +""" +Verifica la cantiadad de registros mapeados por la funcion mapper para el top 10 post mas vistos + +Resultado: passed +""" +def test_mapreduce_post_views( +): + assert len(post_views) == 3675849 + +""" +Verifica la cantiadad de registros mapeados por la funcion mapper para el top 10 palabras mas nombradas en los post por tag + +Resultado: passed +""" +def test_mapreduce_mapped_tags( +): + assert len(mapped_tags) == 3044145 + +""" +Verifica la cantiadad de registros mapeados por la funcion mapper para el tiempo de respuestas con score entre 200 y 300 puntos + +Resultado: passed +""" +def test_mapreduce_score_answertime( +): + assert len(score_answertime) == 652089 + +""" +Verifica si el output del reducer coincide con lo esperado para el top 10 post mas vistos + +Resultado: passed +""" +def test_mapreduce_top10_post_view( +): + assert top10_post_views == [{'Id': '4', 'ViewCount': 5534}, + {'Id': '6', 'ViewCount': 1175}, + {'Id': '7', 'ViewCount': 0}, + {'Id': '8', 'ViewCount': 736}, + {'Id': '9', 'ViewCount': 24779}, + {'Id': '11', 'ViewCount': 11628}, + {'Id': '12', 'ViewCount': 0}, + {'Id': '13', 'ViewCount': 8601}, + {'Id': '14', 'ViewCount': 9906}, + {'Id': '16', 'ViewCount': 9390}] + + +""" +Verifica si el output del reducer coincide con lo esperado para el top 10 palabras mas nombradas en los post por tag + +Resultado: passed +""" +def test_mapreduce_tags( +): + assert tags_reduced == [('c#', 118980), + ('java', 75037), + ('php', 65739), + ('javascript', 57210), + ('.net', 56348), + ('iphone', 54736), + ('asp.net', 52655), + ('jquery', 47927), + ('c++', 45909), + ('python', 38083)] + +""" +Verifica si el output del reducer coincide con lo esperado para el tiempo de respuestas con score entre 200 y 300 puntos + +Resultado: passed +""" +def test_mapreduce_average_answer_time( +): + assert average_answer_time == 4.64395 + \ No newline at end of file diff --git a/dev_jb/big_data/testing/reduce_rf.py b/dev_jb/big_data/testing/reduce_rf.py new file mode 100644 index 0000000..552ff21 --- /dev/null +++ b/dev_jb/big_data/testing/reduce_rf.py @@ -0,0 +1,51 @@ +import collections +from statistics import mean + +def reducer(post_views, mapped_tags, score_answertime): + """Function to reduce the results that come from shuffler + Parameters + ------- + post_views : list + List of dicts. Each dict has key 'Id' and 'ViewCount'. It is sorted + mapped_tags : list + List of dicts. Each dict has as key and tag and a value 1. It is sorted + score_answertime : list + List of dicts. Each dict has key 'Score' and 'ResponseTime' in hours. It is sorted + Returns + ------- + top10_post_views : list + List of dicts. Only the Top 10 + mapped_tags : list + List of dicts. Only the Top 10 + score_answertime : float + Result of the average time to get an accepted answer of the top 200 questions by highest score (hour) + """ + + # Get the 10 most viewed posts + top10_post_views = post_views[0:10] + + # Use a Counter to reduce the tags + counter = collections.Counter() + for d in mapped_tags: + counter.update(d) + + # Transform the counter object to a dictionary + tags_reduced = {} + for key, value in counter.items(): + tags_reduced[key] = value + + # Sort the reduced tags and get the top 10 + tags_reduced = sorted(tags_reduced.items(), key=lambda x: x[1], reverse=True)[0:10] + + # Get the top 200 answers by score + score_answertime = score_answertime[0:200] + + # Get the top 200 answers response_time + answer_times = [] + for dict in score_answertime: + answer_times.append(dict["ResponseTime"]) + + # Calculate the average response time + average_answer_time = mean(answer_times) + + return top10_post_views, tags_reduced, average_answer_time \ No newline at end of file diff --git a/dev_rf/dags/scripts/sql_univ_kennedy.sql b/dev_rf/dags/scripts/sql_univ_kennedy.sql deleted file mode 100644 index 496be90..0000000 --- a/dev_rf/dags/scripts/sql_univ_kennedy.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Things that must be corrected with Python - - name: It has to be splited into first_name and last_name. If there is any dr, ms, etc has to be quited. - - age: If the age is negative 100 must be added. Check if under 18 or above 70. - - location: The Excel file with the postal_codes and locations has to be used to add the location to the table. -*/ - -SELECT -universidades AS university, -carreras AS career, -CAST (fechas_de_inscripcion AS DATE) AS inscription_date, -nombres AS name, -sexo AS gender, -(CURRENT_DATE - TO_DATE(fechas_nacimiento, 'YY-Mon-DD'))/365 AS age, -codigos_postales AS postal_code, -emails AS email -FROM uba_kenedy -WHERE universidades = 'universidad-j.-f.-kennedy' AND -CAST (fechas_de_inscripcion AS DATE) BETWEEN '2020-09-01' AND '2021-02-01' \ No newline at end of file diff --git a/dev_rf/dags/scripts/sql_univ_latinoamericana.sql b/dev_rf/dags/scripts/sql_univ_latinoamericana.sql deleted file mode 100644 index e936cf8..0000000 --- a/dev_rf/dags/scripts/sql_univ_latinoamericana.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Things that must be corrected with Python - - name: It has to be splited into first_name and last_name. If there is any dr, ms, etc has to be quited. - - age: Check if under 18 or above 70. - - postal_code: The Excel file with the postal_codes and locations has to be used to add the postal_code to the table. -*/ - -SELECT -universities AS university, -careers AS career, -TO_DATE(inscription_dates, 'DD-MM-YYYY') AS inscription_date, -names AS name, -sexo AS gender, -(CURRENT_DATE - TO_DATE(birth_dates, 'DD-MM-YYYY'))/365 AS age, -locations AS location, -emails AS email -FROM lat_sociales_cine -WHERE universities = '-FACULTAD-LATINOAMERICANA-DE-CIENCIAS-SOCIALES' AND -TO_DATE(inscription_dates, 'DD-MM-YYYY') BETWEEN '2020-09-01' AND '2021-02-01' \ No newline at end of file diff --git a/dev_wl/dags/wl_univerB_2_dags.py b/dev_wl/dags/wl_univerB_2_dags.py deleted file mode 100644 index e2d319f..0000000 --- a/dev_wl/dags/wl_univerB_2_dags.py +++ /dev/null @@ -1,41 +0,0 @@ -from datetime import datetime, timedelta -from airflow import DAG -from airflow.operators.bash import BashOperator - -extract_load_data = DAG( - dag_id='wl_univerB_2_dags', - default_args={ - 'dependes_on_past': False, - 'retry_delay': timedelta(hours= 1), - 'retries': 5, - }, - description= 'Make an ETL for two different universities', - schedule_interval= timedelta(days= 1), - start_date= datetime(2022, 10, 1), - catchup= False, - tags=['OT302-22'], -) - - - -''' -Documentar los operators que se deberían utilizar a futuro, -teniendo en cuenta que se va a hacer dos consultas SQL (una para cada universidad), -se van a procesar los datos con pandas y se van a cargar los datos en S3. - -Set the tasks: - -extract_data_load_csv = BashOperator( - task_id="extract_data_load_csv", - bash_command="python3 /airflow-docker/OT302-python/dev_wl/functions/sql_to_csv.py ", - dag= extract_load_data - ) - -Setting up Dependencies: - -task_1.set_upstream(task_2) - - if __name__ == "__main__": - dag.cli() - -''' \ No newline at end of file diff --git a/dev_wl/dags/wl_univerB_dags.py b/dev_wl/dags/wl_univerB_dags.py deleted file mode 100644 index b1fbe4b..0000000 --- a/dev_wl/dags/wl_univerB_dags.py +++ /dev/null @@ -1,41 +0,0 @@ -from datetime import datetime, timedelta -from airflow import DAG -from airflow.operators.bash import BashOperator - -extract_load_data = DAG( - dag_id='wl_univerB_dags', - default_args={ - 'dependes_on_past': False, - 'retry_delay': timedelta(hours= 1) - }, - description= 'Make an ETL for two different universities', - schedule_interval= timedelta(days= 1), - start_date= datetime(2022, 10, 1), - catchup= False, - tags=['OT302-30'], -) - - - -''' -Documentar los operators que se deberían utilizar a futuro, -teniendo en cuenta que se va a hacer dos consultas SQL (una para cada universidad), -se van a procesar los datos con pandas y se van a cargar los datos en S3. - -Set the tasks: - -extract_data_load_csv = BashOperator( - task_id="extract_data_load_csv", - bash_command="python3 /airflow-docker/OT302-python/dev_wl/functions/sql_to_csv.py ", - dag= extract_load_data - ) - -Setting up Dependencies: - -task_1.set_upstream(task_2) - - if __name__ == "__main__": - dag.cli() - -''' - \ No newline at end of file diff --git a/dev_wl/functions/__init__.py b/dev_wl/functions/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dev_wl/functions/configure_logger.py b/dev_wl/functions/configure_logger.py deleted file mode 100644 index 6e1c7b7..0000000 --- a/dev_wl/functions/configure_logger.py +++ /dev/null @@ -1,41 +0,0 @@ -from pathlib import Path -import logging -from typing import Optional - -""" -This function should be called when you want to configure logging. -Called: 'configure_logger( path, university_name)'. -You must instantiate the log record with 'logger = logging.getLogger()' to be used. -logger.info('message') -logger.warning('message') -logger.error('message') - -""" - - -def configure_logger( - log_path: Optional[str] = None, university: Optional[str] = "comahue" -) -> None: - """The function sets the log file to an INFO level - - Args: - log_path (Optional[str], optional): Path where the log file is created and written. Defaults to None. - university (Optional[str], optional): The two possible values are 'comahue' and 'salvador'. Defaults to 'comahue'. - """ - - if university == "comahue": - filename = "comahue_etl.log" - else: - filename = "salvador_etl.log" - - log_path_file = Path(log_path / filename) if log_path is not None else filename - - format_string = f"%(asctime)s - %(filename)s - %(message)s" - - logging.basicConfig( - filname=log_path_file, - level=logging.INFO, - format=format_string, - datefmt="%Y-%m-%d", - filemode="w", - ) diff --git a/dev_wl/scripts/wl_comahue.sql b/dev_wl/scripts/wl_comahue.sql deleted file mode 100644 index 598941a..0000000 --- a/dev_wl/scripts/wl_comahue.sql +++ /dev/null @@ -1,14 +0,0 @@ -SELECT - universidad AS university, - carrera AS career, - CAST(fecha_de_inscripcion AS DATE) AS inscription_date, - "name" AS name, - sexo AS gender, - CAST(fecha_nacimiento AS DATE) AS date_birthday, - CAST(fc.codigo_postal AS INTEGER) AS postal_code, - correo_electronico AS email -FROM - flores_comahue fc -WHERE - universidad LIKE 'UNIV. NACIONAL DEL COMAHUE' - AND fecha_de_inscripcion BETWEEN '2020-09-01' AND '2021-02-01' \ No newline at end of file diff --git a/dev_wl/scripts/wl_salvador.sql b/dev_wl/scripts/wl_salvador.sql deleted file mode 100644 index 50f7c53..0000000 --- a/dev_wl/scripts/wl_salvador.sql +++ /dev/null @@ -1,14 +0,0 @@ -SELECT - universidad AS university, - carrera AS career, - CAST(fecha_de_inscripcion AS DATE) AS inscription_date, - nombre, - sexo AS gender, - CAST(fecha_nacimiento AS DATE) AS date_birthday, - localidad AS locations, - email -FROM - salvador_villa_maria svm -WHERE - universidad LIKE 'UNIVERSIDAD_DEL_SALVADOR' - AND CAST(fecha_de_inscripcion AS DATE) BETWEEN '2020-09-01' AND '2021-02-01' \ No newline at end of file