Source code for pyhydrophone.rtsys

#!/usr/bin/python
from pyhydrophone.hydrophone import Hydrophone

from datetime import datetime
import struct
import numpy as np
import zipfile
import os
import pathlib

try:
    import matplotlib.pyplot as plt
    import pandas as pd
except ModuleNotFoundError:
    pass


[docs]class RTSys(Hydrophone): """ Init an instance of RTSys Parameters ---------- name: str Name of the acoustic recorder model: str or int Model of the acoustic recorder serial_number : str or int Serial number of the acoustic recorder sensitivity : float Sensitivity of the acoustic recorder in db preamp_gain : float Gain of the preamplifier in dB Vpp : float Voltage peak to peak in volts mode: string Can be 'lowpower' or 'broadband' channel: string Channel to process, 'A', 'B', 'C' or 'D' string_format : string Format of the datetime string present in the filename calibration_file : string or Path File where the frequency dependent sensitivity values for the calibration are """ def __init__(self, name, model, serial_number, sensitivity, preamp_gain, Vpp, mode, channel='A', string_format="%Y-%m-%d_%H-%M-%S", calibration_file=None): super().__init__(name, model, serial_number, sensitivity, preamp_gain, Vpp, string_format, calibration_file, val='sensitivity', sep=';', freq_col_id=0, val_col_id=1, start_data_id=0) self.cal_freq = 250 self.cal_value = 114 self.mode = mode self.channel = channel
[docs] def get_name_datetime(self, file_name): """ Get the data and time of recording from the name of the file Parameters ---------- file_name : string File name (not path) of the file """ name = file_name.split('.')[0] start_timestamp = name.find('_') + 1 date_string = name[start_timestamp::] date = super().get_name_datetime(date_string) return date
[docs] def get_new_name(self, filename, new_date): """ Replace the datetime with the appropriate one Parameters ---------- filename : string File name (not path) of the file new_date : datetime object New datetime to be replaced in the filename """ old_date = self.get_name_datetime(filename) old_date_name = datetime.strftime(old_date, self.string_format) new_date_name = datetime.strftime(new_date, self.string_format) new_filename = filename.replace(old_date_name, new_date_name) return new_filename
@staticmethod def _parse_board_file(board_file_path): board_info = pd.read_csv(board_file_path, delimiter=';', names=['id', 'T', 'V', 'I', 'P'], usecols=[0, 1, 2, 3, 4]) board_info['T'] = board_info['T'].str.replace('T:', '') board_info.loc[board_info['T'] == ''] = np.nan board_info['T'] = board_info['T'].astype(float) board_info['V'] = board_info['V'].str.replace('V:', '') board_info.loc[board_info['V'] == ''] = np.nan board_info['V'] = board_info['V'].astype(float) board_info['I'] = board_info['I'].str.replace('I:', '') board_info.loc[board_info['I'] == ''] = np.nan board_info['I'] = board_info['I'].astype(float) board_info['P'] = board_info['P'].str.replace('P:', '') board_info.loc[board_info['P'] == ''] = np.nan board_info['P'] = board_info['P'].astype(float) board_info['timestamp'] = pd.to_datetime(board_info['id'].str.replace('@:', '').astype(float), unit='s') board_info['dP'] = board_info['timestamp'].diff().dt.total_seconds() * board_info['P'] return board_info
[docs] def plot_consumption(self, board_file_path): """ Plot the consumption evolution from the board_file_path Parameters ---------- board_file_path : str or Path """ board_info = self._parse_board_file(board_file_path) board_info.plot(y=['V', 'P']) plt.show()
[docs] def plot_consumption_total_mission(self, mission_folder_path, ax=None, show=True): if not isinstance(mission_folder_path, pathlib.Path): mission_folder_path = pathlib.Path(mission_folder_path) total_board = pd.DataFrame() for board_file_i in mission_folder_path.glob('**/*.txt'): if 'board' in board_file_i.name: board_i = self._parse_board_file(board_file_i) total_board = pd.concat([total_board, board_i]) if ax is None: fig, ax = plt.subplots() total_board.plot(x='timestamp', y=['V', 'P'], secondary_y='P', ax=ax) if show: plt.show()
[docs] def compute_consumption(self, board_file_path): """ Calculate the total energy consumption of the file Parameters ---------- board_file_path : str or Path Returns ------- Total consumption in the file """ board_info = self._parse_board_file(board_file_path) return board_info['dP'].sum() / 3600
[docs] def compute_consumption_total_mission(self, mission_folder_path): """ Calculate the total energy consumption of the file Parameters ---------- mission_folder_path : str or Path Returns ------- Total consumption in the mission """ if not isinstance(mission_folder_path, pathlib.Path): mission_folder_path = pathlib.Path(mission_folder_path) total_consumption = 0 for board_file_i in mission_folder_path.glob('**/*.txt'): if 'board' in board_file_i.name: total_consumption += self.compute_consumption(board_file_i) return total_consumption
[docs] @staticmethod def read_header(file_path, zip_mode=False): """ Return the parameters of the .wav file's header as a dictionary Parameters ---------- file_path: Path or string Path to the .wav file to read the header from Returns ------- extra_header: dictionary with all the parameters of the configuration provided by RTSys """ HEADER_CONF = { 'conf': {'format': 'c', 'start': 0, 'end': 4, 'n': 4}, 'conf_size': {'format': 'I', 'start': 4, 'end': 8, 'n': 1}, 'conf_version': {'format': 'I', 'start': 8, 'end': 12, 'n': 1}, 'epoch_time_recording': {'format': 'd', 'start': 16, 'end': 24, 'n': 1}, 'channel': {'format': 'c', 'start': 23, 'end': 24, 'n': 1}, 'sr': {'format': 'f', 'start': 28, 'end': 32, 'n': 1}, 'hydrophone_sensitivity_A': {'format': 'f', 'start': 32, 'end': 36, 'n': 1}, 'hydrophone_sensitivity_B': {'format': 'f', 'start': 36, 'end': 40, 'n': 1}, 'hydrophone_sensitivity_C': {'format': 'f', 'start': 40, 'end': 44, 'n': 1}, 'hydrophone_sensitivity_D': {'format': 'f', 'start': 44, 'end': 48, 'n': 1}, 'hydrophone_amplification_A': {'format': 'f', 'start': 48, 'end': 52, 'n': 1}, 'hydrophone_amplification_B': {'format': 'f', 'start': 52, 'end': 56, 'n': 1}, 'hydrophone_amplification_C': {'format': 'f', 'start': 56, 'end': 60, 'n': 1}, 'hydrophone_amplification_D': {'format': 'f', 'start': 60, 'end': 64, 'n': 1}, 'correction_factor_A': {'format': 'f', 'start': 64, 'end': 68, 'n': 1}, 'correction_factor_B': {'format': 'f', 'start': 68, 'end': 72, 'n': 1}, 'correction_factor_C': {'format': 'f', 'start': 72, 'end': 76, 'n': 1}, 'correction_factor_D': {'format': 'f', 'start': 76, 'end': 80, 'n': 1}, 'serial_number': {'format': 'f', 'start': 80, 'end': 96, 'n': 4}, 'active_channels': {'format': 'c', 'start': 100, 'end': 104, 'n': 4} } if zip_mode==True: path_zip = file_path.split('.zip')[0]+'.zip' file_zip = os.path.relpath(file_path, start=path_zip).replace('\\','/') zipFolder = zipfile.ZipFile(path_zip, 'r') f = zipFolder.open(file_zip) else: f = open(file_path, 'rb') f.seek(40) config_size = struct.unpack('I', f.read(4))[0] f.seek(36) header_buffer = f.read(config_size) f.close() extra_header = {} for chunk_name, chunk_format in HEADER_CONF.items(): if (chunk_format['n'] == 1) & (chunk_format['format'] != 'c'): extra_header[chunk_name] = struct.unpack(chunk_format['format'], header_buffer[chunk_format['start']:chunk_format['end']])[0] else: extra_header[chunk_name] = bytes.decode(header_buffer[chunk_format['start']:chunk_format['end']]) return extra_header
[docs] def update_metadata(self, file_path, zip_mode=False): """ Creates a new RTSys object from an already existing one but updating the metadata from the file header. The "mode" parameter stays the same. Parameters ---------- file_path: str or Path path to the wav file recorded with RTSys with a correc header zip_mode: bool True if file is zipped, otherwise false """ header = self.read_header(file_path, zip_mode) name, model, serial_number, sens, ampl = self.meta_from_header(header) return RTSys(name=name, model=model, serial_number=serial_number, sensitivity=sens, preamp_gain=ampl, Vpp=5.0, mode=self.mode, calibration_file=self.calibration_file)
[docs] def meta_from_header(self, header): sens = header['hydrophone_sensitivity_%s' % self.channel] name = 'RTSys' model = None serial_number = header['serial_number'] if self.mode == 'lowpower': ampl = 20 * np.log10(5 / np.sqrt(2)) else: ampl = 20 * np.log10((1 / (header['hydrophone_amplification_%s' % self.channel] * header['correction_factor_%s' % self.channel]))) return name, model, serial_number, sens, ampl
[docs] def one_rtsys_per_channel_from_header(self, file_path, zip_mode=False): extra_header = self.read_header(file_path, zip_mode) active_channels = [] for channel_i in np.arange(4): if extra_header['active_channels'][channel_i] != '\x00': active_channels.append(extra_header['active_channels'][channel_i]) rtsys_list = [] for channel in active_channels: name, model, serial_number, sens, ampl = self.meta_from_header(extra_header) rtsys_list.append(RTSys(name=name, model=model, serial_number=serial_number, sensitivity=sens, preamp_gain=ampl, Vpp=5.0, channel=channel)) if len(rtsys_list) == 1: return rtsys_list[0] else: return rtsys_list
[docs] def calibrate(self, file_path, zip_mode=False): header = self.read_header(file_path, zip_mode) name, model, serial_number, sens, ampl = self.meta_from_header(header) self.sensitivity = sens self.preamp_gain = ampl self.Vpp = 5.0
[docs] def get_freq_cal(self, val='sensitivity', sep=';', freq_col_id=0, val_col_id=1, start_data_id=0): super().get_freq_cal(sep=sep, freq_col_id=freq_col_id, val_col_id=val_col_id, start_data_id=start_data_id)