from datetime import datetime
import json
import networkx as nx
import numpy as np
import os
import pandas as pd
import seaborn as sns
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.cluster import KMeans, DBSCAN
from sklearn.neighbors import NearestNeighbors
import warnings
def _check_folder(settings):
selected_dir = settings.get('export_folder')
if selected_dir:
if not os.path.isdir(selected_dir):
raise ValueError("You have selected export directory that doesn't exist ({}). "
"Please select another or create this one".format(selected_dir))
return settings
else:
if not os.path.isdir('./experiments/'):
os.mkdir('./experiments/')
settings['export_folder'] = './experiments/{}'.format(datetime.now())
os.mkdir(settings['export_folder'])
with open(os.path.join(settings['export_folder'], 'settings_{}.json'.format(datetime.now())), 'w') as f:
json.dump(settings, f)
return settings
def _str_agg(x):
return ' '.join(x)
[docs]def prepare_dataset(df, target_events, event_filter=None, n_start_events=None):
"""
Prepares data for classifier inference
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param target_events: name of event which signalize target function
(e.g. for prediction of lost users it'll be `lost`)
:param event_filter: list of events that is wanted to use in analysis
:param n_start_events: length of users trajectory from start
:type df: pd.DataFrame
:type target_events: Union[list, None]
:type event_filter: list or other iterable
:return: prepared data for inference (glued user events in one trajectory)
:rtype: pd.DataFrame
"""
if event_filter is not None:
df = df[~df.event_name.isin(event_filter)]
df = df.sort_values('event_timestamp')
train = df.groupby('user_pseudo_id').event_name.agg(_str_agg)
train = train.reset_index()
if target_events or n_start_events:
train.event_name = train.event_name.apply(lambda x: x.split())
if target_events:
train['target'] = train.event_name.apply(lambda x: x[-1] in target_events)
train.event_name = train.event_name.apply(lambda x: x[:-1])
if n_start_events:
train.event_name = train.event_name.apply(lambda x: ' '.join(x[:n_start_events]))
else:
train.event_name = train.event_name.apply(lambda x: ' '.join(x))
return train
[docs]def get_agg(df, agg_type):
"""
Create aggregates (weights) by time of graph nodes
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param agg_type: type of aggregate, should be written in form `'name' + '_' + aggregate type`
(e.g. `trans_count` where `trans` is the name and `count` is aggragete type).
Aggragate types can be: max, min, mean, median, std, count. For greater list, please,
check the pd.DataFrame.groupby().agg() documentation
:type df: pd.DataFrame
:type agg_type: str
:return: table with aggregates by nodes of graph
:rtype: pd.DataFrame
"""
agg = df.groupby(['event_name', 'next_event'], as_index=False)
agg = agg['time_to_next_event'].agg(agg_type.split('_')[1])
agg.columns = ['event_name', 'next_event', agg_type]
return agg
[docs]def get_shift(df):
"""
Creates `next_event` and `time_to_next_event`
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:type df: pd.DataFrame
:return: source table with additional columns
:rtype: pd.DataFrame
"""
df = df.sort_values(['user_pseudo_id', 'event_timestamp']) \
.reset_index(drop=True)
shift = df.groupby('user_pseudo_id').shift(-1)
df['next_event'] = shift.event_name
df['time_to_next_event'] = (shift.event_timestamp - df.event_timestamp)
df = df[df.next_event.notnull()]
return df
[docs]def get_all_agg(df, agg_list):
"""
Create aggregates (weights) by time of graph nodes from agg_list
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param agg_list: list of needed aggregates, each aggregate should be written in form `'name' + '_' + aggregate type`
(e.g. `trans_count` where `trans` is the name and `count` is aggragete type).
Aggragate types can be: max, min, mean, median, std, count. For greater list, please,
check the pd.DataFrame.groupby().agg() documentation
:return: table with aggregates by nodes of graph
:type df: pd.DataFrame
:type agg_list: List[str]
:rtype: pd.DataFrame
"""
if 'next_event' not in df.columns:
df = get_shift(df)
df_result = get_agg(df, agg_list[0])
for agg in agg_list[1:]:
df_result[agg] = get_agg(df, agg)[agg]
return df_result
[docs]def get_adjacency(df, adj_type):
"""
Creates graph adjacency matrix from table with aggregates by nodes
:param df: table with aggregates (from retentioneering.analysis.get_all_agg function)
:param adj_type: name of col for weighting graph nodes (column name from df)
:return: adjacency matrix
:type df: pd.DataFrame
:type adj_type: str
:rtype: pd.DataFrame
"""
df = df.copy()
event_set = set(df.event_name.unique())
event_set.update(df.next_event.unique())
event_num = len(event_set)
event_to_id = dict(zip(event_set, np.arange(event_num)))
df.event_name = df.event_name.apply(event_to_id.get)
df.next_event = df.next_event.apply(event_to_id.get)
if 'count' in adj_type:
adj = np.zeros((event_num, event_num))
else:
adj = -np.ones((event_num, event_num))
for i in df.iterrows():
adj[int(i[1].event_name), int(i[1].next_event)] = i[1][adj_type]
names = sorted(event_to_id, key=event_to_id.get)
adj = pd.DataFrame(adj, columns=names, index=names)
adj = adj.round(2)
return adj
[docs]def get_accums(agg, name, max_rank):
"""
Creates Accumulator Variables
:param agg: Counts of events by step
:param name: Name of Accumulator
:param max_rank: Number of steps in pivot
:return: Accumulator Variable
"""
lost = pd.DataFrame([[0, 0]], columns=['event_rank', 'freq']) \
.append(agg.loc[agg.event_name == name, ['event_rank', 'freq']])
lost['event_rank'] += 1
lost = lost.sort_values('event_rank')
missed_rows = []
k = 0
for i, row in enumerate(lost.itertuples()):
for j in range(row.event_rank - (i + 1) - k):
missed_rows.append([i + k + 1, 0])
k += 1
lost = lost.append(pd.DataFrame(missed_rows, columns=['event_rank', 'freq']))
lost = lost.sort_values('event_rank')
lost.freq = lost.freq.cumsum()
while lost['event_rank'].max() < max_rank:
lost = lost.append(
pd.DataFrame([[lost['event_rank'].iloc[-1] + 1, lost.freq.iloc[-1]]], columns=['event_rank', 'freq']),
sort=True
)
lost['event_name'] = 'Accumulated ' + name.capitalize()
return lost
[docs]def get_desc_table(df, settings, target_event_list=list(['lost', 'passed']), max_steps=None, plot=True, plot_name=None):
"""
Builds distribution of events over steps
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param settings: experiment config (can be empty dict here)
:param target_event_list: list of target events
:param max_steps:
:param plot: if True then heatmap is plotted
:param plot_name:
:type df: pd.DataFrame
:type settings: dict
:type target_event_list: list
:type max_steps: int
:type plot: bool
:type plot_name: str
:return: Pivot table with distribution of events over steps
:rtype: pd.DataFrame
"""
# create ranks and count
df = df.sort_values(['user_pseudo_id', 'event_timestamp']).copy()
df['event_rank'] = 1
df['event_rank'] = df.groupby('user_pseudo_id')['event_rank'].cumsum()
if max_steps:
df = df.loc[df['event_rank'] <= max_steps, :]
agg = df.groupby(['event_rank', 'event_name'], as_index=False)['user_pseudo_id'].count()
agg.columns = ['event_rank', 'event_name', 'freq']
tot_cnt = agg[agg['event_rank'] == 1].freq.sum()
# add accumulated rows
max_rank = agg['event_rank'].max() + 1
for i in target_event_list:
agg = agg.append(get_accums(agg, i, max_rank), sort=True)
# build pivot
agg['freq'] = agg['freq'] / tot_cnt
piv = agg.pivot(index='event_name', columns='event_rank', values='freq').fillna(0)
piv.columns.name = None
piv.index.name = None
piv = piv.round(2)
if max_steps:
piv = piv.T[piv.columns <= max_steps].T
if plot:
# create folder for experiment if doesn't exists
settings = _check_folder(settings)
export_folder = settings['export_folder']
sns.mpl.pyplot.figure(figsize=(20, 10))
heatmap = sns.heatmap(piv, annot=True, cmap="YlGnBu")
if plot_name:
filename = os.path.join(export_folder, 'desc_table_{}.png'.format(plot_name))
else:
filename = os.path.join(export_folder, 'desc_table_{}.png'.format(datetime.now()))
heatmap.get_figure().savefig(filename)
return piv
[docs]def get_diff(df_old, df_new, settings, precalc=False, plot=True, plot_name=None):
"""
Gets difference between two groups
:param df_old: Raw clickstream or calculated desc table of last version
:param df_new: Raw clickstream or calculated desc table of new version
:param settings: experiment config (can be empty dict here)
:param precalc: If True then precalculated desc tables is used
:param plot: if True then heatmap is plotted
:param plot_name:
:type df_old: pd.DataFrame
:type df_new: pd.DataFrame
:type settings: dict
:type precalc: bool
:type plot: bool
:type plot_name: str
:return: Table of differences between two versions
:rtype: pd.DataFrame
"""
if precalc:
desc_new = df_new
desc_old = df_old
else:
desc_old = get_desc_table(df_old, settings, plot=False)
desc_new = get_desc_table(df_new, settings, plot=False)
old_id = set(desc_old.index)
new_id = set(desc_new.index)
if old_id != new_id:
for idx in new_id - old_id:
row = pd.Series([0] * desc_old.shape[1], name=idx)
row.index += 1
desc_old = desc_old.append(row, sort=True)
for idx in old_id - new_id:
row = pd.Series([0] * desc_new.shape[1], name=idx)
row.index += 1
desc_new = desc_new.append(row, sort=True)
max_old = desc_old.shape[1]
max_new = desc_new.shape[1]
if max_old < max_new:
for i in range(max_old, max_new + 1):
desc_old[i] = desc_old[i - 1]
elif max_old > max_new:
for i in range(max_new, max_old + 1):
desc_new[i] = desc_new[i - 1]
diff = desc_new - desc_old
diff = diff.sort_index(axis=1)
if plot:
settings = _check_folder(settings)
export_folder = settings['export_folder']
sns.mpl.pyplot.figure(figsize=(20, 10))
heatmap = sns.heatmap(diff, annot=True, cmap="YlGnBu")
if plot_name:
filename = os.path.join(export_folder, 'desc_table_{}.png'.format(plot_name))
else:
filename = os.path.join(export_folder, 'desc_table_{}.png'.format(datetime.now()))
heatmap.get_figure().savefig(filename)
return diff
[docs]def plot_graph_python(df_agg, agg_type, settings, layout=nx.random_layout, plot_name=None):
"""
Visualize trajectories from aggregated tables (with python)
:param df_agg: table with aggregates (from retentioneering.analysis.get_all_agg function)
:param agg_type: name of col for weighting graph nodes (column name from df)
:param settings: experiment config (can be empty dict here)
:param layout:
:param plot_name: name of file with graph plot
:type df_agg: pd.DataFrame
:type agg_type: str
:type settings: dict
:type layout: func
:type plot_name: str
:return: None
"""
warnings.warn('Please use retentioneering.visulization.plot.plot_graph instead', DeprecationWarning)
edges = df_agg.loc[:, ['event_name', 'next_event', agg_type]]
G = nx.DiGraph()
G.add_weighted_edges_from(edges.values)
width = [G.get_edge_data(i, j)['weight'] for i, j in G.edges()]
width = np.array(width)
if len(np.unique(width)) != 1:
width = (width - width.min()) / (np.mean(width) - width.min())
width *= 2
width = np.where(width > 15, 15, width)
width = np.where(width < 2, 2, width)
else:
width = width * 3 / max(width)
pos = layout(G, seed=2)
f = sns.mpl.pyplot.figure(figsize=(20, 10))
nx.draw_networkx_edges(G, pos, edge_color='b', alpha=0.2, width=width)
nx.draw_networkx_nodes(G, pos, node_color='b', alpha=0.3)
pos = {k: [pos[k][0], pos[k][1] + 0.03] for k in pos.keys()}
nx.draw_networkx_labels(G, pos, node_color='b', font_size=16)
sns.mpl.pyplot.axis('off')
settings = _check_folder(settings)
export_folder = settings['export_folder']
if plot_name:
filename = os.path.join(export_folder, 'graphvis_{}.png'.format(plot_name))
else:
filename = os.path.join(export_folder, 'graphvis_{}.png'.format(datetime.now()))
f.savefig(filename)
[docs]def plot_frequency_map(df, settings, target_events=list(['lost', 'passed']), plot_name=None):
"""
Plots frequency histogram and heatmap of users` event count
:param df: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param settings: experiment config (can be empty dict here)
:param target_events: name of event which signalize target function
(e.g. for prediction of lost users it'll be `lost`)
:param plot_name: name of file with graph plot
:return: table with counts of events for users
:type df: pd.DataFrame
:type settings: dict
:type target_events: List[str]
:type plot_name: str
:rtype: pd.DataFrame
"""
users = df.user_pseudo_id[df.event_name.isin(target_events)].unique()
df = df[df.user_pseudo_id.isin(users)]
data = prepare_dataset(df, '')
cv = CountVectorizer()
x = cv.fit_transform(data.event_name.values).todense()
cols = cv.inverse_transform(np.ones(df.event_name.nunique() - len(target_events)))[0]
x = pd.DataFrame(x, columns=cols, index=data.user_pseudo_id)
nodes_hist = df.groupby('event_name',
as_index=False).event_timestamp.count().sort_values('event_timestamp',
ascending=False)
nodes_hist.event_name = nodes_hist.event_name.apply(lambda x: x.lower())
sorted_cols = nodes_hist.event_name[~nodes_hist.event_name.isin(target_events)].values
x = x.loc[:, sorted_cols]
sns.mpl.pyplot.figure(figsize=[8, 5])
bar = sns.barplot(nodes_hist.event_name.values, nodes_hist.event_timestamp.values, palette='YlGnBu')
bar.set_xticklabels(bar.get_xticklabels(), rotation=90)
settings = _check_folder(settings)
export_folder = settings['export_folder']
if plot_name:
barname = os.path.join(export_folder, 'bar_{}.png'.format(plot_name))
heatname = os.path.join(export_folder, 'countmap_{}.png'.format(plot_name))
else:
barname = os.path.join(export_folder, 'bar_{}.png'.format(datetime.now()))
heatname = os.path.join(export_folder, 'countmap_{}.png'.format(datetime.now()))
bar.get_figure().savefig(barname)
sns.mpl.pyplot.figure(figsize=[10, 15])
heatmap = sns.heatmap(x.values, cmap="YlGnBu")
heatmap.get_figure().savefig(heatname)
return x
[docs]def plot_clusters(data, countmap, target_events=list(['lost', 'passed']), n_clusters=None, plot_cnt=2, width=10,
height=5):
"""
Plot pie-chart with distribution of target events in clusters
:param data: data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:param countmap: result of retentioneering.analysis.utils.plot_frequency_map
:param target_events: name of event which signalize target function
(e.g. for prediction of lost users it'll be `lost`)
:param n_clusters: supposed number of clusters
:param plot_cnt: number of plots for output
:param width: width of plot
:param height: height of plot
:type data: pd.DataFrame
:type countmap: pd.DataFrame
:type target_events: List[str]
:type n_clusters: int
:type plot_cnt: int
:type width: float
:type height: float
:return: None
"""
if n_clusters:
clusterer = KMeans(n_clusters=n_clusters)
else:
nn = NearestNeighbors(metric='cosine')
nn.fit(countmap.values)
dists = nn.kneighbors(countmap.values, 2)[0][:, 1]
eps = np.percentile(dists, 99)
clusterer = DBSCAN(eps=eps, metric='cosine')
clusters = clusterer.fit_predict(countmap)
cl = pd.DataFrame(clusters, columns=['cluster'])
cl['c'] = 1
main_classes = cl.groupby('cluster',
as_index=False).count().sort_values('c',
ascending=False).cluster.iloc[:plot_cnt].values
groups = []
for i in main_classes:
groups.append(countmap.index[clusters == i].values)
sizes = []
for group in groups:
tmp = data[data.user_pseudo_id.isin(group)]
sz = []
for event in target_events:
sz.append(tmp[tmp.event_name == event].user_pseudo_id.nunique())
sizes.append(sz)
fig, ax = sns.mpl.pyplot.subplots(1 if plot_cnt <= 2 else (plot_cnt // 2 + 1), 2)
fig.set_size_inches(width, height)
for i, j in enumerate(sizes):
if plot_cnt <= 2:
ax[i].pie(j, labels=['lost', 'passed'], autopct='%1.1f%%')
ax[i].set_title('Class {}'.format(i))
else:
ax[i // 2][i % 2].pie(j, labels=['lost', 'passed'], autopct='%1.1f%%')
ax[i // 2][i % 2].set_title('Class {}'.format(i))
[docs]def filter_welcome(df):
"""
Filter for truncated welcome visualization
:param df:data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:type df: pd.DataFrame
:return: filtered for users events dataset
:rtype: pd.DataFrame
"""
passed = df[df.event_name.apply(lambda x: str(x).split('_')[0] in [u'newFlight', u'feed', u'tabbar', u'myFlights'])]
myfligts = df[(df.event_name == 'screen_view') & (df.event_params_value_string_value == 'myFlights')]
myfligts = myfligts.append(passed, ignore_index=False, sort=False)
myfligts = myfligts.sort_values(['user_pseudo_id', 'event_timestamp'])
passed = myfligts.groupby('user_pseudo_id').head(1)
passed.event_name = 'passed'
lost = df[~df.user_pseudo_id.isin(passed.user_pseudo_id)].groupby('user_pseudo_id').tail(1).copy()
lost.event_name = 'lost'
lost.event_timestamp -= 1
df = df.append(lost.append(passed, sort=False), sort=False)
return prepare_prunned(df)
[docs]def prepare_prunned(df):
"""
Filter for truncated welcome visualization
:param df:data from BQ or your own (clickstream). Should have at least three columns: `event_name`,
`event_timestamp` and `user_pseudo_id`
:type df: pd.DataFrame
:return: filtered for users events dataset
:rtype: pd.DataFrame
"""
welcome2event = {
'1': "wel",
'2': "push",
'3': "sub",
'4': "wallet",
'5': "import",
'6': "cal"
}
rename_dict = {
'inactive_users': 'inactive',
'new_users': 'newUsers',
'lost': 'lostUser'
}
wel = df[df['event_name'] == 'welcome_see_screen'].copy()
wel.event_name = wel.event_params_value_string_value.apply(welcome2event.get)
wel = wel[wel.event_name.notnull()]
wel = wel.append(df[df.event_name.isin(['lost', 'passed'])], ignore_index=True, sort=False)
wel = wel.sort_values(['user_pseudo_id', 'event_timestamp']).reset_index(drop=True)
first = wel.drop_duplicates('user_pseudo_id', keep='first')
users = first.user_pseudo_id[first.event_name.isin(['wel', 'passed'])].values
first = first[first.user_pseudo_id.isin(users)]
wel = wel[wel.user_pseudo_id.isin(users)]
first.event_name = 'new_users'
first.event_timestamp -= 1
wel = wel.append(first, ignore_index=True, sort=False)
for i in set(wel.event_name.unique()):
if i not in rename_dict:
rename_dict.update({i: i})
wel.event_name = wel.event_name.apply(rename_dict.get)
return wel