diff --git a/dev_jeanvitola/Hadoop/Environment Setup.pdf b/dev_jeanvitola/Hadoop/Environment Setup.pdf new file mode 100644 index 0000000..ed7c8a9 Binary files /dev/null and b/dev_jeanvitola/Hadoop/Environment Setup.pdf differ diff --git a/dev_jeanvitola/Mapreduce_Opt/Mapreduce_meanTime.py b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_meanTime.py new file mode 100644 index 0000000..c3c2399 --- /dev/null +++ b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_meanTime.py @@ -0,0 +1,125 @@ +import pandas as pd +from functools import reduce +import xml.etree.ElementTree as ET +import os +from collections import Counter +import datetime +import numpy +import time + + +#functions open files XML´s +def read_xml(file): + read = ET.parse(file) + root = read.getroot() + return root + +#Files in to chunks +def chunckify(file,chunks): + for i in range(0,len(file), chunks): + yield file[i:i + chunks] + + + +# Get Score by "PostTypeId == 1" Question + +def score(file): + post_id_stack = file.attrib['PostTypeId'] + if post_id_stack== '1': + post_id = file.attrib['Id'] + post_score = int(file.attrib['Score']) + change_datetime = datetime.datetime.strptime(file.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f') + return post_id, post_score, change_datetime + +# Get score by "PostTypeId == 2" Answer + +def score_2(file): + post_id_stack = file.attrib['PostTypeId'] + if post_id_stack== '2': + post_id_2 = file.attrib['ParentId'] + change_datetime_2 = datetime.datetime.strptime(file.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f') + return post_id_2, change_datetime_2 + +#Functions Map() ==1 +def map_post(data): + map_date_post = list(map(score, data)) + date_count= Counter(map_date_post) + return date_count + + +#Functions Map() ==2 +def map_post_2(data): + map_date_post = list(map(score_2, data)) + date_count= Counter(map_date_post) + return date_count + +# Merge function +def merge_date(D1,D2) : + D1.update(D2) + return D1 + +#Reducer questions +def reduce_date(iter): + reduce_post = reduce(merge_date,iter) + top100 =reduce_post.most_common(101) + df = pd.DataFrame(top100, columns=['date', 'count']) + #delete index 0 + df.drop(df.index[0], inplace=True) + #create columns for tuple date + df['post_id'] = df['date'].apply(lambda x: x[0]) + df['post_score'] = df['date'].apply(lambda x: x[1]) + #order score to ASC + df = df.sort_values(by=['post_score'], ascending=False) + df['change_datetime'] = df['date'].apply(lambda x: x[2]) + #delete column date + df.drop(['date'], axis=1, inplace=True) + return df + +#Reducer Answer +def reduce_date_2(iter): + reduce_post = reduce(merge_date,iter) + top100 =reduce_post.most_common(101) + df2 = pd.DataFrame(top100, columns=['date', 'count']) + #delete index 0 + df2.drop(df2.index[0], inplace=True) + #create columns for tuple date + df2['post_id'] = df2['date'].apply(lambda x: x[0]) + df2['change_datetime'] = df2['date'].apply(lambda x: x[1]) + #delete column date + df2.drop(['date'], axis=1, inplace=True) + #calculate unique valor post_id + return df2 + +# join df and df2 +def join_df(df,df2): + df_join = pd.merge(df, df2, on='post_id', how='inner') + df_join['time_diff'] = df_join['change_datetime_y'] - df_join['change_datetime_x'] + df_join['time_diff'] = df_join['time_diff'].dt.total_seconds() + #calculate means time_diff + mean_time_diff = df_join['time_diff'].mean() + #converter mean_time_diff in hours + mean_time_diff_hours = mean_time_diff/3600 + mean_time_diff_hours = round(mean_time_diff_hours ) + print(f"from the ranking of the 0-100 data by score,\n the average response time is {mean_time_diff_hours} hours") + return join_df + +# create function main +def main(): + start = time.time() + read_file = read_xml("posts.xml") + chunky_data = chunckify(read_file,100) + Map_data = list(map(map_post, chunky_data)) + df = reduce_date(Map_data) + #Answer + read_file = read_xml("posts.xml") + chunky_data = chunckify(read_file,50) + Map_data_2 = list(map(map_post_2, chunky_data)) + df2 =reduce_date_2(Map_data_2) + print(join_df(df,df2)) + end = time.time() + print("Execution time: ", end - start) + +if __name__ == '__main__': + main() + + diff --git a/dev_jeanvitola/Mapreduce_Opt/Mapreduce_top10.py b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_top10.py new file mode 100644 index 0000000..f024241 --- /dev/null +++ b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_top10.py @@ -0,0 +1,91 @@ +import pandas as pd +import time +from functools import reduce +import xml.etree.ElementTree as ET +import os +from collections import Counter +import datetime + +# functions open files XML´s + +start = time.time() +def read_xml(file): + read = ET.parse(file) + root = read.getroot() + return root + +# Files in to chunks + + +def chunckify(file, chunks): + for i in range(0, len(file), chunks): + yield file[i:i + chunks] + +# Convert to datetime and get attrib[date] + + +def date_post(alkemy): + # date in %Y-%m-%dT%H:%M:%S.%f', convert to data in '%Y-%m-%d + change_datetime = datetime.datetime.strptime( + alkemy.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f').strftime('%Y-%m-%d') + return change_datetime + +# Functions Map() + + +def map_post(data): + map_date_post = list(map(date_post, data)) + date_count = Counter(map_date_post) + return date_count + + +# Reduce Function +""" +reduce(D1 --> D2 )"key" + sum "values" +Retorna una lista de los n elementos mas comunes y sus conteos, del mas común al menos común +""" + + +def merge_date(D1, D2): + D1.update(D2) + return D1 + + +def reduce_date(iter): + reduce_post = reduce(merge_date, iter) + top10 = reduce_post.most_common(10) + df = pd.DataFrame(top10, columns=['date', 'count']) + longest = max(len(word) for word, count in top10) + for word, count in top10: + print('{word:<{len}}: {count:5}'.format( + len=longest + 1, + word=word, + count=count) + ) + print("by : Jeanvitola") + # save df to csv + df.to_csv("top10_date.csv", index=False) + +#create main function + + +def main(): + # open files + root = read_xml("posts.xml") + # create chunks + chunks = chunckify(root,50) + # map function + Map_data = list(map(map_post, chunks)) + # reduce function + reduce_date(Map_data ) + # time execution + end = time.time() + print("Execution time: ", end - start) + + + +if __name__ == '__main__': + main() + + + \ No newline at end of file diff --git a/dev_jeanvitola/Mapreduce_Opt/Mapreduce_viewAnswer.py b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_viewAnswer.py new file mode 100644 index 0000000..87c5644 --- /dev/null +++ b/dev_jeanvitola/Mapreduce_Opt/Mapreduce_viewAnswer.py @@ -0,0 +1,116 @@ +import pandas as pd +from functools import reduce +import xml.etree.ElementTree as ET +import os +from collections import Counter +import datetime +import numpy +#functions open files XML´s +def read_xml(file): + read = ET.parse(file) + root = read.getroot() + return root + +#Files in to chunks +def chunckify(file,chunks): + for i in range(0,len(file), chunks): + yield file[i:i + chunks] + +# relation between in viewcount and answerscount + +#Viewcounts + +def view_count(file): + + post_id_stack = file.attrib['PostTypeId'] + if post_id_stack== '1': + try : + views = int(file.attrib['ViewCount']) + except: + None + return views + + +#AnswerCount + +def get_post_views(data): + # Get data views + try: + views = int(data.attrib['AnswerCount']) + except Exception as ex: + return None + + return views + +#Mappers +#Functions Map() == Views +def map_post(data): + map_date_post = list(map(view_count, data)) + date_count= Counter(map_date_post) + return date_count + + +#Functions Map() == Answers +def mapper(data): + views = list(map(get_post_views, data)) + date_count= Counter(views) + return date_count + +# Merge function +def merge_date(D1,D2) : + D1.update(D2) + return D1 + +#create function reduce for viewcount and answercount +#ViewCounts +def reduce_date(iter): + reduce_post = reduce(merge_date,iter) + #create dataframe to reduce_post + df = pd.DataFrame.from_dict(reduce_post, orient='index') + df.reset_index(inplace=True) + #delete NaN and convert int + df.dropna(inplace=True) + df['index'] = df['index'].astype(int) + #rename index to viewcounts + df.rename(columns={'index':'ViewCounts'}, inplace=True) + #delete 0 counts + df = df[df.ViewCounts != 0] + return df + +def reduce_date_2(iter): + reduce_post = reduce(merge_date,iter) + #create dataframe to reduce_post + df2 = pd.DataFrame.from_dict(reduce_post, orient='index') + df2.reset_index(inplace=True) + # delete nan + df2.dropna(inplace=True) + #change name indx to counter_viwes + df2.rename(columns={'index':'counter_views',0:'views'},inplace=True) + #counter views as int + df2['counter_views']=df2['counter_views'].astype(int) + return df2 + +#create function union df1 and df2 +def union_df(df1,df2): + df_union = pd.merge(df1,df2,on='ViewCounts',how='outer') + df_union.dropna(inplace=True) + df_union['counter_views']=df_union['counter_views'].astype(int) + return df_union + +#create function main +def main (): + read_file = read_xml("posts.xml") + chunky_data = chunckify(read_file,50) + Map_data = list(map(map_post, chunky_data)) + df1 = reduce_date(Map_data) + #answercount + chunky_data = chunckify(read_file,50) + body_views = list(map(mapper,chunky_data)) + df2 = reduce_date_2(body_views) + print(df2) + + +#create function to create a new column with the relation between views and answers +if __name__ == '__main__': + main() + \ No newline at end of file diff --git a/dev_jeanvitola/Mapreduce_Opt/Mean_time_after.PNG b/dev_jeanvitola/Mapreduce_Opt/Mean_time_after.PNG new file mode 100644 index 0000000..3ec26fd Binary files /dev/null and b/dev_jeanvitola/Mapreduce_Opt/Mean_time_after.PNG differ diff --git a/dev_jeanvitola/Mapreduce_Opt/Mean_time_before.PNG b/dev_jeanvitola/Mapreduce_Opt/Mean_time_before.PNG new file mode 100644 index 0000000..d1790d4 Binary files /dev/null and b/dev_jeanvitola/Mapreduce_Opt/Mean_time_before.PNG differ diff --git a/dev_jeanvitola/Mapreduce_Opt/time_top10_after.PNG b/dev_jeanvitola/Mapreduce_Opt/time_top10_after.PNG new file mode 100644 index 0000000..ade7bae Binary files /dev/null and b/dev_jeanvitola/Mapreduce_Opt/time_top10_after.PNG differ diff --git a/dev_jeanvitola/Mapreduce_Opt/time_top10_before.PNG b/dev_jeanvitola/Mapreduce_Opt/time_top10_before.PNG new file mode 100644 index 0000000..5ae0871 Binary files /dev/null and b/dev_jeanvitola/Mapreduce_Opt/time_top10_before.PNG differ diff --git a/dev_jeanvitola/Test_function/Mapreduce_Walter102.py b/dev_jeanvitola/Test_function/Mapreduce_Walter102.py new file mode 100644 index 0000000..928bc1f --- /dev/null +++ b/dev_jeanvitola/Test_function/Mapreduce_Walter102.py @@ -0,0 +1,31 @@ +from wl_map_reduce import * + +import pytest + + +data= "posts.xml" + +def test_mapper(): + assert isinstance(mapper(data),list) + + +def test_shuffle_sort(): + lista = mapper(data) + assert isinstance(shuffle_sort(lista), list) + + +def test_reduce(): + lista = mapper(data) + lista2 = shuffle_sort(lista) + assert isinstance(reduce(lista2), list) + + +def test_savetoCSV(): + lista = mapper(data) + lista2 = shuffle_sort(lista) + lista3 = reduce(lista2) + data2 = lista3 + filename = "top10tags.csv" + fields = ["TAG", "COUNT"] + type_data = 1 + assert isinstance(savetoCSV(data2,filename,fields,type_data), data_frame.to_csv(index=False)) diff --git a/dev_jeanvitola/Test_function/Mapreduce_Walter102_Docstring.py b/dev_jeanvitola/Test_function/Mapreduce_Walter102_Docstring.py new file mode 100644 index 0000000..1806f4f --- /dev/null +++ b/dev_jeanvitola/Test_function/Mapreduce_Walter102_Docstring.py @@ -0,0 +1,85 @@ +from wl_map_reduce import * + +import pytest + + +data= "posts.xml" + +""" +La función mapper(), realiza el mapeo de los datos y como resultados retorna +una lista para cada consigna a resolver. + +input : La entrada de la función sera un archivo de formato .xml, donde +se creará un objecto Tree utilizando la libreria xml.etree + +output: Lo que se espera de la función es que identifique diversos tag y +busque de forma iterativa los diversos caracteres de interes que se iran guardando en unas listas +vacias, luego se creará una lista de lista, el resultado esperado es un lista de lista + + +""" +def test_mapper(): + assert isinstance(mapper(data),list) + + + +""" +La función shuffle_sort acomoda y ordena las listas que reciben como parámetro los datos mapeados, + + +input: la entrada que se espera es una lista de listas con los parámetros mapeados en la anterior +función. + +output : Se espera que el resultado sea una lista que contenga un Dataframe, este objeto +contiene el merge de los caracteres de interes. + + +""" + + +def test_shuffle_sort(): + lista = mapper(data) + assert isinstance(shuffle_sort(lista), list) + + + +""" +La fución reduce, reduce la cual El método reduce() ejecuta una función reductora sobre cada elemento de un array, + devolviendo como resultado un único valor + +input : La entrada será una lista con 3 listas, en este caso sería la lista que retorna el mapper + +Output : lo que se espera es una lista que contiene dos listas y una seria de pandas +""" + + +def test_reduce(): + lista = mapper(data) + lista2 = shuffle_sort(lista) + assert isinstance(reduce(lista2), list) + + +""" +La función SavetoCSV fuarda la salida de la función + + input: + data (list | pandas.Serie): los datos reducidos a grabar en el reporte + filename (string): + fields (list): es una lista con los nombres de las columnas del reporte + type_data (int): si es 1, los datos son una lista, si es 2, los datos son una serie de pandas + + output : La salida será un archivo .CSV con las casacteristicas menciondas en el input + + +""" + + +def test_savetoCSV(): + lista = mapper(data) + lista2 = shuffle_sort(lista) + lista3 = reduce(lista2) + data2 = lista3 + filename = "top10tags.csv" + fields = ["TAG", "COUNT"] + type_data = 1 + assert isinstance(savetoCSV(data2,filename,fields,type_data), data_frame.to_csv(index=False)) diff --git a/dev_jeanvitola/Test_function/Mapreduce_walter_110.py b/dev_jeanvitola/Test_function/Mapreduce_walter_110.py new file mode 100644 index 0000000..eb9ebd7 --- /dev/null +++ b/dev_jeanvitola/Test_function/Mapreduce_walter_110.py @@ -0,0 +1,38 @@ +from wl_mapreduce_op import * +import pytest + + + +post = "posts.xml" + + + +def test_chunkify(): + assert isinstance(chunkify(xmlfile = post,number_of_chunks =16),list) + + +def test_mapper() : + lista =chunkify(xmlfile = post,number_of_chunks =16) + assert isinstance(mapper(chunk=lista),list) + + +def test_shuffler(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + assert isinstance(shuffler(mapper = lista2),list) + + +def test_reduce(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + lista3 = shuffler(mapper =lista2) + assert isinstance(reduce(lista =lista3),object) + + +def test_savetoCSV(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + lista3 = shuffler(mapper =lista2) + lista4 = reduce(lista=lista3) + assert isinstance(savetoCSV(data =lista4 ,filename="OP_top10tags.csv",fields=["TAG", "COUNT"]), list) + diff --git a/dev_jeanvitola/Test_function/Mapreduce_walter_110_Docstring.py b/dev_jeanvitola/Test_function/Mapreduce_walter_110_Docstring.py new file mode 100644 index 0000000..a98d7f7 --- /dev/null +++ b/dev_jeanvitola/Test_function/Mapreduce_walter_110_Docstring.py @@ -0,0 +1,91 @@ +from wl_mapreduce_op import * +import pytest + + +post = "posts.xml" + + +""" +Está función llama un archivo de tipo .xml y lo convierte en un objeto para acceder a sus atributos, +la información del archivo .xml se itera con un condicional ==1 para que cumpla la condición de buscar +el elemento "PostType1" con respuesta no aceptada "AcceptedAnswerId" == None,se dividen en 16 chunks. + + +input = la entradas son xmlfile = que sera igual a un archivo .xml en este caso posts.xml, + number_of_chunks = hace referencia a las particiones del archivo que por defecto es de 16 + +output = La salida sera un objeto chunks contenido en una lista + + """ + + +def test_chunkify(): + assert isinstance(chunkify(xmlfile = post,number_of_chunks =16),list) + + + +""" +La función mapper(), realiza el mapeo de los datos y como resultados retorna +una lista para cada consigna a resolver. + +input = La entrada sera una lista chunk con el listado de tags provenientes de la función anterior. + +Output = La salida será una lista que contiene un listado con los tagas, es decir una lista de listas + +""" + +def test_mapper() : + lista =chunkify(xmlfile = post,number_of_chunks =16) + assert isinstance(mapper(chunk=lista),list) + + +""" +La función shuffle pasa la salida de la función mapper(Lista que contine un listado de los tagas) + +Output : La salida sera una lista que agrupa los elementos de los tags en un solo listado + +""" + +def test_shuffler(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + assert isinstance(shuffler(mapper = lista2),list) + + + +""" +La fución reduce ejecuta una función reductora sobre cada elemento de un array, + devolviendo como resultado un único valor + +input : Listado de las ocurrencias de los tags + +Output : La salida sera un objeto que contiene la cantidad del top 10 de los tags +""" + +def test_reduce(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + lista3 = shuffler(mapper =lista2) + assert isinstance(reduce(lista =lista3),object) + +""" +La función SavetoCSV fuarda la salida de la función + + input: + data (list | pandas.Serie): los datos reducidos a grabar en el reporte + filename (string): nombre del archivo csv + fields (list): es una lista con los nombres de las columnas del reporte + + + output : La salida será un archivo .CSV con las casacteristicas menciondas en el input + +""" + + +def test_savetoCSV(): + lista =chunkify(xmlfile = post,number_of_chunks =16) + lista2 = mapper(chunk=lista) + lista3 = shuffler(mapper =lista2) + lista4 = reduce(lista=lista3) + assert isinstance(savetoCSV(data =lista4 ,filename="OP_top10tags.csv",fields=["TAG", "COUNT"]), list) + diff --git a/dev_jeanvitola/Test_function/Pass_function.PNG b/dev_jeanvitola/Test_function/Pass_function.PNG new file mode 100644 index 0000000..c405edc Binary files /dev/null and b/dev_jeanvitola/Test_function/Pass_function.PNG differ diff --git a/dev_jeanvitola/Test_function/pass_function2.PNG b/dev_jeanvitola/Test_function/pass_function2.PNG new file mode 100644 index 0000000..4e05c29 Binary files /dev/null and b/dev_jeanvitola/Test_function/pass_function2.PNG differ