Browse Source

new i/o methods for annotations & samples; +python3

The following squashed commit was approved for public release by **The
Aerospace Corporation** on 2019-05-07. It is covered software release
request #SW19-0024. Commits made by the *Digital Communications
Implementation Department*.

New features
- method to read & write SigMF metadata files
- method to read available captures
- method to read samples from captures
- method to get annotations
- method to count samples
- python3 module compatibility
- examples added to README

Fixes
- python3 relative path fixes
- bug in test_validation
- bug in conftest that used old SigMF datatype (f32)
- additional test in test_sigmffile
- made compatible with docker containers
- can now write multiple annotations for same sample start index

Pylint code cleanup
- removed unused imports
- fixed indentation
- fixed snake_case and too short variables
- fixed missing error definition

Other changes
- Remaining core namespace keys were added as constants to SigMFFile class.
- Sample count is automatically determined and set as a class data member
  when a data file is present.
pull/106/head
Teque5 2 years ago
parent
commit
c7a5779170
  1. 12
      .gitignore
  2. 52
      README.md
  3. 3
      example_metadata.py
  4. 11
      setup.py
  5. 23
      sigmf/__init__.py
  6. 8
      sigmf/archive.py
  7. 5
      sigmf/sigmf_hash.py
  8. 276
      sigmf/sigmffile.py
  9. 28
      sigmf/utils.py
  10. 4
      sigmf/version.py
  11. 20
      tests/conftest.py
  12. 59
      tests/test_archive.py
  13. 22
      tests/test_sigmffile.py
  14. 11
      tests/test_validation.py
  15. 6
      tests/testdata.py

12
.gitignore

@ -1,3 +1,15 @@
# temp files
*.swp
*.pyc
.cache
# setuptools related
build/*
.eggs/*
SigMF.egg-info/*
# pytest & coverage related
.coverage
pytest.xml
coverage.xml
htmlcov/*

52
README.md

@ -39,6 +39,58 @@ maintained for posterity.
Anyone is welcome to get involved - indeed, the more people involved in the
discussions, the more useful the standard is likely to be.
## Installation
After cloning, simply run the setup script for a static installation.
```
python setup.py
```
Alternatively, install the module in developer mode if you plan to experiment
with your own changes.
```
python setup.py develop
```
## Usage example
#### Load a SigMF dataset; read its annotation, metadata, and samples
```python
from sigmf import SigMFFile, sigmffile
# Load a dataset
sigmf_filename = 'datasets/my_dataset.sigmf-meta' # extension is optional
signal = sigmffile.fromfile(sigmf_filename)
# Get some metadata and all annotations
sample_rate = signal.get_global_field(SigMFFile.SAMPLE_RATE_KEY)
sample_count = signal.sample_count
signal_duration = sample_count / sample_rate
annotations = signal.get_annotations()
# Iterate over annotations
for annotation_idx, annotation in enumerate(annotations):
annotation_start_idx = annotation[SigMFFile.START_INDEX_KEY]
annotation_length = annotation[SigMFFile.LENGTH_INDEX_KEY]
annotation_comment = annotation.get(SigMFFile.COMMENT_KEY,
"[annotation {}]".format(annotation_idx))
# Get capture info associated with the start of annotation
capture = signal.get_capture_info(annotation_start_idx)
freq_center = capture.get(SigMFFile.FREQUENCY_KEY, 0)
freq_min = freq_center - 0.5*sample_rate
freq_max = freq_center + 0.5*sample_rate
# Get frequency edges of annotation (default to edges of capture)
freq_start = annotation.get(SigMFFile.FLO_KEY, f_min)
freq_stop = annotation.get(SigMFFile.FHI_KEY, f_max)
# Get the samples corresponding to annotation
samples = signal.read_samples(annotation_start_idx, annotation_length)
```
## Frequently Asked Questions
#### Is this a GNU Radio effort?

3
example_metadata.py

@ -1,12 +1,13 @@
#
# Warning: this is not strict JSON, this is python to allow inline comment
#
from sigmf import __version__
{
"global": {
"core:datatype": "cf32_le", # The datatype of the recording (here, little-endian complex 32-bit float)
"core:sample_rate": 10000000, # The sample rate of the recording (10 MHz, here).
"core:version": "0.0.1", # Version of the SigMF spec used.
"core:version": __version__, # Version of the SigMF spec used.
"core:description": "An example metadafile for a SigMF recording."
},
"captures": [

11
setup.py

@ -1,7 +1,5 @@
from setuptools import setup
import sigmf
import os
shortdesc = "Signal Metadata Format Specification"
longdesc = """
@ -10,12 +8,15 @@ sets of recorded digital signal samples with metadata written in JSON.
SigMF can be used to describe general information about a collection
of samples, the characteristics of the system that generated the
samples, and features of the signal itself.
"""
# exec version.py to get __version__ (version.py is the single source of the version)
version_file = os.path.join(os.path.dirname(__file__), 'sigmf', 'version.py')
exec(open(version_file).read())
setup(
name='SigMF',
version=sigmf.__version__,
version=__version__,
description=shortdesc,
long_description=longdesc,
url='https://github.com/gnuradio/SigMF',

23
sigmf/__init__.py

@ -18,13 +18,18 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__version__ = "0.0.1"
# Use version.py to get the version
# Never define in the __init__.py and import it in setup.py because you can't
# import sigmf in setup.py because you won't have the dependencies yet.
# https://packaging.python.org/guides/single-sourcing-package-version/
import archive
import error
import schema
import sigmffile
import validate
import utils
from archive import SigMFArchive
from sigmffile import SigMFFile
from .version import __version__
from .archive import SigMFArchive
from .sigmffile import SigMFFile
from . import archive
from . import error
from . import schema
from . import sigmffile
from . import validate
from . import utils

8
sigmf/archive.py

@ -75,7 +75,7 @@ class SigMFArchive(object):
archive_name = self._get_archive_name()
sigmf_fileobj = self._get_output_fileobj()
sigmf_archive = tarfile.TarFile(mode="w",
sigmf_archive = tarfile.TarFile(mode="w",
fileobj=sigmf_fileobj,
format=tarfile.PAX_FORMAT)
tmpdir = tempfile.mkdtemp()
@ -150,11 +150,11 @@ class SigMFArchive(object):
fileobj = self._get_open_fileobj()
except:
if self.fileobj:
e = "fileobj {!r} is not byte-writable".format(self.fileobj)
err = "fileobj {!r} is not byte-writable".format(self.fileobj)
else:
e = "can't open {!r} for writing".format(self.name)
err = "can't open {!r} for writing".format(self.name)
raise error.SigMFFileError(e)
raise error.SigMFFileError(err)
return fileobj

5
sigmf/sigmf_hash.py

@ -28,8 +28,7 @@ def calculate_sha512(filename):
Returns sha512 of filename
"""
the_hash = hashlib.sha512()
with open(filename, "rb") as f:
for buff in iter(lambda: f.read(4096), b""):
with open(filename, "rb") as handle:
for buff in iter(lambda: handle.read(4096), b""):
the_hash.update(buff)
return the_hash.hexdigest()

276
sigmf/sigmffile.py

@ -23,11 +23,14 @@ import json
import tarfile
import tempfile
from os import path
import warnings
from six import iteritems
import numpy as np
from . import __version__, schema, sigmf_hash, validate
from .archive import SigMFArchive, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT
from .archive import SigMFArchive, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SIGMF_ARCHIVE_EXT
from .utils import dict_merge, insert_sorted_dict_list
from .error import SigMFFileError
class SigMFFile(object):
@ -45,6 +48,24 @@ class SigMFFile(object):
START_OFFSET_KEY = "core:offset"
HASH_KEY = "core:sha512"
VERSION_KEY = "core:version"
DATATYPE_KEY = "core:datatype"
FREQUENCY_KEY = "core:frequency"
FLO_KEY = "core:freq_lower_edge"
FHI_KEY = "core:freq_upper_edge"
SAMPLE_RATE_KEY = "core:sample_rate"
COMMENT_KEY = "core:comment"
DESCRIPTION_KEY = "core:description"
AUTHOR_KEY = "core:author"
META_DOI_KEY = "core:meta-doi"
DATA_DOI_KEY = "core:data-doi"
GENERATOR_KEY = "core:generator"
RECORDER_KEY = "core:recorder"
LICENSE_KEY = "core:license"
HW_KEY = "core:hw"
EXTENSIONS_KEY = "core:extensions"
DATETIME_KEY = "core:datetime"
LAT_KEY = "core:latitude"
LON_KEY = "core:longitude"
GLOBAL_KEY = "global"
CAPTURE_KEY = "captures"
ANNOTATION_KEY = "annotations"
@ -70,6 +91,7 @@ class SigMFFile(object):
self.data_file = data_file
if self.data_file:
self.calculate_hash()
self._count_samples()
def __str__(self):
return self.dumps()
@ -89,9 +111,9 @@ class SigMFFile(object):
Throws if not.
"""
schema_section = self.get_schema()[section_key]
for k, v in iteritems(entries):
for key, value in iteritems(entries):
validate.validate_key_throw(
v, schema_section.get(k, {}), schema_section, k
value, schema_section.get(key, {}), schema_section, key
)
def get_schema(self):
@ -160,6 +182,14 @@ class SigMFFile(object):
self.START_INDEX_KEY,
)
def get_captures(self):
"""
Returns a list of dictionaries representing all captures.
"""
return [
x for x in self._metadata.get(self.CAPTURE_KEY, [])
]
def get_capture_info(self, index):
"""
Returns a dictionary containing all the capture information at sample
@ -189,19 +219,55 @@ class SigMFFile(object):
self._metadata.get(self.ANNOTATION_KEY, []),
metadata,
self.START_INDEX_KEY,
force_insertion=True
)
def get_annotations(self, index):
def get_annotations(self, index=None):
"""
Returns a list of dictionaries.
Every dictionary contains one annotation for the sample at 'index'.
If no index is specified, all annotations are returned.
Keyword arguments:
index -- the criteria for selecting annotations; this sample index must be contained in each annotation that is returned
"""
return [
x for x in self._metadata.get(self.ANNOTATION_KEY, [])
if x[self.START_INDEX_KEY] <= index
and x[self.START_INDEX_KEY] + x[self.LENGTH_INDEX_KEY] > index
if index is None or (x[self.START_INDEX_KEY] <= index
and x[self.START_INDEX_KEY] + x[self.LENGTH_INDEX_KEY] > index)
]
def get_sample_size(self):
"""
Determines the size of a sample, in bytes, from the datatype of this set.
For complex data, a 'sample' includes both the real and imaginary part.
"""
return dtype_info(self.get_global_field(self.DATATYPE_KEY))['sample_size']
def _count_samples(self):
"""
Count, set, and return the total number of samples in the data file.
If there is no data file but there are annotations, use the end index
of the final annotation instead. If there are no annotations, use 0.
For complex data, a 'sample' includes both the real and imaginary part.
"""
annotations = self.get_annotations()
if self.data_file is None:
if len(annotations) > 0:
sample_count = annotations[-1][self.START_INDEX_KEY] + annotations[-1][self.LENGTH_INDEX_KEY]
else:
sample_count = 0
else:
file_size = path.getsize(self.data_file)
sample_size = self.get_sample_size()
sample_count = file_size // sample_size
if file_size % sample_size != 0:
warnings.warn("File '{}' does not contain an integral number of sample. It might not be valid data.".format(self.data_file))
if len(annotations) > 0 and annotations[-1][self.START_INDEX_KEY] + annotations[-1][self.LENGTH_INDEX_KEY] > sample_count:
warnings.warn("File '{}' ends before the final annotation in the corresponding SigMF metadata.".format(self.data_file))
self.sample_count = sample_count
return sample_count
def calculate_hash(self):
"""
Calculates the hash of the data file and adds it to the global section.
@ -212,9 +278,10 @@ class SigMFFile(object):
def set_data_file(self, data_file):
"""
Set the datafile path and recalculate the hash. Return the hash string.
Set the datafile path, then recalculate the hash and sample count. Return the hash string.
"""
self.data_file = data_file
self._count_samples()
return self.calculate_hash()
def validate(self):
@ -238,6 +305,7 @@ class SigMFFile(object):
json.dump(
self._metadata,
filep,
sort_keys=True if pretty else False,
indent=4 if pretty else None,
separators=(',', ': ') if pretty else None,
)
@ -264,29 +332,126 @@ class SigMFFile(object):
archive = SigMFArchive(self, name, fileobj)
return archive.path
def tofile(self, file_path, pretty=False, toarchive=False):
"""
Dump contents to file.
"""
fns = get_sigmf_filenames(file_path)
if toarchive:
self.archive(fns['archive_fn'])
else:
with open(fns['meta_fn'], 'w') as fp:
self.dump(fp, pretty=pretty)
def read_samples(self, start_index=0, count=1, autoscale=True, raw_components=False):
"""
Reads the specified number of samples starting at the specified index
from the associated data file.
Samples are returned as a NumPy array of type np.float32 (if real data)
or np.complex64.
Keyword arguments:
start_index -- starting sample index from which to read
count -- number of samples to read
autoscale -- if dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0)
raw_components -- if True, read and return the sample components (individual I and Q for complex, samples for real) with no conversions
"""
if count < 1:
raise IOError("Number of samples must be greater than zero.")
if start_index + count > self.sample_count:
raise IOError("Cannot read beyond EOF.")
if self.data_file is None:
raise SigMFFileError("No signal data file has been associated with the metadata.")
dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY))
is_complex_data = dtype['is_complex']
is_fixedpoint_data = dtype['is_fixedpoint']
is_unsigned_data = dtype['is_unsigned']
data_type_in = dtype['sample_dtype']
component_type_in = dtype['component_dtype']
sample_size = dtype['sample_size']
component_size = dtype['component_size']
data_type_out = np.dtype("f4") if not is_complex_data else np.dtype("f4,f4")
fp = open(self.data_file, "rb")
fp.seek(start_index * sample_size, 0)
data = np.fromfile(fp, dtype=data_type_in, count=count)
if not raw_components:
data = data.astype(data_type_out)
if autoscale and is_fixedpoint_data:
data = data.view(np.dtype("f4"))
if is_unsigned_data:
data -= 2**(component_size*8-1)
data *= 2**-(component_size*8-1)
data = data.view(data_type_out)
if is_complex_data:
data = data.view(np.complex64)
else:
data = data.view(component_type_in)
def get_default_metadata(schema):
"""Return the minimal metadata that will pass the validator."""
def get_default_dict(keys_dict):
" Return a dict with all default values from keys_dict "
return {
key: desc.get("default")
for key, desc in iteritems(keys_dict)
if "default" in desc
}
fp.close()
return data
def default_category_data(cat_type, defaults):
" Return a valid data type for a category "
return {
'dict': lambda x: x,
'dict_list': lambda x: [x] if x else [],
}[cat_type](defaults)
def dtype_info(datatype):
"""
Parses a datatype string conforming to the SigMF spec and returns a dict
of values describing the format.
return {
category: default_category_data(desc["type"], get_default_dict(desc["keys"]))
for category, desc in iteritems(schema)
}
Keyword arguments:
datatype -- a SigMF-compliant datatype string
"""
output_info = {}
dtype = datatype.lower()
is_unsigned_data = "u" in datatype
is_complex_data = "c" in datatype
is_fixedpoint_data = "f" not in datatype
dtype = datatype.lower().split("_")
byte_order = ""
if len(dtype) == 2:
if dtype[1][0] == "l":
byte_order = "<"
elif dtype[1][0] == "b":
byte_order = ">"
else:
raise SigMFFileError("Unrecognized endianness specifier: '{}'".format(dtype[1]))
dtype = dtype[0]
if "32" in dtype:
sample_size = 4
elif "16" in dtype:
sample_size = 2
elif "8" in dtype:
sample_size = 1
else:
raise SigMFFileError("Unrecognized datatype: '{}'".format(dtype))
component_size = sample_size
if is_complex_data:
sample_size *= 2
sample_size = int(sample_size)
data_type_str = byte_order
data_type_str += "f" if not is_fixedpoint_data else "u" if is_unsigned_data else "i"
data_type_str += str(component_size)
if is_complex_data:
data_type_str = ','.join((data_type_str, data_type_str))
data_type_in = np.dtype(data_type_str)
output_info['sample_dtype'] = data_type_in
output_info['component_dtype'] = data_type_in['f0'] if is_complex_data else data_type_in
output_info['sample_size'] = sample_size
output_info['component_size'] = component_size
output_info['is_complex'] = is_complex_data
output_info['is_unsigned'] = is_unsigned_data
output_info['is_fixedpoint'] = is_fixedpoint_data
return output_info
def fromarchive(archive_path, dir=None):
"""Extract an archive and return a SigMFFile.
@ -320,3 +485,62 @@ def fromarchive(archive_path, dir=None):
archive.close()
return SigMFFile(metadata=metadata, data_file=data_file)
def fromfile(filename):
"""
Creates and returns a returns a SigMFFile instance with metadata loaded from the specified file.
The filename may be that of either a sigmf-meta file, a sigmf-data file, or a sigmf archive.
Keyword arguments:
filename -- the SigMF filename
"""
fns = get_sigmf_filenames(filename)
meta_fn = fns['meta_fn']
data_fn = fns['data_fn']
archive_fn = fns['archive_fn']
if (filename.lower().endswith(SIGMF_ARCHIVE_EXT) or not path.isfile(meta_fn)) and path.isfile(archive_fn):
return fromarchive(archive_fn)
if not path.isfile(data_fn):
data_fn = None
meta_fp = open(meta_fn, "rb")
bytestream_reader = codecs.getreader("utf-8")
mdfile_reader = bytestream_reader(meta_fp)
metadata = json.load(mdfile_reader)
meta_fp.close()
return SigMFFile(metadata=metadata, data_file=data_fn)
def get_sigmf_filenames(filename):
"""
Safely returns a set of SigMF file paths given an input filename.
Returned as dict with 'data_fn', 'meta_fn', and 'archive_fn' as keys.
Keyword arguments:
filename -- the SigMF filename
"""
filename = path.splitext(filename)[0]
return {'data_fn': filename+SIGMF_DATASET_EXT, 'meta_fn': filename+SIGMF_METADATA_EXT, 'archive_fn': filename+SIGMF_ARCHIVE_EXT}
def get_default_metadata(schema):
"""Return the minimal metadata that will pass the validator."""
def get_default_dict(keys_dict):
" Return a dict with all default values from keys_dict "
return {
key: desc.get("default")
for key, desc in iteritems(keys_dict)
if "default" in desc
}
def default_category_data(cat_type, defaults):
" Return a valid data type for a category "
return {
'dict': lambda x: x,
'dict_list': lambda x: [x] if x else [],
}[cat_type](defaults)
return {
category: default_category_data(desc["type"], get_default_dict(desc["keys"]))
for category, desc in iteritems(schema)
}

28
sigmf/utils.py

@ -34,33 +34,35 @@ def get_sigmf_iso8601_datetime_now():
return datetime.isoformat(datetime.utcnow()) + 'Z'
def parse_iso8601_datetime(d):
return datetime.strptime(d, SIGMF_DATETIME_ISO8601_FMT)
def parse_iso8601_datetime(datestr):
return datetime.strptime(datestr, SIGMF_DATETIME_ISO8601_FMT)
def dict_merge(a, b):
def dict_merge(a_dict, b_dict):
"""
Recursively merge b into a. b[k] will overwrite a[k] if it exists.
Recursively merge b_dict into a_dict. b_dict[key] will overwrite a_dict[key] if it exists.
"""
if not isinstance(b, dict):
return b
result = deepcopy(a)
for k, v in iteritems(b):
if k in result and isinstance(result[k], dict):
result[k] = dict_merge(result[k], v)
if not isinstance(b_dict, dict):
return b_dict
result = deepcopy(a_dict)
for key, value in iteritems(b_dict):
if key in result and isinstance(result[key], dict):
result[key] = dict_merge(result[key], value)
else:
result[k] = deepcopy(v)
result[key] = deepcopy(value)
return result
def insert_sorted_dict_list(dict_list, new_entry, key):
def insert_sorted_dict_list(dict_list, new_entry, key, force_insertion=False):
"""
Insert new_entry (which must be a dict) into a sorted list of other dicts.
If force_insertion is True, new_entry will NOT overwrite an existing entry
with the same key.
Returns the new list, which is still sorted.
"""
for index, entry in enumerate(dict_list):
if not entry:
continue
if entry[key] == new_entry[key]:
if entry[key] == new_entry[key] and not force_insertion:
dict_list[index] = dict_merge(entry, new_entry)
return dict_list
if entry[key] > new_entry[key]:

4
sigmf/version.py

@ -0,0 +1,4 @@
'''
This is the only place SigMF version is defined.
'''
__version__ = '0.0.2'

20
tests/conftest.py

@ -29,17 +29,17 @@ from .testdata import TEST_FLOAT32_DATA, TEST_METADATA
@pytest.yield_fixture
def test_data_file():
with tempfile.NamedTemporaryFile() as t:
TEST_FLOAT32_DATA.tofile(t.name)
yield t
with tempfile.NamedTemporaryFile() as temp:
TEST_FLOAT32_DATA.tofile(temp.name)
yield temp
@pytest.fixture
def test_sigmffile(test_data_file):
f = SigMFFile()
f.set_global_field("core:datatype", "f32")
f.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA))
f.add_capture(start_index=0)
f.set_data_file(test_data_file.name)
assert f._metadata == TEST_METADATA
return f
sigf = SigMFFile()
sigf.set_global_field("core:datatype", "rf32_le")
sigf.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA))
sigf.add_capture(start_index=0)
sigf.set_data_file(test_data_file.name)
assert sigf._metadata == TEST_METADATA
return sigf

59
tests/test_archive.py

@ -1,6 +1,5 @@
import codecs
import json
import os
import tarfile
import tempfile
from os import path
@ -22,32 +21,32 @@ def create_test_archive(test_sigmffile, tmpfile):
def test_without_data_file_throws_fileerror(test_sigmffile):
test_sigmffile.data_file = None
with tempfile.NamedTemporaryFile() as t:
with tempfile.NamedTemporaryFile() as temp:
with pytest.raises(error.SigMFFileError):
test_sigmffile.archive(name=t.name)
test_sigmffile.archive(name=temp.name)
def test_invalid_md_throws_validationerror(test_sigmffile):
del test_sigmffile._metadata["global"]["core:datatype"] # required field
with tempfile.NamedTemporaryFile() as t:
with tempfile.NamedTemporaryFile() as temp:
with pytest.raises(error.SigMFValidationError):
test_sigmffile.archive(name=t.name)
test_sigmffile.archive(name=temp.name)
def test_name_wrong_extension_throws_fileerror(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
with tempfile.NamedTemporaryFile() as temp:
with pytest.raises(error.SigMFFileError):
test_sigmffile.archive(name=t.name + ".zip")
test_sigmffile.archive(name=temp.name + ".zip")
def test_fileobj_extension_ignored(test_sigmffile):
with tempfile.NamedTemporaryFile(suffix=".tar") as t:
test_sigmffile.archive(fileobj=t)
with tempfile.NamedTemporaryFile(suffix=".tar") as temp:
test_sigmffile.archive(fileobj=temp)
def test_name_used_in_fileobj(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_archive = test_sigmffile.archive(name="testarchive", fileobj=t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_archive = test_sigmffile.archive(name="testarchive", fileobj=temp)
sigmf_tarfile = tarfile.open(sigmf_archive, mode="r")
basedir, file1, file2 = sigmf_tarfile.getmembers()
assert basedir.name == "testarchive"
@ -61,26 +60,28 @@ def test_name_used_in_fileobj(test_sigmffile):
def test_fileobj_not_closed(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
test_sigmffile.archive(fileobj=t)
assert not t.file.closed
with tempfile.NamedTemporaryFile() as temp:
test_sigmffile.archive(fileobj=temp)
assert not temp.file.closed
def test_unwritable_fileobj_throws_fileerror(test_sigmffile):
with tempfile.NamedTemporaryFile(mode="rb") as t:
with tempfile.NamedTemporaryFile(mode="rb") as temp:
with pytest.raises(error.SigMFFileError):
test_sigmffile.archive(fileobj=t)
test_sigmffile.archive(fileobj=temp)
def test_unwritable_name_throws_fileerror(test_sigmffile):
unwritable_file = "/root/unwritable.sigmf" # assumes root is unwritable
# Cannot assume /root/ is unwritable (e.g. Docker environment)
# so use invalid filename
unwritable_file = '/bad_name/'
with pytest.raises(error.SigMFFileError):
test_sigmffile.archive(name=unwritable_file)
def test_tarfile_layout(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_tarfile = create_test_archive(test_sigmffile, t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
basedir, file1, file2 = sigmf_tarfile.getmembers()
assert tarfile.TarInfo.isdir(basedir)
assert tarfile.TarInfo.isfile(file1)
@ -88,11 +89,11 @@ def test_tarfile_layout(test_sigmffile):
def test_tarfile_names_and_extensions(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_tarfile = create_test_archive(test_sigmffile, t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
basedir, file1, file2 = sigmf_tarfile.getmembers()
archive_name = basedir.name
assert archive_name == path.split(t.name)[-1]
assert archive_name == path.split(temp.name)[-1]
file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT}
file1_name, file1_ext = path.splitext(file1.name)
@ -107,8 +108,8 @@ def test_tarfile_names_and_extensions(test_sigmffile):
def test_tarfile_persmissions(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_tarfile = create_test_archive(test_sigmffile, t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
basedir, file1, file2 = sigmf_tarfile.getmembers()
assert basedir.mode == 0o755
assert file1.mode == 0o644
@ -116,8 +117,8 @@ def test_tarfile_persmissions(test_sigmffile):
def test_contents(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_tarfile = create_test_archive(test_sigmffile, t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
basedir, file1, file2 = sigmf_tarfile.getmembers()
if file1.name.endswith(SIGMF_METADATA_EXT):
mdfile = file1
@ -133,12 +134,12 @@ def test_contents(test_sigmffile):
datfile_reader = sigmf_tarfile.extractfile(datfile)
# calling `fileno` on `tarfile.ExFileObject` throws error (?), but
# np.fromfile requires it, so we need this extra step
data = np.fromstring(datfile_reader.read(), dtype=np.float32)
data = np.frombuffer(datfile_reader.read(), dtype=np.float32)
assert np.array_equal(data, TEST_FLOAT32_DATA)
def test_tarfile_type(test_sigmffile):
with tempfile.NamedTemporaryFile() as t:
sigmf_tarfile = create_test_archive(test_sigmffile, t)
with tempfile.NamedTemporaryFile() as temp:
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
assert sigmf_tarfile.format == tarfile.PAX_FORMAT

22
tests/test_sigmffile.py

@ -52,20 +52,20 @@ def test_default_constructor():
def test_set_non_required_global_field():
f = SigMFFile()
f.set_global_field('this_is:not_in_the_schema', None)
sigf = SigMFFile()
sigf.set_global_field('this_is:not_in_the_schema', None)
def test_add_capture():
f = SigMFFile()
f.add_capture(start_index=0, metadata={})
sigf = SigMFFile()
sigf.add_capture(start_index=0, metadata={})
def test_add_annotation():
f = SigMFFile()
f.add_capture(start_index=0)
m = {"latitude": 40.0, "longitude": -105.0}
f.add_annotation(start_index=0, length=128, metadata=m)
sigf = SigMFFile()
sigf.add_capture(start_index=0)
meta = {"latitude": 40.0, "longitude": -105.0}
sigf.add_annotation(start_index=0, length=128, metadata=meta)
def test_fromarchive(test_sigmffile):
@ -84,6 +84,6 @@ def test_fromarchive(test_sigmffile):
def test_add_multiple_captures_and_annotations():
f = SigMFFile()
for i in range(3):
simulate_capture(f, i, 1024)
sigf = SigMFFile()
for idx in range(3):
simulate_capture(sigf, idx, 1024)

11
tests/test_validation.py

@ -18,18 +18,15 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import pytest
from sigmf.error import SigMFValidationError
from sigmf.sigmffile import SigMFFile
from sigmf import __version__
MD_VALID = """
{
"global": {
"core:datatype": "cf32",
"core:offset": 0,
"core:version": "0.0.1",
"core:version": "X.X.X",
"core:license": "CC0",
"core:date": "foo",
"core:url": "foo",
@ -59,6 +56,7 @@ MD_VALID = """
]
}
"""
MD_VALID = MD_VALID.replace("X.X.X", __version__)
MD_INVALID_SEQUENCE_CAP = """
{
@ -76,6 +74,7 @@ MD_INVALID_SEQUENCE_CAP = """
"annotations": [
{
"core:sample_start": 100000,
"core:sample_count": 120000,
"core:comment": "stuff"
}
]
@ -95,10 +94,12 @@ MD_INVALID_SEQUENCE_ANN = """
"annotations": [
{
"core:sample_start": 2,
"core:sample_count": 120000,
"core:comment": "stuff"
},
{
"core:sample_start": 1,
"core:sample_count": 120000,
"core:comment": "stuff"
}
]

6
tests/testdata.py

@ -22,7 +22,7 @@
import numpy as np
from sigmf import __version__
TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32)
@ -30,8 +30,8 @@ TEST_METADATA = {
'annotations': [{'core:sample_count': 16, 'core:sample_start': 0}],
'captures': [{'core:sample_start': 0}],
'global': {
'core:datatype': 'f32',
'core:datatype': 'rf32_le',
'core:sha512': 'f4984219b318894fa7144519185d1ae81ea721c6113243a52b51e444512a39d74cf41a4cec3c5d000bd7277cc71232c04d7a946717497e18619bdbe94bfeadd6',
'core:version': '0.0.1'
'core:version': __version__
}
}
Loading…
Cancel
Save