Spaces:
Runtime error
Runtime error
from keras.layers import Input, Dense, Flatten | |
from keras.models import Model | |
from Database import Database | |
import numpy as np, json | |
import matplotlib.pyplot as plt | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import mean_squared_error | |
from dotenv import dotenv_values | |
import pandas as pd | |
# from tensorflow.python.ops.confusion_matrix import confusion_matrix | |
from sklearn.metrics import precision_recall_fscore_support | |
class Autoencoder: | |
def __get_autoencoder(self, input_dim) -> Model: | |
input_shape = (input_dim,) | |
input_layer = Input(shape=input_shape) | |
# Encoder layers | |
encoder = Flatten()(input_layer) | |
encoder = Dense(128, activation='relu')(encoder) | |
encoder = Dense(64, activation='relu')(encoder) | |
# encoder = Dense(32, activation='relu')(encoder) | |
# Decoder layers | |
# decoder = Dense(64, activation='relu')(encoder) | |
decoder = Dense(128, activation='relu')(encoder) #decoder | |
decoder = Dense(input_dim, activation='sigmoid')(decoder) | |
# Autoencoder model | |
autoencoder = Model(inputs=input_layer, outputs=decoder) | |
# autoencoder.compile(optimizer='adam', loss='binary_crossentropy') | |
autoencoder.compile(optimizer='adam', loss='mse') | |
return autoencoder | |
def __print_summary(self, model: Model): | |
print(model.summary()) | |
return | |
def __fit_autoencoder(self,epochs,batch_size,model: Model, train_var,valid_var=None): | |
history = model.fit(train_var,train_var, | |
# validation_data=(valid_var,valid_var), | |
epochs=epochs,batch_size=batch_size) | |
return history, model | |
def __split_train_test_val(self, data): | |
train_array, test_array = train_test_split(data,test_size=0.2,random_state=42) | |
train_array, valid_array = train_test_split(train_array,test_size=0.1,random_state=42) | |
return train_array, valid_array, test_array | |
def __compute_metrics(conf_matrix): | |
precision = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[0][1]) | |
if precision==1: | |
print(conf_matrix) | |
recall = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[1][0]) | |
f1 = (2 * precision * recall) / (precision + recall) | |
# print("precision: " + str(precision) + ", recall: " + str(recall) + ", f1: " + str(f1)) | |
return precision, recall, f1 | |
def __find_optimal_modified(self,error_df: pd.DataFrame, steps=50): | |
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max() | |
optimal_threshold = (min_error+max_error)/2 | |
y_pred = [0 if e > optimal_threshold else 1 for e in error_df.Reconstruction_error.values] | |
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro') | |
return optimal_threshold, precision, recall, f1 | |
def __find_optimal(self,error_df: pd.DataFrame, steps=50): | |
min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max() | |
optimal_threshold = min_error | |
max_f1 = 0 | |
max_pr = 0 | |
max_re = 0 | |
# step_value = (max_error-min_error)/(steps - 1) | |
for threshold in np.arange(min_error, max_error, 0.005): | |
# print("Threshold: " + str(threshold)) | |
# y_pred = [1 if e > threshold else 0 for e in error_df.Reconstruction_error.values] | |
y_pred = [0 if e > threshold else 1 for e in error_df.Reconstruction_error.values] | |
# conf_matrix = confusion_matrix(error_df.True_class, y_pred) | |
# precision, recall, f1 = self.__compute_metrics(conf_matrix) | |
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro') | |
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='micro') | |
# precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='weighted') | |
precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='binary') | |
if f1 > max_f1: | |
max_f1 = f1 | |
optimal_threshold = threshold | |
max_pr = precision | |
max_re = recall | |
print(f"Result optimal_threshold={optimal_threshold}, max_precision={max_pr}, max_recall={max_re}, max_f1={max_f1}") | |
# return optimal_threshold, max_pr.numpy(), max_re.numpy(), max_f1.numpy() | |
return optimal_threshold, max_pr, max_re, max_f1 | |
def __split_by_percent(data,percent): | |
return train_test_split(data,test_size=0.3,random_state=42) | |
def train_autoencoder(self): | |
#GraphCodeBERT | |
autoencoder = self.__get_autoencoder(768) | |
self.__print_summary(autoencoder) | |
#Create Dataset df | |
df = pd.DataFrame(columns=['Embedding','True_class']) | |
#DB | |
db = Database(dotenv_values(".env")['COLLECTION_NAME']) | |
# embeddings_list = [emb["embedding"] for emb in list(db.find_docs({"refactoring_type":"Extract Method"}))] | |
pos_emb_list, neg_emb_list = [],[] | |
for doc in list(db.find_docs({"refactoring_type":"Extract Method"})): | |
pos_emb_list.append(doc['embedding_pos']) | |
neg_emb_list.append(doc['embedding_neg']) | |
pos_emb_list_train, pos_emb_list_test = self.__split_by_percent(pos_emb_list,0.3) | |
_, neg_emb_list_test = self.__split_by_percent(neg_emb_list,0.3) | |
x_train = np.array(pos_emb_list_train) | |
x_test = np.array(pos_emb_list_test+neg_emb_list_test) | |
y_test = np.array([1 for i in range(0,len(pos_emb_list_test))]+[0 for i in range(0,len(neg_emb_list_test))]) | |
# print(np.array(pos_emb_list_train).shape) | |
epoch = 25 | |
history, trained_model = self.__fit_autoencoder(epoch,32,autoencoder,x_train) | |
trained_model.save('./results/autoencoder_'+str(epoch)+'.hdf5') | |
#Test | |
test_predict = trained_model.predict(x_test) | |
mse = np.mean(np.power(x_test - test_predict, 2), axis=1) | |
error_df = pd.DataFrame({'Reconstruction_error': mse, | |
'True_class': y_test}) | |
print("Max: ", error_df["Reconstruction_error"].max()) | |
print("Min: ", error_df["Reconstruction_error"].min()) | |
# optimal_threshold, precision, recall, f1 = self.__find_optimal(error_df,100) | |
optimal_threshold, precision, recall, f1 = self.__find_optimal_modified(error_df,100) | |
print(f"Result optimal_threshold={optimal_threshold}, max_precision={precision}, max_recall={recall}, max_f1={f1}") | |
metrics = { | |
"Threshold":optimal_threshold, | |
"Precision": precision, | |
"Recall":recall, | |
"F1":f1 | |
} | |
with open('./results/metrics.json','w') as fp: | |
json.dump(metrics,fp) | |
plt.plot(history.history['loss']) | |
plt.savefig("./results/training_graph.png") | |
if __name__=="__main__": | |
Autoencoder().train_autoencoder() |