Alexander Dunkel, Institute of Cartography, TU Dresden
Abstract preparation for visualization of temporal patterns for ephemeral events.
To run this notebook, as a starting point, you have two options:
As a starting point, you may use the latest conda environment_default.yml from our CartoLab docker container.
Clone the repository and edit your .env
value to point to the repsitory, where this notebook can be found, e.g.:
git clone https://gitlab.vgiscience.de/lbsn/tools/jupyterlab.git
cd jupyterlab
cp .env.example .env
nano .env
## Enter:
# JUPYTER_NOTEBOOKS=~/notebooks/ephemeral_events
# TAG=v0.12.3
docker network create lbsn-network
docker-compose pull && docker-compose up -d
Load dependencies:
import os, sys
from pathlib import Path
import psycopg2
import geopandas as gp
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Optional
from IPython.display import clear_output, display, HTML
To reduce the code shown in this notebook, some helper methods are made available in a separate file.
Load helper module from ../py/modules/base/tools.py
.
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base import tools
Activate autoreload of changed python files:
%load_ext autoreload
%autoreload 2
Define initial parameters that affect processing
WORK_DIR = Path.cwd().parents[0] / "tmp" # Working directory
OUTPUT = Path.cwd().parents[0] / "out" # Define path to output directory (figures etc.)
for folder in [WORK_DIR, OUTPUT]:
folder.mkdir(exist_ok=True)
Load dotfiles environment variables
from dotenv import load_dotenv
load_dotenv(
Path.cwd().parents[0] / '.env', override=True)
DB_NAME_RAWDB = os.getenv("DB_NAME_RAWDB") # lbsn-rawdb name
DB_HOST_RAWDB = os.getenv("DB_HOST_RAWDB") # lbsn-rawdb name
db_user = "postgres"
db_pass = os.getenv('POSTGRES_PASSWORD')
db_host = "lbsn-hlldb"
db_port = "5432"
db_name = "hlldb"
db_connection_hll = psycopg2.connect(
host=db_host,
port=db_port,
dbname=db_name,
user=db_user,
password=db_pass
)
db_conn_hll = tools.DbConn(db_connection_hll)
cur_hll = db_connection_hll.cursor()
cur_hll.execute("SELECT 1;")
print(cur_hll.statusmessage)
Simplify query access:
db_conn = tools.DbConn(db_connection_hll)
db_conn.query("SELECT 1;")
db_connection.rollback()
once, to reset the cursor.
Create a new schema called mviews and update Postgres search_path, to include new schema:
sql_query = """
CREATE SCHEMA IF NOT EXISTS mviews;
ALTER DATABASE hlldb
SET search_path = "$user",
social,
spatial,
temporal,
topical,
interlinkage,
extensions,
mviews;"""
Since the above query will not return any result, we'll directly use the psycopg2 cursor object:
cur = db_connection_hll.cursor()
cur.execute(sql_query)
print(cur.statusmessage)
By using Foreign Table, this step will establish the connection between hlldb to rawdb.
On hlldb, install postgres_fdw extension:
sql_query = """
CREATE EXTENSION IF NOT EXISTS postgres_fdw SCHEMA extensions;
"""
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
Check if foreign table has been imported already:
result = tools.check_table_exists(db_conn_hll, 'flickr_all_reduced')
print(result)
Conditional load password - this only need to be done once, if the server hasn't been added before.
if not result:
import getpass
USER_KEY = getpass.getpass()
Create Foreign Server connection to rawdb, on hlldb:
sql_query = f"""
CREATE SERVER IF NOT EXISTS lbsnraw
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (
host '{DB_NAME_RAWDB}',
dbname '{DB_HOST_RAWDB}',
port '5432',
keepalives '1',
keepalives_idle '30',
keepalives_interval '10',
keepalives_count '5',
fetch_size '500000');
CREATE USER MAPPING IF NOT EXISTS for postgres
SERVER lbsnraw
OPTIONS (user 'lbsn_reader', password '{USER_KEY}');
"""
if not result:
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
Import foreign table definition on the hlldb.
sql_query = f"""
IMPORT FOREIGN SCHEMA mviews
LIMIT TO (
flickr_all_reduced)
FROM SERVER lbsnraw
INTO mviews;
"""
# only import table
# if it hasn't been imported already
if not result:
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
test
db_conn.query("SELECT * FROM mviews.flickr_all_reduced LIMIT 10;")
Commit changes to hlldb
db_connection_hll.commit()
HyperLogLog parameters
The HyperLogLog extension for Postgres from Citus that we're using here, contains several tweaks, to optimize performance, that can affect sensitivity of data.
From a privacy perspective, for example, it is recommended to disable explicit mode.
Explicit mode? When explicit mode is active, full IDs will be stored for small sets. In our case, any coordinates frequented by few users (outliers) would store full user and post IDs.
To disable explicit mode:
db_conn_hll.query("SELECT hll_set_defaults(11, 5, 0, 1);")
def materialized_view_hll(table_name_src: str, table_name_dest, schema: str = None, additional_cols: [str] = None) -> str:
"""Returns raw SQL for creating a materialized view with HLL aggregate"""
if not schema:
schema = 'mviews'
return f"""
CREATE MATERIALIZED VIEW {schema}.{table_name_dest} AS
SELECT
month,
year,
hll_add_agg((hll_hash_text(post_guid))) AS "post_hll",
hll_add_agg((hll_hash_text(user_guid))) AS "user_hll"
{''.join([f",{x}" for x in additional_cols if len(additional_cols) > 0])}
FROM {schema}.{table_name_src}
GROUP BY year,month{''.join([f",{x}" for x in additional_cols if len(additional_cols) > 0])}
ORDER BY
year ASC,
month ASC;
"""
%%time
sql_query = materialized_view_hll(
table_name_src="flickr_all_reduced", table_name_dest="flickr_all_months")
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
Test:
db_conn.query("SELECT * FROM mviews.flickr_all_months LIMIT 10;")
db_connection_hll.commit()
Save hll data to CSV. The following records are available from table spatial.latlng:
sql_query = f"""
SELECT year,
month,
post_hll,
user_hll
FROM mviews.flickr_all_months;
"""
df = db_conn.query(sql_query)
# use type int instead of float
for col in ["year", "month"]:
df[col] = df[col].astype(int)
df.head()
usecols = ["year", "month", "post_hll", "user_hll"]
df.to_csv(
OUTPUT / "flickr_all_months.csv",
mode='w', columns=usecols,
index=False, header=True)
Two thematic queries have been collected to classify subsets of Flickr photos, Milvus milvus
related imagery, based on the terms 'red_kite', 'redkite', 'milvusmilvus', 'milvus_milvus', 'milvus'
and Bloom related imagery, based on the terms 'bloom', 'blossom', 'flower'
.
In addition, the expected data ("all") for the Milvus range (Europe) boundary from iNaturalist is processed.
Select the Parameter below
# topic = "flickr_milvusmilvus"
# topic_ref = "flickr_milvusmilvus_reduced"
# topic = "flickr_bloom"
# topic_ref = "flickr_bloom"
topic = "milvus_range_inat_all"
topic_ref = "milvus_range_inat"
Process topic_ref:
result = tools.check_table_exists(db_conn_hll, f'{topic_ref}')
print(result)
sql_query = f"""
IMPORT FOREIGN SCHEMA mviews
LIMIT TO (
{topic_ref})
FROM SERVER lbsnraw
INTO mviews;
"""
# only import table
# if it hasn't been imported already
if not result:
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_conn.query(f"SELECT * FROM mviews.{topic_ref} LIMIT 10;")
%%time
sql_query = materialized_view_hll(
table_name_src=topic_ref, table_name_dest=f"{topic}_months")
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_connection_hll.commit()
sql_query = f"""
SELECT year,
month,
post_hll,
user_hll
FROM mviews.{topic}_months;
"""
df = db_conn.query(sql_query)
# use type int instead of float
for col in ["year", "month"]:
df[col] = df[col].astype(int)
usecols = ["year", "month", "post_hll", "user_hll"]
df.to_csv(
OUTPUT / f"{topic}_months.csv",
mode='w', columns=usecols,
index=False, header=True)
Check if foreign table has been imported already:
table_ref = 'inaturalist_all_reduced'
result = tools.check_table_exists(db_conn_hll, table_ref)
result
Import foreign table definition on the hlldb.
sql_query = f"""
IMPORT FOREIGN SCHEMA mviews
LIMIT TO (
{table_ref})
FROM SERVER lbsnraw
INTO mviews;
"""
# only import table
# if it hasn't been imported already
if not result:
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_conn.query(f"SELECT * FROM mviews.{table_ref} LIMIT 10;")
db_connection_hll.commit()
table_ref_dest = "inaturalist_all_months"
%%time
sql_query = materialized_view_hll(
table_name_src=table_ref, table_name_dest=table_ref_dest)
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_connection_hll.commit()
sql_query = f"""
SELECT year,
month,
post_hll,
user_hll
FROM mviews.{table_ref_dest};
"""
df = db_conn.query(sql_query)
# use type int instead of float
time_cols = ["year", "month"]
# drop where time cols are invalid
df.dropna(subset=time_cols, inplace=True)
# turn float to int
for col in time_cols:
df[col] = df[col].astype(int)
# we can also remove any rows where the year is < 2007
df.drop(df[df['year'] < 2007].index, inplace = True)
df
usecols = ["year", "month", "post_hll", "user_hll"]
df.to_csv(
OUTPUT / f"{table_ref_dest}.csv",
mode='w', columns=usecols,
index=False, header=True)
Difference below: With origin_id
table_ref = 'milvus_focus_flickr_inat'
result = tools.check_table_exists(db_conn_hll, table_ref)
result
Import foreign table definition on the hlldb.
sql_query = f"""
IMPORT FOREIGN SCHEMA mviews
LIMIT TO (
{table_ref})
FROM SERVER lbsnraw
INTO mviews;
"""
# only import table
# if it hasn't been imported already
if not result:
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_conn.query(f"SELECT * FROM mviews.{table_ref} LIMIT 10;")
db_connection_hll.commit()
table_ref_dest = "milvus_focus_flickr_inat_all_months"
%%time
sql_query = materialized_view_hll(
table_name_src=table_ref, table_name_dest=table_ref_dest, additional_cols=['origin_id'])
cur_hll.execute(sql_query)
print(cur_hll.statusmessage)
db_connection_hll.commit()
sql_query = f"""
SELECT year,
month,
origin_id,
post_hll,
user_hll
FROM mviews.{table_ref_dest};
"""
df = db_conn.query(sql_query)
# use type int instead of float
time_cols = ["year", "month"]
# drop where time cols are invalid
df.dropna(subset=time_cols, inplace=True)
# turn float to int
for col in time_cols:
df[col] = df[col].astype(int)
# we can also remove any rows where the year is not 2007 < x < 2023
df.drop(df[(df['year'] < 2007) | (df['year'] > 2022)].index, inplace = True)
df
usecols = ["year", "month", "origin_id", "post_hll", "user_hll"]
df.to_csv(
OUTPUT / f"{table_ref_dest}.csv",
mode='w', columns=usecols,
index=False, header=True)
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./00_raw_hll_conversion.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-