Source code for mbs.io

import os
import re
import io
import numpy as np
import datetime
import zipfile
import gzip
import warnings
from contextlib import contextmanager
from collections import OrderedDict
from collections.abc import Iterable

mbs_timestamp = lambda s: datetime.datetime.strptime(s.strip(), "%d/%m/%Y   %H:%M")
mbs_timestamp2 = lambda s: datetime.datetime.strptime(s.strip(), "%m/%d/%Y   %I:%M %p")
info_timestamp = lambda s: datetime.datetime.strptime(s.strip(), "%d/%m/%Y %H:%M:%S")
frame_unit = datetime.timedelta(seconds=0.001)
fname_re = re.compile(r'.*(?P<number>\d{5})_(?P<region>\d{5}).txt')
info_re = re.compile(r"^([^:]+):\s*([^(]+)\s*(\(([^)]+)\))?")


[docs] def is_mbs_filename(path): fname = os.path.basename(path) if fname_re.fullmatch(fname): return True return False
[docs] def mbs_boolean(s): if s.lower() == 'yes': return True elif s.lower() == 'no': return False raise ValueError
[docs] @contextmanager def load(fname, zip_fname=None, mode='rt'): """Decorator that opens regular files, gzipped files and files contained within zip folders (e.g. archived measurements)""" if zip_fname is None: if fname.endswith('.gz'): open_func = gzip.open else: open_func = open with open_func(fname, mode) as f: yield f else: with zipfile.ZipFile(zip_fname) as zip_f: try: fname = zip_f.getinfo(fname) except KeyError: raise IOError(f'{zip_fname} does not contain {fname}') with zip_f.open(fname, ''.join(m for m in mode if m in ('r', 'w'))) as f: if 'b' in mode: yield f else: # r -> rt as normal open() with io.TextIOWrapper(f) as f: yield f
# from https://stackoverflow.com/a/2437645/
[docs] class LimitedSizeDict(OrderedDict): """Measurement cache""" # todo: separate caches and sizes for metadata and data def __init__(self, *args, **kwds): self.size_limit = kwds.pop("size_limit", None) OrderedDict.__init__(self, *args, **kwds) self._check_size_limit() def __setitem__(self, key, value): OrderedDict.__setitem__(self, key, value) self._check_size_limit() def _check_size_limit(self): if self.size_limit is not None: while len(self) > self.size_limit: key = self.popitem(last=False)[0] print(f'INFO: Retiring spectrum <{key}> from IO cache')
[docs] def parse_lines(lines, metadata_only=False): data_flag = False data = [] metadata = OrderedDict() for line in lines: if data_flag: data.append(list(map(float, line.split()))) elif line.startswith('DATA:'): if metadata_only: return metadata data_flag = True else: # fix for old files if line.startswith('TIMESTAMP:') and not line.startswith('TIMESTAMP:\t'): line = line.replace('TIMESTAMP:', 'TIMESTAMP:\t', 1) name, val = line.split('\t', 1) val = val.strip() if not name and not val: continue for T in (int, float, mbs_timestamp, mbs_timestamp2, mbs_boolean): try: val = T(val) break except Exception as e: continue if name in metadata and metadata[name] != val: warnings.warn( f'Duplicate field {name} in metadata, ' f'overwriting previous value {metadata[name]} with {val}') metadata[name] = val if metadata['NoS'] != len(data[0]): assert metadata['NoS'] == len(data[0]) - 1 or len(data[0]) == 2 # resolved or integrated mode e_scale = np.linspace(metadata["Start K.E."], metadata["End K.E."]-metadata['Step Size'], len(data)) assert np.allclose(e_scale, np.array(data)[:, 0]) return np.array(data, dtype='uint32')[:, 1:], metadata return np.array(data, dtype='uint32'), metadata
io_cache = LimitedSizeDict(size_limit=128)
[docs] def parse_data(fname, metadata_only=False, zip_fname=None): try: key = (fname, metadata_only, zip_fname) file = zip_fname or fname mtime = os.path.getmtime(file) rv, mtime_cached = io_cache[key] if mtime != mtime_cached: print('File {} changed on disk, reloading...'.format(file)) raise KeyError return rv except KeyError as ke: with load(fname, zip_fname) as f: rv = parse_lines(f, metadata_only=metadata_only) io_cache[key] = (rv, mtime) return rv
[docs] def parse_info(fname, zip_fname=None): with load(fname, zip_fname) as f: info = OrderedDict() for line in f: line = info_re.match(line) quantity, value, unit = line.group(1, 2, 4) for T in [int, float, info_timestamp]: try: value = T(value) break except ValueError: continue info[quantity] = (value, unit) return info
[docs] class MBSFilePathGenerator(object): def __init__(self, prefix, directory=None, zip_fname=None): self.prefix = prefix self.directory = directory or "" self.zip_fname = zip_fname def __call__(self, number, region=None): if isinstance(number, Iterable): return [self(n, region) for n in number] if isinstance(region, Iterable): return [self(number, r) for r in region] if region is None: num_re = re.compile(r'{}{:05d}_\d{{5}}.txt'.format(self.prefix, number)) paths = list(filter(lambda x: num_re.fullmatch(x), os.listdir(self.directory or '.'))) if self.directory: paths = [os.path.join(self.directory, p) for p in paths] if not paths: raise Exception('No files found for {}{:05d}_#####.txt'.format(self.prefix, number)) elif len(paths) == 1: return paths[0] return paths fname = "{}{:05d}_{:05d}.txt".format(self.prefix, number, region) return os.path.join(self.directory, fname)