Alexander Dunkel, Institute of Cartography, TU Dresden
Chi visualization of temporal patterns for ephemeral events. This notebook is a continuation from a previous publication.
Data sources used:
import sys, os
import math
import numpy as np
import pandas as pd
import psycopg2
import statsmodels.api as sm
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import seaborn as sns
from matplotlib.axes import Axes
from matplotlib import cm
from typing import Tuple
from pathlib import Path
from python_hll.hll import HLL
from python_hll.util import NumberUtil
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base import tools, hll
OUTPUT = Path.cwd().parents[0] / "out" # output directory for figures (etc.)
WORK_DIR = Path.cwd().parents[0] / "tmp" # Working directory
(OUTPUT / "figures").mkdir(exist_ok=True)
(OUTPUT / "svg").mkdir(exist_ok=True)
WORK_DIR.mkdir(exist_ok=True)
%load_ext autoreload
%autoreload 2
Load the data from CSV, generated in the previous notebook. Data is stored as aggregate HLL data (postcount, usercount) for each month.
FLICKR_ALL = OUTPUT / "flickr_all_months.csv"
INATURALIST_ALL = OUTPUT / "inaturalist_all_months.csv"
%%time
data_files = {
"FLICKR_ALL":FLICKR_ALL,
"INATURALIST_ALL":INATURALIST_ALL,
}
tools.display_file_stats(data_files)
pd.read_csv(FLICKR_ALL, nrows=10)
DB_USER = "hlluser"
DB_PASS = os.getenv('READONLY_USER_PASSWORD')
# set connection variables
DB_HOST = "hllworkerdb"
DB_PORT = "5432"
DB_NAME = "hllworkerdb"
Connect to empty Postgres database running HLL Extension:
DB_CONN = psycopg2.connect(
host=DB_HOST,
port=DB_PORT ,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS
)
DB_CONN.set_session(
readonly=True)
DB_CALC = tools.DbConn(
DB_CONN)
CUR_HLL = DB_CONN.cursor()
test
Define additional functions for reading and formatting CSV as pd.DataFrame
from datetime import datetime
def read_csv_datetime(csv: Path) -> pd.DataFrame:
"""Read CSV with parsing datetime index (months)
First CSV column: Year
Second CSV column: Month
"""
date_cols = ["year", "month"]
df = pd.read_csv(
csv, index_col='datetime',
parse_dates={'datetime':date_cols},
date_format='%Y %m',
keep_date_col='False')
df.drop(columns=date_cols, inplace=True)
return df
def append_cardinality_df(df: pd.DataFrame, hll_col: str = "post_hll", cardinality_col: str = 'postcount_est'):
"""Calculate cardinality from HLL and append to extra column in df"""
df[cardinality_col] = df.apply(
lambda x: hll.cardinality_hll(
x[hll_col], CUR_HLL),
axis=1)
df.drop(columns=[hll_col], inplace=True)
return df
def filter_fill_time(
df: pd.DataFrame, min_year: int,
max_year: int, val_col: str = "postcount_est",
min_month: int = 1, max_month: int = 1):
"""Filter time values between min - max year and fill missing values"""
min_date = pd.Timestamp(f'{min_year}-{min_month}-01')
max_date = pd.Timestamp(f'{max_year}-{max_month}-01')
# clip by start and end date
if not min_date in df.index:
df.loc[min_date, val_col] = 0
if not max_date in df.index:
df.loc[max_date, val_col] = 0
df.sort_index(inplace=True)
# mask min and max time
time_mask = ((df.index >= min_date) & (df.index <= max_date))
# fill missing months with 0
# this will also set the day to max of month
series = df.loc[time_mask][val_col].resample('M').sum().fillna(0)
return series.to_frame()
Select dataset to process below
Apply functions to all data sets.
def process_dataset(
dataset: Path, metric: str = None,
min_year: int = None, max_year: int = None) -> pd.DataFrame:
"""Apply temporal filter/pre-processing to all data sets."""
if metric is None:
metric = 'post_hll'
if metric == 'post_hll':
cardinality_col = 'postcount_est'
else:
cardinality_col = 'usercount_est'
if min_year is None:
min_year = 2007
if max_year is None:
max_year = 2022
df_post = read_csv_datetime(dataset)
df_post = append_cardinality_df(df_post, metric, cardinality_col)
return filter_fill_time(df_post, min_year, max_year, cardinality_col)
%%time
df_post = process_dataset(FLICKR_ALL)
df_post.head(5)
%%time
df_user = process_dataset(FLICKR_ALL, metric='user_hll')
df_user.head(5)
Define plot function.
def bar_plot_time(
df: pd.DataFrame, ax: Axes, color: str,
label: str, val_col: str = "postcount_est") -> Axes:
"""Matplotlib Barplot with time axis formatting"""
ax = df.set_index(
df.index.map(lambda s: s.strftime('%Y'))).plot.bar(
ax=ax, y=val_col, color=color, width=1.0,
label=label, edgecolor="white", linewidth=0.5, alpha=0.6)
return ax
def plot_time(
df: Tuple[pd.DataFrame, pd.DataFrame], title, color, filename = None,
output = OUTPUT, legend: str = "Postcount", val_col: str = None,
trend: bool = None, seasonal: bool = None, residual: bool = None):
"""Create dataframe(s) time plot"""
fig, ax = plt.subplots()
fig.set_size_inches(15.7, 4.27)
ylabel = f'{legend} (estimate)'
if val_col is None:
val_col = f'{legend.lower()}_est'
ax = bar_plot_time(
df=df, ax=ax, color=color, val_col=val_col, label=legend)
# x axis ticker formatting
tick_loc = mticker.MultipleLocator(12)
ax.xaxis.set_major_locator(tick_loc)
ax.tick_params(axis='x', rotation=45)
ax.yaxis.set_major_formatter(mticker.StrMethodFormatter('{x:,.0f}'))
ax.set(xlabel="Month", ylabel=ylabel)
ax.spines["left"].set_linewidth(0.25)
ax.spines["bottom"].set_linewidth(0.25)
ax.spines["top"].set_linewidth(0)
ax.spines["right"].set_linewidth(0)
ax.yaxis.set_tick_params(width=0.5)
# remove legend
ax.get_legend().remove()
ax.set_title(title)
if any([trend, seasonal, residual]):
# seasonal decompose
decomposition = sm.tsa.seasonal_decompose(
df[val_col], model='additive')
# plot trend part only
if trend:
plt.plot(list(decomposition.trend), color='black',
label="Trend", linewidth=3, alpha=0.8)
if seasonal:
plt.plot(list(decomposition.seasonal), color='black', linestyle='dotted',
label="Seasonal", linewidth=1, alpha=0.8)
if residual:
plt.plot(list(decomposition.resid), color='black', linestyle='dashed',
label="Residual", linewidth=1, alpha=0.8)
# trend.plot(ax=ax)
# store figure to file
if filename:
fig.savefig(
output / "figures" / f"{filename}.png", dpi=300, format='PNG',
bbox_inches='tight', pad_inches=1, facecolor="white")
# also save as svg
fig.savefig(
output / "svg" / f"{filename}.svg", format='svg',
bbox_inches='tight', pad_inches=1, facecolor="white")
def load_and_plot(dataset: Path, metric: str = None, src_ref: str = "flickr", colors: cm.colors.ListedColormap = None):
"""Load data and plot"""
if metric is None:
metric = 'post_hll'
if metric == 'post_hll':
metric_label = 'postcount'
else:
metric_label = 'usercount'
if colors is None:
colors = sns.color_palette("vlag", as_cmap=True, n_colors=2)
colors = colors([1.0])
df = process_dataset(dataset, metric=metric)
plot_time(
df, legend=metric_label.capitalize(), color=colors,
title=f'{src_ref.capitalize()} {metric_label} over time [absolute estimates]',
filename=f"temporal_{metric_label}_{src_ref}_absolute", trend=True)
colors = sns.color_palette("vlag", as_cmap=True, n_colors=2)
load_and_plot(FLICKR_ALL, src_ref="flickr", colors=colors([1.0]))
Plot Flickr use count
load_and_plot(FLICKR_ALL, src_ref="flickr", metric="user_hll", colors=colors([0.0]))
Repeat for iNaturalist data
load_and_plot(INATURALIST_ALL, src_ref="inaturalist", colors=colors([1.0]))
load_and_plot(INATURALIST_ALL, src_ref="inaturalist", metric="user_hll", colors=colors([0.0]))
First, define whether to study usercount or postcount
# METRIC = 'user'
METRIC = 'post'
source_zip="https://opara.zih.tu-dresden.de/xmlui/bitstream/handle/123456789/5793/S10.zip?sequence=1&isAllowed=y"
FLICKR_SUNRISE = WORK_DIR / "flickr-sunrise-months.csv"
FLICKR_SUNSET = WORK_DIR / "flickr-sunset-months.csv"
if not (FLICKR_SUNRISE).exists():
tools.get_zip_extract(
uri=source_zip, filename="S10.zip", output_path=WORK_DIR,
filter_files=["flickr-sunrise-months.csv", "flickr-sunset-months.csv"])
df_sunrise = read_csv_datetime(FLICKR_SUNRISE)
df_sunrise = append_cardinality_df(df_sunrise, f'{METRIC}_hll', f'{METRIC}count_est')
df_sunrise = filter_fill_time(df_sunrise, 2007, 2018, f'{METRIC}count_est')
plot_time(
df_sunrise, legend=f"{METRIC.capitalize()}count", color=colors([0.0]),
title=f'Flickr {METRIC}count over time for sunrise related posts',
filename=f"temporal_{METRIC}count_flickr_sunrise", trend=True)
At the time of writing the sunset-sunrise paper, the data was only collected up to 2018.
Still, the graphic above shows the same seasonal patterns as the graph for all Flickr photographs. Below, we normalize the sunrise graph using chi.
Limit the dataframe for all posts over time to 2018, too.
if METRIC == 'post':
df_expected = df_post
else:
df_expected = df_user
df_expected = filter_fill_time(df_expected, 2007, 2018, f'{METRIC}count_est')
This is adapted from notebook three of the original publication.
First, define the input parameter:
DOF = 1
CHI_CRIT_VAL = 3.84
CHI_COLUMN: str = f"{METRIC}count_est"
def calc_norm(
df_expected: pd.DataFrame,
df_observed: pd.DataFrame,
chi_column: str = CHI_COLUMN):
"""Fetch the number of data points for the observed and
expected dataset by the relevant column
and calculate the normalisation value
"""
v_expected = df_expected[chi_column].sum()
v_observed = df_observed[chi_column].sum()
norm_val = (v_expected / v_observed)
return norm_val
norm_val = calc_norm(df_expected, df_sunrise)
print(norm_val)
rename_expected = {
'postcount_est':'postcount_est_expected',
'usercount_est':'usercount_est_expected',
}
df_expected.rename(
columns=rename_expected,
inplace=True)
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_sunrise[merge_cols],
left_index=True, right_index=True)
Preview
df_expected_observed.head()
def chi_calc(x_observed: float, x_expected: float, x_normalized: float) -> pd.Series:
"""Apply chi calculation based on observed (normalized) and expected value"""
value_observed_normalised = x_observed * x_normalized
a = value_observed_normalised - x_expected
b = math.sqrt(x_expected)
# native division with division by zero protection
chi_value = a / b if b else 0
return chi_value
def apply_chi_calc(
df: pd.DataFrame, norm_val: float,
chi_column: str = CHI_COLUMN, chi_crit_val: float = CHI_CRIT_VAL):
"""Calculate chi-values based on two GeoDataFrames (expected and observed values)
and return new grid with results"""
# lambda: apply function chi_calc() to each item
df['chi_value'] = df.apply(
lambda x: chi_calc(
x[chi_column],
x[f'{chi_column}_expected'],
norm_val),
axis=1)
# add significant column, default False
df['significant'] = False
# calculate significance for both negative and positive chi_values
df.loc[np.abs(df['chi_value'])>chi_crit_val, 'significant'] = True
Apply calculation
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
df_expected_observed.head()
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([0.0]),
title=f'Flickr Chi for "Sunrise"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_sunrise", trend=True)
Repeat the same for the observation of sunset posts worldwide
df_sunset = read_csv_datetime(FLICKR_SUNSET)
df_sunset = append_cardinality_df(df_sunset, f'{METRIC}_hll', f'{METRIC}count_est')
df_sunset = filter_fill_time(df_sunset, 2007, 2018, f'{METRIC}count_est')
plot_time(
df_sunset, legend=f"{METRIC.capitalize()}count", color=colors([1.0]),
title=f'Flickr {METRIC}count over time for sunset related posts',
filename=f"temporal_{METRIC}count_flickr_sunset", trend=True)
norm_val = calc_norm(
df_expected.rename(columns={f'{METRIC}count_est_expected':f'{METRIC}count_est'}, inplace=False),
df_sunset)
print(norm_val)
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_sunset[merge_cols],
left_index=True, right_index=True)
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([1.0]),
title=f'Flickr Chi for "Sunset"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_sunset", trend=True)
Select topic parameter below
# topic = "milvusmilvus"
topic = "bloom"
FLICKR_SUBQUERY = OUTPUT / f"flickr_{topic}_months.csv"
%%time
df_post = read_csv_datetime(FLICKR_ALL)
df_post = append_cardinality_df(df_post, 'post_hll', 'postcount_est')
df_post = filter_fill_time(df_post, 2007, 2020, 'postcount_est', max_month=8)
df_expected = df_post
df_subquery= read_csv_datetime(FLICKR_SUBQUERY)
df_subquery = append_cardinality_df(df_subquery, f'{METRIC}_hll', f'{METRIC}count_est')
df_subquery = filter_fill_time(df_subquery, 2007, 2020, f'{METRIC}count_est', max_month=8)
plot_time(
df_subquery, legend=f"{METRIC.capitalize()}count", color=colors([1.0]),
title=f'Flickr {METRIC}count over time for {topic.capitalize()} related posts',
filename=f"temporal_{METRIC}count_flickr_{topic}", trend=True)
df_expected.rename(columns={f'{METRIC}count_est_expected':f'{METRIC}count_est'}, inplace=True)
norm_val = calc_norm(
df_expected,
df_subquery)
print(norm_val)
df_expected.rename(columns={f'{METRIC}count_est':f'{METRIC}count_est_expected'}, inplace=True)
df_expected.head()
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_subquery[merge_cols],
left_index=True, right_index=True)
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([1.0]),
title=f'Flickr Chi for "{topic.capitalize()}"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_{topic}", trend=True, seasonal=False, residual=False)
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./01_temporal_chi.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-