#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import networkx as nx
from networkx.algorithms.approximation.steinertree import steiner_tree
import random
import time
import multiprocessing
from functools import partial
from contextlib import contextmanager
from shapely.geometry import Point
import matplotlib.pyplot as plt
from dhd.logs import log
from dhd.utils import get_list_transpose, normalized_inner_product
from dhd.exceptions import IncompatibilityError, NoConnectionError
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
[docs]def remove_no_connection_terminals(terminals):
"""
Remove terminals which cannot be connected to the district heating network.
Parameters
----------
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
Returns
-------
DataFrame
Updated *terminals* dataframe.
"""
terminals = terminals.copy()
count = 0
for idx, terminal in terminals.iterrows():
if len(terminal["_id"]) == 0:
count += 1
terminals.drop(index=idx, inplace=True)
text = "{} terminals without connection removed."
log.info(text.format(count))
return terminals
[docs]def parameters_compatibility(n, n1, n2, mutation_rate):
"""
Assert that the input parameters are compatible with eachother.
Parameters
----------
n: int
Population size.
n1: int
Elite population size.
n2: int
Parent population size.
mutation_rate: float
Rate of genes mutation. Must belong to the open interval (0,1).
Return
------
bool
True if the parameters are compatible.
"""
assert n1 <= n, (
"The elites population must me smaller than " "the whole population."
)
assert n2 <= n, (
"The parents population must me smaller than " "the whole population."
)
assert 0 <= mutation_rate <= 1, (
"The mutation rate must belong " "to the open interval (0,1)."
)
return True
[docs]def dataframe_compatibility(vertices, terminals):
"""
Check the compatibility of the input dataframes.
Parameters
----------
vertices: DataFrame
Dataframe of the street graph plus the connection nodes.
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
Returns
-------
bool
True if the dataframes are compatible.
"""
index = set(vertices.idA).union(set(vertices.idB))
for idx, terminal in terminals.iterrows():
for id in terminal["_id"]:
if not id in index:
text = (
"The terminal connection {} doesn't belong to the "
"vertices nodes."
)
raise IncompatibilityError(text.format(terminal.name))
return True
[docs]def connect_terminals(vertices, terminals, genes):
"""
Add the connections from *terminals* associated to the given *genes* to the
graph *vertices*.
Parameters
----------
vertices : DataFrame
Dataframe of the street graph plus the connection nodes.
terminals : DataFrame
Dataframe of the terminals to connect to the graph.
genes : list
List of terminal single connections of the considered configuration.
Returns
-------
DataFrame
Updated *vertices* dataframe.
"""
tst_vertices = vertices.copy()
n = len(tst_vertices)
for i, a in enumerate(genes):
terminal = terminals.iloc[i]
tst_vertices.at[n + i, "idA"] = terminal.name
tst_vertices.at[n + i, "idB"] = terminal["_id"][a]
tst_vertices.at[n + i, "geometry"] = terminal["_geometry"][a]
tst_vertices.at[n + i, "weight"] = terminal["_weight"][a]
return tst_vertices
[docs]def get_terminal_steiner_tree(vertices, terminals):
"""
Compute the Steiner Tree (TST by construction) associated to the updated
graph *vertices* (with connection edges) of the considered configuration.
Parameters
----------
vertices: DataFrame
Dataframe of the street graph plus the connection nodes concatenated with a single connection at each terminal.
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
Returns
-------
Graph
Approximate Terminal Steiner Tree of the considered single connections
configuration.
"""
G = nx.from_pandas_edgelist(
vertices, source="idA", target="idB", edge_attr=list(vertices.columns)
)
T = steiner_tree(G, terminal_nodes=list(terminals.index), weight="weight")
return T
[docs]def get_tree_weight(individual, args):
"""
Find the approximate Steiner Tree associated to the genes of the given
*individual* and compute its weight.
Parameters
----------
individual: tuple = float, list
Individual weight and genes. The weight has not yet been computed and is
None.
args: tuple = Dataframe, DataFrame
Dataframes *vertices* and *terminals*.
Returns
-------
tuple = float, list
Individual with the updated weight associated to its genes.
"""
vertices, terminals = args
weigth, genes = individual
# add rows associated to the connections
tst_vertices = connect_terminals(vertices, terminals, genes)
# compute Steiner tree
T = get_terminal_steiner_tree(tst_vertices, terminals)
# graph -> DataFrame
df = nx.to_pandas_edgelist(T)
# get total weight
weight = sum(list(df["weight"]))
individual = (weight, genes)
return individual
[docs]def get_configuration_number_exponent(allele_number):
"""
Compute the approximate *log10* of the total number of connection
configurations.
Parameters
----------
allele_number: list
List of the number of possible connections at each terminal.
Returns
-------
int
Integer approximation of the *log10* of the total number of connection
configurations.
"""
configuration_number_exponent = 0
for A in allele_number:
if A == 0:
raise NoConnectionError("At least one terminal has no connection.")
else:
configuration_number_exponent += np.log10(A)
return int(configuration_number_exponent)
[docs]def set_allele_number(terminals):
"""
Counts the number of possible connection at each terminal.
Parameters
----------
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
Returns
-------
list
List of the number of possible connections at each terminal.
"""
allele_number = [len(terminal["_id"]) for idx, terminal in terminals.iterrows()]
configuration_number_exponent = get_configuration_number_exponent(allele_number)
text = (
"Discrete vector space of dimension d={} and total number of "
"configuration 10^{} initialized."
)
log.info(text.format(len(allele_number), configuration_number_exponent))
return allele_number
[docs]def init_individual_genes(allele_number):
"""
Generate a random genes list.
Parameters
----------
allele_number: list
List of the number of possible connections at each terminal.
Returns
-------
list
List of terminal single connections.
"""
genes = [random.randint(0, A - 1) for A in allele_number]
return genes
[docs]def init_population_genes(n, allele_number):
"""
Generate random genes for the *n* individuals of the population according to
the possible connections at each terminal *allele_number*.
Parameters
----------
n: int
Population size.
allele_number: list
List of the number of possible connections at each terminal.
Returns
-------
list
List of *n* individual tuples (weight=None, genes).
"""
population = [(None, init_individual_genes(allele_number)) for k in range(n)]
return population
[docs]def get_statistics(population):
"""
Get population weights mean and standard deviation.
Parameters
----------
population: list
List of individual tuples (weight, genes).
Returns
-------
float
Mean of the population weights.
float
Standard deviation of the population weigths.
"""
p = 1 / len(population)
M1 = sum([p * weight for weight, genes in population])
M2 = sum([p * weight ** 2 for weight, genes in population])
V = M2 - M1 ** 2
if V < 0:
text = "Negative variance ({}) of the population weights."
log.debug(text.format(V))
return M1, np.sqrt(V)
[docs]def individual_mutation(individual, allele_number, mutation_rate, inbred=False):
"""
Genes mutation of the given *individual*.
Each gene is randomly mutated with probability *mutation_rate*.
Parameters
----------
individual: tuple = float, list
Tuple of the individual weight and genes.
allele_number: list
List of the number of possible connections at each terminal.
mutation_rate: float
Rate of genes mutation. Must belong to the open interval (0,1).
inbred: bool, optional
True if the two parents are identical. Default is False
Returns
-------
tuple = float, list
Individual with weight reset to None and mutated genes.
"""
# get weight and genes
weight, genes = individual
genes_ = genes.copy()
count = 0
for i, A in enumerate(allele_number):
x = random.random()
if x <= mutation_rate and A > 1:
allele_set = set(range(A)) - set([genes_[i]])
genes_[i] = random.sample(allele_set, 1)[0]
count += 1
if count == 0 and inbred:
log.info("Inbred procreation lead to identical individual.")
return (None, genes_)
[docs]def select_random_parents(parents):
"""
Randomly select two parents from the parent population.
If the two parents are identical the procreation is classified as 'inbred'.
Parameters
----------
parents: list
Parents population.
Returns
-------
tuple = float, list
First parent individual.
tuple = float, list
second parent individual.
bool
True if the two parents are identical.
"""
# loop to encourage diversity mating
n = 10
for i in range(n):
mum, dad = random.sample(parents, 2)
if not mum[0] == dad[0]:
break
# check inbred procreation
if i == n - 1:
inbred = True
else:
inbred = False
return mum, dad, inbred
[docs]def procreate(mum, dad):
"""
Gene mixing of the two parent individuals.
Each gene of the children is either the one of its dad or its mum, with equal
probability.
Parameters
----------
mum: tuple = float, list
First parent individual.
dad: tuple = float, list
Second parent individual.
Returns
-------
tuple = float, list
Child individual with weight=None and a mixing of its parents genes.
"""
child = (None, [])
for i in range(len(mum[1])):
x = random.randint(0, 1)
if x == 0:
child[1].append(mum[1][i])
else:
child[1].append(dad[1][i])
return child
[docs]def reproduction(parents, allele_number, mutation_rate):
"""
Production of a new individual out of the set of parents.
Two parents are first selected, then mated and finally the child is mutated.
Parameters
----------
parents: list
Parents population.
allele_number: list
List of the number of possible connections at each terminal.
mutation_rate: float
Rate of genes mutation. Must belong to the open interval (0,1).
Returns
-------
tuple = float, list
Child individual with weight=None and a mixing of its parents genes
followed by a mutation.
"""
mum, dad, inbred = select_random_parents(parents)
child = procreate(mum, dad)
child = individual_mutation(child, allele_number, mutation_rate, inbred)
return child
[docs]def get_next_generation(elites, parents, n1, n, allele_number, mutation_rate):
"""
Generate the next generation from the *elites* and *parents* populations.
Parameters
----------
elites: list
Elites population.
parents: list
Parents population.
n1: int
Elites population size.
n: int
Population size.
allele_number: list
List of the number of possible connections at each terminal.
mutation_rate: float
Rate of genes mutation. Must belong to the open interval (0,1).
Returns
-------
list
List of individuals constituting the new generation.
"""
elite_mutations = [
individual_mutation(elite, allele_number, mutation_rate) for elite in elites
]
children = [
reproduction(parents, allele_number, mutation_rate) for i in range(n - n1)
]
population = elite_mutations + children
return population
[docs]def select_elites_and_parents(population, elites, n1, n2):
"""
Select the new elites and parents from the whole *population* and old
*elites*.
Parameters
----------
population: list
List of individuals constituting the population.
elites: list
Elites population.
n1: int
Elites population size.
n2: int
Parents population size.
Returns
-------
list
New elites population.
list
New parents population.
"""
noitalupop = get_list_transpose(population)
if elites is None:
sample = noitalupop
else:
setile = get_list_transpose(elites)
sample = [noitalupop[0] + setile[0], noitalupop[1] + setile[1]]
# sort the weight and get their index
index = sorted(range(len(sample[0])), key=lambda k: sample[0][k])
# sort the individuals
elites = [(sample[0][i], sample[1][i]) for i in index[0:n1]]
parents = [(sample[0][i], sample[1][i]) for i in index[0:n2]]
return elites, parents
[docs]def init_evolution_dataframe(N, save_population):
"""
Initilization of the dataframe to save the population evolution.
Parameters
----------
N: int
Number of generation.
save_population: bool
True if each generation population must be saved.
Returns
-------
DataFrame
Dataframe of the population evolution.
"""
if save_population:
columns = ["weight", "mean", "stddev", "genes", "population"]
else:
columns = ["weight", "mean", "stddev", "genes"]
evolution = pd.DataFrame(columns=columns, index=range(N))
return evolution
[docs]def save_generation(i, evolution, population, elites, save_population):
"""
Update the *evolution* dataframe with the informations on the current
generation ; in-place.
Parameters
----------
i: int
Generation number.
evolution: DataFrame
Dataframe of the population evolution.
population: list
List of individuals constituting the population.
elites: list
Elites population.
save_population: bool
True if the each generation population must be saved.
"""
elites_ = elites.copy()
population_ = population.copy()
mean, stddev = get_statistics(population)
evolution.at[i, "weight"] = elites_[0][0]
evolution.at[i, "mean"] = mean
evolution.at[i, "stddev"] = stddev
evolution.at[i, "genes"] = elites_[0][1]
if save_population:
evolution.at[i, "population"] = population_
[docs]def run_evolution(
vertices,
terminals,
N,
n=64,
n1=8,
n2=32,
mutation_rate=0.1,
save_population=False,
pool_number=6,
):
"""
Evolves a sample of *n* configurations of single terminal connections seeking
for the lowest Terminal Steiner Tree (TST) weight.
The algorithm swipes the space of single connections
.. math::
V = \otimes_{i=0}^{d-1} \:\: \{0,\cdots,A_i-1\}
namely the discrete space of dimension d = *# of terminals* with each direction
spanned by the integers between 0 and A_i, with A_i = *# of
possible connection to terminal i*.
A set a *n* vectors is first chosen randomly and then evolved through the
configuration space generation after generation. At each generation, the *n2*
best individuals (*parents*) reproduce amongst themselves as couples. These
*n-n1* children as well as the *n1* best individual (*elites*) of the
previous generation then undergo a gene mutation of parameter *mutation_rate*.
Parameters
----------
vertices: DataFrame
Dataframe of the street graph plus the connection nodes.
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
N: int
Number of generation.
n: int
Population size.
n1: int
Elite population size.
n2: int
Parent population size.
mutation_rate: float
Rate of genes mutation. Must belong to the open interval (0,1).
save_population: bool
True if each generation population must be saved.
pool_number: int, optional
Number of processes working in parallel. Default is 6.
Returns
-------
evolution: DataFrame
Dataframe of the population evolution with the following structure:
* INDEX: generation integers from 0 to *N*-1.
* COLUMNS:
- 'weight': weight of the best individual,
- 'mean': mean weight of the population,
- 'stddev': standard deviation of the population weights,
- 'genes': genes of the best individual,
- 'population': whole population weights and genes (only if
*save_population* is True).
"""
assert parameters_compatibility(n, n1, n2, mutation_rate)
terminals = remove_no_connection_terminals(terminals)
assert dataframe_compatibility(vertices, terminals)
# list the possible connection number of each leaf
allele_number = set_allele_number(terminals)
# generate initial random vector
population = init_population_genes(n, allele_number)
# initialize the storing lists
evolution = init_evolution_dataframe(N, save_population)
args = (vertices, terminals)
elites = None
# evolution
for i in range(N):
tic = time.time()
# multiprocessing computation of weights of the Steiner trees associated
# to each vector of the current generation
with poolcontext(processes=pool_number) as pool:
population = pool.map(partial(get_tree_weight, args=args), population)
# select elites and parents by sorting the population
elites, parents = select_elites_and_parents(population, elites, n1, n2)
save_generation(i, evolution, population, elites, save_population)
# prepare next generation
population = get_next_generation(
elites, parents, n1, n, allele_number, mutation_rate
)
tac = time.time()
text = "generation {} / {} completed in {} seconds. Current best weight: {}"
log.info(text.format(i + 1, N, round(tac - tic, 2), evolution.at[i, "weight"]))
return evolution
[docs]def get_best_individual(evolution):
"""
Select the best connections configuration of the whole evolution.
Parameters
----------
evolution: DataFrame
Dataframe of the population evolution.
Returns
-------
List of the genes (connection configuration) of the best individual.
"""
N = len(evolution)
genes = evolution.loc[N - 1, "genes"]
return genes
[docs]def get_best_terminal_steiner_tree(vertices, terminals, evolution):
"""
Find the TST dataframe with the smallest weight from those computed during
the evolution.
Parameters
----------
vertices: DataFrame
Dataframe of the street graph plus the connection nodes.
terminals: DataFrame
Dataframe of the terminals to connect to the graph.
evolution: DataFrame
Dataframe of the population evolution.
Returns
-------
tst: DataFrame
Dataframe of the best computed TST with the following structure:
* INDEX: Integers.
* COLUMNS:
- 'idA': first vertex end node index,
- 'idB': second vertex end node index,
- 'pA': coordinates (shapely Point) of the node 'idA',
- 'pB': coordinates (shapely Point) of the node 'idB',
- 'weight': vertex weight,
- 'geometry': vertex geometry.
"""
genes = get_best_individual(evolution)
tst_terminals = remove_no_connection_terminals(terminals)
tst_vertices = connect_terminals(vertices, tst_terminals, genes)
tst = get_terminal_steiner_tree(tst_vertices, tst_terminals)
tst = nx.to_pandas_edgelist(tst, source="idA", target="idB")
set_nodes_coordinates(tst)
tst.drop(columns=["xA", "yA", "xB", "yB", "idE"], inplace=True)
return tst
[docs]def set_nodes_coordinates(tst):
"""
Find all TST nodes coordinates and save them as shapely Points ; in-place.
The key 'pA' ('pB') correspond to the first (last) shapely Point of the edge
geometry.
Parameters
----------
tst: DataFrame
DataFrame of the TST edges.
"""
tst["pA"] = None
tst["pB"] = None
for i, edge in tst.iterrows():
line = edge["geometry"].xy
pA = Point(line[0][0], line[1][0])
pB = Point(line[0][-1], line[1][-1])
tst.at[i, "pA"] = pA
tst.at[i, "pB"] = pB
[docs]def get_genes_variation(evolution):
"""
Measure the "distance" between each following pair of best genes.
The distance is defined as the cosine of the angle between the two considered
gene vectors. The closer the returned number is to one, the less variation.
Parameters
----------
evolution: DataFrame
Dataframe of the population evolution.
Returns
-------
list
List of number between 0 and 1 representing the distance between each
following genes pairs.
"""
genes = evolution["genes"]
n = len(genes)
genes_variation = list()
for i in range(1, n):
gene1 = np.array(genes[i - 1])
gene2 = np.array(genes[i])
d = normalized_inner_product(gene1, gene2)
genes_variation.append(d)
return genes_variation
[docs]def show_evolution_statistics(evolution):
"""
Plot the evolution of the best weight, the mean weight, the standard
deviation and the genes variations.
The gene variation is defined as the cosine of the angle between the best
gene of a generation and the best gene of its previous generation. The number
lies between 0 and 1. The closer iit is to 0, the more mutation there is.
Parameters
----------
evolution: DataFrame
Dataframe of the population evolution.
"""
genes_variation = get_genes_variation(evolution)
fig, ax = plt.subplots(3, figsize=(9, 12))
ax[0].plot(
evolution["weight"], marker="o", Linestyle="--", color="C0", label="best weight"
)
ax[0].plot(
evolution["mean"], marker="o", Linestyle="--", color="C1", label="mean weight"
)
ax[0].set_xlabel("generation number", fontsize=14)
ax[0].set_ylabel("TST weight", fontsize=14)
ax[0].legend(loc="best", fontsize=14)
ax[1].plot(evolution["stddev"], marker="o", Linestyle="--", color="C0")
ax[1].set_xlabel("generation number", fontsize=14)
ax[1].set_ylabel("TST standard deviation", fontsize=14)
ax[2].plot(genes_variation, marker="o", Linestyle="--", color="C0")
ax[2].set_xlabel("generation number", fontsize=14)
ax[2].set_ylabel("best gene variation", fontsize=14)
plt.show()