Source code for pyhydrophone.soundtrap

#!/usr/bin/python
from pyhydrophone.hydrophone import Hydrophone

import os
import zipfile
import numpy as np
import pandas as pd
import soundfile as sf
from datetime import datetime
import xml.etree.ElementTree as ET
import requests
import pathlib


[docs]class SoundTrap(Hydrophone): """ Initialize a SoundTrap instance Parameters ---------- name: str Name of the acoustic recorder model: str or int Model of the acoustic recorder serial_number : str or int Serial number of the acoustic recorder. It has to match the one in the calibration file sensitivity : float Sensitivity of the acoustic recorder in db. If None the one from the calibration file will be read gain_type : str 'High' or 'Low', depending on the settings of the recorder string_format : string Format of the datetime string present in the filename calibration_file : string or Path File where the frequency dependent sensitivity values for the calibration are """ def __init__(self, name, model, serial_number, sensitivity=None, gain_type='High', string_format="%y%m%d%H%M%S", calibration_file=None, **kwargs): if sensitivity is None: try: query = 'http://oceaninstruments.azurewebsites.net/api/Devices/Search/%s' % serial_number response = requests.get(query).json() if len(response) > 1: models_available = {} for device in response: if device['serialNo'] == str(serial_number): models_available[device['modelName']] = device['deviceId'] if model not in models_available.keys(): raise AttributeError('There are multiple instruments with serial number %s. Set the model ' 'parameter to match the model specified in the SoundTrap calibration ' 'webpage to chose the correct one: %s' % (serial_number, models_available.keys())) else: device_id = models_available[model] else: # Ignore the model name if there is only one serial number device_id = response[0]['deviceId'] query = 'http://oceaninstruments.azurewebsites.net/api/Calibrations/Device/%s' % device_id response = requests.get(query).json()[0] if gain_type == 'High': sensitivity = -response['highFreq'] elif gain_type == 'Low': sensitivity = -response['lowFreq'] else: raise Exception('Gain type %s is not implemented!' % gain_type) except ConnectionError: raise Exception('Serial number %s is not in the OceanInstruments database!' % serial_number) super().__init__(name, model, serial_number=serial_number, sensitivity=sensitivity, preamp_gain=0.0, Vpp=2.0, string_format=string_format, calibration_file=calibration_file, **kwargs)
[docs] @staticmethod def read_file_specs(xmlfile_path, last_gain, date_format='%Y-%m-%dT%H:%M:%S'): """ Read the specs of the recording from the XML file and save them to the object Parameters ---------- xmlfile_path : string or path Path to the xml file last_gain : str Last gain type. 'High' or 'Low', depending on the settings of the recorder date_format : string Format of the datetime in the .log.xml file """ tree = ET.parse(xmlfile_path) type_start = tree.find('EVENT/START').get('STATE') # Metadata colected temp = float(tree.find('EVENT/TEMPERATURE').text)/100 # WavFileHandler information sampling_attr = {} WavFileHandler = tree.findall('PROC_EVENT/WavFileHandler') for wfh in WavFileHandler: sampling_attr.update(wfh.attrib) # Info about the sampling fs = float(tree.find('CFG/FS').text) # Setup information. Read SoundTrap gain ('HIGH' or 'LOW') if type_start == 'NEW': st_gain = tree.find('EVENT/AUDIO').get('Gain') else: if last_gain is None: print('Unknown gain if it is reopened and the last gain is not passed!') st_gain = last_gain start_time = datetime.strptime(sampling_attr['SamplingStartTimeLocal'], date_format) stop_time = datetime.strptime(sampling_attr['SamplingStopTimeLocal'], date_format) return {'type_start': type_start, 'temp': temp, 'fs': fs, 'st_gain': st_gain, 'start_time': start_time, 'stop_time': stop_time}
[docs] def get_name_datetime(self, file_name): """ Get the data and time of recording from the name of the file Will convert the local in UTC. It assumes the localtime is the one from the computer Parameters ---------- file_name : string File name (not path) of the file """ name = file_name.split('.') date_string = name[1] date = super().get_name_datetime(date_string) return date
[docs] @staticmethod def get_xml_utc_datetime(file_path): """ Get the UTC datetime from the xml file Parameters ---------- file_path : str or Path """ if type(file_path) == str: xml_name = file_path.replace('.wav', '.log.xml') else: xml_name = file_path.parent.joinpath(file_path.name.replace('.wav', '.log.xml')) tree = ET.parse(xml_name) WavFileHandler_list = tree.findall('PROC_EVENT/WavFileHandler') for wfh in WavFileHandler_list: if 'SamplingStartTimeUTC' in wfh.attrib.keys(): utc_datetime = datetime.strptime(wfh.attrib.values(), format='%Y-%m-%dTH:M:S') return utc_datetime return None
[docs] def get_new_name(self, filename, new_date): """ Replace the datetime with the appropriate one Parameters ---------- filename : string File name (not path) of the file new_date : datetime object New datetime to be replaced in the filename """ old_date = self.get_name_datetime(filename) old_date_name = datetime.strftime(old_date, "%y%m%d%H%M%S") new_date_name = datetime.strftime(new_date, "%y%m%d%H%M%S") new_filename = filename.replace(old_date_name, new_date_name) return new_filename
[docs] def test_calibration(self, signal): """ Test the calibration of the soundtrap """
# TO BE IMPLEMENTED
[docs]class SoundTrapHF(SoundTrap): def __init__(self, name, model, serial_number, sensitivity=None, gain_type='High', string_format="%y%m%d%H%M%S"): """ Init a SoundTrap HF reader Parameters ---------- name: str Name of the acoustic recorder model: str or int Model of the acoustic recorder serial_number : str or int Serial number of the acoustic recorder. It has to match the one in the calibration file sensitivity : float Sensitivity of the acoustic recorder in db. If None the one from the calibration file will be read gain_type : str 'High' or 'Low', depending on the settings of the recorder string_format : string Format of the datetime string present in the filename """ super().__init__(name, model, serial_number, sensitivity, gain_type, string_format)
[docs] def read_HFfolder(self, main_folder_path, zip_mode=False, include_dirs=False): """ Read all the clicks in all the folders. If zip_mode is True and include_dirs is True, only the INSIDE folders can be zipped inside a non-zipped folder. If only one zip folder is to be analyzed, then set include_dirs to False. Parameters ---------- main_folder_path: str or Path Folder containing all the files and/or subfolders to be extracted zip_mode : boole Set to True if the folders are zipped include_dirs : bool Set to True if folder needs to be analyzed recursively Returns ------- A DataFrame with all the clicks of all the folders and a fs metadata parameter with the sampling rate """ if type(main_folder_path) == str: main_folder_path = pathlib.Path(main_folder_path) clicks = pd.DataFrame() if not zip_mode: if include_dirs: glob_str = '**.wav' else: glob_str = '*.wav' for file_name in main_folder_path.glob(glob_str): clicks_file = self.read_HFclicks_file(file_name, zip_mode) clicks = pd.concat([clicks, clicks_file]) else: if include_dirs: for zipped_dir in main_folder_path.glob('*'): clicks_file = self.read_HFfolder(zipped_dir, zip_mode=zip_mode, include_dirs=False) clicks = clicks.append(clicks_file) else: folder_path = zipfile.ZipFile(main_folder_path, 'r', allowZip64=True) files_list = folder_path.namelist() for file_name in files_list: if file_name.split('.')[-1] == 'wav': file_name = pathlib.Path(folder_path.filename).joinpath(file_name) clicks_file = self.read_HFclicks_file(file_name, zip_mode) clicks = clicks.append(clicks_file) return clicks
[docs] def read_HFclicks_file(self, wavfile_path, zip_mode=False): """ Read all the clicks stored in a folder with soundtrap files Parameters ---------- wavfile_path: str Wav file path zip_mode : boolean Set to True if the folders are zipped Returns ------- A DataFrame with all the clicks and a fs metadata parameter with the sampling rate """ if not isinstance(wavfile_path, pathlib.Path): wavfile_path = pathlib.Path(wavfile_path) bcl_name = wavfile_path.name.replace('.wav', '.bcl') dwv_name = wavfile_path.name.replace('.wav', '.dwv') xml_name = wavfile_path.name.replace('.wav', '.log.xml') if zip_mode: zip_file = zipfile.ZipFile(wavfile_path.parent, 'r', allowZip64=True) bcl_path = zip_file.open(bcl_name) dwv_path = zip_file.open(dwv_name) xml_path = zip_file.open(xml_name) else: bcl_path = os.path.join(wavfile_path.parent, bcl_name) dwv_path = os.path.join(wavfile_path.parent, dwv_name) xml_path = os.path.join(wavfile_path.parent, xml_name) try: file_clicks = self._read_HFclicks(bcl_path, dwv_path, xml_path) except FileNotFoundError: print(dwv_path, 'has some problem and can not be read') if zip_mode: dwv_path = wavfile_path.parent.joinpath(bcl_name) file_clicks['filename'] = str(dwv_path) return file_clicks
def _read_HFclicks(self, bcl_path, dwv_path, xml_path): """ Read the clicks of one soundtrap file Parameters ---------- bcl_path : str or Path Path to the bcl file dwv_path : str or Path Path to the dwv file xml_path : str or Path Path to the .log.xml file Returns ------- A DataFrame with all the parameters from the bcl file + a column with the wave and a column with the datetime """ # Read the wav file with all the clicks sound_file = sf.SoundFile(dwv_path, 'r') # click_len has to be checked automatically click_len = self.read_HFparams(xml_path=xml_path) # Read the info of clicks clicks_info = pd.read_csv(bcl_path) clicks_info = clicks_info[clicks_info['report'] == 'D'] clicks_info = clicks_info[clicks_info['state'] == 1] waves = [] for block in sound_file.blocks(blocksize=click_len): waves.append(block.astype(float)) print(dwv_path, 'bcl:', len(clicks_info), 'dwv:', len(waves)) if len(waves) < len(clicks_info): # Cut the clicks info if there are not enough snippets clicks_info = clicks_info.loc[0:len(waves)] clicks_info['wave'] = waves[0:len(clicks_info)] clicks_info['start_sample'] = np.arange(len(clicks_info)) * click_len clicks_info['end_sample'] = clicks_info['start_sample'] + click_len clicks_info['duration'] = click_len clicks_info['fs'] = sound_file.samplerate clicks_info['datetime'] = pd.to_datetime(clicks_info['rtime'] + clicks_info['mticks'] / 1e6, unit='s') return clicks_info.reset_index(drop=True)
[docs] @staticmethod def read_HFparams(xml_path): """ Return the length of the clips and the time in between Parameters ---------- xml_path : string or Path Path to the .log.xml file Returns ------- Clip length in samples (int) """ tree = ET.parse(xml_path) # blank_len = int(tree.find('CFG/BLANKING').text) clip_len = int(tree.find('CFG/PREDET').text) + int(tree.find('CFG/POSTDET').text) return clip_len