Source code for jupyter_analysis_tools.binning

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Overview
========
1D rebinning.
Should take input file, read, rebin and write.
Rebins to log bins.
"""

# __author__ = "Brian R. Pauw"
# __contact__ = "brian@stack.nl"
# __license__ = "GPLv3+"
# __date__ = "2015/01/09"
# __status__ = "beta"

import argparse
import itertools
import os
import sys

import numpy as np
import pandas
from numpy import argsort, log10, reshape, shape, size, sqrt, zeros


[docs] def argparser(): parser = argparse.ArgumentParser( description=""" Re-binning function, reads three-column ASCII input files, and outputs re-binned three-column ASCII files""" ) # binning options parser.add_argument("-n", "--numBins", type=int, default=50, help="Number of bins to use") parser.add_argument( "-q", "--qMin", type=float, default=0.0, help="Minimum Q to clip from original data", ) parser.add_argument( "-Q", "--qMax", type=float, default=np.inf, help="Minimum Q to clip from original data", ) parser.add_argument( "-e", "--minE", type=float, default=0.01, help="Minimum error is at least this times intensity value.", ) parser.add_argument( "-s", "--scaling", type=str, action="store", default="logarithmic", help="q-axis scaling for binning, can be linear or logarithmic", ) # csv / datafile options parser.add_argument( "-d", "--delimiter", type=str, action="store", default=",", help="Delimiter in original file. '\\t' is tab. (with quotes)", ) parser.add_argument( "-H", "--headerLines", type=int, default=0, help="Number of header lines to skip", ) parser.add_argument( "-D", "--outputDelimiter", type=str, action="store", default=None, help="Delimiter in final file (defaults to input delimiter)", ) parser.add_argument( "-c", "--cleanEmpty", action="store_true", default=True, help="Removes empty bins before writing", ) parser.add_argument( "-i", "--iScale", type=float, default=1.0, help="Intensity (and error) scaled by this factor on output.", ) # program options parser.add_argument( "-v", "--verbose", action="store_true", help="Be verbose about the steps", ) parser.add_argument( "-t", "--test", action="store_true", help="Do not save output files, test run only", ) parser.add_argument( "-N", "--noBin", action="store_true", help="Do not bin, just input -> output (for translation and scaling)", ) parser.add_argument( "fnames", nargs="*", metavar="FILENAME", action="store", help="One or more data files to rebin", ) # show help if no files were provided, no arguments at all args = parser.parse_args() if len(args.fnames): return args parser.print_help(sys.stderr) sys.exit(1)
[docs] class reBin(object): """all kinds of binning-related functions""" # set defaults for file reading: pandasArgs = { "skipinitialspace": True, "skip_blank_lines": True, "engine": "python", "header": None, } # set defaults for kwargs, in case this is not called from command line: reBinArgs = { "delimiter": ";", "outputDelimiter": ";", "headerLines": 0, "fnames": "", "verbose": False, "qMin": -np.inf, "qMax": np.inf, "numBins": 100, "scaling": "logarithmic", "cleanEmpty": False, "minE": 0.01, "noBin": False, }
[docs] def __init__(self, **kwargs): # process defaults: for kw in self.reBinArgs: setattr(self, kw, self.reBinArgs[kw]) # process kwargs: if "verbose" in kwargs: self.verbose = kwargs.pop("verbose") for kw in kwargs: if self.verbose: print("Processing input argument {}: {}".format(kw, kwargs[kw])) setattr(self, kw, kwargs[kw]) # process delimiter options # decode no longer necessary in python 3 if sys.version_info <= (3, 0): self.delimiter = self.delimiter.decode("string-escape") if self.outputDelimiter is None: self.outputDelimiter = self.delimiter else: if sys.version_info <= (3, 0): self.outputDelimiter = self.outputDelimiter.decode("string-escape") self.pandasArgs.update({"delimiter": self.delimiter, "skiprows": self.headerLines}) # process files individually: for filename in self.fnames: self.readFile(filename) self.validate() self.defineBinEdges() self.binning1D() if self.cleanEmpty: # removes bins with no intensity or error self.cleanup() if not self.test: # generate output file name ofname = self.outputFilename(filename) # write binned data to file name self.writeFile(ofname)
def cleanup(self): # removes unwanted bin values # cannot use lists, because: # http://unspecified.wordpress.com/2009/02/12/thou-shalt-not-modify-a-list-during-iteration validi = True ^ np.isnan(self.IBin) validi[np.argwhere(self.binMask > 0)] = False self.QBin = self.QBin[validi] self.IBin = self.IBin[validi] self.EBin = self.EBin[validi] self.QEBin = self.QEBin[validi] if self.verbose: print("valid bins: {} of {}".format(validi.sum(), len(validi)))
[docs] def outputFilename(self, filename): """returns an output filename based on the input filename""" of = filename.strip() # split at extension ob, oe = of.rsplit(".", 1) # add rebin tag and reassemble ofname = "{}_reBin.{}".format(ob, oe) if self.verbose: print("output filename: {}".format(ofname)) return ofname
def readFile(self, filename): if self.verbose: print("reading file: {} with settings: {}".format(filename, self.pandasArgs)) dval = pandas.read_csv(filename, **self.pandasArgs).values assert isinstance(dval, np.ndarray) # no problems reading? assert size(dval, axis=1) >= 3 # Q, I and E can be extracted if self.verbose: print("data read: {}".format(dval)) self.Q = np.float32(dval[:, 0]) self.I = np.float32(dval[:, 1]) self.E = np.maximum(self.minE * self.I, np.float32(dval[:, 2])) numChanged = (self.minE * self.I > dval[:, 2]).sum() if self.verbose: print( "Minimum uncertainty set for {} out of {} ({} %) datapoints".format( numChanged, size(self.Q), 100.0 * numChanged / size(self.Q) ) ) # writer modified from imp2/modules/Write1D def writeFile(self, ofname, hstrs=None, append=False): sep = self.outputDelimiter # scale if necessary iterData = itertools.zip_longest( self.QBin, self.IBin * float(self.iScale), self.EBin * float(self.iScale), ) def writeLine(filename, line=None, append=True): if append: openarg = "a" else: openarg = "w" with open(filename, openarg) as fh: if isinstance(line, str): fh.write(line) else: # iterable object containing multiple lines fh.writelines(line) # truncate file if exists (i.e. discard) if os.path.exists(ofname) and (not append): os.remove(ofname) # write header and data: if hstrs is not None: writeLine(ofname, hstrs) # store in file moreData = True while moreData: try: # generate formatted datastring containing column data wstr = sep.join(["{}".format(k) for k in next(iterData)]) + "\n" except StopIteration: # end of data reached moreData = False break writeLine(ofname, wstr)
[docs] def validate(self): """Applies limits to the data""" mask = zeros(shape(self.Q), dtype="bool") # appy integration limits: iind = np.array(((self.Q < self.qMin) + (self.Q > self.qMax)), dtype=bool) mask[iind] = True # define binning limits (qmin, qmax) = ( np.abs(self.Q[True ^ mask]).min(), np.abs(self.Q[True ^ mask]).max(), ) self.iqMin = np.maximum(qmin, self.qMin) self.iqMax = np.minimum(qmax, self.qMax) self.Q = self.Q[True ^ mask] self.I = self.I[True ^ mask] self.E = self.E[True ^ mask] if self.verbose: print( "data Q-range: {}, integration Q-range: {}, masked: {} of {} ({}%)".format( (self.Q.min(), self.Q.max()), (self.iqMin, self.iqMax), mask.sum(), self.Q.size, mask.sum() / self.Q.size, ) )
[docs] def defineBinEdges(self): """defines binning edges""" # define bin edges if self.scaling.lower() in ("linear", "lin"): qEdges = np.linspace(self.iqMin, self.iqMax, self.numBins + 1) else: qEdges = np.logspace(log10(self.iqMin), log10(self.iqMax), self.numBins + 1) self.qEdges = qEdges if self.verbose: print("Bin edges used: {}".format(self.qEdges))
[docs] def binning1D(self, qError=None): """An unweighted binning routine. imp-version of binning, taking q-bin edges in which binning takes place, and calculates the mean q uncertainty in the bin as well from the relative Q uncertainties provided. The intensities are sorted across bins of equal size. If provided error is empty, the standard deviation of the intensities in the bins is computed. """ # no binning requested, just input -> output if self.noBin: self.QBin = self.Q.copy() self.IBin = self.I.copy() self.EBin = self.E.copy() self.QEBin = np.zeros(np.shape(self.I)) return # set values: q = self.Q.copy() intensity = self.I.copy() error = self.E.copy() numBins = self.numBins qEdges = self.qEdges # flatten q, intensity and error q = reshape(q, size(q)) intensity = reshape(intensity, size(intensity)) # sort q, let intensity and error follow sort sortInd = argsort(q, axis=None) q = q[sortInd] intensity = intensity[sortInd] # initialise storage: numBins = len(qEdges) - 1 ibin = zeros(numBins) qbin = zeros(numBins) sdbin = zeros(numBins) sebin = zeros(numBins) qebin = zeros(numBins) binMask = zeros(numBins) # set one for masked bin values if error is not None: error = reshape(error, size(error)) error = error[sortInd] if qError is not None: qError = reshape(qError, size(qError)) qError = qError[sortInd] # now we can fill the bins for bini in range(numBins): # limit ourselves to only the bits we're interested in: limMask = (q >= qEdges[bini]) & (q <= qEdges[bini + 1]) iToBin = intensity[limMask] # sum the intensities in one bin and normalize by number of pixels if limMask.sum() == 0: # no pixels in bin (ibin[bini], sebin[bini], qebin[bini], qbin[bini]) = ( None, None, None, None, ) binMask[bini] = 1 continue elif limMask.sum() == 1: ibin[bini] = iToBin.mean() qbin[bini] = q[limMask].mean() if error is not None: sebin[bini] = error[limMask] if qError is not None: qebin[bini] = qError[limMask] else: ibin[bini] = iToBin.mean() qbin[bini] = q[limMask].mean() if error is not None: sebin[bini] = np.sqrt((error[limMask] ** 2).sum()) / limMask.sum() # now we deal with the Errors: # calculate the standard deviation of the intensity in the bin # according to the definition of sample-standard deviation sdbin = iToBin.std(ddof=1) # what we want is to have the "standard error of the mean": sdbin = sdbin / sqrt(1.0 * np.size(iToBin)) # maximum between standard error and Poisson statistics sebin[bini] = np.maximum(sebin[bini], sdbin) # qebin is the mean error of the q-values in the bin, should # probably be superseded by the bin width qe = 0.0 if qError is not None: qe = np.sqrt((qError[limMask] ** 2).sum()) # SSTD of q in bin: qs = np.std(q[limMask], ddof=1) # sample standard deviation qebin[bini] = np.maximum(qe, qs) self.QBin = qbin.copy() self.IBin = ibin.copy() self.EBin = sebin.copy() self.QEBin = qebin.copy() self.binMask = binMask.copy() if self.verbose: print("qbin: {}".format(qbin)) print("ibin: {}".format(ibin)) print("sebin: {}".format(sebin)) print("qebin: {}".format(qebin)) print("binMask: {}".format(binMask))
if __name__ == "__main__": # process input arguments adict = argparser() # transmogrify into kwargs object adict = vars(adict) # run the reBin program reBin(**adict) # vim: set ts=4 sts=4 sw=4 tw=0: