Alexander Dunkel, Institute of Cartography, TU Dresden
Chi visualization of temporal patterns for ephemeral events. This notebook is a continuation from a previous publication.
Data sources used:
import sys, os
import math
import numpy as np
import pandas as pd
import psycopg2
import statsmodels.api as sm
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import seaborn as sns
from matplotlib.axes import Axes
from matplotlib import cm
from typing import Tuple
from pathlib import Path
from python_hll.hll import HLL
from python_hll.util import NumberUtil
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base import tools, hll
OUTPUT = Path.cwd().parents[0] / "out" # output directory for figures (etc.)
WORK_DIR = Path.cwd().parents[0] / "tmp" # Working directory
(OUTPUT / "figures").mkdir(exist_ok=True)
(OUTPUT / "svg").mkdir(exist_ok=True)
WORK_DIR.mkdir(exist_ok=True)
%load_ext autoreload
%autoreload 2
Select M
for monthly aggregation, Y
for yearly aggregation
AGG_BASE = "M"
First, define whether to study usercount or postcount
# METRIC = 'user'
METRIC = 'post'
Load the data from CSV, generated in the previous notebook. Data is stored as aggregate HLL data (postcount, usercount) for each month.
FLICKR_ALL = OUTPUT / "flickr_all_months.csv"
INATURALIST_ALL = OUTPUT / "inaturalist_all_months.csv"
%%time
data_files = {
"FLICKR_ALL":FLICKR_ALL,
"INATURALIST_ALL":INATURALIST_ALL,
}
tools.display_file_stats(data_files)
pd.read_csv(FLICKR_ALL, nrows=10)
DB_USER = "hlluser"
DB_PASS = os.getenv('READONLY_USER_PASSWORD')
# set connection variables
DB_HOST = "hllworkerdb"
DB_PORT = "5432"
DB_NAME = "hllworkerdb"
Connect to empty Postgres database running HLL Extension:
DB_CONN = psycopg2.connect(
host=DB_HOST,
port=DB_PORT ,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS
)
DB_CONN.set_session(
readonly=True)
DB_CALC = tools.DbConn(
DB_CONN)
CUR_HLL = DB_CONN.cursor()
test
Define additional functions for reading and formatting CSV as pd.DataFrame
from datetime import datetime
def read_csv_datetime(csv: Path) -> pd.DataFrame:
"""Read CSV with parsing datetime index (months)
First CSV column: Year
Second CSV column: Month
"""
date_cols = ["year", "month"]
df = pd.read_csv(
csv, index_col='datetime',
parse_dates={'datetime':date_cols},
date_format='%Y %m',
keep_date_col='False')
df.drop(columns=date_cols, inplace=True)
return df
def append_cardinality_df(df: pd.DataFrame, hll_col: str = "post_hll", cardinality_col: str = 'postcount_est'):
"""Calculate cardinality from HLL and append to extra column in df"""
df[cardinality_col] = df.apply(
lambda x: hll.cardinality_hll(
x[hll_col], CUR_HLL),
axis=1)
df.drop(columns=[hll_col], inplace=True)
return df
def filter_fill_time(
df: pd.DataFrame, min_year: int,
max_year: int, val_col: str = "postcount_est",
min_month: str = "01", max_month: str = "01", agg_base: str = None,
agg_method = None):
"""Filter time values between min - max year and fill missing values"""
max_day = "01"
if agg_base is None:
agg_base = "M"
elif agg_base == "Y":
max_month = "12"
max_day = "31"
min_date = pd.Timestamp(f'{min_year}-{min_month}-01')
max_date = pd.Timestamp(f'{max_year}-{max_month}-{max_day}')
# clip by start and end date
if not min_date in df.index:
df.loc[min_date, val_col] = 0
if not max_date in df.index:
df.loc[max_date, val_col] = 0
df.sort_index(inplace=True)
# mask min and max time
time_mask = ((df.index >= min_date) & (df.index <= max_date))
resampled = df.loc[time_mask][val_col].resample(agg_base)
if agg_method is None:
series = resampled.sum()
elif agg_method == "count":
series = resampled.count()
# fill missing months with 0
# this will also set the day to max of month
return series.fillna(0).to_frame()
Select dataset to process below
Apply functions to all data sets.
def process_dataset(
dataset: Path, metric: str = None,
min_year: int = None, max_year: int = None, agg_base: str = None) -> pd.DataFrame:
"""Apply temporal filter/pre-processing to all data sets."""
if metric is None:
metric = 'post_hll'
if metric == 'post_hll':
cardinality_col = 'postcount_est'
else:
cardinality_col = 'usercount_est'
if min_year is None:
min_year = 2007
if max_year is None:
max_year = 2022
df_post = read_csv_datetime(dataset)
df_post = append_cardinality_df(df_post, metric, cardinality_col)
return filter_fill_time(df_post, min_year, max_year, cardinality_col, agg_base=agg_base)
%%time
df_post = process_dataset(FLICKR_ALL, agg_base=AGG_BASE)
df_post.head(5)
%%time
df_user = process_dataset(FLICKR_ALL, metric='user_hll', agg_base=AGG_BASE)
df_user.head(5)
Define plot function.
def bar_plot_time(
df: pd.DataFrame, ax: Axes, color: str,
label: str, val_col: str = "postcount_est") -> Axes:
"""Matplotlib Barplot with time axis formatting"""
ax = df.set_index(
df.index.map(lambda s: s.strftime('%Y'))).plot.bar(
ax=ax, y=val_col, color=color, width=1.0,
label=label, edgecolor="white", linewidth=0.5, alpha=0.6)
return ax
def plot_time(
df: Tuple[pd.DataFrame, pd.DataFrame], title, color, filename = None,
output = OUTPUT, legend: str = "Postcount", val_col: str = None,
trend: bool = None, seasonal: bool = None, residual: bool = None,
agg_base: str = None):
"""Create dataframe(s) time plot"""
x_ticks_every = 12
fig_x = 15.7
fig_y = 4.27
font_mod = False
x_label = "Month"
linewidth = 3
if agg_base and agg_base == "Y":
x_ticks_every = 1
fig_x = 3
fig_y = 1.5
font_mod = True
x_label = "Year"
linewidth = 1
fig, ax = plt.subplots()
fig.set_size_inches(fig_x, fig_y)
ylabel = f'{legend}'
if val_col is None:
val_col = f'{legend.lower()}_est'
ax = bar_plot_time(
df=df, ax=ax, color=color, val_col=val_col, label=legend)
# x axis ticker formatting
tick_loc = mticker.MultipleLocator(x_ticks_every)
ax.xaxis.set_major_locator(tick_loc)
ax.tick_params(axis='x', rotation=45, length=0)
ax.yaxis.set_major_formatter(mticker.StrMethodFormatter('{x:,.0f}'))
ax.set(xlabel=x_label, ylabel=ylabel)
ax.spines["left"].set_linewidth(0.25)
ax.spines["bottom"].set_linewidth(0.25)
ax.spines["top"].set_linewidth(0)
ax.spines["right"].set_linewidth(0)
ax.yaxis.set_tick_params(width=0.5)
# remove legend
ax.get_legend().remove()
ax.set_title(title)
ax.set_xlim(-0.5,len(df)-0.5)
if font_mod:
for item in (
[ax.xaxis.label, ax.title, ax.yaxis.label] +
ax.get_xticklabels() + ax.get_yticklabels()):
item.set_fontsize(8)
if any([trend, seasonal, residual]):
# seasonal decompose
decomposition = sm.tsa.seasonal_decompose(
df[val_col], model='additive')
# plot trend part only
if trend:
plt.plot(list(decomposition.trend), color='black',
label="Trend", linewidth=linewidth, alpha=0.8)
if seasonal:
plt.plot(list(decomposition.seasonal), color='black', linestyle='dotted',
label="Seasonal", linewidth=1, alpha=0.8)
if residual:
plt.plot(list(decomposition.resid), color='black', linestyle='dashed',
label="Residual", linewidth=1, alpha=0.8)
# trend.plot(ax=ax)
# store figure to file
if filename:
fig.savefig(
output / "figures" / f"{filename}.png", dpi=300, format='PNG',
bbox_inches='tight', pad_inches=1, facecolor="white")
# also save as svg
fig.savefig(
output / "svg" / f"{filename}.svg", format='svg',
bbox_inches='tight', pad_inches=1, facecolor="white")
def load_and_plot(
dataset: Path, metric: str = None, src_ref: str = "flickr", colors: cm.colors.ListedColormap = None,
agg_base: str = None, trend: bool = None, return_df: bool = None):
"""Load data and plot"""
if metric is None:
metric = 'post_hll'
if metric == 'post_hll':
metric_label = 'postcount'
else:
metric_label = 'usercount'
if colors is None:
colors = sns.color_palette("vlag", as_cmap=True, n_colors=2)
colors = colors([1.0])
df = process_dataset(dataset, metric=metric, agg_base=agg_base)
plot_time(
df, legend=metric_label.capitalize(), color=colors,
title=f'{src_ref.capitalize()} {metric_label} over time',
filename=f"temporal_{metric_label}_{src_ref}_absolute", trend=trend, agg_base=agg_base)
if return_df:
return df
colors = sns.color_palette("vlag", as_cmap=True, n_colors=2)
load_and_plot(FLICKR_ALL, src_ref=f"flickr_{AGG_BASE}", colors=colors([1.0]), agg_base=AGG_BASE, trend=False)
Plot Flickr use count
load_and_plot(FLICKR_ALL, src_ref=f"flickr_{AGG_BASE}", metric="user_hll", colors=colors([0.0]), agg_base=AGG_BASE, trend=False)
Repeat for iNaturalist data
load_and_plot(INATURALIST_ALL, src_ref=f"inaturalist_{AGG_BASE}", colors=colors([1.0]), agg_base=AGG_BASE, trend=False)
load_and_plot(INATURALIST_ALL, src_ref=f"inaturalist_{AGG_BASE}", metric="user_hll", colors=colors([0.0]), agg_base=AGG_BASE, trend=False)
source_zip="https://opara.zih.tu-dresden.de/xmlui/bitstream/handle/123456789/5793/S10.zip?sequence=1&isAllowed=y"
FLICKR_SUNRISE = WORK_DIR / "flickr-sunrise-months.csv"
FLICKR_SUNSET = WORK_DIR / "flickr-sunset-months.csv"
if not (FLICKR_SUNRISE).exists():
tools.get_zip_extract(
uri=source_zip, filename="S10.zip", output_path=WORK_DIR,
filter_files=["flickr-sunrise-months.csv", "flickr-sunset-months.csv"])
df_sunrise = read_csv_datetime(FLICKR_SUNRISE)
df_sunrise = append_cardinality_df(df_sunrise, f'{METRIC}_hll', f'{METRIC}count_est')
df_sunrise = filter_fill_time(df_sunrise, 2007, 2018, f'{METRIC}count_est', agg_base=AGG_BASE)
plot_time(
df_sunrise, legend=f"{METRIC.capitalize()}count", color=colors([0.0]),
title=f'Flickr {METRIC}count over time for sunrise related posts',
filename=f"temporal_{METRIC}count_flickr_sunrise_{AGG_BASE}", trend=False, agg_base=AGG_BASE)
At the time of writing the sunset-sunrise paper, the data was only collected up to 2018.
Still, the graphic above shows the same seasonal patterns as the graph for all Flickr photographs. Below, we normalize the sunrise graph using chi.
Limit the dataframe for all posts over time to 2018, too.
if METRIC == 'post':
df_expected = df_post
else:
df_expected = df_user
df_expected = filter_fill_time(df_expected, 2007, 2018, f'{METRIC}count_est', agg_base=AGG_BASE)
This is adapted from notebook three of the original publication.
First, define the input parameter:
DOF = 1
CHI_CRIT_VAL = 3.84
CHI_COLUMN: str = f"{METRIC}count_est"
def calc_norm(
df_expected: pd.DataFrame,
df_observed: pd.DataFrame,
chi_column: str = CHI_COLUMN):
"""Fetch the number of data points for the observed and
expected dataset by the relevant column
and calculate the normalisation value
"""
v_expected = df_expected[chi_column].sum()
v_observed = df_observed[chi_column].sum()
norm_val = (v_expected / v_observed)
return norm_val
norm_val = calc_norm(df_expected, df_sunrise)
print(norm_val)
rename_expected = {
'postcount_est':'postcount_est_expected',
'usercount_est':'usercount_est_expected',
}
df_expected.rename(
columns=rename_expected,
inplace=True)
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_sunrise[merge_cols],
left_index=True, right_index=True)
Preview
df_expected_observed.head()
def chi_calc(x_observed: float, x_expected: float, x_normalized: float) -> pd.Series:
"""Apply chi calculation based on observed (normalized) and expected value"""
value_observed_normalised = x_observed * x_normalized
a = value_observed_normalised - x_expected
b = math.sqrt(x_expected)
# native division with division by zero protection
chi_value = a / b if b else 0
return chi_value
def apply_chi_calc(
df: pd.DataFrame, norm_val: float,
chi_column: str = CHI_COLUMN, chi_crit_val: float = CHI_CRIT_VAL):
"""Calculate chi-values based on two GeoDataFrames (expected and observed values)
and return new grid with results"""
# lambda: apply function chi_calc() to each item
df['chi_value'] = df.apply(
lambda x: chi_calc(
x[chi_column],
x[f'{chi_column}_expected'],
norm_val),
axis=1)
# add significant column, default False
df['significant'] = False
# calculate significance for both negative and positive chi_values
df.loc[np.abs(df['chi_value'])>chi_crit_val, 'significant'] = True
df_expected_observed
Apply calculation
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
df_expected_observed.head()
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([0.0]),
title=f'Flickr Chi for "Sunrise"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_sunrise_{AGG_BASE}", trend=False, agg_base=AGG_BASE)
Repeat the same for the observation of sunset posts worldwide
df_sunset = read_csv_datetime(FLICKR_SUNSET)
df_sunset = append_cardinality_df(df_sunset, f'{METRIC}_hll', f'{METRIC}count_est')
df_sunset = filter_fill_time(df_sunset, 2007, 2018, f'{METRIC}count_est', agg_base=AGG_BASE)
plot_time(
df_sunset, legend=f"{METRIC.capitalize()}count", color=colors([1.0]),
title=f'Flickr {METRIC}count over time for sunset related posts',
filename=f"temporal_{METRIC}count_flickr_sunset_{AGG_BASE}", trend=False, agg_base=AGG_BASE)
norm_val = calc_norm(
df_expected.rename(columns={f'{METRIC}count_est_expected':f'{METRIC}count_est'}, inplace=False),
df_sunset)
print(norm_val)
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_sunset[merge_cols],
left_index=True, right_index=True)
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([1.0]),
title=f'Flickr Chi for "Sunset"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_sunset_{AGG_BASE}", trend=False, agg_base=AGG_BASE)
Select topic parameter below
topic = "milvusmilvus"
# topic = "bloom"
FLICKR_SUBQUERY = OUTPUT / f"flickr_{topic}_months.csv"
%%time
df_post = read_csv_datetime(FLICKR_ALL)
df_post = append_cardinality_df(df_post, 'post_hll', 'postcount_est')
df_post = filter_fill_time(df_post, 2007, 2020, 'postcount_est', max_month=8, agg_base=AGG_BASE)
df_expected = df_post
df_subquery= read_csv_datetime(FLICKR_SUBQUERY)
df_subquery = append_cardinality_df(df_subquery, f'{METRIC}_hll', f'{METRIC}count_est')
df_subquery = filter_fill_time(df_subquery, 2007, 2020, f'{METRIC}count_est', max_month=8, agg_base=AGG_BASE)
plot_time(
df_subquery, legend=f"{METRIC.capitalize()}count", color=colors([1.0]),
title=f'Flickr {METRIC}count over time for {topic.capitalize()} related posts',
filename=f"temporal_{METRIC}count_flickr_{topic}_{AGG_BASE}", trend=False, agg_base=AGG_BASE)
df_expected.rename(columns={f'{METRIC}count_est_expected':f'{METRIC}count_est'}, inplace=True)
norm_val = calc_norm(
df_expected,
df_subquery)
print(norm_val)
df_expected.rename(columns={f'{METRIC}count_est':f'{METRIC}count_est_expected'}, inplace=True)
df_expected.head()
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_subquery[merge_cols],
left_index=True, right_index=True)
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val)
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([1.0]),
title=f'Flickr Chi for "{topic.capitalize()}"-related {METRIC}count over time',
filename=f"temporal_chi_flickr_{METRIC}count_{topic}_{AGG_BASE}", trend=False, seasonal=False, residual=False, agg_base=AGG_BASE)
src = Path.cwd().parents[0] / "00_data" / "milvus" / "observations-350501.csv"
df = pd.read_csv(
src,
index_col='datetime',
parse_dates={'datetime':["observed_on"]},
date_format='%Y-%m-%d',
keep_date_col='False',
usecols=["id", "observed_on"])
df.drop(columns=['observed_on'], inplace=True)
df.head()
df_milvus = filter_fill_time(
df, 2007, 2022, val_col="id", agg_base=AGG_BASE, agg_method="count")
df_milvus.rename(columns={"id": "observations"}, inplace=True)
metric_label="observations"
src_ref="iNaturalist Milvus milvus"
plot_time(
df_milvus, legend=metric_label.capitalize(), color=colors([1.0]),
title=f'{src_ref.capitalize()} {metric_label} over time', val_col=metric_label,
filename=f"temporal_iNaturalist_milvusmilvus_absolute", trend=False, agg_base=AGG_BASE)
df_expected = load_and_plot(
INATURALIST_ALL, src_ref=f"inaturalist_{AGG_BASE}", colors=colors([1.0]), agg_base=AGG_BASE, trend=False, return_df=True)
df_expected.head()
norm_val = calc_norm(
df_expected.rename(columns={f'{METRIC}count_est':f'observations'}, inplace=False),
df_milvus, chi_column = "observations")
print(norm_val)
df_expected.rename(columns={f'{METRIC}count_est':f'observations_expected'}, inplace=True)
merge_cols = [f'{METRIC}count_est']
df_expected_observed = df_expected.merge(
df_milvus["observations"],
left_index=True, right_index=True)
%%time
apply_chi_calc(
df=df_expected_observed,
norm_val=norm_val, chi_column="observations")
plot_time(
df_expected_observed, legend="Signed Chi", val_col="chi_value", color=colors([1.0]),
title=f'Flickr Chi for "{topic.capitalize()}"-related {METRIC}count over time',
filename=f"temporal_chi_inaturalist_observations_milvus_{AGG_BASE}", trend=False, seasonal=False, residual=False, agg_base=AGG_BASE)
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./01_temporal_chi.ipynb \
--output 01_temporal_chi_{AGG_BASE.lower()} \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-