from datetime import datetime
import logging
import pandas as pd
from tqdm import tqdm
import warnings
from retentioneering.utils import queries
logger = logging.getLogger()
[docs]def download_table(client, dataset_id, table_id):
"""
Download table from bigquery
:param client: bigquery client
:param dataset_id: target dataset id
:param table_id: target table id
:type client: bigquery.Client()
:type dataset_id: str
:type table_id: str
:return: pd.DataFrame
"""
target_table = client.dataset(dataset_id).table(table_id)
table = client.get_table(target_table)
rows = client.list_rows(target_table, selected_fields=table.schema)
data = rows.to_dataframe()
return data
[docs]def run_query(client, query, job_config=None, group_name=None, return_dataframe=True, return_only_query=False,
hide_progress_bar=False, progress_bar_min_interval=4, **params):
"""
Run a query in bigquery and download results
:param client: bigquery client
:param query: query to run (it could be string with params)
:param job_config: bigquery client job config
:param group_name: add new column 'group_name' with value
:param return_dataframe: if is true then data will be returned as pd.DataFrame, list otherwise
:param return_only_query: return only query string without running
:param hide_progress_bar: hide tqdm progress bar
:param progress_bar_min_interval: min interval of tqdm progress bar in seconds
:param **params: options to pass in query.format function
:type client: bigquery.Client()
:type query: str
:type job_config: bigquery.QueryJobConfig()
:type group_name: str or None
:type return_dataframe: bool
:type return_only_query: bool
:type hide_progress_bar: bool
:type progress_bar_min_interval: int
:type **params: keywords
:return: list or pd.DataFrame
"""
start = datetime.now()
query = query.format(**params)
if return_only_query:
return query
query_results = client.query(query, job_config=job_config)
rows = query_results.result()
total_time = datetime.now() - start
logging.info('Query complete in {} seconds\n'.format(total_time.total_seconds()))
dest_table = client.get_table(job_config.destination)
result = []
for i, row in tqdm(enumerate(rows), mininterval=progress_bar_min_interval, total=dest_table.num_rows,
disable=hide_progress_bar):
items = list(row.values())
if group_name is not None:
items += [group_name]
result.append(items)
if return_dataframe:
col_names = [col.name for col in dest_table.schema]
if group_name is not None:
col_names += ['group_name']
result = pd.DataFrame(result, columns=col_names)
return result
def _prepare_event_filter_query(event_names=None, table_with_events=None):
if table_with_events is None:
if event_names is None:
return ''
is_in = len(event_names) > 1
filter_query = '\'' + '\',\''.join(event_names) + '\''
if is_in:
filter_query = 'WHERE event_name IN (' + filter_query + ')'
else:
filter_query = 'WHERE event_name = ' + filter_query
else:
filter_query = "WHERE event_name IN (SELECT string_field_0 FROM {})".format(table_with_events)
return filter_query
def _prepare_app_version_filter_query(app_version=None, is_not_first=True):
if app_version is None:
return ''
app_version_filter = "app_info.version = '{}'".format(app_version)
if is_not_first:
app_version_filter = 'AND ' + app_version_filter
else:
app_version_filter = 'WHERE ' + app_version_filter
return app_version_filter
def _prepare_drop_duplicates_flag(drop_duplicates=None):
if drop_duplicates is None:
return ''
res = ", ROW_NUMBER() OVER (PARTITION BY {}) AS row_n".format(','.join(['tbl1.' + col for col in drop_duplicates]))
return res
[docs]def download_events_multi(client, job_config, settings=None, return_only_query=False, **kwargs):
"""
Generate queries from settings, run them in bigquery and download results.
:param client: bigquery client
:param job_config: bigquery client job config
:param settings: settings dict
:param return_only_query: return only query string for all queries without running
:param **kwargs: options to pass in download_events function
:type client: bigquery.Client()
:type job_config: bigquery.QueryJobConfig()
:type settings: dict
:type return_only_query: bool
:type **kwargs: keywords
:return: pd.DataFrame or list
"""
df = pd.DataFrame()
res = []
if settings is not None:
for settings_name, settings_config in settings['sql'].items():
if return_only_query:
res.append(download_events(client=client, job_config=job_config, return_dataframe=True,
settings=settings_config, group_name=settings_name,
return_only_query=return_only_query, **kwargs))
else:
job_config.write_disposition = "WRITE_TRUNCATE"
job_config.destination = client.dataset(settings_config['destination_table']['dataset']) \
.table(settings_config['destination_table']['table'])
df = df.append(
download_events(client=client, job_config=job_config, return_dataframe=True,
settings=settings_config, group_name=settings_name, **kwargs), sort=False)
if return_only_query:
return res
return df
[docs]def download_events(client, job_config, user_filter_event_names=None, user_filter_event_table=None, dates_users=None,
users_app_version=None, event_filter_event_names=None, event_filter_event_table=None,
dates_events=None, events_app_version=None, count_events=None, use_last_events=False,
random_user_limit=None, random_seed=None, settings=None, group_name=None, drop_duplicates=None,
return_dataframe=True, return_only_query=False, hide_progress_bar=False,
progress_bar_min_interval=4):
"""
:param client: bigquery client
:param job_config: bigquery client job config
:param user_filter_event_names: filter on events for user selection
:param user_filter_event_table: name of the table with users
:param dates_users: first and last dates of first user appearance
:param users_app_version: select only users with this app_version
:param event_filter_event_names: select only users with such events
:param event_filter_event_table: name of the table with events
:param dates_events: first and last date of the event selection period
:param events_app_version: app version filter for event table
:param count_events: number of event which are taking from the event table for every user
:param use_last_events: use last events before target event if true, use first events after otherwise
:param random_user_limit: number of random selected users
:param random_seed: random seed
:param settings: settings dict
:param group_name: add new column 'group_name' with value
:param drop_duplicates: list of columns in bigquery table which are used to drop duplicates
:param return_dataframe: if is true then data will be returned as pd.DataFrame, list otherwise
:param return_only_query: return only query string without running
:param hide_progress_bar: hide tqdm progress bar
:param progress_bar_min_interval: min interval of tqdm progress bar in seconds
:type client: bigquery.Client()
:type job_config: bigquery.QueryJobConfig()
:type user_filter_event_names: list
:type user_filter_event_table: str
:type dates_users: tuple or list
:type users_app_version: str
:type event_filter_event_names: list
:type event_filter_event_table: str
:type dates_events: tuple or list
:type events_app_version: str
:type count_events: int
:type use_last_events: bool
:type random_user_limit: int
:type random_seed: int
:type settings: dict
:type group_name: str
:type drop_duplicates: list
:type return_dataframe: bool
:type return_only_query: bool
:type hide_progress_bar: bool
:type progress_bar_min_interval: int
:return:
"""
if settings is not None:
user_filter_event_names = settings['user_filters'].get('event_names')
users_app_version = settings['user_filters'].get('app_version')
dates_users = (settings['user_filters']['date_start'], settings['user_filters']['date_finish'])
event_filter_event_names = settings['event_filters'].get('event_names')
events_app_version = settings['event_filters'].get('app_version')
dates_events = (settings['event_filters']['date_start'], settings['event_filters']['date_finish'])
count_events = settings['event_filters'].get('count_events', None)
use_last_events = settings['event_filters'].get('use_last_events', False)
random_user_limit = settings['user_filters'].get('limit')
drop_duplicates = settings.get('drop_duplicates_events', None)
if random_user_limit:
if 'random_seed' not in settings['user_filters']:
settings['user_filters']['random_seed'] = 42
random_seed = settings['user_filters']['random_seed']
else:
assert all(v is not None for v in [dates_events, dates_users])
users_event_filter = _prepare_event_filter_query(user_filter_event_names, user_filter_event_table)
users_app_version_filter = _prepare_app_version_filter_query(users_app_version, users_event_filter)
events_filter = _prepare_event_filter_query(event_filter_event_names, event_filter_event_table)
events_app_version_filter = _prepare_app_version_filter_query(events_app_version, events_filter)
duplicate_event_flag = _prepare_drop_duplicates_flag(drop_duplicates)
new_users_query = queries.query_with_params.format(
rank_sort_type='DESC' if use_last_events else '',
date_events_first=dates_events[0],
date_events_last=dates_events[1],
date_users_events_first=dates_users[0],
date_users_events_last=dates_users[1],
events_filter=events_filter if events_filter else '',
events_app_version_filter=events_app_version_filter,
min_event_timestamp=", MIN(event_timestamp) AS min_event_timestamp" if use_last_events else '',
users_event_filter=users_event_filter if users_event_filter else '',
users_app_version_filter=users_app_version_filter,
delete_events_after_target="WHERE tbl1.event_timestamp <= tbl2.min_event_timestamp" if use_last_events else '',
duplicate_event_flag=duplicate_event_flag,
duplicate_event_delete='WHERE row_n = 1' if drop_duplicates is not None else '',
count_events_filter='{} event_rank < {}'.format(
'AND' if drop_duplicates is not None else 'WHERE',
count_events) if count_events else '',
rand_select_start='SELECT user_pseudo_id, RAND({}) AS random_value FROM ('.format(
random_seed if random_seed else '') if random_user_limit else '',
rand_select_end='ORDER BY user_pseudo_id) ORDER BY random_value LIMIT {}'.format(
random_user_limit) if random_user_limit else ''
)
result = run_query(client=client, query=new_users_query, job_config=job_config, group_name=group_name,
return_dataframe=return_dataframe, return_only_query=return_only_query,
hide_progress_bar=hide_progress_bar, progress_bar_min_interval=progress_bar_min_interval)
return result