Python numpy 模块-compress() 实例源码

Python numpy 模块,compress() 实例源码

我们从Python开源项目中,提取了以下49代码示例,用于说明如何使用numpy.compress()

项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_compress(self):
        tgt = [[5, 6, 7, 8, 9]]
        arr = np.arange(10).reshape(2, 5)
        out = arr.compress([0, 1], axis=0)
        assert_equal(out, tgt)

        tgt = [[1, 3], [6, 8]]
        out = arr.compress([0, 1, 0, 0], axis=1)
        assert_equal(out, tgt)

        tgt = [[1], [6]]
        arr = np.arange(10).reshape(2, tgt)

        arr = np.arange(10).reshape(2, 1])
        assert_equal(out, 1)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_compress(self):
        tgt = [[5, 1)
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def get_cloud_colors(data):
        """ Get colors from the cloud """
        dtype = np.dtype('float32')
        dtype = dtype.newbyteorder('<')
        buf = np.frombuffer(data.data, dtype)
        buf = np.resize(buf, (data.width * data.height, 8))
        buf = np.compress([True, True, False,
                           False], buf, axis=1)
        cond = np.isnan(buf).any(1)
        buf[cond] = [0.0, 0.0, 0.0]
        buf = np.compress([False, True], axis=1)
        nstr = buf.tostring()
        rgb = np.fromstring(nstr, dtype='uint8')
        rgb.resize((data.height * data.width), 4)
        rgb = np.compress([True, False], rgb, axis=1)
        return np.array([rgb])
项目:pysynphot    作者:spacetelescope    | 项目源码 | 文件源码
def _getWavesetIntersection(self):
        minw = refs._default_waveset[0]
        maxw = refs._default_waveset[-1]

        for component in self.components[1:]:
            if component.emissivity != None:
                wave = component.emissivity.GetWaveSet()

                minw = max(minw, wave[0])
                maxw = min(maxw, wave[-1])

        result = self._mergeEmissivityWavesets()

        result = N.compress(result > minw, result)
        result = N.compress(result < maxw, result)

        # intersection with vega spectrum (why???)
        vegasp = spectrum.TabularsourceSpectrum(locations.VegaFile)
        vegaws = vegasp.GetWaveSet()
        result = N.compress(result > vegaws[0], result)
        result = N.compress(result < vegaws[-1], result)

        return result
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def test_compress(self):
        tgt = [[5, 1)
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_compress(self):
        tgt = [[5, 1)
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def _min_or_max_axis(X, axis, min_or_max):
        N = X.shape[axis]
        if N == 0:
            raise ValueError("zero-size array to reduction operation")
        M = X.shape[1 - axis]
        mat = X.tocsc() if axis == 0 else X.tocsr()
        mat.sum_duplicates()
        major_index, value = _minor_reduce(mat, min_or_max)
        not_full = np.diff(mat.indptr)[major_index] < N
        value[not_full] = min_or_max(value[not_full], 0)
        mask = value != 0
        major_index = np.compress(mask, major_index)
        value = np.compress(mask, value)

        from scipy.sparse import coo_matrix
        if axis == 0:
            res = coo_matrix((value, (np.zeros(len(value)), major_index)),
                             dtype=X.dtype, shape=(1, M))
        else:
            res = coo_matrix((value, (major_index, np.zeros(len(value)))), shape=(M, 1))
        return res.A.ravel()
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def test_compress(self):
        tgt = [[5, 1)
项目:supremm    作者:ubccr    | 项目源码 | 文件源码
def computejobcpus(self):
        """ stats for the cores on the nodes that were assigend to the job (if available) """

        proc = self._job.getdata('proc')

        if proc == None:
            return {"error": ProcessingError.cpuSET_UNKNowN}, {"error": ProcessingError.cpuSET_UNKNowN}

        cpusallowed = self._job.getdata('proc')['cpusallowed']

        ratios = numpy.empty((self._ncpumetrics, self._totalcores), numpy.double)

        coreindex = 0
        for host, last in self._last.iteritems():
            elapsed = last - self._first[host]
            if host in cpusallowed and 'error' not in cpusallowed[host]:
                elapsed = elapsed[:, cpusallowed[host]]
            else:
                return {"error": ProcessingError.cpuSET_UNKNowN}, {"error": ProcessingError.cpuSET_UNKNowN}

            coresperhost = len(elapsed[0, :])
            ratios[:, coreindex:(coreindex+coresperhost)] = 1.0 * elapsed / numpy.sum(elapsed, 0)
            coreindex += coresperhost

        allowedcores = numpy.array(ratios[:, :coreindex])

        results = {}
        for i, name in enumerate(self._outnames):
            results[name] = calculate_stats(allowedcores[i, :])

        results['all'] = {"cnt": coreindex}

        effective = numpy.compress(allowedcores[1, :] < 0.95, allowedcores , axis=1)
        effectiveresults = {
            'all': len(effective[i, :])
        }
        if effectiveresults['all'] > 0:
            for i, name in enumerate(self._outnames):
                effectiveresults[name] = calculate_stats(effective[i, :])

        return results, effectiveresults
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def break_info(self, range=None):
        """
        Return break information for the axis

        The range,major breaks & minor_breaks are
        in transformed space. The labels for the major
        breaks depict data space values.
        """
        if range is None:
            range = self.dimension()

        major = self.get_breaks(range)
        if major is None or len(major) == 0:
            major = minor = labels = np.array([])
        else:
            major = major.compress(np.isfinite(major))
            minor = self.get_minor_breaks(major, range)

        major = major.compress(
            (range[0] <= major) & (major <= range[1]))
        labels = self.get_labels(major)

        return {'range': range,
                'labels': labels,
                'major': major,
                'minor': minor}
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_compress(self):
        arr = [[0, 2, 3, 4],
               [5, 9]]
        tgt = [[5, 9]]
        out = np.compress([0, arr, tgt)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_axis(self):
        tgt = [[5, 5)
        out = np.compress([0, 8]]
        out = np.compress([0, tgt)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_truncate(self):
        tgt = [[1], tgt)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_flatten(self):
        arr = np.arange(10).reshape(2, arr)
        assert_equal(out, 1)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def whiskers(self, whis: float = 1.5) -> t.Tuple[float, float]:
        """
        Calculates the upper and the lower whisker for a Boxplot.
        I.e. the minimum and the maximum value of the data set
        the lie in the range (Q1 - whis * iqr,Q3 + whis * iqr).
        iqr being the interquartil distance,Q1 the lower and Q2 the upper quartile.

        Adapted from http://stackoverflow.com/a/20096945
        """
        q1, q2, q3 = self.quartiles()
        iqr = self.iqr()
        hi_val = q1 + whis * self.iqr()
        whisk_hi = np.compress(self.array <= hi_val, self.array)
        if len(whisk_hi) == 0 or np.max(whisk_hi) < q3:
            whisk_hi = q3
        else:
            whisk_hi = max(whisk_hi)

        # get low extreme
        lo_val = q1 - whis * iqr
        whisk_lo = np.compress(self.array >= lo_val, self.array)
        if len(whisk_lo) == 0 or np.min(whisk_lo) > q1:
            whisk_lo = q1
        else:
            whisk_lo = min(whisk_lo)
        return whisk_lo, whisk_hi
项目:PySCUBA    作者:GGiecold    | 项目源码 | 文件源码
def PCR_preprocess(file_path, log_mode = False, pseudotime_mode = False, 
                   pcv_method = 'Rprincurve', anchor_gene = None,
                   exclude_marker_names = None):

    low_gene_fraction_max = 0.8

    data_tag, output_directory = create_output_directory(file_path)

    cell_IDs, cell_stages, data = get_PCR_or_RNASeq_data(file_path, pseudotime_mode)

    with open(file_path, 'r') as f:
        markers = np.loadtxt(f, dtype = str, delimiter = '\t', 
            skiprows = 1 if pseudotime_mode else 2, usecols = [0])
        markers.reshape(markers.size)

    if exclude_marker_names:
        indices = np.zeros(0, dtype = int)
        for name in exclude_marker_names:
            indices = np.append(indices, np.where(markers == name)[0])

        data = np.delete(data, indices, axis = 1)
        markers = np.delete(markers, indices)

    if pseudotime_mode:
        cell_stages = infer_pseudotime(data, output_directory, data_tag, pcv_method,
                                       anchor_gene, markers)

    condition = np.mean(data == 0, axis = 0) < low_gene_fraction_max
    data = np.compress(condition, data, 1)
    markers = np.compress(condition, markers)

    write_preprocessed_data(output_directory, cell_IDs, markers)

    return cell_IDs, markers, cell_stages.astype(float), output_directory
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_compress(self):
        arr = [[0, tgt)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_axis(self):
        tgt = [[5, tgt)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_truncate(self):
        tgt = [[1], tgt)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_flatten(self):
        arr = np.arange(10).reshape(2, 1)
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def get_cloud_data(data):
        """ Get the data out of a cloud as a numpy array """
        dtype = np.dtype('float32')
        dtype = dtype.newbyteorder('<')
        buf = np.frombuffer(data.data, 8))
        return np.compress([True,
                            False], axis=1)
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def get_cloud_image(self, data):
        """ Get an image from the cloud """
        dta = np.zeros((data.height, data.width), dtype="float32")

        dtype = np.dtype('float32')
        dtype = dtype.newbyteorder('<')
        buf = np.frombuffer(data.data,
                          buf, axis=1)
        buf = buf[~np.isnan(buf).any(1)]

        for point in buf:
            point[3] = 1.0
            src = np.asmatrix(point[:4])
            src = np.reshape(src, (4, 1))
            dst = np.dot(self.p_left, src)
            pnt_w = dst[2, 0]
            if pnt_w != 0:
                img_x = dst[0, 0] / pnt_w
                img_y = dst[1, 0] / pnt_w
                dta[img_y, img_x] = point[4]

        nstr = dta.tostring()
        img = np.fromstring(nstr, dtype='uint8')
        img.resize(data.height, data.width, 4)
        img = np.compress([True, img, axis=2)
        return img
项目:srcsim2017    作者:ZarjRobotics    | 项目源码 | 文件源码
def _find_door(self):
        """ Find the door,The most distant point in our cloud """
        cloud = self.fc.zarj.eyes.get_stereo_cloud()
        image, details = self.fc.zarj.eyes.get_cloud_image_with_details(cloud)

        # we only want the center of the image
        shape = image.shape
        print shape
        cloud = details[0:2*shape[0]/3, shape[1]/3:2*shape[1]/3]
        cloud = np.compress([False, cloud, axis=2)
        cloud = cloud.flatten()
        return np.nanmax(cloud)
项目:bonsu    作者:bonsudev    | 项目源码 | 文件源码
def log10(self, ind):
        data = np.compress(data[:, ind] > 0, 0)
        data[:, ind] = np.log10(data[:, ind])
        return data
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def _get_default_locs(self, vmin, vmax):
        "Returns the default locations of ticks."

        if self.plot_obj.date_axis_info is None:
            self.plot_obj.date_axis_info = self.finder(vmin, vmax, self.freq)

        locator = self.plot_obj.date_axis_info

        if self.isminor:
            return np.compress(locator['min'], locator['val'])
        return np.compress(locator['maj'], locator['val'])
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def _set_default_format(self, vmax):
        "Returns the default ticks spacing."

        if self.plot_obj.date_axis_info is None:
            self.plot_obj.date_axis_info = self.finder(vmin, self.freq)
        info = self.plot_obj.date_axis_info

        if self.isminor:
            format = np.compress(info['min'] & np.logical_not(info['maj']),
                                 info)
        else:
            format = np.compress(info['maj'], info)
        self.formatdict = dict([(x, f) for (x, _, f) in format])
        return self.formatdict
项目:aws-lambda-numpy    作者:vitolimandibhrata    | 项目源码 | 文件源码
def test_small_large(self):
        # test the small and large code paths,current cutoff 400 elements
        for s in [5, 20, 51, 200, 1000]:
            d = np.random.randn(4, s)
            # Randomly set some elements to NaN:
            w = np.random.randint(0, d.size, size=d.size // 5)
            d.ravel()[w] = np.nan
            d[:,0] = 1.  # ensure at least one good value
            # use normal median without nans to compare
            tgt = []
            for x in d:
                nonan = np.compress(~np.isnan(x), x)
                tgt.append(np.median(nonan, overwrite_input=True))

            assert_array_equal(np.nanmedian(d, axis=-1), tgt)
项目:pysynphot    作者:spacetelescope    | 项目源码 | 文件源码
def trimspectrum(sp, minw, maxw):
    """Create a new spectrum with trimmed upper and lower ranges.

    Parameters
    ----------
    sp : `SourceSpectrum`
        Spectrum to trim.

    minw,maxw : number
        Lower and upper limits (inclusive) for the wavelength set
        in the trimmed spectrum.

    Returns
    -------
    result : `TabularsourceSpectrum`
        Trimmed spectrum.

    """
    wave = sp.GetWaveSet()
    flux = sp(wave)

    new_wave = N.compress(wave >= minw, wave)
    new_flux = N.compress(wave >= minw, flux)

    new_wave = N.compress(new_wave <= maxw, new_wave)
    new_flux = N.compress(new_wave <= maxw, new_flux)

    result = TabularsourceSpectrum()

    result._wavetable = new_wave
    result._fluxtable = new_flux

    result.waveunits = units.Units(sp.waveunits.name)
    result.fluxunits = units.Units(sp.fluxunits.name)

    return result
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def test_compress(self):
        arr = [[0, tgt)
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def test_axis(self):
        tgt = [[5, tgt)
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def test_truncate(self):
        tgt = [[1], tgt)
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def test_flatten(self):
        arr = np.arange(10).reshape(2, 1)
项目:PyTangoArchiving    作者:tango-controls    | 项目源码 | 文件源码
def sort_array(arg0,arg1=None,decimate=True,as_index=False):
    """
    Args can be an (N,2) array or a tuple with 2 (times,values) arrays
    Takes two arrays of times and values of the same length and sorts the (time,value) 
    The decimate argument just removes repeated timestamps,not values
    """
    import numpy as np
    t0=time.time()
    #times = np.random.random_integers(N,size=(N,))
    #values = np.random.random_integers(3000,4000,))
    data = arg0 if arg1 is None else (arg0,arg1)
    if len(data)==2:
        times,values = data
        data = np.array((times,values)).T #Build a new array for sorting
    #Sort the array by row index (much faster than numpy.sort(order))
    time_index = get_col(np.argsort(data,0),0)
    if as_index:
        if not decimate:
            return index
        else:
            return np.compress(get_array_steps(get_col(data,0).take(time_index)),time_index,0)
    else:
        sdata = data.take(time_index,0)
        if decimate:
            sdata = np.compress(get_array_steps(get_col(sdata,0)),sdata,0)
        print time.time()-t0
        return sdata
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_compress(self):
        arr = [[0, tgt)
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_axis(self):
        tgt = [[5, tgt)
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_truncate(self):
        tgt = [[1], tgt)
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_flatten(self):
        arr = np.arange(10).reshape(2, 1)
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def test_small_large(self):
        # test the small and large code paths, tgt)
项目:Theano-Deep-learning    作者:GeekLiB    | 项目源码 | 文件源码
def compress(condition, x, axis=None):
    """
    Return selected slices of an array along given axis.

    It returns the input tensor,but with selected slices along a given axis
    retained. If no axis is provided,the tensor is flattened.
    Corresponds to numpy.compress

    .. versionadded:: 0.7

    Parameters
    ----------
    x
        Input data,tensor variable.
    condition
         1 dimensional array of non-zero and zero values
         corresponding to indices of slices along a selected axis.

    Returns
    -------
    object
        `x` with selected slices.

    """
    indices = theano.tensor.basic.flatnonzero(condition)
    return x.take(indices, axis=axis)
项目:Theano-Deep-learning    作者:GeekLiB    | 项目源码 | 文件源码
def test_op(self):
        for axis, cond, shape in zip(self.axis_list, self.cond_list,
                                     self.shape_list):
            cond_var = theano.tensor.ivector()
            data = numpy.random.random(size=shape).astype(theano.config.floatX)
            data_var = theano.tensor.matrix()

            f = theano.function([cond_var, data_var],
                                self.op(cond_var, data_var, axis=axis))

            expected = numpy.compress(cond, axis=axis)
            tested = f(cond, data)

            assert tested.shape == expected.shape
            assert numpy.allclose(tested, expected)
项目:BigbrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def subtr_cellmeans(workd,subjslots):
        """
   Subtract all cell means when within-subjects factors are present ...
   i.e.,calculate full-model using a D-variable.
   """
        # Get a list of all dims that are source and between-subj
        sourcedims = makelist(Bbetweens,Nfactors+1)

        # Now,fix this list by mapping the dims from the original source
        # to dims for a between-subjects variable (namely,subjslots)
        transidx = range(len(subjslots.shape))[1:] + [0] # put subj dim at end
        tsubjslots = N.transpose(subjslots,transidx) # get all Ss for this idx
        tworkd = N.transpose(workd) # swap subj. and variable dims
        errors = 1.0 * tworkd

        if len(sourcedims) == 0:
            idx = [-1]
            loopcap = [0]
        if len(sourcedims) <> 0:
            btwsourcedims = map(Bscols.index,sourcedims)
            idx = [0] * len(btwsourcedims)
            idx[0] = -1 # compensate for pre-increment of 1st slot in incr()

            # Get a list of the maximum values each factor can handle
            loopcap = N.take(N.array(Nlevels),sourcedims)-1

### WHILE STILL MORE GROUPS,CALculaTE GROUP MEAN FOR EACH D-VAR
        while incr(idx,loopcap) <> -1:  # loop through source btw level-combos
            mask = tsubjslots[idx]
            thisgroup = tworkd*mask[N.NewAxis,:]
            groupmns = amean(N.compress(mask,thisgroup),1)

### THEN SUBTRACT THEM FROM APPROPRIATE SUBJECTS
            errors = errors - N.multiply.outer(groupmns,mask)
        return errors
项目:BigbrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def atvar(a,limits=None,inclusive=(1,1)):
    """
   Returns the sample variance of values in an array,(i.e.,using N-1),
   ignoring values strictly outside the sequence passed to 'limits'.  
   Note: either limit in the sequence,or the value of limits itself,
   can be set to None.  The inclusive list/tuple determines whether the lower
   and upper limiting bounds (respectively) are open/exclusive (0) or
   closed/inclusive (1). ASSUMES A FLAT ARRAY (OR ELSE PREFLATTENS).

   Usage:   atvar(a,limits=None,inclusive=(1,1))
   """
    a = a.astype(N.float_)
    if limits == None or limits == [None,None]:
        return avar(a)
    assert type(limits) in [ListType,TupleType,N.ndarray], "Wrong type for limits in atvar"
    if inclusive[0]:    lowerfcn = N.greater_equal
    else:               lowerfcn = N.greater
    if inclusive[1]:    upperfcn = N.less_equal
    else:               upperfcn = N.less
    if limits[0] > N.maximum.reduce(N.ravel(a)) or limits[1] < N.minimum.reduce(N.ravel(a)):
        raise ValueError, "No array values within given limits (atvar)."
    elif limits[0]==None and limits[1]<>None:
        mask = upperfcn(a,limits[1])
    elif limits[0]<>None and limits[1]==None:
        mask = lowerfcn(a,limits[0])
    elif limits[0]<>None and limits[1]<>None:
        mask = lowerfcn(a,limits[0])*upperfcn(a,limits[1])

    a = N.compress(mask,a)  # squish out excluded values
    return avar(a)
项目:BigbrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def awilcoxont(x,y):
    """
    Calculates the Wilcoxon T-test for related samples and returns the
    result.  A non-parametric T-test.

    Usage:   awilcoxont(x,y)     where x,y are equal-length arrays for 2 conditions
    Returns: t-statistic,two-tailed p-value
    """
    if len(x) <> len(y):
        raise ValueError, 'Unequal N in awilcoxont.  Aborting.'
    d = x-y
    d = N.compress(N.not_equal(d,d) # Keep all non-zero differences
    count = len(d)
    absd = abs(d)
    absranked = arankdata(absd)
    r_plus = 0.0
    r_minus = 0.0
    for i in range(len(absd)):
        if d[i] < 0:
            r_minus = r_minus + absranked[i]
        else:
            r_plus = r_plus + absranked[i]
    wt = min(r_plus, r_minus)
    mn = count * (count+1) * 0.25
    se =  math.sqrt(count*(count+1)*(2.0*count+1.0)/24.0)
    z = math.fabs(wt-mn) / se
    z = math.fabs(wt-mn) / se
    prob = 2*(1.0 -zprob(abs(z)))
    return wt, prob
项目:decision-tree-id3    作者:svaante    | 项目源码 | 文件源码
def _build(self, tree, examples_idx, features_idx, depth=0):
        items, counts = unique(self.y[examples_idx])
        if (features_idx.size == 0
                or items.size == 1
                or examples_idx.size < self.min_samples_split
                or depth >= self.max_depth):
            node = self._class_node(items, counts)
            return node

        calc_record = self.splitter.calc(examples_idx, features_idx)

        if (calc_record is None
                or calc_record.info < self.min_entropy_decrease):
            node = self._class_node(items, counts)
            return node

        split_records = self.splitter.split(examples_idx, calc_record)

        features_idx = np.compress(calc_record.alive_features, features_idx)
        if not self.is_repeating:
            features_idx = np.delete(features_idx,
                                     np.where(features_idx ==
                                              calc_record.feature_idx))
        root = Node(calc_record.feature_idx,
                    is_feature=True,
                    details=calc_record,
                    item_count=(items, counts))
        for record in split_records:
            if record.size == 0:
                node = self._class_node(items, counts)
                root.add_child(node, record)
            else:
                root.add_child(self._build(tree, record.bag,
                               features_idx, depth+1),
                               record)
        return root
项目:mglex    作者:fungs    | 项目源码 | 文件源码
def assert_probmatrix_relaxed(mat):  # accepts matrices with all-nan rows (invalid training data for class etc.)
    mask = ~np.all(np.isnan(mat), axis=1, keepdims=False)
    mat = mat.compress(mask, axis=0)
    assert_probmatrix(mat)
项目:CHAID    作者:Rambatino    | 项目源码 | 文件源码
def best_cat_split(self, ind, dep):
        """ detrmine best categorical variable split """
        split = Split(None, None, 0)
        all_dep = np.unique(dep.arr)
        for i, ind_var in enumerate(ind):
            ind_var = ind_var.deep_copy()
            unique = np.unique(ind_var.arr)

            freq = {}
            if dep.weights is None:
                for col in unique:
                    counts = np.unique(np.compress(ind_var.arr == col, dep.arr), return_counts=True)
                    freq[col] = cl.defaultdict(int)
                    freq[col].update(np.transpose(counts))
            else:
                for col in unique:
                    counts = np.unique(np.compress(ind_var.arr == col, return_counts=True)
                    freq[col] = cl.defaultdict(int)
                    for dep_v in all_dep:
                        freq[col][dep_v] = dep.weights[(ind_var.arr == col) * (dep.arr == dep_v)].sum()

            if len(list(ind_var.possible_groupings())) == 0:
                split.invalid_reason = InvalidSplitReason.PURE_NODE

            choice, highest_p_join, split_chi, dof = None, None
            for comb in ind_var.all_combinations():
                freqs = [ sum( [ cl.Counter(freq[key]) for key in c ], cl.Counter()) for c in comb ]
                keys = set(sum([ list(f.keys()) for f in freqs ], []))

                n_ij = np.array(
                    [ [ col.get(k, 0) for k in keys ] for col in freqs ]
                )

                chi, p_split, dof = chisquare(n_ij, dep.weights is not None)

                if (choice is None or p_split < highest_p_join or (p_split == highest_p_join and chi > split_chi)) and (n_ij.sum(axis=1) >= self.min_child_node_size).all() and p_split < self.alpha_merge:
                    choice, split_chi = comb, chi

            temp_split = Split(i, choice, dof, split_name=ind_var.name)
            better_split = (not split.valid() or p_split < split.p or (p_split == split.p and chi > split.score)) and choice is not None
            if better_split: split, temp_split = temp_split, split

            if split.valid() and choice is not None:
                chi_threshold = self.split_threshold * split.score

                if temp_split.valid() and temp_split.score >= chi_threshold:
                    for sur in temp_split.surrogates:
                        if sur.column_id != i and sur.score >= chi_threshold:
                            split.surrogates.append(sur)

                    temp_split.surrogates = []
                    split.surrogates.append(temp_split)

                split.sub_split_values(ind[split.column_id].Metadata)

        return split
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def test_compress(self):
        arr = [[0, tgt)
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def test_axis(self):
        tgt = [[5, tgt)
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def test_truncate(self):
        tgt = [[1], tgt)

相关文章

Python setuptools.dep_util 模块,newer_pairwise_group() ...
Python chainer.utils.type_check 模块,eval() 实例源码 我...
Python chainer.utils.type_check 模块,prod() 实例源码 我...
Python chainer.utils.type_check 模块,expect() 实例源码 ...
Python multiprocessing.managers 模块,BaseProxy() 实例源...
Python multiprocessing.managers 模块,RemoteError() 实例...