Python statistics 模块,pstdev() 实例源码
我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用statistics.pstdev()。
def PSTDEV(df, n, price='Close', mu=None):
"""
Population standard deviation of data
"""
pstdev_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
pstdev = float('NaN')
else:
start = i + 1 - n
end = i + 1
pstdev = statistics.pstdev(df[price][start:end], mu)
pstdev_list.append(pstdev)
i += 1
return pstdev_list
def diagnosticity(evaluations):
"""Return the diagnosticity of a piece of evidence given its evaluations against a set of hypotheses.
:param evaluations: an iterable of iterables of Eval for a piece of evidence
"""
# The "diagnosticity" needs to capture how well the evidence separates/distinguishes the hypotheses. If we don't
# show a preference between consistent/inconsistent,STDDEV captures this intuition OK. However,in the future,
# we may want to favor evidence for which hypotheses are inconsistent. Additionally,we may want to calculate
# "marginal diagnosticity" which takes into the rest of the evidence.
# (1) calculate the consensus for each hypothesis
# (2) map N/A to neutral because N/A doesn't help determine consistency of the evidence
# (3) calculate the population standard deviation of the evidence. It's more reasonable to consider the set of
# hypotheses at a given time to be the population of hypotheses than as a "sample" (although it doesn't matter
# much because we're comparing across hypothesis sets of the same size)
na_neutral = map(mean_na_neutral_Vote, evaluations) # pylint: disable=bad-builtin
try:
return statistics.pstdev(filter(None.__ne__, na_neutral)) # pylint: disable=bad-builtin
except statistics.StatisticsError:
return 0.0
def pstdev(self):
return statistics.pstdev(self.price)
# ?????
def calculator(test, arg, n_runs, only_result,workpath=os.getcwd()):
w = []
for x in range(n_runs):
if only_result:
process = subprocess.Popen(
arg,
cwd=workpath,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait()
else:
process = subprocess.Popen(arg, cwd=workpath, shell=True)
process.wait()
with open(os.path.join(workpath,"Benchmark.txt")) as f:
for line in f.readlines():
w.append(float(line))
click.secho(
"\nThe results of the benchmark are (all speeds in items/sec) : \n",
bold=True)
click.secho(
"\nTest = '{0}' Iterations = '{1}'\n".format(test, n_runs),
bold=True)
click.secho(
"\nMean : {0} Median : {1} Std Dev : {2}\n".format(
statistics.mean(w),
statistics.median(w),
statistics.pstdev(w)),
bold=True)
os.remove(os.path.join(workpath,"Benchmark.txt"))
def getstdev(l, mean):
"""
Get the mean of the values in the list,or NaN if there is none.
"""
if len(l) > 0:
return statistics.pstdev(l, mean)
else:
return float('nan')
def stdev(arr):
"""
Compute the standard deviation.
"""
if sys.version_info >= (3, 0):
import statistics
return statistics.pstdev(arr)
else:
# Dependency on NumPy
try:
import numpy
return numpy.std(arr, axis=0)
except ImportError:
return 0.
def main():
print(stats.mean(range(6)))
print(stats.median(range(6)))
print(stats.median_low(range(6)))
print(stats.median_high(range(6)))
print(stats.median_grouped(range(6)))
try:
print(stats.mode(range(6)))
except Exception as e:
print(e)
print(stats.mode(list(range(6)) + [3]))
print(stats.pstdev(list(range(6)) + [3]))
print(stats.stdev(list(range(6)) + [3]))
print(stats.pvariance(list(range(6)) + [3]))
print(stats.variance(list(range(6)) + [3]))
def __init__(self, dout_pin, pd_sck_pin, gain_channel_A=128, select_channel='A'):
if (isinstance(dout_pin, int) and
isinstance(pd_sck_pin, int)): # just chack of it is integer
self._pd_sck = pd_sck_pin # init pd_sck pin number
self._dout = dout_pin # init data pin number
else:
raise TypeError('dout_pin and pd_sck_pin have to be pin numbers.\nI have got dout_pin: '\
+ str(dout_pin) + \
' and pd_sck_pin: ' + str(pd_sck_pin) + '\n')
self._gain_channel_A = 0 # init to 0
self._offset_A_128 = 0 # init offset for channel A and gain 128
self._offset_A_64 = 0 # init offset for channel A and gain 64
self._offset_B = 0 # init offset for channel B
self._last_raw_data_A_128 = 0 # init last data to 0 for channel A and gain 128
self._last_raw_data_A_64 = 0 # init last data to 0 for channelA and gain 64
self._last_raw_data_B = 0 # init last data to 0 for channel B
self._wanted_channel = '' # init to empty string
self._current_channel = '' # init to empty string
self._scale_ratio_A_128 = 1 # init to 1
self._scale_ratio_A_64 = 1 # init to 1
self._scale_ratio_B = 1 # init to 1
self._debug_mode = False # init debug mode to False
self._pstdev_filter = True # pstdev filter is by default ON
GPIO.setmode(GPIO.BCM) # set GPIO pin mode to BCM numbering
GPIO.setup(self._pd_sck, GPIO.OUT) # pin _pd_sck is output only
GPIO.setup(self._dout, GPIO.IN) # pin _dout is input only
self.select_channel(select_channel) # call select channel function
self.set_gain_A(gain_channel_A) # init gain for channel A
############################################################
# select_channel function evaluates if the desired channel #
# is valid and then sets the _wanted_channel. #
# If returns True then OK #
# INPUTS: channel ('A'|'B') #
# OUTPUTS: BOOL #
############################################################
def get_current_channel(self):
return self._current_channel
############################################################
# get pstdev filter status returns True if turned on. #
# INPUTS: none #
# OUTPUTS: INT #
############################################################
def test_stdev(self):
s = Stat(self.data)
print(s)
assert s.stdev == statistics.pstdev(self.data)
def __init__(self, data):
self.mean = st.mean(data)
self.median = st.median(data)
self.stdev = st.pstdev(data)
def pstdev(text):
"""
Finds the population standard deviation of a space-separated list of numbers.
Example::
/pstdev 33 54 43 65 43 62
"""
return format_output(statistics.pstdev(parse_numeric_list(text)))
def setup():
commands.add(mean)
commands.add(median)
commands.add(median_low)
commands.add(median_high)
commands.add(median_grouped)
commands.add(mode)
commands.add(pstdev)
commands.add(pvariance)
commands.add(stdev)
commands.add(variance)
def discover_benches(publish_or_subscribe):
for f in os.listdir(os.curdir):
if not f.startswith(publish_or_subscribe + '-'):
continue
with open(f) as fi:
datalines = [l.split() for l in fi.readlines() if l[0].isdigit()]
rpss = [float(d[1]) for d in datalines]
rps_avg = s.mean(rpss)
rps_stddev = s.pstdev(rpss)
parts = f.split('-')
size = int(parts[-1])
yield ('-'.join(parts[1:-1]), size, rps_avg, rps_stddev)
def print_summary_statistics(stats):
from statistics import mean, median, pstdev
print("benchmark min mean sd median max")
for benchmark, samples in stats.items():
if samples:
metrics = [int(round(fn(samples), 0)) for fn in [min, mean, pstdev, max]]
else:
metrics = [0, 0, 0]
print(benchmark, *metrics)
def uploadresult(test, w):
commit = get_latest_commit('scrapy', 'scrapy')
data = {
'commitid': commit['html_url'].rsplit('/', 1)[-1],
'branch': 'default', # Always use default for trunk/master/tip
'project': 'Scrapy-Bench',
'executable': 'bench.py',
'benchmark': test,
'environment': get_env(),
'result_value': statistics.mean(w),
}
data.update({
'revision_date': current_date, # Optional. Default is taken either
# from VCS integration or from current date
'result_date': current_date, # Optional,default is current date
'std_dev': statistics.pstdev(w), # Optional. Default is blank
#'max': 4001.6,# Optional. Default is blank
#'min': 3995.1,# Optional. Default is blank
})
params = urllib.urlencode(data).encode("utf-8")
response = "None"
print("Saving result for executable %s,revision %s,benchmark %s" % (
data['executable'], data['commitid'], data['benchmark']))
f = urllib2.urlopen(CODESPEED_URL + 'result/add/', params)
response = f.read()
f.close()
print("Server (%s) response: %s\n" % (CODESPEED_URL, response))
def calculator(
test,
arg,
n_runs,
only_result,
upload_result=False,
workpath=os.getcwd()):
w = []
for x in range(n_runs):
if only_result:
process = subprocess.Popen(
arg, "Benchmark.txt")) as f:
for line in f.readlines():
w.append(float(line))
click.secho(
"\nThe results of the benchmark are (all speeds in items/sec) : \n",
bold=True)
if upload_result:
codespeedinfo.uploadresult(test, w)
os.remove(os.path.join(workpath, "Benchmark.txt"))
def get_raw_data_mean(self, times=1):
backup_channel = self._current_channel # do backup of current channel befor reading for later use
backup_gain = self._gain_channel_A # backup of gain channel A
if times > 0 and times < 100: # check if times is in required range
data_list = [] # create empty list
for i in range(times): # for number of times read and add up all readings.
data_list.append(self._read()) # append every read value to the list
if times > 2 and self._pstdev_filter: # if times is > 2 filter the data
data_pstdev = stat.pstdev(data_list) # calculate population standard deviation from the data
data_mean = stat.mean(data_list) # calculate mean from the collected data
max_num = data_mean + data_pstdev # calculate max number which is within pstdev
min_num = data_mean - data_pstdev # calculate min number which is within pstdev
filtered_data = [] # create new list for filtered data
if data_pstdev <=100: # is pstdev is less than 100 it is ok
self._save_last_raw_data(backup_channel, backup_gain, data_mean) # save last data
return data_mean # just return the calculated mean
for index,num in enumerate(data_list): # Now I kNow that pstdev is greater then iterate through the list
if (num > min_num and num < max_num): # check if the number is within pstdev
filtered_data.append(num) # then append to the filtered data list
if self._debug_mode:
print('data_list: ' + str(data_list))
print('filtered_data lsit: ' + str(filtered_data))
print('pstdev data: ' + str(data_pstdev))
print('pstdev filtered data: ' + str(stat.pstdev(filtered_data)))
print('mean data_list: ' + str(stat.mean(data_list)))
print('mean filtered_data: ' + str(stat.mean(filtered_data)))
f_data_mean = stat.mean(filtered_data) # calculate mean from filtered data
self._save_last_raw_data(backup_channel, f_data_mean) # save last data
return f_data_mean # return mean from filtered data
else:
data_mean = stat.mean(data_list) # calculate mean from the list
self._save_last_raw_data(backup_channel, data_mean) # save last data
return data_mean # times was 2 or less just return mean
else:
raise ValueError('function "get_raw_data_mean" parameter "times" has to be in range 1 up to 99.\n I have got: '\
+ str(times))
############################################################
# get_data_mean returns average value of readings minus #
# offset for the particular channel which was read. #
# If return False something is wrong. Try debug mode. #
# INPUTS: times # how many times to read data. Default 1 #
# OUTPUTS: INT | BOOL #
############################################################
def get_normalized_scores(entries, args):
"""Generate smoothed scores based on mean,stdev of that days times. """
times_by_date = defaultdict(dict)
for e in entries:
x = e.seconds
# if x > 0:
# x = np.log(x)
times_by_date[e.date][e.userid] = x
sorted_dates = sorted(times_by_date.keys())
# failures come with a heaver ranking penalty
MAX_score = 1.5
FAILURE_PENALTY = -2
def mk_score(mean, t, stdev):
if t < 0:
return FAILURE_PENALTY
if stdev == 0:
return 0
score = (mean - t) / stdev
return np.clip(score, -MAX_score, MAX_score)
# scores are the stdev away from mean of that day
scores = {}
for date, user_times in times_by_date.items():
times = [t for t in user_times.values() if t is not None]
# make failures 1 minute worse than the worst time
times = [t if t >= 0 else max(times) + 60 for t in times]
q1, q3 = np.percentile(times, [25,75])
stdev = statistics.pstdev(times)
o1, o3 = q1 - stdev, q3 + stdev
times = [t for t in times if o1 <= t <= o3]
mean = statistics.mean(times)
stdev = statistics.pstdev(times, mean)
scores[date] = {
userid: mk_score(mean, stdev)
for userid, t in user_times.items()
if t is not None
}
new_score_weight = 1 - args.smooth
running = defaultdict(list)
weighted_scores = defaultdict(dict)
for date in sorted_dates:
for user, score in scores[date].items():
old_score = running.get(user)
new_score = score * new_score_weight + old_score * (1 - new_score_weight) \
if old_score is not None else score
weighted_scores[user][date] = running[user] = new_score
ticker = matplotlib.ticker.MultipleLocator(base=0.25)
formatter = matplotlib.ticker.ScalarFormatter(uSEOffset=False)
return weighted_scores, ticker, formatter
def findConcentrationAux(tagClassification):
featList=vars(tagClassification['a1'][0])
del featList['tE']
del featList['tag']
del featList['bBox']
del featList['center']
feats2Rec=copy.deepcopy(featList)
del feats2Rec['Coord']
del feats2Rec['Style']
del feats2Rec['coG']
l1=[symbol for symbol in tagClassification if symbol[-1]=='1']
l2=[symbol for symbol in tagClassification if symbol[-1]=='2']
l3=[symbol for symbol in tagClassification if symbol[-1]=='3']
standevs={}
for fe in feats2Rec:
for llet in tagClassification:
for i in range(len(tagClassification[llet])):
new=True
for j in range(len(vars(tagClassification[llet][i])[fe])):
if math.isnan(vars(tagClassification[llet][i])[fe][j]) and new:
tagClassification[llet][i].computeFeatures()
new=False
for symbol in tagClassification:
for i in range(len(tagClassification[symbol])):
if tagClassification[symbol][i].Style=='diagonal':
tagClassification[symbol][i].Style=[[1,1]]
elif tagClassification[symbol][i].Style=='horizontal':
tagClassification[symbol][i].Style=[[1,2]]
elif tagClassification[symbol][i].Style=='vertical':
tagClassification[symbol][i].Style=[[2,1]]
elif tagClassification[symbol][i].Style=='closed':
tagClassification[symbol][i].Style=[[2,2]]
for feature in featList:
print feature
if feature=='Coord' or feature=='Style' or feature=='coG':
standevs[feature]=[math.sqrt((((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i][0] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i][0] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3))**2)+(((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i][1] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i][1] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3))**2)) for symbol in tagClassification]
else:
standevs[feature]=[((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3)) for symbol in tagClassification]
print standevs
if os.path.isfile('varStandarDevs.txt'):
os.remove('varStandarDevs.txt')
f = open('varStandarDevs.txt','wb')
pickle.dump(standevs,f)
f.close()
#PonderateByConcentration: Retorna la ponderacio de les features segons les desviacions estandar que carrega del sistema