import timeit
from collections import deque
import numpy as np
from ghx.aggregated_loads import AggregatedLoadFixed
from ghx.base import BaseGHXClass
from ghx.constants import ConstantClass
from ghx.my_print import PrintClass
[docs]class GHXArrayFixedAggBlocks(BaseGHXClass):
"""
GHXArrayFixedAggBlocks is the class object that holds the information that defines a ground heat exchanger array.
This could be a single borehole, or a field with an arbitrary number of boreholes at arbitrary locations.
"""
def __init__(self, json_data, loads_path, output_path, print_output=True):
"""
Constructor for the class.
"""
PrintClass(print_output, output_path)
# init base class
BaseGHXClass.__init__(self, json_data, loads_path,
output_path, print_output)
errors_found = False
try:
self.min_hourly_history = json_data['Simulation Configuration']['Min Hourly History']
except: # pragma: no cover
PrintClass.my_print(
"....'Min Hourly History' key not found", 'warn')
errors_found = True
try:
self.agg_load_intervals = json_data['Simulation Configuration']['Intervals']
except: # pragma: no cover
PrintClass.my_print("....'Intervals' key not found", 'warn')
errors_found = True
if not errors_found:
# success
PrintClass.my_print("Simulation successfully initialized")
else: # pragma: no cover
PrintClass.fatal_error(message="Error initializing GHXArrayFixedAggBlocks")
# class data
# set load aggregation intervals
self.set_load_aggregation()
# set first aggregated load, which is zero. Need this for later
self.agg_load_objects.append(AggregatedLoadFixed([0], 0, 1, True))
self.g_func_hourly = deque()
self.hourly_loads = deque()
[docs] def set_load_aggregation(self):
"""
Sets the load aggregation intervals based on the type specified by the user.
Intervals must be integer multiples.
"""
if self.aggregation_type == "Fixed":
pass
elif self.aggregation_type == "None":
self.agg_loads_flag = False
self.agg_load_intervals = [
ConstantClass.hours_in_year * self.sim_years]
self.min_hourly_history = 0
[docs] def aggregate_load(self):
"""
Creates aggregated load object
"""
if len(self.agg_load_intervals) > 1:
self.collapse_aggregate_loads()
prev_sim_hour = self.agg_load_objects[-1].last_sim_hour
agg_loads = []
for i in range(self.agg_load_intervals[0]):
agg_loads.append(self.hourly_loads[i])
self.agg_load_objects.append(AggregatedLoadFixed(
agg_loads, prev_sim_hour, len(agg_loads)))
[docs] def collapse_aggregate_loads(self):
"""
Collapses aggregated loads
"""
agg_load_objects_update = []
i = 0
while i < len(self.agg_load_objects):
if i == 0: # keep '0' time object
agg_load_objects_update.append(self.agg_load_objects[i])
i += 1
continue
# already max agg interval
elif len(self.agg_load_objects[i].loads) == self.agg_load_intervals[-1]:
agg_load_objects_update.append(self.agg_load_objects[i])
i += 1
continue
else:
k = len(self.agg_load_intervals) - 1
while k >= 0:
temp_objs = []
agg_int = self.agg_load_intervals[k]
for j in range(i, len(self.agg_load_objects)):
if len(self.agg_load_objects[j].loads) == agg_int:
temp_objs.append(self.agg_load_objects[j])
else:
continue
num_objs = len(temp_objs)
i += num_objs
if num_objs > 0:
if num_objs * agg_int >= self.agg_load_intervals[k + 1]:
agg_load_objects_update.append(
self.merge_agg_load_objs(temp_objs))
else:
for l in range(len(temp_objs)):
agg_load_objects_update.append(temp_objs[l])
k -= 1
self.agg_load_objects = agg_load_objects_update
[docs] def merge_agg_load_objs(self, obj_list):
"""
Merges AggregatedLoad objects into a single AggregatedLoad object
:return: merged AggregatedLoad object
"""
loads = []
min_hour = ConstantClass.hours_in_year * self.sim_years
max_hour = 0
for this_obj in obj_list:
for i in range(len(this_obj.loads)):
loads.append(this_obj.loads[i])
if min_hour > this_obj.first_sim_hour:
min_hour = this_obj.first_sim_hour
if max_hour < this_obj.last_sim_hour:
max_hour = this_obj.last_sim_hour
return AggregatedLoadFixed(loads, min_hour, len(loads))
[docs] def simulate(self):
"""
More docs to come...
"""
PrintClass.my_print("Beginning simulation")
# calculate g-functions if not present
if not self.g_func_present:
PrintClass.my_print("G-functions not present", 'warn')
self.calc_g_func()
# pre-load hourly g-functions
for hour in range(self.agg_load_intervals[0] + self.min_hourly_history):
ln_t_ts = np.log((hour + 1) * 3600 / self.ts)
self.g_func_hourly.append(self.g_func(ln_t_ts))
# set aggregate load container max length
len_hourly_loads = self.min_hourly_history + self.agg_load_intervals[0]
self.hourly_loads = deque(
[0] * len_hourly_loads, maxlen=len_hourly_loads)
agg_hour = 0
sim_hour = 0
for year in range(self.sim_years):
for month in range(ConstantClass.months_in_year):
PrintClass.my_print("....Year/Month: %d/%d" %
(year + 1, month + 1))
for hour in range(ConstantClass.hours_in_month):
agg_hour += 1
sim_hour += 1
# get raw hourly load and append to hourly list
curr_index = month * ConstantClass.hours_in_month + hour
self.hourly_loads.append(self.sim_loads[curr_index])
curr_flow_rate = self.total_flow_rate[curr_index]
# update borehole flow rate
self.borehole.pipe.fluid.update_fluid_state(
new_flow_rate=curr_flow_rate)
# calculate borehole resistance
self.borehole.calc_bh_resistance()
# calculate borehole temp
# hourly effects
temp_bh_hourly = []
temp_mft_hourly = []
start_hourly = len(self.hourly_loads) - 1
end_hourly = start_hourly - agg_hour
g_func_index = -1
for i in range(start_hourly, end_hourly, -1):
g_func_index += 1
q_curr = self.hourly_loads[i]
q_prev = self.hourly_loads[i - 1]
g = self.g_func_hourly[g_func_index]
# calculate average bh temp
delta_q = (q_curr - q_prev) / \
(2 * np.pi * self.borehole.soil.conductivity *
self.total_bh_length)
temp_bh_hourly.append(delta_q * g)
# calculate mean fluid temp
g_rb = g + self.borehole.resist_bh
if g_rb < 0:
g = -self.borehole.resist_bh * 2 * np.pi * self.borehole.soil.conductivity
g_rb = g + self.borehole.resist_bh
temp_mft_hourly.append(delta_q * g_rb)
# aggregated load effects
temp_bh_agg = []
temp_mft_agg = []
if self.agg_loads_flag:
for i in range(len(self.agg_load_objects)):
if i == 0:
continue
curr_obj = self.agg_load_objects[i]
prev_obj = self.agg_load_objects[i - 1]
t_agg = sim_hour - curr_obj.time()
ln_t_ts = np.log(t_agg * 3600 / self.ts)
g = self.g_func(ln_t_ts)
# calculate the average borehole temp
delta_q = (curr_obj.q - prev_obj.q) / (
2 * np.pi * self.borehole.soil.conductivity * self.total_bh_length)
temp_bh_agg.append(delta_q * g)
# calculate the mean fluid temp
g_rb = g + self.borehole.resist_bh
if g_rb < 0:
g = -self.borehole.resist_bh * 2 * np.pi * self.borehole.soil.conductivity
g_rb = g + self.borehole.resist_bh
temp_mft_agg.append(delta_q * g_rb)
# aggregate load
if agg_hour == self.agg_load_intervals[0] + self.min_hourly_history - 1:
# this has one extra value for comparative purposes
# need to get rid of it here
self.hourly_loads.popleft()
# create new aggregated load object
self.aggregate_load()
# reset aggregation hour to '0'
agg_hour -= self.agg_load_intervals[0]
# final bh temp
self.temp_bh.append(
self.borehole.soil.undisturbed_temp + sum(temp_bh_hourly) + sum(temp_bh_agg))
# final mean fluid temp
self.temp_mft.append(
self.borehole.soil.undisturbed_temp + sum(temp_mft_hourly) + sum(temp_mft_agg))
# update borehole temperature
self.borehole.pipe.fluid.update_fluid_state(new_temp=self.temp_mft[-1])
self.generate_output_reports()
PrintClass.my_print("Simulation complete", "success")
PrintClass.my_print("Simulation time: %0.3f sec" %
(timeit.default_timer() - self.timer_start))
PrintClass.write_log_file()