import numpy as np
import scipy.sparse
import scipy
import os
from . import models_functions as mof
from . import solver_functions as sof
from . import network_functions as nef
from . import ensemble_generator as eg
[docs]class UndirectedGraph:
"""Undirected Graph instance can be initialised with adjacency
matrix, edgelist, degrees sequence or strengths sequence.
:param adjacency: Adjacency matrix, defaults to None.
:type adjacency: numpy.ndarray, list or scipy.sparse_matrix, optional
:param edgelist: edgelist, defaults to None.
:type edgelist: numpy.ndarray, list, optional
:param degree_sequence: degrees sequence, defaults to None.
:type degree_sequence: numpy.ndarray, optional
:param strength_sequence: strengths sequence, defaults to None.
:type strength_sequence: numpy.ndarray, optional
"""
def __init__(
self,
adjacency=None,
edgelist=None,
degree_sequence=None,
strength_sequence=None,
):
self.n_nodes = None
self.n_edges = None
self.adjacency = None
self.is_sparse = False
self.edgelist = None
self.dseq = None
self.strength_sequence = None
self.nodes_dict = None
self.is_initialized = False
self.is_randomized = False
self.is_weighted = False
self._initialize_graph(
adjacency=adjacency,
edgelist=edgelist,
degree_sequence=degree_sequence,
strength_sequence=strength_sequence,
)
self.avg_mat = None
self.initial_guess = None
# Reduced problem parameters
self.is_reduced = False
self.r_dseq = None
self.r_n = None
self.r_invert_dseq = None
self.r_dim = None
self.r_multiplicity = None
# Problem solutions
self.x = None
self.beta = None
# reduced solutions
self.r_x = None
# Problem (reduced) residuals
self.residuals = None
self.final_result = None
self.r_beta = None
self.nz_index = None
self.rnz_n = None
# model
self.x0 = None
self.error = None
self.error_degree = None
self.relative_error_degree = None
self.error_strength = None
self.relative_error_strength = None
self.full_return = False
self.last_model = None
# function
self.args = None
def _initialize_graph(
self,
adjacency=None,
edgelist=None,
degree_sequence=None,
strength_sequence=None,
):
# Here we can put controls over the type of input. For instance, if the graph is directed,
# i.e. adjacency matrix is asymmetric, the class to use must be the DiGraph,
# or if the graph is weighted (edgelist contains triplets or matrix is not binary) or bipartite
if adjacency is not None:
if not isinstance(
adjacency, (list, np.ndarray)
) and not scipy.sparse.isspmatrix(adjacency):
raise TypeError(
"The adjacency matrix must be passed as a list or"
"numpy array or scipy sparse matrix."
)
elif adjacency.size > 0:
if np.sum(adjacency < 0):
raise TypeError(
"The adjacency matrix entries must be positive."
)
if isinstance(
adjacency, list
):
self.adjacency = np.array(adjacency)
elif isinstance(adjacency, np.ndarray):
self.adjacency = adjacency
else:
self.adjacency = adjacency
self.is_sparse = True
if np.sum(adjacency) == np.sum(adjacency > 0):
self.dseq = nef.degree(adjacency).astype(np.float64)
else:
self.dseq = nef.degree(adjacency).astype(np.float64)
self.strength_sequence = nef.strength(adjacency).astype(
np.float64
)
self.nz_index = np.nonzero(self.strength_sequence)[0]
self.is_weighted = True
self.n_nodes = len(self.dseq)
self.n_edges = np.sum(self.dseq)/2
self.is_initialized = True
elif edgelist is not None:
if not isinstance(edgelist, (list, np.ndarray)):
raise TypeError(
"The edgelist must be passed as a list or numpy array."
)
elif len(edgelist) > 0:
if len(edgelist[0]) > 3:
raise ValueError(
"This is not an edgelist. An edgelist must be a list"
"or array of couples of nodes with optional weights."
"Is this an adjacency matrix?"
)
elif len(edgelist[0]) == 2:
(
self.edgelist,
self.dseq,
self.nodes_dict,
) = nef.edgelist_from_edgelist_undirected(edgelist)
else:
(
self.edgelist,
self.dseq,
self.strength_sequence,
self.nodes_dict,
) = nef.edgelist_from_edgelist_undirected(edgelist)
self.n_nodes = len(self.dseq)
self.n_edges = np.sum(self.dseq)/2
self.is_initialized = True
elif degree_sequence is not None:
if not isinstance(degree_sequence, (list, np.ndarray)):
raise TypeError(
"The degree sequence must be passed as a list or"
"numpy array."
)
elif len(degree_sequence) > 0:
try:
int(degree_sequence[0])
except:
raise TypeError(
"The degree sequence must contain numeric values."
)
if (np.array(degree_sequence) < 0).sum() > 0:
raise ValueError("A degree cannot be negative.")
else:
self.n_nodes = int(len(degree_sequence))
self.dseq = degree_sequence.astype(np.float64)
self.n_edges = np.sum(self.dseq)/2
self.is_initialized = True
if strength_sequence is not None:
if not isinstance(strength_sequence, (list, np.ndarray)):
raise TypeError(
"The strength sequence must be passed as a"
"list or numpy array."
)
elif len(strength_sequence):
try:
int(strength_sequence[0])
except:
raise TypeError(
"The strength sequence must contain numeric"
"values."
)
if (np.array(strength_sequence) < 0).sum() > 0:
raise ValueError("A strength cannot be negative.")
else:
if len(strength_sequence) != len(degree_sequence):
raise ValueError(
"Degrees and strengths arrays must have"
"same length."
)
self.n_nodes = int(len(strength_sequence))
self.strength_sequence = strength_sequence.astype(
np.float64
)
self.nz_index = np.nonzero(self.strength_sequence)[
0
]
self.is_weighted = True
self.is_initialized = True
elif strength_sequence is not None:
if not isinstance(strength_sequence, (list, np.ndarray)):
raise TypeError(
"The strength sequence must be passed as a list"
"or numpy array."
)
elif len(strength_sequence):
try:
int(strength_sequence[0])
except:
raise TypeError(
"The strength sequence must contain numeric values."
)
if (np.array(strength_sequence) < 0).sum() > 0:
raise ValueError("A strength cannot be negative.")
else:
self.n_nodes = int(len(strength_sequence))
self.strength_sequence = strength_sequence
self.nz_index = np.nonzero(self.strength_sequence)[0]
self.is_weighted = True
self.is_initialized = True
[docs] def set_adjacency_matrix(self, adjacency):
"""Initialises graph given the adjacency matrix.
:param adjacency: ajdacency matrix.
:type adjacency: numpy.ndarray, list, scipy.sparse_matrix
"""
if self.is_initialized:
print(
"Graph already contains edges or has a degree sequence."
"Use clean_edges() first."
)
else:
self._initialize_graph(adjacency=adjacency)
[docs] def set_edgelist(self, edgelist):
"""Initialises graph given the edgelist.
:param edgelist: edgelist.
:type edgelist: numpy.ndarray, list
"""
if self.is_initialized:
print(
"Graph already contains edges or has a degree sequence."
"Use clean_edges() first."
)
else:
self._initialize_graph(edgelist=edgelist)
[docs] def set_degree_sequences(self, degree_sequence):
"""Initialises graph given the degrees sequence.
:param degree_sequence: degrees sequence.
:type degree_sequence: numpy.ndarray
"""
if self.is_initialized:
print(
"Graph already contains edges or has a degree sequence."
"Use clean_edges() first."
)
else:
self._initialize_graph(degree_sequence=degree_sequence)
[docs] def clean_edges(self):
"""Deletes all the initialiased attributes.
"""
self.adjacency = None
self.edgelist = None
self.deg_seq = None
self.is_initialized = False
def _solve_problem(
self,
initial_guess=None,
model="cm",
method="quasinewton",
max_steps=100,
tol=1e-8,
eps=1e-8,
full_return=False,
verbose=False,
linsearch=True,
regularise=True,
):
self.last_model = model
self.full_return = full_return
self.initial_guess = initial_guess
self.regularise = regularise
self._initialize_problem(self.last_model, method)
x0 = self.x0
sol = sof.solver(
x0,
fun=self.fun,
fun_jac=self.fun_jac,
step_fun=self.step_fun,
linsearch_fun=self.fun_linsearch,
hessian_regulariser=self.hessian_regulariser,
tol=tol,
eps=eps,
max_steps=max_steps,
method=method,
verbose=verbose,
regularise=regularise,
full_return=full_return,
linsearch=linsearch,
)
self._set_solved_problem(sol)
def _set_solved_problem_cm(self, solution):
if self.full_return:
self.r_xy = solution[0]
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
self.r_xy = solution
self.r_x = self.r_xy
if self.last_model == "cm":
self.x = self.r_x[self.r_invert_dseq]
elif self.last_model == "cm_exp":
self.x = np.exp(-self.r_x[self.r_invert_dseq])
def _set_solved_problem(self, solution):
model = self.last_model
if model in ["cm", "cm_exp"]:
self._set_solved_problem_cm(solution)
elif model in ["ecm", "ecm_exp"]:
self._set_solved_problem_ecm(solution)
elif model in ["crema", "crema-sparse"]:
self._set_solved_problem_crema_undirected(solution)
[docs] def degree_reduction(self):
"""
Carries out degree reduction for UBCM.
The graph should be initialized.
"""
self.r_dseq, self.r_index_dseq, self.r_invert_dseq, self.r_multiplicity = np.unique(
self.dseq,
return_index=True,
return_inverse=True,
return_counts=True,
axis=0,
)
self.rnz_n = self.r_dseq.size
self.is_reduced = True
def _set_initial_guess(self, model):
if model in ["cm", "cm_exp"]:
self._set_initial_guess_cm()
elif model in ["ecm", "ecm_exp"]:
self._set_initial_guess_ecm()
elif model in ["crema", "crema-sparse"]:
self._set_initial_guess_crema_undirected()
def _set_initial_guess_cm(self):
if ~self.is_reduced:
self.degree_reduction()
if isinstance(self.initial_guess, np.ndarray):
self.r_x = self.initial_guess[self.r_index_dseq]
elif isinstance(self.initial_guess, str):
if self.initial_guess == "degrees_minor":
self.r_x = self.r_dseq / (
np.sqrt(self.n_edges) + 1
) # This +1 increases the stability of the solutions.
elif self.initial_guess == "random":
self.r_x = np.random.rand(self.rnz_n).astype(np.float64)
elif self.initial_guess == "uniform":
self.r_x = 0.5 * np.ones(
self.rnz_n, dtype=np.float64
) # All probabilities will be 1/2 initially
elif self.initial_guess == "degrees":
self.r_x = self.r_dseq.astype(np.float64)
elif self.initial_guess == "chung_lu":
self.r_x = self.r_dseq.astype(np.float64)/(2*self.n_edges)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.r_x[self.r_dseq == 0] = 0
if isinstance(self.initial_guess, str):
if self.last_model == "cm":
self.x0 = self.r_x
elif self.last_model == "cm_exp":
self.r_x[self.r_x != 0] = -np.log(self.r_x[self.r_x != 0])
self.x0 = self.r_x
elif isinstance(self.initial_guess, np.ndarray):
self.x0 = self.r_x
def _set_initial_guess_crema_undirected(self):
if isinstance(self.initial_guess, np.ndarray):
self.beta = self.initial_guess
elif isinstance(self.initial_guess, str):
if self.initial_guess == "strengths":
self.beta = (self.strength_sequence > 0).astype(
float
) / self.strength_sequence.sum()
elif self.initial_guess == "strengths_minor":
self.beta = (self.strength_sequence > 0).astype(float) / (
self.strength_sequence + 1
)
elif self.initial_guess == "random":
self.beta = np.random.rand(self.n_nodes).astype(np.float64)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.beta[self.strength_sequence == 0] = 0
self.x0 = self.beta
def _set_initial_guess_ecm(self):
if isinstance(self.initial_guess, np.ndarray):
self.x = self.initial_guess[:self.n_nodes]
self.y = self.initial_guess[self.n_nodes:]
elif isinstance(self.initial_guess, str):
if self.initial_guess == "strengths":
self.x = self.dseq.astype(float) / (
self.n_edges + 1
) # This +1 increases the stability of the solutions.
self.y = (
self.strength_sequence.astype(float)
/ self.strength_sequence.sum()
)
elif self.initial_guess == "strengths_minor":
self.x = np.ones_like(self.dseq, dtype=np.float64) / (
self.dseq + 1
)
self.y = np.ones_like(self.strength_sequence, dtype=np.float64) / (
self.strength_sequence + 1
)
elif self.initial_guess == "random":
self.x = np.random.rand(self.n_nodes).astype(np.float64)
self.y = np.random.rand(self.n_nodes).astype(np.float64)
elif self.initial_guess == "uniform":
self.x = 0.001 * np.ones(self.n_nodes, dtype=np.float64)
self.y = 0.001 * np.ones(self.n_nodes, dtype=np.float64)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.x[self.dseq == 0] = 0
self.y[self.strength_sequence == 0] = 0
if isinstance(self.initial_guess, str):
if self.last_model == "ecm":
self.x0 = np.concatenate((self.x, self.y))
elif self.last_model == "ecm_exp":
self.x[self.x != 0] = -np.log(self.x[self.x != 0])
self.y[self.y != 0] = -np.log(self.y[self.y != 0])
self.x0 = np.concatenate((self.x, self.y))
elif isinstance(self.initial_guess, np.ndarray):
self.x0 = np.concatenate((self.x, self.y))
# DA SISTEMARE
def _solution_error(self):
if self.last_model in ["cm", "cm_exp", "crema", "crema-sparse"]:
if self.x is not None:
ex_k = mof.expected_degree_cm(self.x)
# print(k, ex_k)
self.expected_dseq = ex_k
# error output
self.error_degree = np.linalg.norm(ex_k - self.dseq, ord=np.inf)
self.relative_error_degree = np.linalg.norm((ex_k - self.dseq)/(self.dseq + np.exp(-100)), ord=np.inf)
self.error = self.error_degree
if self.beta is not None:
if self.is_sparse:
ex_s = mof.expected_strength_crema_undirected_sparse(
self.beta, self.adjacency_crema
)
else:
ex_s = mof.expected_strength_crema_undirected(
self.beta, self.adjacency_crema
)
self.expected_stregth_seq = ex_s
# error output
self.error_strength = np.linalg.norm(
ex_s - self.strength_sequence, ord=np.inf
)
self.relative_error_strength = np.max(
abs(
(ex_s - self.strength_sequence) / (self.strength_sequence + np.exp(-100))
)
)
if self.adjacency_given:
self.error = self.error_strength
else:
self.error = max(self.error_strength, self.error_degree)
# potremmo strutturarlo così per evitare ridondanze
elif self.last_model in ["ecm", "ecm_exp"]:
sol = np.concatenate((self.x, self.y))
ex = mof.expected_ecm(sol)
k = np.concatenate((self.dseq, self.strength_sequence))
self.expected_dseq = ex[: self.n_nodes]
self.expected_strength_seq = ex[self.n_nodes:]
# error output
self.error_degree = np.linalg.norm(self.expected_dseq -
self.dseq, ord=np.inf)
self.error_strength = np.linalg.norm(self.expected_strength_seq -
self.strength_sequence,
ord=np.inf)
self.relative_error_strength = max(
abs(
(self.strength_sequence -
self.expected_strength_seq)/self.strength_sequence
)
)
self.relative_error_degree = max(
abs(
(self.dseq - self.expected_dseq)/self.dseq
)
)
self.error = max(self.error_strength, self.error_degree)
def _set_args(self, model):
if model in ["crema", "crema-sparse"]:
self.args = (
self.strength_sequence,
self.adjacency_crema,
self.nz_index,
)
elif model in ["cm", "cm_exp"]:
self.args = (self.r_dseq, self.r_multiplicity)
elif model in ["ecm", "ecm_exp"]:
self.args = (self.dseq, self.strength_sequence)
def _initialize_problem(self, model, method):
self._set_initial_guess(model)
self._set_args(model)
mod_met = "-"
mod_met = mod_met.join([model, method])
d_fun = {
"cm-newton": lambda x: -mof.loglikelihood_prime_cm(x, self.args),
"cm-quasinewton": lambda x: -mof.loglikelihood_prime_cm(x, self.args),
"cm-fixed-point": lambda x: mof.iterative_cm(x, self.args),
"crema-newton": lambda x: -mof.loglikelihood_prime_crema_undirected(
x, self.args
),
"crema-quasinewton": lambda x: -mof.loglikelihood_prime_crema_undirected(
x, self.args
),
"crema-fixed-point": lambda x: -mof.iterative_crema_undirected(x, self.args),
"ecm-newton": lambda x: -mof.loglikelihood_prime_ecm(x, self.args),
"ecm-quasinewton": lambda x: -mof.loglikelihood_prime_ecm(
x, self.args
),
"ecm-fixed-point": lambda x: mof.iterative_ecm(x, self.args),
"crema-sparse-newton": lambda x: -mof.loglikelihood_prime_crema_undirected_sparse(
x, self.args
),
"crema-sparse-quasinewton": lambda x: -mof.loglikelihood_prime_crema_undirected_sparse(
x, self.args
),
"crema-sparse-fixed-point": lambda x: -mof.iterative_crema_undirected_sparse(
x, self.args
),
"cm_exp-newton": lambda x: -mof.loglikelihood_prime_cm_exp(
x, self.args
),
"cm_exp-quasinewton": lambda x: -mof.loglikelihood_prime_cm_exp(
x, self.args
),
"cm_exp-fixed-point": lambda x: mof.iterative_cm_exp(x, self.args),
"ecm_exp-newton": lambda x: -mof.loglikelihood_prime_ecm_exp(
x, self.args
),
"ecm_exp-quasinewton": lambda x: -mof.loglikelihood_prime_ecm_exp(
x, self.args
),
"ecm_exp-fixed-point": lambda x: mof.iterative_ecm_exp(x, self.args),
}
d_fun_jac = {
"cm-newton": lambda x: -mof.loglikelihood_hessian_cm(x, self.args),
"cm-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_cm(
x, self.args
),
"cm-fixed-point": None,
"crema-newton": lambda x: -mof.loglikelihood_hessian_crema_undirected(
x, self.args
),
"crema-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_crema_undirected(
x, self.args
),
"crema-fixed-point": None,
"ecm-newton": lambda x: -mof.loglikelihood_hessian_ecm(x, self.args),
"ecm-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_ecm(
x, self.args
),
"ecm-fixed-point": None,
"crema-sparse-newton": lambda x: -mof.loglikelihood_hessian_crema_undirected(
x, self.args
),
"crema-sparse-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_crema_undirected_sparse(
x, self.args
),
"crema-sparse-fixed-point": None,
"cm_exp-newton": lambda x: -mof.loglikelihood_hessian_cm_exp(
x, self.args
),
"cm_exp-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_cm_exp(
x, self.args
),
"cm_exp-fixed-point": None,
"ecm_exp-newton": lambda x: -mof.loglikelihood_hessian_ecm_exp(
x, self.args
),
"ecm_exp-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_ecm_exp(
x, self.args
),
"ecm_exp-fixed-point": None,
}
d_fun_stop = {
"cm-newton": lambda x: -mof.loglikelihood_cm(x, self.args),
"cm-quasinewton": lambda x: -mof.loglikelihood_cm(x, self.args),
"cm-fixed-point": lambda x: -mof.loglikelihood_cm(x, self.args),
"crema-newton": lambda x: -mof.loglikelihood_crema_undirected(x, self.args),
"crema-quasinewton": lambda x: -mof.loglikelihood_crema_undirected(
x, self.args
),
"crema-fixed-point": lambda x: -mof.loglikelihood_crema_undirected(
x, self.args
),
"ecm-newton": lambda x: -mof.loglikelihood_ecm(x, self.args),
"ecm-quasinewton": lambda x: -mof.loglikelihood_ecm(x, self.args),
"ecm-fixed-point": lambda x: -mof.loglikelihood_ecm(x, self.args),
"crema-sparse-newton": lambda x: -mof.loglikelihood_crema_undirected_sparse(
x, self.args
),
"crema-sparse-quasinewton": lambda x: -mof.loglikelihood_crema_undirected_sparse(
x, self.args
),
"crema-sparse-fixed-point": lambda x: -mof.loglikelihood_crema_undirected_sparse(
x, self.args
),
"cm_exp-newton": lambda x: -mof.loglikelihood_cm_exp(x, self.args),
"cm_exp-quasinewton": lambda x: -mof.loglikelihood_cm_exp(
x, self.args
),
"cm_exp-fixed-point": lambda x: -mof.loglikelihood_cm_exp(
x, self.args
),
"ecm_exp-newton": lambda x: -mof.loglikelihood_ecm_exp(x, self.args),
"ecm_exp-quasinewton": lambda x: -mof.loglikelihood_ecm_exp(
x, self.args
),
"ecm_exp-fixed-point": lambda x: -mof.loglikelihood_ecm_exp(
x, self.args
),
}
try:
self.fun = d_fun[mod_met]
self.fun_jac = d_fun_jac[mod_met]
self.step_fun = d_fun_stop[mod_met]
except:
raise ValueError(
'Method must be "newton","quasi-newton", or "fixed-point".'
)
d_pmatrix = {
"cm": mof.pmatrix_cm,
"cm_exp": mof.pmatrix_cm,
}
if model in ["cm", "cm_exp"]:
self.args_p = (self.n_nodes, np.nonzero(self.dseq)[0])
self.fun_pmatrix = lambda x: d_pmatrix[model](x, self.args_p)
args_lin = {
"cm": (mof.loglikelihood_cm, self.args),
"crema": (mof.loglikelihood_crema_undirected, self.args),
"crema-sparse": (mof.loglikelihood_crema_undirected_sparse, self.args),
"ecm": (mof.loglikelihood_ecm, self.args),
"cm_exp": (mof.loglikelihood_cm_exp, self.args),
"ecm_exp": (mof.loglikelihood_ecm_exp, self.args),
}
self.args_lins = args_lin[model]
lins_fun = {
"cm-newton": lambda x: mof.linsearch_fun_CM(x, self.args_lins),
"cm-quasinewton": lambda x: mof.linsearch_fun_CM(x, self.args_lins),
"cm-fixed-point": lambda x: mof.linsearch_fun_CM_fixed(x),
"crema-newton": lambda x: mof.linsearch_fun_crema_undirected(x, self.args_lins),
"crema-quasinewton": lambda x: mof.linsearch_fun_crema_undirected(x, self.args_lins),
"crema-fixed-point": lambda x: mof.linsearch_fun_crema_undirected_fixed(x),
"crema-sparse-newton": lambda x: mof.linsearch_fun_crema_undirected(x, self.args_lins),
"crema-sparse-quasinewton": lambda x: mof.linsearch_fun_crema_undirected(x, self.args_lins),
"crema-sparse-fixed-point": lambda x: mof.linsearch_fun_crema_undirected_fixed(x),
"ecm-newton": lambda x: mof.linsearch_fun_ECM(x, self.args_lins),
"ecm-quasinewton": lambda x: mof.linsearch_fun_ECM(x, self.args_lins),
"ecm-fixed-point": lambda x: mof.linsearch_fun_ECM_fixed(x),
"cm_exp-newton": lambda x: mof.linsearch_fun_CM_exp(x, self.args_lins),
"cm_exp-quasinewton": lambda x: mof.linsearch_fun_CM_exp(x, self.args_lins),
"cm_exp-fixed-point": lambda x: mof.linsearch_fun_CM_exp_fixed(x),
"ecm_exp-newton": lambda x: mof.linsearch_fun_ECM_exp(x, self.args_lins),
"ecm_exp-quasinewton": lambda x: mof.linsearch_fun_ECM_exp(x, self.args_lins),
"ecm_exp-fixed-point": lambda x: mof.linsearch_fun_ECM_exp_fixed(x)
}
self.fun_linsearch = lins_fun[mod_met]
hess_reg = {
"cm": sof.matrix_regulariser_function_eigen_based,
"cm_exp": sof.matrix_regulariser_function,
"ecm": sof.matrix_regulariser_function_eigen_based,
"ecm_exp": sof.matrix_regulariser_function,
"crema": sof.matrix_regulariser_function,
"crema-sparse": sof.matrix_regulariser_function,
}
self.hessian_regulariser = hess_reg[model]
if isinstance(self.regularise, str):
if self.regularise == "eigenvalues":
self.hessian_regulariser = sof.matrix_regulariser_function_eigen_based
elif self.regularise == "identity":
self.hessian_regulariser = hessian_regulariser_function
def _solve_problem_crema_undirected(
self,
initial_guess=None,
model="crema",
adjacency="cm",
method="quasinewton",
method_adjacency="newton",
initial_guess_adjacency="random",
max_steps=100,
tol=1e-8,
eps=1e-8,
full_return=False,
verbose=False,
linsearch=True,
regularise=True,
):
if model == "crema-sparse":
self.is_sparse = True
else:
self.is_sparse = False
if not isinstance(adjacency, (list, np.ndarray, str)) and (
not scipy.sparse.isspmatrix(adjacency)
):
raise ValueError("adjacency must be a matrix or a method")
elif isinstance(adjacency, str):
# aggiungere check sul modello passato per l'adjacency matrix
self._solve_problem(
initial_guess=initial_guess_adjacency,
model=adjacency,
method=method_adjacency,
max_steps=max_steps,
tol=tol,
eps=eps,
full_return=full_return,
verbose=verbose,
linsearch=linsearch,
regularise=regularise
)
if self.is_sparse:
self.adjacency_crema = (self.x,)
self.adjacency_given = False
else:
pmatrix = self.fun_pmatrix(self.x)
raw_ind, col_ind = np.nonzero(np.triu(pmatrix))
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = pmatrix[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = False
elif isinstance(adjacency, list):
adjacency = np.array(adjacency).astype(np.float64)
raw_ind, col_ind = np.nonzero(np.triu(adjacency))
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = adjacency[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
elif isinstance(adjacency, np.ndarray):
adjacency = adjacency.astype(np.float64)
raw_ind, col_ind = np.nonzero(np.triu(adjacency))
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = adjacency[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
elif scipy.sparse.isspmatrix(adjacency):
raw_ind, col_ind = scipy.sparse.triu(adjacency).nonzero()
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = (adjacency[raw_ind, col_ind].A1).astype(np.float64)
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
if self.is_sparse:
self.last_model = "crema-sparse"
else:
self.last_model = model
linsearch = linsearch
regularise = regularise
self.regularise = regularise
self.full_return = full_return
self.initial_guess = initial_guess
self._initialize_problem(self.last_model, method)
x0 = self.x0
sol = sof.solver(
x0,
fun=self.fun,
fun_jac=self.fun_jac,
step_fun=self.step_fun,
linsearch_fun=self.fun_linsearch,
hessian_regulariser=self.hessian_regulariser,
tol=tol,
eps=eps,
max_steps=max_steps,
method=method,
verbose=verbose,
regularise=regularise,
linsearch=linsearch,
full_return=full_return,
)
self._set_solved_problem(sol)
def _set_solved_problem_crema_undirected(self, solution):
if self.full_return:
self.beta = solution[0]
self.comput_time_crema = solution[1]
self.n_steps_crema = solution[2]
self.norm_seq_crema = solution[3]
self.diff_seq_crema = solution[4]
self.alfa_seq_crema = solution[5]
else:
self.beta = solution
def _set_solved_problem_ecm(self, solution):
if self.full_return:
self.r_xy = solution[0]
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
self.r_xy = solution
if self.last_model == "ecm":
self.x = self.r_xy[: self.n_nodes]
self.y = self.r_xy[self.n_nodes:]
elif self.last_model == "ecm_exp":
self.x = np.exp(-self.r_xy[:self.n_nodes])
self.y = np.exp(-self.r_xy[self.n_nodes:])
[docs] def ensemble_sampler(self, n, cpu_n=1, output_dir="sample/", seed=42):
"""The function sample a given number of graphs in the ensemble
generated from the last model solved. Each grpah is an edgelist
written in the output directory as `.txt` file.
The function is parallelised and can run on multiple cpus.
:param n: Number of graphs to sample.
:type n: int
:param cpu_n: Number of cpus to use, defaults to 1.
:type cpu_n: int, optional
:param output_dir: Name of the output directory, defaults to "sample/".
:type output_dir: str, optional
:param seed: Random seed, defaults to 42.
:type seed: int, optional
:raises ValueError: [description]
"""
# al momento funziona solo sull'ultimo problema risolto
# create the output directory
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# compute the sample
# seed specification
np.random.seed(seed)
s = [np.random.randint(0, 1000000) for i in range(n)]
if self.last_model in ["cm", "cm_exp"]:
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_cm_graph(
outfile_name=item,
x=self.x,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["ecm", "ecm_exp"]:
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_ecm_graph(
outfile_name=item,
x=self.x,
y=self.y,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["crema"]:
if self.adjacency_given:
# deterministic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_ecm_det_graph(
outfile_name=item,
beta=self.beta,
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
else:
# probabilistic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_ecm_prob_graph(
outfile_name=item,
beta=self.beta,
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["crema-sparse"]:
if not self.adjacency_given:
# probabilistic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_sparse_ecm_prob_graph(
outfile_name=item,
beta=self.beta,
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
else:
raise ValueError("insert a model")
[docs]class DirectedGraph:
"""Directed graph instance can be initialised with
adjacency matrix, edgelist, degree sequence or strengths sequence.
:param adjacency: Adjacency matrix, defaults to None.
:type adjacency: numpy.ndarray, list, scipy.sparse_matrix, optional
:param edgelist: edgelist, defaults to None.
:type edgelist: numpy.ndarray, list, optional
:param degree_sequence: degrees sequence, defaults to None.
:type degree_sequence: numpy.ndarray, optional
:param strength_sequence: strengths sequence, defaults to None.
:type strength_sequence: numpy.ndarray, optional
"""
def __init__(
self,
adjacency=None,
edgelist=None,
degree_sequence=None,
strength_sequence=None,
):
self.n_nodes = None
self.n_edges = None
self.adjacency = None
self.is_sparse = False
self.edgelist = None
self.dseq = None
self.dseq_out = None
self.dseq_in = None
self.out_strength = None
self.in_strength = None
self.nz_index_sout = None
self.nz_index_sin = None
self.nodes_dict = None
self.is_initialized = False
self.is_randomized = False
self.is_weighted = False
self._initialize_graph(
adjacency=adjacency,
edgelist=edgelist,
degree_sequence=degree_sequence,
strength_sequence=strength_sequence,
)
self.avg_mat = None
self.initial_guess = None
# Reduced problem parameters
self.is_reduced = False
self.r_dseq = None
self.r_dseq_out = None
self.r_dseq_in = None
self.r_n_out = None
self.r_n_in = None
self.r_invert_dseq = None
self.r_invert_dseq_out = None
self.r_invert_dseq_in = None
self.r_dim = None
self.r_multiplicity = None
# Problem solutions
self.x = None
self.y = None
self.xy = None
self.b_out = None
self.b_in = None
# reduced solutions
self.r_x = None
self.r_y = None
self.r_xy = None
# Problem (reduced) residuals
self.residuals = None
self.final_result = None
self.r_beta = None
# non-zero indices
self.nz_index_out = None
self.rnz_dseq_out = None
self.nz_index_in = None
self.rnz_dseq_in = None
# model
self.x0 = None
self.error = None
self.error_degree = None
self.relative_error_degree = None
self.error_strength = None
self.relative_error_strength = None
self.full_return = False
self.last_model = None
# functen
self.args = None
def _initialize_graph(
self,
adjacency=None,
edgelist=None,
degree_sequence=None,
strength_sequence=None,
):
if adjacency is not None:
if not isinstance(
adjacency, (list, np.ndarray)
) and not scipy.sparse.isspmatrix(adjacency):
raise TypeError(
("The adjacency matrix must be passed as a list or numpy"
" array or scipy sparse matrix.")
)
elif adjacency.size > 0:
if np.sum(adjacency < 0):
raise TypeError(
"The adjacency matrix entries must be positive."
)
if isinstance(
adjacency, list
):
# Cast it to a numpy array: if it is given as a list
# it should not be too large
self.adjacency = np.array(adjacency)
elif isinstance(adjacency, np.ndarray):
self.adjacency = adjacency
else:
self.adjacency = adjacency
self.is_sparse = True
if np.sum(adjacency) == np.sum(adjacency > 0):
self.dseq_in = nef.in_degree(adjacency)
self.dseq_out = nef.out_degree(adjacency)
else:
self.dseq_in = nef.in_degree(adjacency)
self.dseq_out = nef.out_degree(adjacency)
self.in_strength = nef.in_strength(adjacency).astype(
np.float64
)
self.out_strength = nef.out_strength(adjacency).astype(
np.float64
)
self.nz_index_sout = np.nonzero(self.out_strength)[0]
self.nz_index_sin = np.nonzero(self.in_strength)[0]
self.is_weighted = True
self.n_nodes = len(self.dseq_out)
self.n_edges = np.sum(self.dseq_out)
self.is_initialized = True
elif edgelist is not None:
if not isinstance(edgelist, (list, np.ndarray)):
raise TypeError(
"The edgelist must be passed as a list or numpy array."
)
elif len(edgelist) > 0:
if len(edgelist[0]) > 3:
raise ValueError(
("This is not an edgelist. An edgelist must be a list"
" or array of couples of nodes with optional weights."
" Is this an adjacency matrix?")
)
elif len(edgelist[0]) == 2:
(
self.edgelist,
self.dseq_out,
self.dseq_in,
self.nodes_dict,
) = nef.edgelist_from_edgelist_directed(edgelist)
else:
(
self.edgelist,
self.dseq_out,
self.dseq_in,
self.out_strength,
self.in_strength,
self.nodes_dict,
) = nef.edgelist_from_edgelist_directed(edgelist)
self.n_nodes = len(self.dseq_out)
self.n_edges = np.sum(self.dseq_out)
self.is_initialized = True
elif degree_sequence is not None:
if not isinstance(degree_sequence, (list, np.ndarray)):
raise TypeError(
("The degree sequence must be passed as a list"
" or numpy array.")
)
elif len(degree_sequence) > 0:
try:
int(degree_sequence[0])
except: # TODO: bare exception
raise TypeError(
"The degree sequence must contain numeric values."
)
if (np.array(degree_sequence) < 0).sum() > 0:
raise ValueError("A degree cannot be negative.")
else:
if len(degree_sequence) % 2 != 0:
raise ValueError(
"Strength-in/out arrays must have same length."
)
self.n_nodes = int(len(degree_sequence) / 2)
self.dseq_out = degree_sequence[: self.n_nodes]
self.dseq_in = degree_sequence[self.n_nodes:]
self.n_edges = np.sum(self.dseq_out)
self.is_initialized = True
if strength_sequence is not None:
if not isinstance(strength_sequence, (list, np.ndarray)):
raise TypeError(
("The strength sequence must be passed as a"
" list or numpy array.")
)
elif len(strength_sequence):
try:
int(strength_sequence[0])
except: # TODO: bare exception to check
raise TypeError(
("The strength sequence must contain"
" numeric values.")
)
if (np.array(strength_sequence) < 0).sum() > 0:
raise ValueError("A strength cannot be negative.")
else:
if len(strength_sequence) % 2 != 0:
raise ValueError(
("Strength-in/out arrays must have"
" same length.")
)
self.n_nodes = int(len(strength_sequence) / 2)
self.out_strength = strength_sequence[
: self.n_nodes
]
self.in_strength = strength_sequence[
self.n_nodes:
]
self.nz_index_sout = np.nonzero(self.out_strength)[
0
]
self.nz_index_sin = np.nonzero(self.in_strength)[0]
self.is_weighted = True
self.is_initialized = True
elif strength_sequence is not None:
if not isinstance(strength_sequence, (list, np.ndarray)):
raise TypeError(
("The strength sequence must be passed as a list or"
" numpy array.")
)
elif len(strength_sequence):
try:
int(strength_sequence[0])
except: # TODO: bare exception
raise TypeError(
"The strength sequence must contain numeric values."
)
if (np.array(strength_sequence) < 0).sum() > 0:
raise ValueError("A strength cannot be negative.")
else:
if len(strength_sequence) % 2 != 0:
raise ValueError(
"Strength-in/out arrays must have same length."
)
self.n_nodes = int(len(strength_sequence) / 2)
self.out_strength = strength_sequence[: self.n_nodes]
self.in_strength = strength_sequence[self.n_nodes:]
self.nz_index_sout = np.nonzero(self.out_strength)[0]
self.nz_index_sin = np.nonzero(self.in_strength)[0]
self.is_weighted = True
self.is_initialized = True
[docs] def set_adjacency_matrix(self, adjacency):
"""Initializes a graph from the adjacency matrix.
:param adjacency: Adjacency matrix.
:type adjacency: numpy.ndarray, list, scipy.sparse_matrix
"""
if self.is_initialized:
print(
("Graph already contains edges or has a degree sequence."
" Use clean_edges() first.")
)
else:
self._initialize_graph(adjacency=adjacency)
[docs] def set_edgelist(self, edgelist):
"""Initializes a graph from the edgelist.
:param adjacency: Edgelist
:type adjacency: numpy.ndarray, list
"""
if self.is_initialized:
print(
("Graph already contains edges or has a degree sequence."
" Use clean_edges() first.")
)
else:
self._initialize_graph(edgelist=edgelist)
[docs] def set_degree_sequences(self, degree_sequence):
"""Initializes graph from the degrees sequence.
:param adjacency: Degrees sequence
:type adjacency: numpy.ndarray
"""
if self.is_initialized:
print(
("Graph already contains edges or has a degree sequence."
" Use clean_edges() first.")
)
else:
self._initialize_graph(degree_sequence=degree_sequence)
[docs] def clean_edges(self):
"""Deletes all initialized graph attributes
"""
self.adjacency = None
self.edgelist = None
self.deg_seq = None
self.is_initialized = False
def _solve_problem(
self,
initial_guess=None, # TODO:aggiungere un default a initial guess
model="dcm",
method="quasinewton",
max_steps=100,
tol=1e-8,
eps=1e-8,
full_return=False,
verbose=False,
linsearch=True,
regularise=True,
regularise_eps=1e-3,
):
self.last_model = model
self.full_return = full_return
self.initial_guess = initial_guess
self.regularise = regularise
self._initialize_problem(model, method)
x0 = self.x0
sol = sof.solver(
x0,
fun=self.fun,
fun_jac=self.fun_jac,
step_fun=self.step_fun,
linsearch_fun=self.fun_linsearch,
hessian_regulariser=self.hessian_regulariser,
tol=tol,
eps=eps,
max_steps=max_steps,
method=method,
verbose=verbose,
regularise=self.regularise,
regularise_eps=regularise_eps,
full_return=full_return,
linsearch=linsearch,
)
self._set_solved_problem(sol)
def _set_solved_problem_dcm(self, solution):
if self.full_return:
self.r_xy = solution[0]
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
self.r_xy = solution
self.r_x = self.r_xy[: self.rnz_n_out]
self.r_y = self.r_xy[self.rnz_n_out:]
self.x = self.r_x[self.r_invert_dseq]
self.y = self.r_y[self.r_invert_dseq]
def _set_solved_problem_dcm_exp(self, solution):
if self.full_return:
# conversion from theta to x
self.r_xy = np.exp(-solution[0])
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
# conversion from theta to x
self.r_xy = np.exp(-solution)
self.r_x = self.r_xy[: self.rnz_n_out]
self.r_y = self.r_xy[self.rnz_n_out:]
self.x = self.r_x[self.r_invert_dseq]
self.y = self.r_y[self.r_invert_dseq]
def _set_solved_problem_decm(self, solution):
if self.full_return:
self.r_xy = solution[0]
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
self.r_xy = solution
self.x = self.r_xy[: self.n_nodes]
self.y = self.r_xy[self.n_nodes: 2 * self.n_nodes]
self.b_out = self.r_xy[2 * self.n_nodes: 3 * self.n_nodes]
self.b_in = self.r_xy[3 * self.n_nodes:]
def _set_solved_problem_decm_exp(self, solution):
if self.full_return:
# conversion from theta to x
self.r_xy = np.exp(-solution[0])
self.comput_time = solution[1]
self.n_steps = solution[2]
self.norm_seq = solution[3]
self.diff_seq = solution[4]
self.alfa_seq = solution[5]
else:
# conversion from theta to x
self.r_xy = np.exp(-solution)
self.x = self.r_xy[: self.n_nodes]
self.y = self.r_xy[self.n_nodes: 2 * self.n_nodes]
self.b_out = self.r_xy[2 * self.n_nodes: 3 * self.n_nodes]
self.b_in = self.r_xy[3 * self.n_nodes:]
def _set_solved_problem(self, solution):
model = self.last_model
if model in ["dcm"]:
self._set_solved_problem_dcm(solution)
if model in ["dcm_exp"]:
self._set_solved_problem_dcm_exp(solution)
elif model in ["decm"]:
self._set_solved_problem_decm(solution)
elif model in ["decm_exp"]:
self._set_solved_problem_decm_exp(solution)
elif model in ["crema", "crema-sparse"]:
self._set_solved_problem_crema_directed(solution)
[docs] def degree_reduction(self):
"""Carries out degree reduction for DBCM.
The graph should be initialized.
"""
self.dseq = np.array(list(zip(self.dseq_out, self.dseq_in)))
(
self.r_dseq,
self.r_index_dseq,
self.r_invert_dseq,
self.r_multiplicity
) = np.unique(
self.dseq,
return_index=True,
return_inverse=True,
return_counts=True,
axis=0,
)
self.rnz_dseq_out = self.r_dseq[:, 0]
self.rnz_dseq_in = self.r_dseq[:, 1]
self.nz_index_out = np.nonzero(self.rnz_dseq_out)[0]
self.nz_index_in = np.nonzero(self.rnz_dseq_in)[0]
self.rnz_n_out = self.rnz_dseq_out.size
self.rnz_n_in = self.rnz_dseq_in.size
self.rnz_dim = self.rnz_n_out + self.rnz_n_in
self.is_reduced = True
def _set_initial_guess(self, model):
if model in ["dcm"]:
self._set_initial_guess_dcm()
if model in ["dcm_exp"]:
self._set_initial_guess_dcm_exp()
elif model in ["decm"]:
self._set_initial_guess_decm()
elif model in ["decm_exp"]:
self._set_initial_guess_decm_exp()
elif model in ["crema", "crema-sparse"]:
self._set_initial_guess_crema_directed()
def _set_initial_guess_dcm(self):
# The preselected initial guess works best usually.
# The suggestion is, if this does not work,
# trying with random initial conditions several times.
# If you want to customize the initial guess,
# remember that the code starts with a reduced number
# of rows and columns.
# remember if you insert your choice as initial choice,
# it should be numpy.ndarray
if ~self.is_reduced:
self.degree_reduction()
if isinstance(self.initial_guess, np.ndarray):
# we reduce the full x0, it's not very honest
# but it's better to ask to provide an already reduced x0
self.r_x = self.initial_guess[:self.n_nodes][self.r_index_dseq]
self.r_y = self.initial_guess[self.n_nodes:][self.r_index_dseq]
elif isinstance(self.initial_guess, str):
if self.initial_guess == 'degrees_minor':
# This +1 increases the stability of the solutions.
self.r_x = self.rnz_dseq_out / (np.sqrt(self.n_edges) + 1)
self.r_y = self.rnz_dseq_in / (np.sqrt(self.n_edges) + 1)
elif self.initial_guess == "random":
self.r_x = np.random.rand(self.rnz_n_out).astype(np.float64)
self.r_y = np.random.rand(self.rnz_n_in).astype(np.float64)
elif self.initial_guess == "uniform":
# All probabilities will be 1/2 initially
self.r_x = 0.5 * np.ones(self.rnz_n_out, dtype=np.float64)
self.r_y = 0.5 * np.ones(self.rnz_n_in, dtype=np.float64)
elif self.initial_guess == "degrees":
self.r_x = self.rnz_dseq_out.astype(np.float64)
self.r_y = self.rnz_dseq_in.astype(np.float64)
elif self.initial_guess == "chung_lu":
self.r_x = self.rnz_dseq_out.astype(np.float64) / \
(2*self.n_edges)
self.r_y = self.rnz_dseq_in.astype(np.float64)/(2*self.n_edges)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.r_x[self.rnz_dseq_out == 0] = 0
self.r_y[self.rnz_dseq_in == 0] = 0
self.x0 = np.concatenate((self.r_x, self.r_y))
def _set_initial_guess_dcm_exp(self):
# The preselected initial guess works best usually.
# The suggestion is, if this does not work,
# trying with random initial conditions several times.
# If you want to customize the initial guess, remember that the code
# starts with a reduced number of rows and columns.
if ~self.is_reduced:
self.degree_reduction()
if isinstance(self.initial_guess, np.ndarray):
# we reduce the full x0, it's not very honest
# but it's better to ask to provide an already reduced x0
self.r_x = self.initial_guess[:self.n_nodes][self.r_index_dseq]
self.r_y = self.initial_guess[self.n_nodes:][self.r_index_dseq]
elif isinstance(self.initial_guess, str):
if self.initial_guess == 'degrees_minor':
self.r_x = self.rnz_dseq_out / (
np.sqrt(self.n_edges) + 1
) # This +1 increases the stability of the solutions.
self.r_y = self.rnz_dseq_in / (np.sqrt(self.n_edges) + 1)
elif self.initial_guess == "random":
self.r_x = np.random.rand(self.rnz_n_out).astype(np.float64)
self.r_y = np.random.rand(self.rnz_n_in).astype(np.float64)
elif self.initial_guess == "uniform":
self.r_x = 0.5 * np.ones(
self.rnz_n_out, dtype=np.float64
) # All probabilities will be 1/2 initially
self.r_y = 0.5 * np.ones(self.rnz_n_in, dtype=np.float64)
elif self.initial_guess == "degrees":
self.r_x = self.rnz_dseq_out.astype(np.float64)
self.r_y = self.rnz_dseq_in.astype(np.float64)
elif self.initial_guess == "chung_lu":
self.r_x = self.rnz_dseq_out.astype(np.float64) / \
(2*self.n_edges)
self.r_y = self.rnz_dseq_in.astype(np.float64) / \
(2*self.n_edges)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
not_zero_ind_x = self.r_x != 0
self.r_x[not_zero_ind_x] = -np.log(self.r_x[not_zero_ind_x])
self.r_x[self.rnz_dseq_out == 0] = 1e3
not_zero_ind_y = self.r_y != 0
self.r_y[not_zero_ind_y] = -np.log(self.r_y[not_zero_ind_y])
self.r_y[self.rnz_dseq_in == 0] = 1e3
self.x0 = np.concatenate((self.r_x, self.r_y))
def _set_initial_guess_crema_directed(self):
# The preselected initial guess works best usually.
# The suggestion is, if this does not work,
# trying with random initial conditions several times.
# If you want to customize the initial guess, remember that
# the code starts with a reduced number of rows and columns.
# TODO: mettere un self.is_weighted bool
if isinstance(self.initial_guess, np.ndarray):
self.b_out = self.initial_guess[:self.n_nodes]
self.b_in = self.initial_guess[self.n_nodes:]
elif isinstance(self.initial_guess, str):
if self.initial_guess == "strengths":
self.b_out = (self.out_strength > 0).astype(
float
) / self.out_strength.sum()
self.b_in = (self.in_strength > 0).astype(
float
) / self.in_strength.sum()
elif self.initial_guess == "strengths_minor":
# This +1 increases the stability of the solutions.
self.b_out = (self.out_strength > 0).astype(float) / (
self.out_strength + 1
)
self.b_in = (self.in_strength > 0).astype(float) / (
self.in_strength + 1
)
elif self.initial_guess == "random":
self.b_out = np.random.rand(self.n_nodes).astype(np.float64)
self.b_in = np.random.rand(self.n_nodes).astype(np.float64)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.b_out[self.out_strength == 0] = 0
self.b_in[self.in_strength == 0] = 0
self.x0 = np.concatenate((self.b_out, self.b_in))
def _set_initial_guess_decm(self):
# The preselected initial guess works best usually.
# The suggestion is, if this does not work,
# trying with random initial conditions several times.
# If you want to customize the initial guess,
# remember that the code starts with a reduced number
# of rows and columns.
if isinstance(self.initial_guess, np.ndarray):
self.x = self.initial_guess[:self.n_nodes]
self.y = self.initial_guess[self.n_nodes:2*self.n_nodes]
self.b_out = self.initial_guess[2*self.n_nodes:3*self.n_nodes]
self.b_in = self.initial_guess[3*self.n_nodes:]
elif isinstance(self.initial_guess, str):
if self.initial_guess == 'strengths':
self.x = self.dseq_out.astype(float) / (self.n_edges + 1)
self.y = self.dseq_in.astype(float) / (self.n_edges + 1)
self.b_out = (
self.out_strength.astype(float) / self.out_strength.sum()
) # This +1 increases the stability of the solutions.
self.b_in = self.in_strength.astype(float) /\
self.in_strength.sum()
elif self.initial_guess == "strengths_minor":
self.x = np.ones_like(self.dseq_out) / (self.dseq_out + 1)
self.y = np.ones_like(self.dseq_in) / (self.dseq_in + 1)
self.b_out = np.ones_like(self.out_strength) / (
self.out_strength + 1
)
self.b_in = np.ones_like(self.in_strength) /\
(self.in_strength + 1)
elif self.initial_guess == "random":
self.x = np.random.rand(self.n_nodes).astype(np.float64)
self.y = np.random.rand(self.n_nodes).astype(np.float64)
self.b_out = np.random.rand(self.n_nodes).astype(np.float64)
self.b_in = np.random.rand(self.n_nodes).astype(np.float64)
elif self.initial_guess == "uniform":
# All probabilities will be 0.9 initially
self.x = 0.9 * np.ones(self.n_nodes, dtype=np.float64)
self.y = 0.9 * np.ones(self.n_nodes, dtype=np.float64)
self.b_out = 0.9 * np.ones(self.n_nodes, dtype=np.float64)
self.b_in = 0.9 * np.ones(self.n_nodes, dtype=np.float64)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
self.x[self.dseq_out == 0] = 0
self.y[self.dseq_in == 0] = 0
self.b_out[self.out_strength == 0] = 0
self.b_in[self.in_strength == 0] = 0
self.x0 = np.concatenate((self.x, self.y, self.b_out, self.b_in))
def _set_initial_guess_decm_exp(self):
# The preselected initial guess works best usually.
# The suggestion is, if this does not work,
# trying with random initial conditions several times.
# If you want to customize the initial guess, remember that
# the code starts with a reduced number of rows and columns.
if isinstance(self.initial_guess, np.ndarray):
self.x = self.initial_guess[:self.n_nodes]
self.y = self.initial_guess[self.n_nodes:2*self.n_nodes]
self.b_out = self.initial_guess[2*self.n_nodes:3*self.n_nodes]
self.b_in = self.initial_guess[3*self.n_nodes:]
elif isinstance(self.initial_guess, str):
if self.initial_guess == "strengths":
self.x = self.dseq_out.astype(float) / (self.n_edges + 1)
self.y = self.dseq_in.astype(float) / (self.n_edges + 1)
self.b_out = (
self.out_strength.astype(float) / self.out_strength.sum()
) # This +1 increases the stability of the solutions.
self.b_in = self.in_strength.astype(float) /\
self.in_strength.sum()
elif self.initial_guess == "strengths_minor":
self.x = np.ones_like(self.dseq_out) / (self.dseq_out + 1)
self.y = np.ones_like(self.dseq_in) / (self.dseq_in + 1)
self.b_out = np.ones_like(self.out_strength) / (
self.out_strength + 1
)
self.b_in = np.ones_like(self.in_strength) /\
(self.in_strength + 1)
elif self.initial_guess == "random":
self.x = np.random.rand(self.n_nodes).astype(np.float64)
self.y = np.random.rand(self.n_nodes).astype(np.float64)
self.b_out = np.random.rand(self.n_nodes).astype(np.float64)
self.b_in = np.random.rand(self.n_nodes).astype(np.float64)
elif self.initial_guess == "uniform":
self.x = 0.1 * np.ones(
self.n_nodes, dtype=np.float64
) # All probabilities will be 1/2 initially
self.y = 0.1 * np.ones(self.n_nodes, dtype=np.float64)
self.b_out = 0.1 * np.ones(self.n_nodes, dtype=np.float64)
self.b_in = 0.1 * np.ones(self.n_nodes, dtype=np.float64)
else:
raise ValueError(
'{} is not an available initial guess'.format(
self.initial_guess
)
)
else:
raise TypeError('initial_guess must be str or numpy.ndarray')
not_zero_ind_x = self.x != 0
self.x[not_zero_ind_x] = -np.log(self.x[not_zero_ind_x])
not_zero_ind_y = self.y != 0
self.y[not_zero_ind_y] = -np.log(self.y[not_zero_ind_y])
not_zero_ind_b_out = self.b_out != 0
self.b_out[not_zero_ind_b_out] = -np.log(
self.b_out[not_zero_ind_b_out])
not_zero_ind_b_in = self.b_in != 0
self.b_in[not_zero_ind_b_in] = -np.log(self.b_in[not_zero_ind_b_in])
self.x[self.dseq_out == 0] = 1e3
self.y[self.dseq_in == 0] = 1e3
self.b_out[self.out_strength == 0] = 1e3
self.b_in[self.in_strength == 0] = 1e3
self.x0 = np.concatenate((self.x, self.y, self.b_out, self.b_in))
def _solution_error(self):
if self.last_model in ["dcm_exp", "dcm", "crema", "crema-sparse"]:
if (self.x is not None) and (self.y is not None):
sol = np.concatenate((self.x, self.y))
ex_k_out = mof.expected_out_degree_dcm(sol)
ex_k_in = mof.expected_in_degree_dcm(sol)
ex_k = np.concatenate((ex_k_out, ex_k_in))
k = np.concatenate((self.dseq_out, self.dseq_in))
# print(k, ex_k)
self.expected_dseq = ex_k
# error output
self.error_degree = np.linalg.norm(ex_k - k, ord=np.inf)
self.relative_error_degree = np.linalg.norm(
(ex_k - k) / (k + + np.exp(-100)),
ord=np.inf
)
self.error = self.error_degree
if (self.b_out is not None) and (self.b_in is not None):
sol = np.concatenate([self.b_out, self.b_in])
if self.is_sparse:
ex_s_out = mof.expected_out_strength_crema_directed_sparse(
sol, self.adjacency_crema
)
ex_s_in = mof.expected_in_stregth_crema_directed_sparse(
sol, self.adjacency_crema
)
else:
ex_s_out = mof.expected_out_strength_crema_directed(
sol, self.adjacency_crema
)
ex_s_in = mof.expected_in_stregth_crema_directed(
sol, self.adjacency_crema
)
ex_s = np.concatenate([ex_s_out, ex_s_in])
s = np.concatenate([self.out_strength, self.in_strength])
self.expected_stregth_seq = ex_s
# error output
self.error_strength = np.linalg.norm(ex_s - s, ord=np.inf)
self.relative_error_strength = np.max(
abs(
(ex_s - s) / (s + np.exp(-100))
)
)
if self.adjacency_given:
self.error = self.error_strength
else:
self.error = max(self.error_strength, self.error_degree)
# potremmo strutturarlo così per evitare ridondanze
elif self.last_model in ["decm", "decm_exp"]:
sol = np.concatenate((self.x, self.y, self.b_out, self.b_in))
ex = mof.expected_decm(sol)
k = np.concatenate(
(
self.dseq_out,
self.dseq_in,
self.out_strength,
self.in_strength,
)
)
self.expected_dseq = ex[: 2 * self.n_nodes]
self.expected_strength_seq = ex[2 * self.n_nodes:]
# error putput
self.error_degree = max(
abs(
(
np.concatenate((self.dseq_out, self.dseq_in))
- self.expected_dseq
)
)
)
self.error_strength = max(
abs(
np.concatenate((self.out_strength, self.in_strength))
- self.expected_strength_seq
)
)
self.relative_error_strength = max(
abs(
(np.concatenate((self.out_strength, self.in_strength))
- self.expected_strength_seq)
/ np.concatenate((self.out_strength, self.in_strength)
+ np.exp(-100))
)
)
self.relative_error_degree = max(
abs(
(np.concatenate((self.dseq_out, self.dseq_in))
- self.expected_dseq)
/ np.concatenate((self.dseq_out, self.dseq_in)
+ np.exp(-100))
)
)
self.error = np.linalg.norm(ex - k, ord=np.inf)
def _set_args(self, model):
if model in ["crema", "crema-sparse"]:
self.args = (
self.out_strength,
self.in_strength,
self.adjacency_crema,
self.nz_index_sout,
self.nz_index_sin,
)
elif model in ["dcm", "dcm_exp"]:
self.args = (
self.rnz_dseq_out,
self.rnz_dseq_in,
self.nz_index_out,
self.nz_index_in,
self.r_multiplicity,
)
elif model in ["decm", "decm_exp"]:
self.args = (
self.dseq_out,
self.dseq_in,
self.out_strength,
self.in_strength,
)
def _initialize_problem(self, model, method):
self._set_initial_guess(model)
self._set_args(model)
mod_met = "-"
mod_met = mod_met.join([model, method])
d_fun = {
"dcm-newton": lambda x: -mof.loglikelihood_prime_dcm(x, self.args),
"dcm-quasinewton": lambda x: -mof.loglikelihood_prime_dcm(
x,
self.args
),
"dcm-fixed-point": lambda x: mof.iterative_dcm(x, self.args),
"dcm_exp-newton": lambda x: -mof.loglikelihood_prime_dcm_exp(
x,
self.args
),
"dcm_exp-quasinewton": lambda x: -mof.loglikelihood_prime_dcm_exp(
x,
self.args
),
"dcm_exp-fixed-point": lambda x: mof.iterative_dcm_exp(x, self.args),
"crema-newton": lambda x: -mof.loglikelihood_prime_crema_directed(
x,
self.args
),
"crema-quasinewton": lambda x: -mof.loglikelihood_prime_crema_directed(
x,
self.args
),
"crema-fixed-point": lambda x: -mof.iterative_crema_directed(x, self.args),
"decm-newton": lambda x: -mof.loglikelihood_prime_decm(x, self.args),
"decm-quasinewton": lambda x: -mof.loglikelihood_prime_decm(
x,
self.args
),
"decm-fixed-point": lambda x: mof.iterative_decm(x, self.args),
"decm_exp-newton": lambda x: -mof.loglikelihood_prime_decm_exp(
x,
self.args
),
"decm_exp-quasinewton": lambda x: -mof.loglikelihood_prime_decm_exp(
x,
self.args
),
"decm_exp-fixed-point": lambda x: mof.iterative_decm_exp(x, self.args),
"crema-sparse-newton": lambda x: -mof.loglikelihood_prime_crema_directed_sparse(
x,
self.args
),
"crema-sparse-quasinewton": lambda x:
-mof.loglikelihood_prime_crema_directed_sparse(
x,
self.args
),
"crema-sparse-fixed-point": lambda x: -mof.iterative_crema_directed_sparse(
x,
self.args
),
}
d_fun_jac = {
"dcm-newton": lambda x: -mof.loglikelihood_hessian_dcm(x, self.args),
"dcm-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_dcm(
x,
self.args
),
"dcm-fixed-point": None,
"dcm_exp-newton": lambda x: -mof.loglikelihood_hessian_dcm_exp(
x,
self.args
),
"dcm_exp-quasinewton": lambda x:
-mof.loglikelihood_hessian_diag_dcm_exp(
x,
self.args
),
"dcm_exp-fixed-point": None,
"crema-newton": lambda x: -mof.loglikelihood_hessian_crema_directed(
x,
self.args
),
"crema-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_crema_directed(
x,
self.args
),
"crema-fixed-point": None,
"decm-newton": lambda x: -mof.loglikelihood_hessian_decm(x, self.args),
"decm-quasinewton": lambda x: -mof.loglikelihood_hessian_diag_decm(
x,
self.args
),
"decm-fixed-point": None,
"decm_exp-newton": lambda x: -mof.loglikelihood_hessian_decm_exp(
x,
self.args
),
"decm_exp-quasinewton": lambda x:
-mof.loglikelihood_hessian_diag_decm_exp(
x,
self.args
),
"decm_exp-fixed-point": None,
"crema-sparse-newton": lambda x: -mof.loglikelihood_hessian_crema_directed(
x,
self.args
),
"crema-sparse-quasinewton": lambda x:
-mof.loglikelihood_hessian_diag_crema_directed_sparse(
x,
self.args
),
"crema-sparse-fixed-point": None,
}
d_fun_step = {
"dcm-newton": lambda x: -mof.loglikelihood_dcm(x, self.args),
"dcm-quasinewton": lambda x: -mof.loglikelihood_dcm(x, self.args),
"dcm-fixed-point": lambda x: -mof.loglikelihood_dcm(x, self.args),
"dcm_exp-newton": lambda x: -mof.loglikelihood_dcm_exp(x, self.args),
"dcm_exp-quasinewton": lambda x: -mof.loglikelihood_dcm_exp(
x,
self.args
),
"dcm_exp-fixed-point": lambda x: -mof.loglikelihood_dcm_exp(
x,
self.args
),
"crema-newton": lambda x: -mof.loglikelihood_crema_directed(x, self.args),
"crema-quasinewton": lambda x: -mof.loglikelihood_crema_directed(
x,
self.args
),
"crema-fixed-point": lambda x: -mof.loglikelihood_crema_directed(
x,
self.args
),
"decm-newton": lambda x: -mof.loglikelihood_decm(x, self.args),
"decm-quasinewton": lambda x: -mof.loglikelihood_decm(x, self.args),
"decm-fixed-point": lambda x: -mof.loglikelihood_decm(x, self.args),
"decm_exp-newton": lambda x: -mof.loglikelihood_decm_exp(x, self.args),
"decm_exp-quasinewton": lambda x: -mof.loglikelihood_decm_exp(
x,
self.args
),
"decm_exp-fixed-point": lambda x: -mof.loglikelihood_decm_exp(
x,
self.args
),
"crema-sparse-newton": lambda x: -mof.loglikelihood_crema_directed_sparse(
x,
self.args
),
"crema-sparse-quasinewton": lambda x: -mof.loglikelihood_crema_directed_sparse(
x,
self.args
),
"crema-sparse-fixed-point": lambda x: -mof.loglikelihood_crema_directed_sparse(
x,
self.args
),
}
try:
self.fun = d_fun[mod_met]
self.fun_jac = d_fun_jac[mod_met]
self.step_fun = d_fun_step[mod_met]
except: # TODO: remove bare excpets
raise ValueError(
'Method must be "newton","quasinewton", or "fixed-point".'
)
# TODO: mancano metodi
d_pmatrix = {"dcm": mof.pmatrix_dcm, "dcm_exp": mof.pmatrix_dcm}
# Così basta aggiungere il decm e funziona tutto
if model in ["dcm", "dcm_exp"]:
self.args_p = (
self.n_nodes,
np.nonzero(self.dseq_out)[0],
np.nonzero(self.dseq_in)[0],
)
self.fun_pmatrix = lambda x: d_pmatrix[model](x, self.args_p)
args_lin = {
"dcm": (mof.loglikelihood_dcm, self.args),
"crema": (mof.loglikelihood_crema_directed, self.args),
"crema-sparse": (mof.loglikelihood_crema_directed_sparse, self.args),
"decm": (mof.loglikelihood_decm, self.args),
"dcm_exp": (mof.loglikelihood_dcm_exp, self.args),
"decm_exp": (mof.loglikelihood_decm_exp, self.args),
}
self.args_lins = args_lin[model]
lins_fun = {
"dcm-newton": lambda x: mof.linsearch_fun_DCM(x, self.args_lins),
"dcm-quasinewton": lambda x: mof.linsearch_fun_DCM(x, self.args_lins),
"dcm-fixed-point": lambda x: mof.linsearch_fun_DCM_fixed(x),
"dcm_exp-newton": lambda x: mof.linsearch_fun_DCM_exp(
x,
self.args_lins),
"dcm_exp-quasinewton": lambda x: mof.linsearch_fun_DCM_exp(
x,
self.args_lins),
"dcm_exp-fixed-point": lambda x: mof.linsearch_fun_DCM_exp_fixed(x),
"crema-newton": lambda x: mof.linsearch_fun_crema_directed(x, self.args_lins),
"crema-quasinewton": lambda x: mof.linsearch_fun_crema_directed(
x,
self.args_lins),
"crema-fixed-point": lambda x: mof.linsearch_fun_crema_directed_fixed(x),
"crema-sparse-newton": lambda x: mof.linsearch_fun_crema_directed(
x,
self.args_lins),
"crema-sparse-quasinewton": lambda x: mof.linsearch_fun_crema_directed(
x,
self.args_lins),
"crema-sparse-fixed-point": lambda x: mof.linsearch_fun_crema_directed_fixed(
x),
"decm-newton": lambda x: mof.linsearch_fun_DECM(
x,
self.args_lins),
"decm-quasinewton": lambda x: mof.linsearch_fun_DECM(
x,
self.args_lins),
"decm-fixed-point": lambda x: mof.linsearch_fun_DECM_fixed(x),
"decm_exp-newton": lambda x: mof.linsearch_fun_DECM_exp(
x,
self.args_lins),
"decm_exp-quasinewton": lambda x: mof.linsearch_fun_DECM_exp(
x,
self.args_lins),
"decm_exp-fixed-point": lambda x: mof.linsearch_fun_DECM_exp_fixed(x),
}
self.fun_linsearch = lins_fun[mod_met]
hess_reg = {
"dcm": sof.matrix_regulariser_function_eigen_based,
"dcm_exp": sof.matrix_regulariser_function,
"decm": sof.matrix_regulariser_function_eigen_based,
"decm_exp": sof.matrix_regulariser_function,
"crema": sof.matrix_regulariser_function,
"crema-sparse": sof.matrix_regulariser_function,
}
self.hessian_regulariser = hess_reg[model]
if isinstance(self.regularise, str):
if self.regularise == "eigenvalues":
self.hessian_regulariser = \
sof.matrix_regulariser_function_eigen_based
elif self.regularise == "identity":
self.hessian_regulariser = sof.matrix_regulariser_function
def _solve_problem_crema_directed(
self,
initial_guess=None,
model="crema",
adjacency="dcm",
method="quasinewton",
method_adjacency="newton",
initial_guess_adjacency="random",
max_steps=100,
tol=1e-8,
eps=1e-8,
full_return=False,
verbose=False,
linsearch=True,
regularise=True,
regularise_eps=1e-3,
):
if model == "crema-sparse":
self.is_sparse = True
else:
self.is_sparse = False
if not isinstance(adjacency, (list, np.ndarray, str)) and (
not scipy.sparse.isspmatrix(adjacency)
):
raise ValueError("adjacency must be a matrix or a method")
elif isinstance(adjacency, str):
self._solve_problem(
initial_guess=initial_guess_adjacency,
model=adjacency,
method=method_adjacency,
max_steps=max_steps,
tol=tol,
eps=eps,
full_return=full_return,
verbose=verbose,
)
if self.is_sparse:
self.adjacency_crema = (self.x, self.y)
self.adjacency_given = False
else:
pmatrix = self.fun_pmatrix(np.concatenate([self.x, self.y]))
raw_ind, col_ind = np.nonzero(pmatrix)
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = pmatrix[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = False
elif isinstance(adjacency, list):
adjacency = np.array(adjacency).astype(np.float64)
raw_ind, col_ind = np.nonzero(adjacency)
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = adjacency[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
elif isinstance(adjacency, np.ndarray):
adjacency = adjacency.astype(np.float64)
raw_ind, col_ind = np.nonzero(adjacency)
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = adjacency[raw_ind, col_ind]
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
elif scipy.sparse.isspmatrix(adjacency):
raw_ind, col_ind = adjacency.nonzero()
raw_ind = raw_ind.astype(np.int64)
col_ind = col_ind.astype(np.int64)
weigths_value = (adjacency[raw_ind, col_ind].A1).astype(np.float64)
self.adjacency_crema = (raw_ind, col_ind, weigths_value)
self.is_sparse = False
self.adjacency_given = True
if self.is_sparse:
self.last_model = "crema-sparse"
else:
self.last_model = model
self.regularise = regularise
self.full_return = full_return
self.initial_guess = initial_guess
self._initialize_problem(self.last_model, method)
x0 = self.x0
sol = sof.solver(
x0,
fun=self.fun,
fun_jac=self.fun_jac,
step_fun=self.step_fun,
linsearch_fun=self.fun_linsearch,
hessian_regulariser=self.hessian_regulariser,
tol=tol,
eps=eps,
max_steps=max_steps,
method=method,
verbose=verbose,
regularise=regularise,
regularise_eps=regularise_eps,
linsearch=linsearch,
full_return=full_return,
)
self._set_solved_problem_crema_directed(sol)
def _set_solved_problem_crema_directed(self, solution):
if self.full_return:
self.b_out = solution[0][: self.n_nodes]
self.b_in = solution[0][self.n_nodes:]
self.comput_time_crema = solution[1]
self.n_steps_crema = solution[2]
self.norm_seq_crema = solution[3]
self.diff_seq_crema = solution[4]
self.alfa_seq_crema = solution[5]
else:
self.b_out = solution[: self.n_nodes]
self.b_in = solution[self.n_nodes:]
[docs] def ensemble_sampler(self, n, cpu_n=1, output_dir="sample/", seed=42):
"""The function sample a given number of graphs in the ensemble
generated from the last model solved. Each grpah is an edgelist
written in the output directory as `.txt` file.
The function is parallelised and can run on multiple cpus.
:param n: Number of graphs to sample.
:type n: int
:param cpu_n: Number of cpus to use, defaults to 1.
:type cpu_n: int, optional
:param output_dir: Name of the output directory, defaults to "sample/".
:type output_dir: str, optional
:param seed: Random seed, defaults to 42.
:type seed: int, optional
:raises ValueError: [description]
"""
# al momento funziona solo sull'ultimo problema risolto
# unico input possibile e' la cartella dove salvare i samples
# ed il numero di samples
# create the output directory
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# compute the sample
# seed specification
np.random.seed(seed)
s = [np.random.randint(0, 1000000) for i in range(n)]
if self.last_model in ["dcm", "dcm_exp"]:
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_dcm_graph(
outfile_name=item,
x=self.x,
y=self.y,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["decm", "decm_exp"]:
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_decm_graph(
outfile_name=item,
a_out=self.x,
a_in=self.y,
b_out=self.b_out,
b_in=self.b_in,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["crema"]:
if self.adjacency_given:
# deterministic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_decm_det_graph(
outfile_name=item,
beta=(self.b_out, self.b_in),
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
else:
# probabilistic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_decm_prob_graph(
outfile_name=item,
beta=(self.b_out, self.b_in),
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
elif self.last_model in ["crema-sparse"]:
if not self.adjacency_given:
# probabilistic adj matrix
iter_files = iter(
output_dir + "{}.txt".format(i) for i in range(n))
i = 0
for item in iter_files:
eg.ensemble_sampler_crema_sparse_decm_prob_graph(
outfile_name=item,
beta=(self.b_out, self.b_in),
adj=self.adjacency_crema,
cpu_n=cpu_n,
seed=s[i])
i += 1
else:
raise ValueError("insert a model")