Source code for openalea.caribu.CaribuScene

# -*- python -*-
#
#       Copyright 2015 INRIA - CIRAD - INRA
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       WebSite : https://github.com/openalea-incubator/caribu
#
# ==============================================================================
""" This module defines CaribuScene and CaribuSceneError classes."""

import os
import numpy
from itertools import groupby, chain
from math import sqrt
from numbers import Number
from pathlib import Path

from openalea.plantgl.all import Scene as pglScene, Viewer

from openalea.caribu.file_adaptor import read_can, read_light, read_pattern, \
    read_opt, build_materials
from openalea.caribu.plantgl_adaptor import scene_to_cscene, mtg_to_cscene
from openalea.caribu.caribu import raycasting, radiosity, mixed_radiosity, \
    x_raycasting, x_radiosity, x_mixed_radiosity, opt_string_and_labels, \
    triangles_string, pattern_string, write_scene
from openalea.caribu.display import jet_colors, generate_scene, nan_to_zero
from openalea.caribu.caribu_shell import vperiodise, Path
from functools import reduce
from openalea.caribu.caributriangleset import AbstractCaribuTriangleSet, CaribuTriangleSet 

import tempfile

def _agregate(values, indices, fun=sum):
    """ performs aggregation of outputs along indices """
    ag = {}
    for key, group in groupby(sorted(zip(indices, values), key=lambda x: x[0]),
                              lambda x: x[0]):
        vals = [elt[1] for elt in group]
        try:
            ag[key] = fun(vals)
        except TypeError:
            ag[key] = vals[0]
    return ag


def _convert(output, conv_unit):
    """ convert caribu output to meter/meter_square
    """
    # To DO ? filtering eabs_max / zero + occultations ?
    if conv_unit == 1:
        return output
    else:
        meter_square = conv_unit ** 2
        output['area'] = [area * meter_square for area in output['area']]
        for k in ['Eabs', 'Ei', 'Ei_sup', 'Ei_inf']:
            output[k] = [nrj / meter_square for nrj in output[k]]
        if 'sensors' in output:
            output['sensors']['area'] = [area * meter_square for area in output['sensors']['area']]
            for k in ['Ei', 'Ei0']:
                output['sensors'][k] = [nrj / meter_square for nrj in output['sensors'][k]]
        return output


def _wsum(nrj_area):
    nrj, area = list(zip(*nrj_area))
    area_tot = sum(area)
    if area_tot == 0:
        return 0
    else:
        return sum([e * a if a > 0 else 0 for e, a in nrj_area]) / area_tot

[docs] def is_mtg_like(obj): return all( hasattr(obj, attr) for attr in ("_scale", "_complex", "_components", "_parent", "_children", "_root") )
[docs] def domain_mesh(domain, z=0., subdiv=1): """ Create a triangle mesh covering a domain at height z Args: domain: (tuple of float) : a (xmin, ymin, xmax, ymax) tuple defining the extend of a square domain z: the altitude of the mesh subdiv: the number of subdivision of the mesh (not functional) Returns: a list of triangles. A triangle is a list of 3-tuples points coordinates """ xmin, ymin, xmax, ymax = domain if subdiv > 1: raise UserWarning('subdivision of mesh not yet implemented') # TODO: copy code from cpp/canopy_io.cpp, function parse_can a = (xmin, ymin, z) b = (xmax, ymin, z) c = (xmin, ymax, z) d = (xmax, ymax, z) return [(a, b, c), (b, d, c)]
[docs] class CaribuScene: """A class interface to Caribu algorithms""" default_material = (0.06, 0.07) default_soil_reflectance = 0.15 default_light = (1, (0, 0, -1)) default_band = 'default_band' units = {'mm': 0.001, 'cm': 0.01, 'dm': 0.1, 'm': 1, 'dam': 10, 'hm': 100, 'km': 1000} def __init__(self, scene=None, light=None, pattern=None, opt=None, soil_reflectance=None, soil_mesh=None, z_soil=None, scene_unit='m', debug = False, filecache = True): """ Initialise a CaribuScene Args: scene (dict): a {primitive_id: [triangles,]} dict.A triangle is a list of 3-tuples points coordinates Alternatively, scene can be a *.can file or a mtg with 'geometry' property or a plantGL scene. For the later case, shape.id are used as primitive_id. light (list): a list of (Energy, (vx, vy, vz)) tuples defining light sources Alternatively, a *.light file If None (default), a unit energy vertical light is used. Energy unit should be given per square-meter (m-2) pattern (tuple): 2D Coordinates of the domain bounding the scene for its replication. (xmin, ymin, xmax, ymax) scene is not bounded along z axis. Alternatively a *.8 file. if None (default), scene is not repeated opt (dict): a {band_name: {primitive_id: material}} dict of dict or a {band_name: material} dict of tuples. In the second form the material is used for all primitives. A material is a 1-, 2- or 4-tuple depending on its optical behavior. A 1-tuple encode an opaque material characterised by its reflectance A 2-tuple encode a symmetric translucent material defined by a reflectance and a transmittance A 4-tuple encode an asymmetric translucent material defined the reflectance and transmittance of the upper and lower side respectively Alternatively, a list of band_name.opt files (require scene to be given as a *.can file) If None (default), all primitive are associated to the default material of the class. soil_reflectance (dict): a {band_name: reflectance} dict. If None (default) class default soil reflectance is used for all bands If *.opt files are provided, the values in opt files are used in priority soil_mesh (int): a flag triggering for the creation of a soil mesh in the scene during computations If None (default) or -1, no soil is added If an int (n), a soil is added to the scene, with n subdivisions z_soil (float): the altitude of the soil. If None (default), the soil is placed at the bottom of the scene bounding box scene_unit (str): the unit of length used for scene coordinate and for pattern (should be one of class.units default) By default, scene_unit is considered to be 'm' (meter). Returns: A CaribuScene instance Note: File format specifications (*.can, *.light, *.8, *.opt) can be found in data/CanestraDoc.pdf """ self.lightfile = None self.debug = debug if scene_unit not in self.units: raise ValueError('unrecognised scene unit: ' + scene_unit) self.conv_unit = self.units[scene_unit] self.scene = None if scene is not None: if isinstance(scene, dict): elt = scene[list(scene.keys())[0]] try: assert isinstance(elt, list) assert isinstance(elt[0], list) assert isinstance(elt[0][0], tuple) except: raise ValueError('Unrecognised scene format') self.scene = CaribuTriangleSet(scene) elif isinstance(scene, str): self.scene = CaribuTriangleSet(read_can(scene)) elif is_mtg_like(scene): self.scene = CaribuTriangleSet(mtg_to_cscene(scene)) elif isinstance(scene, pglScene): self.scene = CaribuTriangleSet(scene_to_cscene(scene)) elif isinstance(scene, AbstractCaribuTriangleSet): self.scene = scene else: raise ValueError('Unrecognised scene format') self.light = [self.default_light] if light is not None: self.setLight(light) self.pattern = None if pattern is not None: if isinstance(pattern, tuple): if len(pattern) == 2: pattern = sum(pattern, ()) if len(pattern) != 4: raise ValueError('Unrecognised pattern format') self.pattern = pattern elif isinstance(pattern, str): self.pattern = read_pattern(pattern) else: raise ValueError('Unrecognised pattern format') self.material = None if opt is None: if soil_reflectance is None: self.soil_reflectance = { self.default_band: self.default_soil_reflectance} bands = [self.default_band] else: self.soil_reflectance = soil_reflectance bands = list(soil_reflectance.keys()) if self.scene is not None: self.material = {} for band in bands: self.material[band] = {pid: self.default_material for pid in self.scene.keys()} else: if isinstance(opt, list): if not isinstance(opt[0], str): raise ValueError('Unrecognised opt format') if not isinstance(scene, str): raise ValueError( 'un-compatible inputs types: opt file and scene not a can file') if self.scene is not None: self.material = {} self.soil_reflectance = {} for path in opt: band = os.path.basename(path).split('.')[0] n, ro_soil, po = read_opt(path) self.material[band] = build_materials(list(self.scene.keys()), po, ro_soil) self.soil_reflectance[band] = ro_soil elif isinstance(opt, dict): elt = opt[list(opt.keys())[0]] if not isinstance(elt, dict): if isinstance(elt, tuple): self.material = {} if self.scene is not None: for band in opt: self.material[band] = {pid: opt[band] for pid in self.scene.keys()} else: raise ValueError('Unrecognised opt format') else: self.material = opt if soil_reflectance is None: self.soil_reflectance = {band: self.default_soil_reflectance for band in self.material} else: if isinstance(soil_reflectance, dict): if not len(soil_reflectance) == len(opt): raise ValueError( 'the number of bands for optical properties and soil reflectance should match') self.soil_reflectance = soil_reflectance else: raise ValueError('Unrecognised soil_reflectance format') else: raise ValueError('Unrecognised opt format') self.soil = None self.soil_label = 'soil' if self.scene is not None: ids = self.scene.allids() if ids is not None: if isinstance(ids[0], Number): self.soil_label = -1 if soil_mesh is not None: if soil_mesh != -1: if self.pattern is None: raise ValueError( 'Adding a soil needs the scene domain to be defined') if z_soil is None: if self.scene is None: z_soil = 0 else: z_soil = self.scene.getZmin() self.soil = domain_mesh(self.pattern, z_soil, soil_mesh) self.tempdir = Path(' ') # allow testing existence in __del__ if filecache: self.tempdir = tempfile.mkdtemp() if not debug else './caribuscene_'+str(id(self)) self.tempdir = Path(self.tempdir) self.canfile = None self.optfile = None def __del__(self): if self.tempdir.exists(): import shutil shutil.rmtree(self.tempdir)
[docs] def triangle_areas(self, convert=True): """ compute mean area of elementary triangles in the scene If convert is true, area is expressed in meter (scene unit otherwise)""" areas = self.scene.triangle_areas() if convert: areas *= self.conv_unit**2 return areas
[docs] def setLight(self, light): if isinstance(light, list): elt = light[0] try: assert isinstance(elt, tuple) assert isinstance(elt[1], tuple) except: raise ValueError('Unrecognised light format') self.light = light self.lightfile = None elif isinstance(light, str): self.light = read_light(light) else: raise ValueError('Unrecognised light format')
[docs] def bbox(self): """ Scene bounding box opposite corner points Returns: two tuples: (xmin, ymin, zmin), (xmax, ymax, zmax) """ return self.scene.getBoundingBox()
[docs] def auto_screen(self, screen_resolution): pix = screen_resolution * self.conv_unit (xmin, ymin, zmin), (xmax, ymax, zmax) = self.bbox() ldiag = numpy.sqrt( (xmax - xmin) ** 2 + (ymax - ymin) ** 2 + (zmax - zmin) ** 2) return max(2, int(ldiag / pix))
[docs] def plot(self, a_property=None, minval=None, maxval=None, gamma=None, display=True): """ Args: a_property: {dict of float or dict of list of float} : a dict of values, each key being a scene primitive index. minval: (float) minimal value at lower bound of color range if None (default), minimal value of property is used maxval: (float) maximal value at upper bound of color range if None (default), maximal value of property is used gamma (float): exponent of the normalised values if None (default), na gamma transform is applied display: (bool) : should the scene be displayed ? (default True) Returns: A plantGL scene """ if a_property is None: color_property = None values = None else: values = list(a_property.values()) if isinstance(values[0], list): values = list(chain.from_iterable(values)) values = nan_to_zero(values) if minval is None: minval = min(values) if maxval is None: maxval = max(values) if gamma is None: gamma = 1 norm = 1 if minval != maxval: norm = maxval - minval values = [((x - minval) / float(norm))**gamma for x in values] colors = jet_colors(values, 0, 1) color_property = {} for k, v in a_property.items(): if isinstance(v, list): color_property[k] = [] for i in range(len(v)): color_property[k].append(colors.pop(0)) else: color_property[k] = [colors.pop(0)] * self.scene.getNumberOfTriangles(k) scene = self.scene.generate_scene(color_property) if display: Viewer.display(scene) return scene, values
[docs] def getIncidentEnergy(self): """ Compute energy of emission of light sources. Qi is the total horizontal irradiance emitted by sources (m-2) Qem is the sum of the normal-to-the-sources irradiance emitted by sources (m-2) Einc is the total incident energy received on the domain """ Qi, Qem, Einc = None, None, None def _costheta(vect): vx, vy, vz = vect norme = sqrt(vx ** 2 + vy ** 2 + vz ** 2) return abs(vz / norme) if self.light is not None: nrj, direction = list(zip(*self.light)) Qi = sum(nrj) costheta = list(map(_costheta, direction)) Qem = sum([x[0] / x[1] for x in zip(nrj, costheta)]) if self.pattern is not None: xmin, ymin, xmax, ymax = self.pattern d_area = abs((xmax - xmin) * (ymax - ymin)) * self.conv_unit**2 Einc = Qi * d_area return Qi, Qem, Einc
[docs] def getSoilEnergy(self): """ Compute energy received on soil. """ Qi, Einc = None, None if self.soil is None: raise ValueError('A soil should be added to allow SoilEnargy Estimation') res = self.soil_aggregated if res is None: raise ValueError('Caribu should have been called to allow SoilEnargy Estimation') # hack, TODO : take care of bands Qi = list(res.values())[0]['Ei'] if self.pattern is not None: xmin, ymin, xmax, ymax = self.pattern d_area = abs((xmax - xmin) * (ymax - ymin)) * self.conv_unit**2 Einc = Qi * d_area return Qi, Einc
[docs] def as_primitive(self): """ Transform scene and materials into simpler python objects Returns: """ triangles, groups, materials, bands, albedo = None, None, None, None, None if self.scene is not None: triangles = self.scene.allvalues(copied=True) #reduce(lambda x, y: x + y, self.scene.values()) groups = self.scene.allids() if self.soil is not None: triangles += self.soil groups = groups + [self.soil_label] * len(self.soil) bands = list(self.material.keys()) if len(bands) == 1: materials = self.scene.repeat_for_triangles([ self.material[bands[0]][pid] for pid in self.scene.keys()]) albedo = self.soil_reflectance[bands[0]] if self.soil is not None: materials = materials + [(albedo,)] * len(self.soil) else: materials = {} for band in bands: mat = self.scene.repeat_for_triangles([self.material[band][pid] for pid in self.scene.keys()]) materials[band] = mat if self.soil is not None: materials = materials + [(self.soil_reflectance[ band],)] * len(self.soil) albedo = self.soil_reflectance return triangles, groups, materials, bands, albedo
[docs] def run(self, direct=True, infinite=False, d_sphere=0.5, layers=10, height=None, screen_size=1536, screen_resolution=None, sensors=None, split_face=False, simplify=False): """ Compute illumination using the appropriate caribu algorithm Args: direct: (bool) Whether only first order interception is to be computed Default is True (no rediffusions) infinite: (bool) Whether the scene should be considered as infinite Default is False (non-infinite canopy) d_sphere: (float) the diameter (m) of the sphere defining the close neighbourhood of mixed radiosity algorithm if d_sphere = 0, direct + pure layer algorithm is used layers: (int) the number of horizontal layers for estimating far contributions height: (float) the height of the canopy (m). if None (default), the maximal height of the scene is used. screen_size: (int) size of the screen_size x screen_size square projection screen (pixels) screen_resolution: (float) real world size (meter) of a pixel of the projection screen. If None(default), screen_size is used. sensors: (dict of list of list of tuples) a {sensor_id: [triangle,...]} dict defining the virtual sensors each triangle is a list of tuple defining the coordinates of its vertices split_face: (bool) Whether results of incidence on individual faces of triangle should be output. Default is False simplify: (bool) Whether results per band should be simplified to a {result_name: property} dict in the case of a monochromatic simulation Returns: - raw (dict of dict) a {band_name: {result_name: property}} dict of dict. Except for result_name='sensors', each property is a {primitive_id: [values,]} dict containing results for individual triangles of the primitive - aggregated (dict of dict) : a {band_name: {result_name: property}} Except for result_name='sensors', each property is a {primitive_id: value} dict containing aggregated results for each primitive result_name are : - area (float): the individual areas (m2) - Eabs (float): the surfacic density of energy absorbed (m-2) - Ei (float): the surfacic density of energy incoming (m-2) additionally, if split_face is True: - Ei_inf (float): the surfacic density of energy incoming on the inferior face (m-2) - Ei_sup (float): the surfacic density of energy incoming on the superior face (m-2) - sensors (dict of dict): area, surfacic density of incoming direct energy and surfacic density of incoming total energy of sensors grouped by id, if any """ raw, aggregated = {}, {} self.soil_raw, self.soil_aggregated = {}, {} results = ['Eabs', 'Ei', 'area'] if split_face: results.extend(['Ei_inf', 'Ei_sup']) # convert lights to scene_unit lights = self.light if self.conv_unit != 1: lights = [(e * self.conv_unit ** 2, vect) for e, vect in self.light] if self.scene is not None: if self.debug: print ('Prepare scene', len(self.light)) triangles = self.scene.allvalues(copied=True) groups = self.scene.allids() if self.debug: print ('done') if self.soil is not None: triangles += self.soil groups = groups + [self.soil_label] * len(self.soil) bands = list(self.material.keys()) if len(bands) == 1: if not hasattr(self,'materialvalues') : materials = self.scene.repeat_for_triangles([ self.material[bands[0]][pid] for pid in self.scene.keys()]) albedo = self.soil_reflectance[bands[0]] if self.soil is not None: materials = materials + [(albedo,)] * len(self.soil) self.materialvalues = materials if self.tempdir != '': self.canfile = self.tempdir / 'cscene.can' self.optfile = self.tempdir / 'band0.opt' write_scene(triangles, materials, canfile = self.canfile, optfile = self.optfile) else: # self.materialvalues is a cache for the computation of the material list materials = self.materialvalues albedo = self.soil_reflectance[bands[0]] algos = {'raycasting': raycasting, 'radiosity': radiosity, 'mixed_radiosity': mixed_radiosity} else: materials = {} if not hasattr(self,'materialvalues') : for band in bands: mat = self.scene.repeat_for_triangles([self.material[band][pid] for pid in self.scene.keys()]) materials[band] = mat if self.soil is not None: materials = materials + [(self.soil_reflectance[ band],)] * len(self.soil) albedo = self.soil_reflectance else: materials = self.materialvalues albedo = self.soil_reflectance algos = {'raycasting': x_raycasting, 'radiosity': x_radiosity, 'mixed_radiosity': x_mixed_radiosity} if not direct and infinite: # mixed radiosity will be used if d_sphere < 0: raise ValueError( 'calling radiosity should be done using direct=False and infinite=False') d_sphere /= self.conv_unit if height is None: height = self.scene.getZmax() else: height /= self.conv_unit if infinite and self.pattern is None: raise ValueError( 'infinite canopy illumination needs a pattern to be defined') if screen_resolution is not None: screen_size = self.auto_screen(screen_resolution) print('adjusted projection screen size: ' + str(screen_size)) if sensors is not None: sensors_id = reduce(lambda x, y: x + y, [[k] * len(v) for k, v in sensors.items()], []) sensors = reduce(lambda x, y: x + y, list(sensors.values()), []) if not direct and infinite: # mixed radiosity out = algos['mixed_radiosity'](triangles, materials, lights=lights, domain=self.pattern, soil_reflectance=albedo, diameter=d_sphere, layers=layers, height=height, screen_size=screen_size, sensors=sensors, debug = self.debug) elif not direct: # pure radiosity out = algos['radiosity'](triangles, materials, lights=lights, screen_size=screen_size, sensors=sensors, debug = self.debug) else: # ray_casting if infinite: out = algos['raycasting'](triangles, materials, lights=lights, domain=self.pattern, screen_size=screen_size, sensors=sensors, debug = self.debug) else: out = algos['raycasting'](triangles, materials, lights=lights, domain=None, screen_size=screen_size, sensors=sensors, debug = self.debug, canfile = self.canfile, optfile = self.optfile) if len(bands) == 1: out = {bands[0]: out} for band in bands: output = _convert(out[band], self.conv_unit) raw[band] = {} aggregated[band] = {} if 'sensors' in output: sensors = output.pop('sensors') raw[band]['sensors'] = {} aggregated[band]['sensors'] = {} for k in ('Ei', 'Ei0', 'area'): raw[band]['sensors'][k] = _agregate(sensors[k], sensors_id, list) if k == 'area': aggregated[band]['sensors'][k] = _agregate(sensors[k], sensors_id, sum) else: aggregated[band]['sensors'][k] = _agregate( zip(sensors[k], sensors['area']), sensors_id, _wsum) for k in results: raw[band][k] = _agregate(output[k], groups, list) if k == 'area': aggregated[band][k] = _agregate(output[k], groups, sum) else: aggregated[band][k] = _agregate( zip(output[k], output['area']), groups, _wsum) if self.soil is not None: self.soil_raw[band] = {k: raw[band][k].pop(self.soil_label) for k in results} self.soil_aggregated[band] = { k: aggregated[band][k].pop(self.soil_label) for k in results} if simplify and len(bands) == 1: raw = raw[bands[0]] aggregated = aggregated[bands[0]] return raw, aggregated
[docs] def runPeriodise(self): """ Call periodise and modify position of triangle in the scene to fit inside pattern""" triangles, groups, materials, bands, albedo = self.as_primitive() o_string, labels = opt_string_and_labels(materials) can_string = triangles_string(triangles, labels) pat_string = pattern_string(self.pattern) newscene = vperiodise(can_string, pat_string) infile = newscene.split('\n') cscene = {} for line in infile: line = line.strip() if not line: continue if line.startswith('#'): continue fields = line.split() label = fields[2] if label not in cscene: cscene[label] = [] coords = list(map(float, fields[-9:])) cscene[label].append(list(map(tuple, [coords[:3], coords[3:6], coords[6:]]))) # should be changed to take into account the new structure self.scene = cscene return self