Source code for ghx.base

import os
import timeit
from collections import deque

import numpy as np
import simplejson as json

from ghx.borehole import BoreholeClass
from ghx.my_print import PrintClass


[docs]class BaseGHXClass: """ Base class for GHXArray """ def __init__(self, json_data, loads_path, output_path, print_output=True): """ Class constructor """ self.timer_start = timeit.default_timer() errors_found = False self.output_path = output_path if not os.path.exists(self.output_path): os.makedirs(self.output_path) with open(os.path.join(self.output_path, 'in.json'), 'w') as outfile: json.dump(json_data, outfile, indent=4, sort_keys=True) # load data into data structs PrintClass.my_print("....Loading GHX data") try: self.name = json_data['Name'] except: # pragma: no cover PrintClass.my_print("....'Name' key not found", 'warn') errors_found = True try: self.sim_years = json_data['Simulation Configuration']['Simulation Years'] except: # pragma: no cover PrintClass.my_print("....'Simulation Years' key not found", 'warn') errors_found = True try: self.aggregation_type = json_data['Simulation Configuration']['Aggregation Type'] except: # pragma: no cover PrintClass.my_print("....'Aggregation Type' key not found", 'warn') errors_found = True try: self.g_func_lntts = [] self.g_func_val = [] for pair in json_data['G-func Pairs']: self.g_func_lntts.append(pair[0]) self.g_func_val.append(pair[1]) self.g_func_present = True except: # pragma: no cover PrintClass.my_print("....'G-func Pairs' key not found", 'warn') self.g_func_present = False self.total_bh_length = 0 self.ghx_list = [] ghx_dict_list = [] for json_data_bh in json_data['GHXs']: ghx_dict_list.append(json_data_bh) this_bh = BoreholeClass(json_data_bh, print_output) self.total_bh_length += this_bh.depth self.ghx_list.append(this_bh) self.borehole = BoreholeClass( self.merge_dicts(ghx_dict_list), print_output) try: PrintClass.my_print("....Importing flow rates and loads") load_pairs = np.genfromtxt( loads_path, delimiter=',', skip_header=1) self.sim_hours = [] self.sim_loads = [] self.total_flow_rate = [] for pair in load_pairs: self.sim_hours.append(pair[0]) self.sim_loads.append(pair[1]) self.total_flow_rate.append(pair[2]) except: # pragma: no cover PrintClass.fatal_error(message="Error importing loads") if errors_found: # pragma: no cover PrintClass.fatal_error(message="Error initializing BaseGHXClass") self.ts = self.calc_ts() self.temp_bh = deque() self.temp_mft = deque() self.agg_load_objects = [] self.agg_loads_flag = True
[docs] def merge_dicts(self, list_of_dicts): """ Merges two-level dictionaries into a single identical dictionary. For non-int/floats arguments, the first item in the list will remain in place. For int/float arguments, the mean value of the parent dictionaries will calculated. """ z = {} for this_dict in list_of_dicts: for key in this_dict.keys(): if key not in z: z[key] = this_dict[key] continue try: for sub_key in this_dict[key].keys(): if isinstance(this_dict[key][sub_key], (float, int)): z[key][sub_key] += this_dict[key][sub_key] else: z[key][sub_key] = this_dict[key][sub_key] except: if isinstance(this_dict[key], (float, int)): z[key] += this_dict[key] num_dicts = len(list_of_dicts) for key in z.keys(): try: for sub_key in z[key].keys(): if isinstance(z[key][sub_key], (float, int)): z[key][sub_key] /= num_dicts except: if isinstance(z[key], int) or isinstance(z[key], float): z[key] /= num_dicts return z
[docs] def calc_ts(self): """ Calculates non-dimensional time. """ try: ts = self.borehole.depth ** 2 / (9 * self.borehole.soil.thermal_diffusivity) return ts except: # pragma: no cover PrintClass.fatal_error( message="Error calculating simulation time scale \"ts\"")
[docs] def calc_g_func(self): """ Calculate g-functions for given ground heat exchangers. """ try: PrintClass.my_print("Calculating g-functions") self.g_func_present = True PrintClass.my_print("....Success") except: # pragma: no cover PrintClass.fatal_error(message="Error calculating g-functions")
[docs] def g_func(self, ln_t_ts): """ Interpolates to the correct g-function value """ num = len(self.g_func_lntts) lower_index = 0 upper_index = num - 1 if ln_t_ts < self.g_func_lntts[lower_index]: # if value is below range, extrapolate down return ((ln_t_ts - self.g_func_lntts[lower_index]) / ( self.g_func_lntts[lower_index + 1] - self.g_func_lntts[lower_index])) * ( self.g_func_val[lower_index + 1] - self.g_func_val[lower_index]) + self.g_func_val[lower_index] elif ln_t_ts > self.g_func_lntts[upper_index]: # if value is above range, extrapolate up return ((ln_t_ts - self.g_func_lntts[upper_index]) / ( self.g_func_lntts[upper_index - 1] - self.g_func_lntts[upper_index])) * ( self.g_func_val[upper_index - 1] - self.g_func_val[upper_index]) + self.g_func_val[upper_index] else: # value is in range return np.interp(ln_t_ts, self.g_func_lntts, self.g_func_val)
[docs] def generate_output_reports(self): # pragma: no cover """ Generates output results """ try: PrintClass.my_print("Writing output results") cwd = os.getcwd() path_to_output_dir = os.path.join(cwd, self.output_path) if not os.path.exists(path_to_output_dir): os.makedirs(path_to_output_dir) # open files out_file = open(os.path.join(path_to_output_dir, "GHX.csv"), 'w') # write headers out_file.write("Hour, BH Temp [C], MFT [C]\n") for i in range(len(self.temp_bh)): out_file.write("%d, %0.4f, %0.4f\n" % (i + 1, self.temp_bh[i], self.temp_mft[i])) # close file out_file.close() PrintClass.my_print("....Success") except: # pragma: no cover PrintClass.fatal_error(message="Error writing output results")