Mapnik Generative AI workflow: Processing¶
Alexander Dunkel, Madalina Gugulica, Institute of Cartography, TU Dresden
Mapnik rendering based on stable diffusion generative AI and social media data.
This notebook is a continuation from the previous notebook (01_mapnik_generativeai.html).
Prepare environment¶
Load base dependencies:
import os, sys
import re
import random
import shutil
import geopandas as gp
import pandas as pd
import geopandas
import matplotlib.pyplot as plt
import rasterio as rio
from pathlib import Path
from rasterio.plot import show
Install temporary package rembg
!../py/modules/base/pkginstall.sh "rembg"
Import every cell from the previous notebook, except those tagged with active-ipynb
. This will make all variables and methods from the previous notebook available in the current runtime, so we can continue where we left.
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base import raster
from _02_generativeai import *
Symlink font folder
!rm /fonts && ln -s {TMP}/fonts /fonts
Symlink remove white background model:
!ln -s {OUTPUT}/isnet-general-use.onnx /root/.u2net/isnet-general-use.onnx 2> /dev/null
Create new directories
dlist = [
(OUTPUT / "images_gis"),
(OUTPUT / "img2img"),
(INPUT / "cluster_img"),
(INPUT / "cluster_img_tags"),
]
for folder in dlist:
folder.mkdir(exist_ok=True, parents=True)
Activate autoreload of changed python files:
%load_ext autoreload
%autoreload 2
Parameters¶
APIURL = "http://127.0.0.1:7861"
BASE_PROMPT_POS: str = \
"white background,simple outline,masterpiece,best quality,high quality," \
"<lora:Japanese_style_Minimalist_Line_Illustrations:0.2>"
BASE_PROMPT_NEG: str = \
"(bad-artist:1),(worst quality, low quality:1.4),lowres,bad anatomy,bad hands," \
"((text)),(watermark),error,missing fingers,extra digit,fewer digits,cropped,worst quality," \
"low quality,normal quality,((username)),blurry,(extra limbs),bad-artist-anime," \
"(three hands:1.6),(three legs:1.2),(more than two hands:1.4),(more than two legs,:1.2)," \
"label,(isometric), (square)"
Set global SD-settings
payload = {
"CLIP_stop_at_last_layers": 1,
"sd_vae":"vae-ft-mse-840000-ema-pruned.safetensors",
"sd_model_checkpoint":"hellofunnycity_V14.safetensors",
}
requests.post(url=f'{APIURL}/sdapi/v1/options', json=payload)
Have a look at our per-job basis settings, loaded from the last notebook:
SD_CONFIG
For this notebook, increase steps to 28
SD_CONFIG["steps"] = 28
Test image generation for tags and emoji¶
The next step is to process social media metadata (tags, emoji) in descending importance (cluster-size), generate images for clusters, and place images on the map, according to the center of gravity for the cluster shape from tagmaps package.
Test API for selected tags¶
PROMPT = "(Grosser Garten, Palais, Nature)"
output_name = "test_image_palais_default"
KWARGS = {
"prompt": concat_prompt(PROMPT),
"negative_prompt": BASE_PROMPT_NEG,
"save_name": output_name,
"sd_config": SD_CONFIG,
"show": False
}
DKWARGS = {
"resize":(350, 350),
"figsize":(22, 60),
}
if not (OUTPUT / "images" / f'{output_name}.png').exists():
generate(**KWARGS)
imgs = list((OUTPUT / "images").glob(f'{output_name}*'))
tools.image_grid(imgs, **DKWARGS)
We have to think about a way to better incorporate these square images in the map. Maybe if we add A thought bubble of to our prompt?
def generate_samples(
prompt: str, save_name: str, kwargs=KWARGS, output=OUTPUT,
dkwargs=DKWARGS, print_prompt: bool = None, rembg: bool = None):
"""Generate and show 4 sample images for prompt"""
kwargs["prompt"] = concat_prompt(prompt)
if print_prompt:
print(kwargs["prompt"][:50])
kwargs["save_name"] = save_name
if not (output / "images" / f'{kwargs["save_name"]}.png').exists():
if rembg:
generate_rembg(**kwargs)
else:
generate(**kwargs)
imgs = list((output / "images").glob(f'{kwargs["save_name"]}*'))
tools.image_grid(imgs, **dkwargs)
generate_samples("(thought bubble of Grosser Garten, Palais, Nature)", save_name="test_image_palais_bubble")
or maybe icon
?
generate_samples("(A map icon of Grosser Garten, Palais, Nature)", save_name="test_image_palais_icon")
Let's keep A map icon of
as the pre-prompt.
Some more tests for other tags and terms
generate_samples("(A map icon of Botanischergarten), flower, grün, 🌵 🌿 🌱", save_name="test_image_botan_icon")
generate_samples("(A map icon of Gläsernemanufaktur), volkswagen, building", save_name="test_image_vw_icon")
generate_samples("(A map icon of zoo), zoodresden, animals", save_name="test_image_zoo_icon")
generate_samples("(A map icon of fussball stadion), dynamo, stadion", save_name="test_image_fussball_icon")
generate_samples("(people 🏃), activity", save_name="test_image_running_activity")
Enough tests. Now, we can move to collecting tag and emoji clusters and move on to batch generation.
Process clustered data¶
The overall workflow looks like this:
- Find all clusters above a weight of
x
- Walk through clusters, get cluster centroid
- Select all other cluster-shapes that can be found at this location
- Concat prompt based on ascending importance
- Generate image, remove background, save
- Create Mapnik Stylesheet to place images as either symbols or raster images
- Render map
- (Adjust parameters and repeat, until map quality is acceptable)
data_src = Path(INPUT / "shapefiles_gg" / "allTagCluster.shp")
gdf = gp.read_file(INPUT / "shapefiles_gg" / "allTagCluster.shp", encoding='utf-8')
CRS_PROJ = gdf.crs
def sel_cluster(gdf: gp.GeoDataFrame, min_weight: int) -> gp.GeoSeries:
"""Return GeoSeries of Clusters above min_weight"""
with fiona.open(data_src, encoding='UTF-8', mode="r") as shapefile:
for feature in shapefile:
properties = feature["properties"]
if properties["HImpTag"] == 1 and properties["ImpTag"] == feature_name.lower():
bounds = shape(feature["geometry"]).bounds
if add_buffer:
bounds = add_buffer_bbox(bounds, buffer = add_buffer)
return bounds
OUTPUT_MAPS = TMP / "bg"
Reproject raster
%%time
raster.reproject_raster(
raster_in=f"{OUTPUT_MAPS}/grossergarten_carto_17.tif",
raster_out=f"{OUTPUT_MAPS}/grossergarten_carto_17_proj.tif",
dst_crs=f'epsg:{CRS_PROJ.to_epsg()}')
basemap = rio.open(f"{OUTPUT_MAPS}/grossergarten_carto_17_proj.tif")
bbox_map = gdf.total_bounds.squeeze()
minx, miny = bbox_map[0], bbox_map[1]
maxx, maxy = bbox_map[2], bbox_map[3]
x_lim=(minx, maxx)
y_lim=(miny, maxy)
Plot all cluster shapes
fig, ax = plt.subplots(figsize=(10, 10))
rio.plot.show(basemap, ax=ax)
gdf.plot(ax=ax, facecolor='none', edgecolor='red', linewidth=0.1)
ax.set_xlim(*x_lim)
ax.set_ylim(*y_lim)
ax.set_axis_off()
Plot only cluster shapes above a certain weight
cluster_sel = gdf[gdf["Weights"]>300]
def plot_clustermap(cluster_sel: gp.GeoDataFrame, basemap: rio.DatasetReader, label: bool = None):
"""Plot a map with clusters, basemap, and cluster labels"""
if label is None:
label = True
fig, ax = plt.subplots(figsize=(7, 10))
rio.plot.show(basemap, ax=ax)
cmap=plt.get_cmap('Paired')
cluster_sel.plot(ax=ax, facecolor='none', cmap=cmap, linewidth=1)
if label:
tools.annotate_locations_fit(
gdf=cluster_sel, ax=ax,
text_col="ImpTag", arrowstyle='-', arrow_col='black', fontsize=10,
font_path="/fonts/seguisym.ttf")
ax.set_xlim(*x_lim)
ax.set_ylim(*y_lim)
ax.set_axis_off()
with warnings.catch_warnings():
# Ignore emoji "Variation-Selector" not found in font
warnings.filterwarnings("ignore", category=UserWarning)
plt.show()
plot_clustermap(cluster_sel=cluster_sel, basemap=basemap)
There are several clusters visible. On the upper left, we can see the Dynamo Dresden stadium. Several tag and emoji cluster shapes are can be found at in this area. There is also a big shape covering the Großer Garten. Two smaller shapes can be found hovering the Dresden Zoo and the Gläserne Manufaktur.
Process Emoji¶
We start with processing emoji. This seems like the easier part, since emoji are already highly abstracted concepts that can convey many meanings in a simplified form.
Some emoji, however, are very generic and used for arbitrary context. We use a broad positive filter list with 693 emoji (out of about 2000 available) to focus on specific activity and environment emoji.
emoji_filter_list = pd.read_csv(
INPUT / 'SelectionList_EmojiLandscapePlanning.txt', header=None, names=["emoji"], encoding="utf-8", on_bad_lines='skip')
emoji_filter_list = emoji_filter_list.set_index("emoji").index
print(emoji_filter_list[:20])
cluster_sel = gdf[(gdf["Weights"]>100) & (gdf["emoji"]==1) & (gdf["ImpTag"].isin(emoji_filter_list))].copy()
plot_clustermap(cluster_sel=cluster_sel, basemap=basemap)
We can see four spatial groups of emoji clusters, the football stadium (upper left), the Zoo (below), the botanical garden (upper group) and the Junge Garde (lower right), an outdoor music venue.
Concat emoji based on cluster group/spatial intersection¶
intersects = cluster_sel.sjoin(cluster_sel[["geometry"]], how="left", predicate="intersects").reset_index()
cluster_groups = intersects.dissolve("index_right", aggfunc="min")
Join back the group-id's
cluster_sel["group"] = cluster_groups["index"]
cluster_lists = cluster_sel.groupby("group")["ImpTag"].apply(list)
cluster_lists
Generate images for cluster-groups¶
emoji_cluster_1 = list(cluster_sel[cluster_sel["group"]==1]["ImpTag"])
emoji_cluster_1
Generate sample images for clusters
for ix, cluster_list in enumerate(cluster_lists):
print(cluster_list)
generate_samples(
f"A map icon of happy ({cluster_list[0]}), {''.join(cluster_list[1:])}", save_name=f"emoji_{ix:03d}", rembg=True)
Test placement on map in rasterio
Get bounds of cluster group 1
bounds = cluster_sel[cluster_sel["group"]==1]["geometry"].total_bounds
in_img = OUTPUT / "images" / "000.png"
out_img = OUTPUT / "images_gis" / "000_geo.png"
Convert to GeoPng to place cluster on the map
raster.georeference_raster(
raster_in=in_img,
raster_out=out_img, bbox=bounds, crs_out=CRS_PROJ)
Preview in rasterio
cluster_raster = rasterio.open(out_img)
fig, ax = plt.subplots(figsize=(4, 4))
rasterio.plot.show(basemap, ax=ax)
rasterio.plot.show(cluster_raster, ax=ax, alpha=0.5)
ax.set_axis_off()
TODO: Display image with alpha channel in rio, e.g. ^1
Conclusion: placement as raster is too complicated. Instead, we will use the symbol point renderer to place and scale symbols with Mapnik.
Create shapefile and Mapnik stylesheet¶
In order to place multiple images on the map, we create a shapefile with features cluster_groups as points. For each point, we add a column with the [reference]
to its generated image and a [scale]
, to emphasize weights.
TODO: Maybe use GroupSymbolizer?
Use Geopandas to write gdf to point shapefile
df = cluster_lists.to_frame()
df.reset_index(inplace=True)
df.head()
Prepare conversion to geodataframe
Two options here:
- use centroid for symbol placement
- or use dissolved geometry [x]
def cluster_id(row):
return row.name
def centroid_geom(row, cluster_sel):
return cluster_sel[cluster_sel["group"] == row.group]["geometry"].to_frame().dissolve().centroid
def bounds_geom(row, cluster_sel):
return cluster_sel[cluster_sel["group"] == row.group]["geometry"].to_frame().dissolve().geometry
df["cluster_id"] = df.apply(cluster_id, axis=1)
df["geometry"] = df.apply(bounds_geom, axis=1, cluster_sel=cluster_sel)
df["ImpTag"] = df.ImpTag.map(' '.join)
df.head(6)
gdf = gp.GeoDataFrame(df, crs=CRS_PROJ, geometry=df.geometry)
gdf.to_file(filename=INPUT / 'shapefiles_gg' / 'gen_img.shp', driver="ESRI Shapefile")
Prepare Mapnik Plot¶
Copy generated images to input path for mapnik
def _copy_generated_tomapnik(
input_path: Path = OUTPUT / "images", output_path: Path = INPUT / "cluster_img", batch: int = None, emoji: bool = None):
"""Copy files from image gen folder to mapnik plot folder, rename to standard"""
if batch is None:
batch = 0
cluster_img = []
emoji_pre = ""
if emoji:
emoji_pre= "emoji_"
for fname in input_path.glob(f"{emoji_pre}*.png"):
if batch == 0:
if re.match(rf"{emoji_pre}[0-9][0-9][0-9].png", fname.name):
cluster_img.append(fname)
else:
if re.match(f"[0-9][0-9][0-9]_{batch:02}.png", fname.name):
cluster_img.append(fname)
print(f'Copied {len(cluster_img)} files.')
for file in cluster_img:
shutil.copy(file, output_path / file.name.replace(emoji_pre, "").replace(f"_{batch:02}", ""))
_copy_generated_tomapnik(emoji=True)
output_name = "tagmap_production_cluster_gg_emoji.png"
stylesheet = "tagmap_production_testraster_points_gg_emoji.xml"
%%time
!/usr/bin/python3 -m mapnik_cli \
--stylesheet_name {stylesheet} \
--output_name {output_name} \
--map_dimensiony_x 1000 \
--map_dimensiony_y 1000 \
--input_path {INPUT} \
--output_path {OUTPUT}
display.Image(f'{OUTPUT}/{output_name}')
Process Tags¶
For processing tags, there are several additional challenges (compare image below):
- some clusters have a large number of tags (e.g. Dynamo Dresdne Stadium, on the left)
- some clusters have diverging concepts (e.g.
rammsteinlive
andfootball
at the same location) - one cluster covers a large area (Großer Garten), which includes other smaller clusters (Junge Garde, Botanischer Garten)
The workflow below tries to solve these issues:
- first, select the largest cluster, area-wise; this will be our "background" prompt ("Großer Garten") that we can add to all smaller clusters in the area
- select only a number of tags from clusters with many tags (e.g. Dynamo Dresden Stadium)
- try to select recursively clusters that cover different areas, so that we can get an even coverage, filling gaps in the map
- (identify similarity of concepts based on NLP/BART/Cosine Similarity and generate separate images for different concepts)
Parameter:
CLUSTER_WEIGHT_CUTOFF = 10
gdf = gp.read_file(INPUT / "shapefiles_gg" / "allTagCluster.shp", encoding='utf-8')
cluster_sel = gdf[(gdf["Weights"]>CLUSTER_WEIGHT_CUTOFF) & (gdf["emoji"]==0)].copy()
Get preview (limit to >100 weights)
plot_clustermap(cluster_sel=cluster_sel[cluster_sel["Weights"]>100], basemap=basemap)
Get preview (limit to <=100 weights)
plot_clustermap(cluster_sel=cluster_sel[cluster_sel["Weights"]<=100], basemap=basemap, label=False)
ToDo: Spatial separation of clusters¶
As is visible, many cluster shapes can be found in few dense areas, overlapping each other. For our map, we want a possibly maxmimum of coverage, without overlapping symbols. For this, we first dissolve all cluster shapes into a single MultiPolygon and then separate areas that do not touch.
gdf["area"] = gdf.geometry.area / 1000
gdf.area.max()
gdf.area.min()
import mapclassify as mc
def get_scheme_breaks(series_nan: pd.Series, scheme: str = None):
"""Classify series of values
Notes: some classification schemes (e.g. HeadTailBreaks)
do not support specifying the number of classes returned
construct optional kwargs with k == number of classes
"""
optional_kwargs = {"k":9}
if scheme is None:
scheme = "NaturalBreaks"
if scheme == "HeadTailBreaks":
optional_kwargs = {}
scheme_breaks = mc.classify(
y=np.abs(series_nan.values), scheme=scheme, **optional_kwargs)
return scheme_breaks
breaks = get_scheme_breaks(gdf["area"], scheme="HeadTailBreaks")
bins = np.flip(breaks.bins[:-1])
bins
gdf["area"].max()
gdf[(gdf["area"]>=bins[0])]
cmap = tools.get_cmap(len(bins), 'Paired')
SUBPLOTS = len(bins)
fig, axes = plt.subplots(nrows=int(round(SUBPLOTS/4)), ncols=4, figsize=(8, 4))
for ix, ax in enumerate(axes.reshape(-1)):
if ix >= SUBPLOTS:
break
if ix >= len(bins)-1:
mask = (gdf["area"]<=bins[ix])
else:
mask = (gdf["area"]>=bins[ix+1]) & (gdf["area"]<=bins[ix])
gdf[mask].plot(facecolor='none', edgecolor=cmap(ix), ax=ax)
ax.set_axis_off()
Spatial Cluster Merge¶
- Select top-cluster based on area/coverage/percentage
- Go through each level and select clusters based on distinct areas;
- Exclude previous cluster areas (except top-level) from the follow-up levels
- until all levels are processed and a maximum coverage is achieved.
Process Top-Cluster
in m²
cluster_sel["area"] = cluster_sel.area * 0.001
cluster_sel.sort_values("area", ascending=False).head()
Our whole area is:
total_area = cluster_sel["geometry"].to_frame().dissolve().area[0] * 0.001
print(total_area)
Calculate percentage for all cluster areas of the total area, and filter all clusters above a certain percentage.
cluster_sel["percs"] = cluster_sel.sort_values("area", ascending=False)["area"] / (total_area/100)
cluster_sel.sort_values("area", ascending=False)["percs"][:10]
We can see there is a gap between the fourth cluster (garden
) and the fifth (zoo
). We use 20
% as the cutoff value.
top_cluster_mask = cluster_sel["percs"] >= 20
top_cluster_mask
plot_clustermap(cluster_sel=cluster_sel[top_cluster_mask], basemap=basemap)
Get Cluster Groups
Below, we use a simple approach to best-coverage, by first selecting the top cluster, and then selecting a limited number of non-intersecting cluster areas afterwards.
def get_cluster_groups(gdf: gp.GeoDataFrame) -> gp.GeoSeries:
"""Get cluster groups based on spatial self-intersection,
and return list of tags/emoji sorted by ascending importance
"""
intersects = gdf.sjoin(
gdf[["geometry"]], how="left", predicate="intersects"
).reset_index()
cluster_groups = intersects.dissolve("index_right", aggfunc="min")
cluster_sel["group"] = cluster_groups["index"]
cluster_lists = cluster_sel.groupby("group")["ImpTag"].apply(list)
return cluster_lists
top_cluster_group = get_cluster_groups(cluster_sel[cluster_sel["percs"] >= 20])
top_cluster_geom = cluster_sel[cluster_sel["percs"] >= 20]["geometry"].to_frame().dissolve().geometry[0]
top_cluster_group
other_cluster_groups = get_cluster_groups(cluster_sel[cluster_sel["percs"] < 20])
other_cluster_groups
Concat the two series
cluster_groups = pd.concat([top_cluster_group, other_cluster_groups])
cluster_groups
Semantic Cluster Split¶
Sometimes terms and emoji that refer to very different thematic contexts appear in the same cluster group. We want to split these clusters up, to generate two images for the different context.
Below, we use a fixed threshold (0.3) for the cosine angle of the dot product of term and emoji vectors to separate these different meanings into different cluster groups, allowing for the generation of different icons for semantically distinct meanings.
We first need to install an additional dependency temprarily in our environment: Gensim - Topic Modelling for Humans
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
!../py/modules/base/pkginstall.sh "gensim"
import gensim
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics.pairwise import cosine_similarity
Data¶
A training dataset was produced based on all geosocial media data in Dresden.
Load the pretrained Word2Vec model.
model_path = INPUT / "language_model" / "word2vec_25012021.model"
word2vec_model = gensim.models.Word2Vec.load(str(model_path))
Test the computation of cos_similarity matrix for a list of words¶
Compute Word2Vec embeddings for each word in the list
word_list = ['dynamo', 'stadion', 'rammstein']
word_vectors = []
for word in word_list:
try:
word_vector = word2vec_model.wv[word]
word_vectors.append(word_vector)
except KeyError:
print(f"Warning: '{word}' is not in the vocabulary.")
# Calculate cosine similarity matrix between word vectors
cosine_similarity_matrix = cosine_similarity(word_vectors, word_vectors)
cosine_similarity_matrix
Visualize Matrix¶
List of lists of words
word_list = ['dynamo', 'rammstein', 'stadion','fußball','football']
word_vectors = []
for word in word_list:
try:
word_vector = word2vec_model.wv[word]
word_vectors.append(word_vector)
except KeyError:
print(f"Warning: '{word}' is not in the vocabulary.")
Calculate cosine similarity using Scikit-Learn's cosine_similarity
cosine_similarity_matrix = cosine_similarity(word_vectors, word_vectors)
Create a heatmap to visualize the cosine similarity matrix
- Define a custom colormap (you can choose or create your own)
custom_cmap = sns.color_palette("coolwarm", as_cmap=True)
- Create a heatmap with the custom colormap
plt.figure(figsize=(10, 8))
heatmap = sns.heatmap(cosine_similarity_matrix, annot=True, xticklabels=word_list, yticklabels=word_list, cmap=custom_cmap)
plt.title("Cosine Similarity between Words")
plt.show()
Outlier Detection¶
We use the sklearn.cluster's AgglomerativeClustering method for hierarchical clustering to detect the main cluster contain the majority of words representing the main semantic field. Its a buttom up approach which begins with each data point as its own cluster and then iteratively merges the closest pairs of clusters (in our case the one with the min average distance between each observation of the two sets) into a single cluster until a stopping criterion is met (the threshold for the cosine_similarity measure).
def find_semantic_outliers(word_list, cosine_threshold=0.3, printwarnings: bool = None):
"""Find semantic outliers in a list of words based on cosine_threshold"""
if printwarnings is None:
printwarnings = False
# Compute Word2Vec embeddings for each word in the list
word_vectors = []
for word in word_list:
try:
word_vector = word2vec_model.wv[word]
word_vectors.append(word_vector)
except KeyError:
if printwarnings:
print(f"Warning: '{word}' is not in the vocabulary.")
# Calculate cosine similarity matrix between word vectors
cosine_similarity_matrix = cosine_similarity(
word_vectors, word_vectors)
# Perform agglomerative clustering
clustering = AgglomerativeClustering(
n_clusters=None, affinity='precomputed', linkage='average',
distance_threshold=1-cosine_threshold).fit(1-cosine_similarity_matrix)
# Find the cluster with the most words
unique_clusters, cluster_counts = np.unique(
clustering.labels_, return_counts=True)
main_cluster = unique_clusters[np.argmax(cluster_counts)]
# Identify potential outliers in other clusters
outliers = [
word_list[i] for i, label in enumerate(clustering.labels_)
if label != main_cluster]
return outliers
Test with list of lists of words
w_list = [['garten', 'garden', 'park'], ['dynamo', 'rammstein','stadion','fussball','football'], ['zoo', 'zoodresden', 'animals'],
['volkswagen', 'manufaktur', 'gläsernemanufaktur'], ['jungegarde', 'annenmaykantereidt', 'jungegardedresden'],
['palais', 'palaisteich', 'palace'], ['exhibition', 'skatepark', 'ausstellung'], ['brunnen', 'mosaik'],
['carolaschlösschen','schwan','nature','love', 'afterwork'], ['parkeisenbahndresden', 'train'],
['oldtown', 'town', 'prague'], ['breakfast', 'milchmädchen', 'frühstück'],
['estancia', 'steak']]
Iterate through the list of lists and detect semantic outliers for each sublist
for idx, word_sublist in enumerate(w_list):
print(f"List {idx + 1}: {word_sublist}")
if len(word_sublist) < 3:
print("Not enough words to perfom clustering")
else:
outliers = find_semantic_outliers(word_sublist, printwarnings=True)
print("Outliers:", outliers)
print()
Apply outlier detection to split subject prompts¶
For the actual application to our social media derived lists, we use the first 7
terms for outlier detection. Otherwise, there is too much noise from unrelated words used by a small minority of users.
N_TERMS: int = 7 # fixed limit of input terms for the outlier detection function
C_THRESHOLD: float = 0.3 # fixed cosine threshold
cluster_groups_semantic = cluster_groups.copy()
def split_semantic_outliers(
cluster_groups: pd.Series, n_terms: int = N_TERMS,
c_threshold = C_THRESHOLD, report: bool = None) -> pd.Series:
"""
Split semantic outliers into separate cluster groups;
return new updated Series that contains both
"""
d = {}
for idx, cluster_group in cluster_groups.items():
if report:
print(f"List {idx + 1}: {cluster_group[:n_terms]}")
if len(cluster_group) < 3:
d[idx] = cluster_group
continue
outliers = find_semantic_outliers(cluster_group[:n_terms], cosine_threshold=c_threshold)
if not outliers:
d[idx] = cluster_group
continue
words_excl_outlier = [term for term in cluster_group if not term in outliers]
d[idx] = words_excl_outlier
# provide new idx as offset,
# so we can later derive mthe matching cluster geom
d[idx+10000] = outliers[:n_terms]
if report:
print("Outliers:", outliers[:n_terms])
series = pd.Series(d)
series.rename_axis('group', inplace=True)
series.rename("ImpTag", inplace=True)
return series
cluster_groups_semantic = split_semantic_outliers(cluster_groups=cluster_groups_semantic, report=False)
cluster_groups_semantic
Prompt Preparation¶
There are some final cleanup & preparation steps necessary before image generation.
- Some tags repeat at lower cluster groups (e.g. "großergarten"; we want to remove these, to make space for more specific terms)
- Limit to the first
n
items in each list;n=3
. Stable diffusion will not be able to process much more context variety in a single prompt.
def pop_recursive(cluster_groups: pd.Series, lim_terms: int = 3) -> pd.Series:
"""Remove recursive terms that repeat at lower levels; return new Series
Further, limit to the list of terms per cluster to n items; n=3
"""
terms = set()
d = {}
for idx, cluster_group in cluster_groups.items():
# if len(cluster_group) == 1:
# continue
new_words = [term for term in cluster_group if not term in terms]
terms.update(set(new_words))
if len(new_words) > 0:
d[idx] = new_words[:lim_terms]
series = pd.Series(d)
series.rename_axis('group', inplace=True)
series.rename("ImpTag", inplace=True)
# series.sort_index(inplace=True)
return series
cleaned_groups = pop_recursive(cluster_groups_semantic)
Pop two cluster from a wrongly georeferenced Instagram place:
cleaned_groups.pop(10861.0)
cleaned_groups.pop(10250.0)
Merge two clusters wrongly split clusters:
cleaned_groups[195.0] = cleaned_groups[195.0] + cleaned_groups[10195.0]
cleaned_groups.pop(10195.0)
This is the list of prompts prepared for Stable Diffusion. Every group ID > 10000
is a colleciton of semantic outliers. These are scaled smaller on the map, due to lower prevalence.
cleaned_groups
from shapely.geometry.point import Point
from IPython.display import display as ipydisplay
def get_scale(geom_series: pd.Series, min_scale: float = 0.2, max_scale: float = 0.4) -> List[str]:
"""Get Scale (e.g. 0.2,0.2) for Mapnik Symbol Placement from cluster area
1. Take the Minimum cluster area
2. Take the Maximum cluster area
3. Create scale interpolation of values between min (default: 0.1) and max (default: 0.6)
"""
areas = geom_series.area
series_max = areas.max()
series_min = areas.min()
series_interp = np.interp(
areas, (series_min, series_max), (min_scale, max_scale))
# format for Mapnik and return
# return [f'{x:.2},{x:.2}' for x in series_interp]
return series_interp
def offset_points(points: List[Point]):
"""Try to minimize overlap by offsetting points a limited number of times
TODO: Not yet implemented; ideally look into adjustText and how this is
solved with bioframe.core.arrops.overlap_intervals()
"""
ipydisplay(points)
ipydisplay(type(points[0]))
for pt in points:
distance_between_pts = points[0].distance(pt)
print(distance_between_pts)
def offset_points_manual(points: List[Point]):
# garden, top cluster geom
points[0] = Point(points[0].x-50, points[0].y+100)
# stadium
points[1] = Point(points[1].x-500, points[1].y)
# rammstein
points[2] = Point(points[2].x-750, points[2].y+250)
# zoo
points[3] = Point(points[3].x-300, points[3].y)
# vw
points[4] = Point(points[4].x-150, points[4].y+100)
# botanical garden
points[5] = Point(points[5].x+100, points[5].y+150)
# palais
points[7] = Point(points[7].x, points[7].y)
# palace
points[8] = Point(points[8].x, points[8].y+100)
# robotron
points[9] = Point(points[9].x+100, points[9].y-100)
# skatepark
points[10] = Point(points[10].x-150, points[10].y)
# schwan
points[12] = Point(points[12].x+100, points[12].y)
# train
points[13] = Point(points[13].x, points[13].y-300)
# train
points[14] = Point(points[14].x, points[14].y+300)
# old town
points[15] = Point(points[15].x-150, points[15].y+150)
# hotel
points[17] = Point(points[17].x+200, points[17].y)
return points
def bounds_geom_match(row, cluster_sel):
geom = cluster_sel[cluster_sel["group"] == row.group]["geometry"]
if row.group >= 10000:
geom = cluster_sel[cluster_sel["group"] == row.group-10000]["geometry"]
return geom.to_frame().dissolve().geometry
def create_clustergroups_shape(
cluster_series: pd.Series, top_cluster_geom: "Point", cluster_gdf: gp.GeoDataFrame = cluster_sel,
output_folder: Path = None, crs_proj: str = CRS_PROJ, input: Path = INPUT):
"""Prepare cluster shapefile for Mapnik, store to output_folder"""
if output_folder is None:
output_folder = input / 'shapefiles_gg'
df = cluster_series.to_frame()
df.reset_index(inplace=True)
df["cluster_id"] = df.apply(cluster_id, axis=1)
# retrieve cluster geometry
df["geometry"] = df.apply(bounds_geom_match, axis=1, cluster_sel=cluster_gdf)
# update top cluster geom
df["geometry"][0] = top_cluster_geom
df["ImpTag"] = df.ImpTag.map(' '.join)
gdf = gp.GeoDataFrame(df, crs=crs_proj, geometry=df.geometry)
gdf["scale"] = get_scale(gdf.geometry)
# use point for symbol placement, as Mapnik will assume centroid of polygons anyway
gdf["geometry"] = [geom.centroid for geom in df["geometry"]]
gdf["geometry"] = offset_points_manual(gdf["geometry"])
gdf.to_file(filename = output_folder / 'gen_img_tags.shp', driver="ESRI Shapefile", encoding='utf-8')
create_clustergroups_shape(cluster_series=cleaned_groups, top_cluster_geom=top_cluster_geom)
Generate images for clusters¶
Note: To re-generate images, delete first in output/images
%%time
for ix, cluster_list in enumerate(cleaned_groups):
# pre_prompt = random.choice(["A map icon", "An icon", "A thought bubble"])
pre_prompt = "A map icon"
generate_samples(
# f"{pre_prompt} of ({', '.join(cluster_list[0:2])}), {''.join(cluster_list[2:])}",
f"{pre_prompt} of ({', '.join(cluster_list)})",
save_name=f"{ix:03d}", print_prompt=True)
Map generation¶
Use batch n=0-3
to select a different image batch for map generation.
_copy_generated_tomapnik(output_path = INPUT / "cluster_img_tags", batch=1)
from rembg import remove, new_session
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
SESSION = new_session(
model_name=MODEL_NAME,
alpha_matting=True,
alpha_matting_foreground_threshold=700,
alpha_matting_background_threshold=1,
alpha_matting_erode_size=10)
def remove_background(img: Path, session=SESSION):
"""Remove background from image and overwrite"""
input = Image.open(img)
output = remove(input, session=session)
output.save(img)
for file in (INPUT / "cluster_img_tags").glob('*.png'):
remove_background(file)
Render map
output_name = "tagmap_production_cluster_gg.png"
stylesheet = "tagmap_production_testraster_points_gg.xml"
%%time
!/usr/bin/python3 -m mapnik_cli \
--stylesheet_name {stylesheet} \
--output_name {output_name} \
--map_dimensiony_x 2000 \
--map_dimensiony_y 2000 \
--input_path {INPUT} \
--output_path {OUTPUT}
display.Image(f'{OUTPUT}/{output_name}')
img2img¶
The last test is to use a final img2img
pass to merge overlaid images with the background and produce a combined images, reducing the overlay effect of icons.
def img2img(
text, image_path, steps: int = 50, denoising_strength: float = 0.05,
api: str = APIURL, output=OUTPUT / "img2img"):
api_url = f"{api}/sdapi/v1/img2img"
with open(image_path, 'rb') as file:
image_data = file.read()
encoded_image = base64.b64encode(image_data).decode('utf-8')
payload = {
"init_images": [encoded_image],
'prompt' : text,
"steps": steps,
"denoising_strength": denoising_strength
}
response = requests.post(api_url, json=payload)
name = 'GENimg2img_'
for i in range(random.randint(15, 25)):
name += random.choice('QAZXfrSWEDCVFRTqazxswgbnhyujmkiolpGBNHYUJedcvtMKIOLP')
print(name)
if response.status_code == 200:
response_data = response.json()
encoded_result = response_data["images"][0]
result_data = base64.b64decode(encoded_result)
output_path = output / f'{name}.jpg'
with open(output_path, 'wb') as file:
file.write(result_data)
return name
name = img2img("A tourist city map with points of interests, wimmelbild", image_path=OUTPUT / output_name)
display.Image(OUTPUT / "img2img" / f'{name}.jpg')
The result here is not convincing. Even using a very small denoising_strength
of 0.05
produces a map with distorted icons.
One solution could be to tile the image, and use ControlNet, together with Upscaler, to produce a more fine-grained result.
We cannot use this currently through the API
, as ControlNet and Upscaler are extensions, and these extensions are not available through the /sdapi
endpoint. Try in the native webui.
Create Release File¶
Create a release file that contains ipynb notebooks, HTML, figures, svg and python converted files.
Make sure that 7z is available (apt-get install p7zip-full
)
!cd .. && git config --system --add safe.directory '*' \
&& RELEASE_VERSION=$(git describe --tags --abbrev=0) \
&& 7z a -tzip -mx=9 output/release_$RELEASE_VERSION.zip \
input/* md/* py/* output/* resources/* notebooks/*.ipynb \
CHANGELOG.md README.md jupytext.toml nbconvert.tpl conf.json pyproject.toml \
-xr!/__pycache__ -xr!.ipynb_checkpoints \
-y > /dev/null
Create notebook HTML¶
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./03_map_processing.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-