Source code for edge_pydb

from ._astropy_init import __version__
from . import beam_sample, conversion, fitsextract, plotting, util
from astropy.table import Table as _Table
from astropy.table import join as _join


'''
Definition of the EdgeTable class.  An EdgeTable is a regular AstroPy table in 
either ECSV or HDF5 format.  For an HDF5 table, the Path must be specified, otherwise 
a list of available Paths is provided.
Keyword 'cols' allows a subset of available columns to be read in.
'''


[docs] class EdgeTable(_Table): """ Definition of the EdgeTable class. An EdgeTable is a regular Astropy table in either ECSV or HDF5 format. Parameters ---------- file : str Name of the file. This is generally NOT a full path, as the package maintains a list of available tables. The name should end with .csv or .hdf5. Use 'list' to provide a listing of available tables. path : str Name of the path within the file, for HDF5 files only. If not given, a list of available paths is output. cols : list of str A list of columns to read in. Other columns are discarded. """ def __init__(self, file='', path='', cols=None, data=None, masked=None, names=None, dtype=None, meta=None, copy=True, rows=None, copy_indices=True, **kwargs): super().__init__(masked=masked) if file: if file == 'list': print("Choose from the following files to read:") util.listfiles(printing=True) elif file.endswith('csv'): self.read(file) elif path: self.read(file, path) else: # no path specified with hdf5 file print('Paths in',file,':\n', util.getPath(file)) if cols: self.table.keep_columns(cols) # data = [] # for i in cols: # data.append(self.table[i]) # self.table = _Table(data=data) self.__dict__.update(self.table.__dict__) self.srcfile = file self.path = path self.joined = []
[docs] def read(self, file, path=''): if 'csv' in file: try: self.table = _Table.read(util.fetch(file), format='ascii.ecsv') except ValueError: self.table = _Table.read(util.fetch(file), format='ascii.csv') elif path: self.table = _Table.read(util.fetch(file), path=path) self.__dict__.update(self.table.__dict__)
[docs] def join(self, table, join_type='inner', keys=None): # check for ix or iy in both tables if keys is not None: for key in ['Name', 'ix', 'iy']: if key in self.colnames and key in table.colnames and key not in keys: keys.append(key) # keys = ['Name'] # join_keys = [i for i in keys] # if 'ix' in self.colnames and 'ix' in table.colnames: # join_keys.append('ix') # if 'iy' in self.colnames and 'iy' in table.colnames: # join_keys.append('iy') if isinstance(table, self.__class__): self.table = _join(self.table, table.table, join_type=join_type, keys=keys) self.joined.append((table.srcfile, join_type)) elif isinstance(table, _Table): self.table = _join(self.table, table, join_type=join_type, keys=keys) self.joined.append((table.srcfile, join_type)) else: raise Exception('cannot merge the two tables, \ the second table data type is neither EdgeTable nor astropy table.') # update the data self.__dict__.update(self.table.__dict__)