@ -32,7 +32,7 @@ import numpy as np
from . import __version__ , schema , sigmf_hash , validate
from . archive import SigMFArchive , SIGMF_DATASET_EXT , SIGMF_METADATA_EXT , SIGMF_ARCHIVE_EXT
from . utils import dict_merge , insert_sorted_dict_list
from . error import SigMFFileError
from . error import SigMFFileError , SigMFAccessError
class SigMFFile ( ) :
@ -59,6 +59,7 @@ class SigMFFile():
VERSION_KEY = " core:version "
DATATYPE_KEY = " core:datatype "
FREQUENCY_KEY = " core:frequency "
HEADER_BYTES_KEY = " core:header_bytes "
FLO_KEY = " core:freq_lower_edge "
FHI_KEY = " core:freq_upper_edge "
SAMPLE_RATE_KEY = " core:sample_rate "
@ -91,7 +92,7 @@ class SigMFFile():
GEOLOCATION_KEY , HASH_KEY , HW_KEY , LICENSE_KEY , META_DOI_KEY , METADATA_ONLY_KEY , NUM_CHANNELS_KEY , RECORDER_KEY ,
SAMPLE_RATE_KEY , START_OFFSET_KEY , TRAILING_BYTES_KEY , VERSION_KEY
]
VALID_CAPTURE_KEYS = [ DATETIME_KEY , FREQUENCY_KEY , GLOBAL_INDEX_KEY, START_INDEX_KEY ]
VALID_CAPTURE_KEYS = [ DATETIME_KEY , FREQUENCY_KEY , HEADER_BYTES_KEY, GLOBAL_INDEX_KEY, START_INDEX_KEY ]
VALID_ANNOTATION_KEYS = [
COMMENT_KEY , FHI_KEY , FLO_KEY , GENERATOR_KEY , LABEL_KEY , LAT_KEY , LENGTH_INDEX_KEY , LON_KEY , START_INDEX_KEY
]
@ -134,6 +135,27 @@ class SigMFFile():
''' Returns integer number of channels if present, otherwise 1 '''
return self . get_global_field ( self . NUM_CHANNELS_KEY , 1 )
def _is_conforming_dataset ( self ) :
"""
Returns ` True ` if the dataset is conforming to SigMF , ` False ` otherwise
The dataset is non - conforming if the datafile contains non - sample bytes
which means global trailing_bytes field is zero or not set , all captures
` header_bytes ` fields are zero or not set . Because we do not necessarily
know the filename no means of verifying the meta / data filename roots
match , but this will also check that a data file exists .
"""
if self . get_global_field ( self . TRAILING_BYTES_KEY , 0 ) :
return False
for capture in self . get_captures ( ) :
# check for any non-zero `header_bytes` fields in captures segments
if capture . get ( self . HEADER_BYTES_KEY , 0 ) :
return False
if not path . isfile ( self . data_file ) :
return False
# if we get here, the file exists and is conforming
return True
def _validate_dict_in_section ( self , entries , section_key ) :
"""
Checks a dictionary for validity .
@ -234,6 +256,41 @@ class SigMFFile():
cap_info = dict_merge ( cap_info , capture )
return cap_info
def get_capture_start ( self , index ) :
"""
Returns a the start sample index of a given capture , will raise
SigMFAccessError if this field is missing .
"""
start = self . get_captures ( ) [ index ] . get ( self . START_INDEX_KEY )
if start is None :
raise SigMFAccessError ( " Capture {} does not have required {} key " . format ( index , self . START_INDEX_KEY ) )
return start
def get_capture_byte_boundarys ( self , index ) :
"""
Returns a tuple of the file byte range in a dataset of a given SigMF
capture of the form [ start , stop ) . This function works on either
compliant or noncompliant SigMF Recordings .
"""
if index > = len ( self . get_captures ( ) ) :
raise SigMFAccessError ( " Invalid captures index {} (only {} captures in Recording) " . format ( index , len ( self . get_captures ( ) ) ) )
start_byte = 0
prev_start_sample = 0
for ii , capture in enumerate ( self . get_captures ( ) ) :
start_byte + = capture . get ( self . HEADER_BYTES_KEY , 0 )
start_byte + = ( self . get_capture_start ( ii ) - prev_start_sample ) * self . get_sample_size ( ) * self . get_num_channels ( )
prev_start_sample = self . get_capture_start ( ii )
if ii > = index :
break
end_byte = start_byte
if index == len ( self . get_captures ( ) ) - 1 : # last captures...data is the rest of the file
end_byte = path . getsize ( self . data_file ) - self . get_global_field ( self . TRAILING_BYTES_KEY , 0 )
else :
end_byte + = ( self . get_capture_start ( index + 1 ) - self . get_capture_start ( index ) ) * self . get_sample_size ( ) * self . get_num_channels ( )
return ( start_byte , end_byte )
def add_annotation ( self , start_index , length , metadata = None ) :
"""
Insert annotation
@ -293,7 +350,8 @@ class SigMFFile():
else :
sample_count = 0
else :
file_size = path . getsize ( self . data_file ) - self . get_global_field ( self . TRAILING_BYTES_KEY , 0 ) # in bytes
header_bytes = sum ( [ c . get ( self . HEADER_BYTES_KEY , 0 ) for c in self . get_captures ( ) ] )
file_size = path . getsize ( self . data_file ) - self . get_global_field ( self . TRAILING_BYTES_KEY , 0 ) - header_bytes # bytes
sample_size = self . get_sample_size ( ) # size of a sample in bytes
num_channels = self . get_num_channels ( )
sample_count = file_size / / sample_size / / num_channels
@ -434,6 +492,29 @@ class SigMFFile():
with open ( fns [ ' meta_fn ' ] , ' w ' ) as fp :
self . dump ( fp , pretty = pretty )
def read_samples_in_capture ( self , index = 0 , autoscale = True ) :
'''
Reads samples from the specified captures segment in its entirety .
Parameters
- - - - - - - - - -
index : int , default 0
Captures segment to read samples from .
autoscale : bool , default True
If dataset is in a fixed - point representation , scale samples from ( min , max ) to ( - 1.0 , 1.0 )
Returns
- - - - - - -
data : ndarray
Samples are returned as an array of float or complex , with number of dimensions equal to NUM_CHANNELS_KEY .
'''
cb = self . get_capture_byte_boundarys ( index )
if ( cb [ 1 ] - cb [ 0 ] ) % ( self . get_sample_size ( ) * self . get_num_channels ( ) ) :
warnings . warn ( f ' Capture ` { index } ` in ` { self . data_file } ` does not contain '
' an integer number of samples across channels. It may be invalid. ' )
return self . _read_datafile ( cb [ 0 ] , ( cb [ 1 ] - cb [ 0 ] ) / / self . get_sample_size ( ) , autoscale , False )
def read_samples ( self , start_index = 0 , count = - 1 , autoscale = True , raw_components = False ) :
'''
Reads the specified number of samples starting at the specified index from the associated data file .
@ -465,23 +546,30 @@ class SigMFFile():
raise SigMFFileError ( " Cannot read samples from a metadata only distribution. " )
else :
raise SigMFFileError ( " No signal data file has bfeen associated with the metadata. " )
first_byte = start_index * self . get_sample_size ( ) * self . get_num_channels ( )
if not self . _is_conforming_dataset ( ) :
warnings . warn ( f ' Recording dataset appears non-compliant, resulting data may be erroneous ' )
return self . _read_datafile ( first_byte , count * self . get_num_channels ( ) , autoscale , False )
def _read_datafile ( self , first_byte , nitems , autoscale , raw_components ) :
'''
internal function for reading samples from datafile
'''
dtype = dtype_info ( self . get_global_field ( self . DATATYPE_KEY ) )
is_complex_data = dtype [ ' is_complex ' ]
is_fixedpoint_data = dtype [ ' is_fixedpoint ' ]
is_unsigned_data = dtype [ ' is_unsigned ' ]
data_type_in = dtype [ ' sample_dtype ' ]
component_type_in = dtype [ ' component_dtype ' ]
sample_size = dtype [ ' sample_size ' ]
component_size = dtype [ ' component_size ' ]
data_type_out = np . dtype ( " f4 " ) if not is_complex_data else np . dtype ( " f4, f4 " )
num_channels = self . get_num_channels ( )
fp = open ( self . data_file , " rb " )
fp . seek ( start_index * sample_size * num_channels , 0 )
data = np . fromfile ( fp , dtype = data_type_in , count = count * num_channels )
fp . seek ( first_byte , 0 )
data = np . fromfile ( fp , dtype = data_type_in , count = nitems )
if num_channels != 1 :
# return reshaped view for num_channels
# first dimension will be double size if `is_complex_data`