Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dev_jeanvitola/Hadoop/Environment Setup.pdf
Binary file not shown.
125 changes: 125 additions & 0 deletions dev_jeanvitola/Mapreduce_Opt/Mapreduce_meanTime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import pandas as pd
from functools import reduce
import xml.etree.ElementTree as ET
import os
from collections import Counter
import datetime
import numpy
import time


#functions open files XML´s
def read_xml(file):
read = ET.parse(file)
root = read.getroot()
return root

#Files in to chunks
def chunckify(file,chunks):
for i in range(0,len(file), chunks):
yield file[i:i + chunks]



# Get Score by "PostTypeId == 1" Question

def score(file):
post_id_stack = file.attrib['PostTypeId']
if post_id_stack== '1':
post_id = file.attrib['Id']
post_score = int(file.attrib['Score'])
change_datetime = datetime.datetime.strptime(file.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f')
return post_id, post_score, change_datetime

# Get score by "PostTypeId == 2" Answer

def score_2(file):
post_id_stack = file.attrib['PostTypeId']
if post_id_stack== '2':
post_id_2 = file.attrib['ParentId']
change_datetime_2 = datetime.datetime.strptime(file.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f')
return post_id_2, change_datetime_2

#Functions Map() ==1
def map_post(data):
map_date_post = list(map(score, data))
date_count= Counter(map_date_post)
return date_count


#Functions Map() ==2
def map_post_2(data):
map_date_post = list(map(score_2, data))
date_count= Counter(map_date_post)
return date_count

# Merge function
def merge_date(D1,D2) :
D1.update(D2)
return D1

#Reducer questions
def reduce_date(iter):
reduce_post = reduce(merge_date,iter)
top100 =reduce_post.most_common(101)
df = pd.DataFrame(top100, columns=['date', 'count'])
#delete index 0
df.drop(df.index[0], inplace=True)
#create columns for tuple date
df['post_id'] = df['date'].apply(lambda x: x[0])
df['post_score'] = df['date'].apply(lambda x: x[1])
#order score to ASC
df = df.sort_values(by=['post_score'], ascending=False)
df['change_datetime'] = df['date'].apply(lambda x: x[2])
#delete column date
df.drop(['date'], axis=1, inplace=True)
return df

#Reducer Answer
def reduce_date_2(iter):
reduce_post = reduce(merge_date,iter)
top100 =reduce_post.most_common(101)
df2 = pd.DataFrame(top100, columns=['date', 'count'])
#delete index 0
df2.drop(df2.index[0], inplace=True)
#create columns for tuple date
df2['post_id'] = df2['date'].apply(lambda x: x[0])
df2['change_datetime'] = df2['date'].apply(lambda x: x[1])
#delete column date
df2.drop(['date'], axis=1, inplace=True)
#calculate unique valor post_id
return df2

# join df and df2
def join_df(df,df2):
df_join = pd.merge(df, df2, on='post_id', how='inner')
df_join['time_diff'] = df_join['change_datetime_y'] - df_join['change_datetime_x']
df_join['time_diff'] = df_join['time_diff'].dt.total_seconds()
#calculate means time_diff
mean_time_diff = df_join['time_diff'].mean()
#converter mean_time_diff in hours
mean_time_diff_hours = mean_time_diff/3600
mean_time_diff_hours = round(mean_time_diff_hours )
print(f"from the ranking of the 0-100 data by score,\n the average response time is {mean_time_diff_hours} hours")
return join_df

# create function main
def main():
start = time.time()
read_file = read_xml("posts.xml")
chunky_data = chunckify(read_file,100)
Map_data = list(map(map_post, chunky_data))
df = reduce_date(Map_data)
#Answer
read_file = read_xml("posts.xml")
chunky_data = chunckify(read_file,50)
Map_data_2 = list(map(map_post_2, chunky_data))
df2 =reduce_date_2(Map_data_2)
print(join_df(df,df2))
end = time.time()
print("Execution time: ", end - start)

if __name__ == '__main__':
main()


91 changes: 91 additions & 0 deletions dev_jeanvitola/Mapreduce_Opt/Mapreduce_top10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pandas as pd
import time
from functools import reduce
import xml.etree.ElementTree as ET
import os
from collections import Counter
import datetime

# functions open files XML´s

start = time.time()
def read_xml(file):
read = ET.parse(file)
root = read.getroot()
return root

# Files in to chunks


def chunckify(file, chunks):
for i in range(0, len(file), chunks):
yield file[i:i + chunks]

# Convert to datetime and get attrib[date]


def date_post(alkemy):
# date in %Y-%m-%dT%H:%M:%S.%f', convert to data in '%Y-%m-%d
change_datetime = datetime.datetime.strptime(
alkemy.attrib["CreationDate"], '%Y-%m-%dT%H:%M:%S.%f').strftime('%Y-%m-%d')
return change_datetime

# Functions Map()


def map_post(data):
map_date_post = list(map(date_post, data))
date_count = Counter(map_date_post)
return date_count


# Reduce Function
"""
reduce(D1 --> D2 )"key" + sum "values"
Retorna una lista de los n elementos mas comunes y sus conteos, del mas común al menos común
"""


def merge_date(D1, D2):
D1.update(D2)
return D1


def reduce_date(iter):
reduce_post = reduce(merge_date, iter)
top10 = reduce_post.most_common(10)
df = pd.DataFrame(top10, columns=['date', 'count'])
longest = max(len(word) for word, count in top10)
for word, count in top10:
print('{word:<{len}}: {count:5}'.format(
len=longest + 1,
word=word,
count=count)
)
print("by : Jeanvitola")
# save df to csv
df.to_csv("top10_date.csv", index=False)

#create main function


def main():
# open files
root = read_xml("posts.xml")
# create chunks
chunks = chunckify(root,50)
# map function
Map_data = list(map(map_post, chunks))
# reduce function
reduce_date(Map_data )
# time execution
end = time.time()
print("Execution time: ", end - start)



if __name__ == '__main__':
main()



116 changes: 116 additions & 0 deletions dev_jeanvitola/Mapreduce_Opt/Mapreduce_viewAnswer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pandas as pd
from functools import reduce
import xml.etree.ElementTree as ET
import os
from collections import Counter
import datetime
import numpy
#functions open files XML´s
def read_xml(file):
read = ET.parse(file)
root = read.getroot()
return root

#Files in to chunks
def chunckify(file,chunks):
for i in range(0,len(file), chunks):
yield file[i:i + chunks]

# relation between in viewcount and answerscount

#Viewcounts

def view_count(file):

post_id_stack = file.attrib['PostTypeId']
if post_id_stack== '1':
try :
views = int(file.attrib['ViewCount'])
except:
None
return views


#AnswerCount

def get_post_views(data):
# Get data views
try:
views = int(data.attrib['AnswerCount'])
except Exception as ex:
return None

return views

#Mappers
#Functions Map() == Views
def map_post(data):
map_date_post = list(map(view_count, data))
date_count= Counter(map_date_post)
return date_count


#Functions Map() == Answers
def mapper(data):
views = list(map(get_post_views, data))
date_count= Counter(views)
return date_count

# Merge function
def merge_date(D1,D2) :
D1.update(D2)
return D1

#create function reduce for viewcount and answercount
#ViewCounts
def reduce_date(iter):
reduce_post = reduce(merge_date,iter)
#create dataframe to reduce_post
df = pd.DataFrame.from_dict(reduce_post, orient='index')
df.reset_index(inplace=True)
#delete NaN and convert int
df.dropna(inplace=True)
df['index'] = df['index'].astype(int)
#rename index to viewcounts
df.rename(columns={'index':'ViewCounts'}, inplace=True)
#delete 0 counts
df = df[df.ViewCounts != 0]
return df

def reduce_date_2(iter):
reduce_post = reduce(merge_date,iter)
#create dataframe to reduce_post
df2 = pd.DataFrame.from_dict(reduce_post, orient='index')
df2.reset_index(inplace=True)
# delete nan
df2.dropna(inplace=True)
#change name indx to counter_viwes
df2.rename(columns={'index':'counter_views',0:'views'},inplace=True)
#counter views as int
df2['counter_views']=df2['counter_views'].astype(int)
return df2

#create function union df1 and df2
def union_df(df1,df2):
df_union = pd.merge(df1,df2,on='ViewCounts',how='outer')
df_union.dropna(inplace=True)
df_union['counter_views']=df_union['counter_views'].astype(int)
return df_union

#create function main
def main ():
read_file = read_xml("posts.xml")
chunky_data = chunckify(read_file,50)
Map_data = list(map(map_post, chunky_data))
df1 = reduce_date(Map_data)
#answercount
chunky_data = chunckify(read_file,50)
body_views = list(map(mapper,chunky_data))
df2 = reduce_date_2(body_views)
print(df2)


#create function to create a new column with the relation between views and answers
if __name__ == '__main__':
main()

Binary file added dev_jeanvitola/Mapreduce_Opt/Mean_time_after.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added dev_jeanvitola/Mapreduce_Opt/Mean_time_before.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 31 additions & 0 deletions dev_jeanvitola/Test_function/Mapreduce_Walter102.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from wl_map_reduce import *

import pytest


data= "posts.xml"

def test_mapper():
assert isinstance(mapper(data),list)


def test_shuffle_sort():
lista = mapper(data)
assert isinstance(shuffle_sort(lista), list)


def test_reduce():
lista = mapper(data)
lista2 = shuffle_sort(lista)
assert isinstance(reduce(lista2), list)


def test_savetoCSV():
lista = mapper(data)
lista2 = shuffle_sort(lista)
lista3 = reduce(lista2)
data2 = lista3
filename = "top10tags.csv"
fields = ["TAG", "COUNT"]
type_data = 1
assert isinstance(savetoCSV(data2,filename,fields,type_data), data_frame.to_csv(index=False))
Loading