Python statistics 模块,median() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用statistics.median()。
def get_median_problems_solved_per_user(eligible=True, scoring=True, user_breakdown=None):
if user_breakdown is None:
user_breakdown = get_team_member_solve_stats(eligible)
solves = []
for tid, breakdown in user_breakdown.items():
for uid, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return statistics.median(solves)
def MEDIAN(df, n, price='Close'):
"""
Median (middle value) of data
"""
median_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
median = float('NaN')
else:
start = i + 1 - n
end = i + 1
median = statistics.median(df[price][start:end])
median_list.append(median)
i += 1
return median_list
def MEDIAN_LOW(df, price='Close'):
"""
Low median of data
"""
median_low_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
median_low = float('NaN')
else:
start = i + 1 - n
end = i + 1
median_low = statistics.median_low(df[price][start:end])
median_low_list.append(median_low)
i += 1
return median_low_list
def MEDIAN_HIGH(df, price='Close'):
"""
High median of data
"""
median_high_list = []
i = 0
while i < len(df[price]):
if i + 1 < n:
median_high = float('NaN')
else:
start = i + 1 - n
end = i + 1
median_high = statistics.median_high(df[price][start:end])
median_high_list.append(median_high)
i += 1
return median_high_list
def get_median_problems_solved_per_user(eligible=True, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return statistics.median(solves)
def get_median_problems_solved_per_user(eligible=True, ubreakdown in breakdown.items():
if ubreakdown is None:
solved = 0
else:
if 'correct' in ubreakdown:
solved = ubreakdown['correct']
else:
solved = 0
if solved > 0 or not scoring:
solves += [solved]
return statistics.median(solves)
def printWinSizeSummary(neighborTL):
'''Given a list where index is genes and the values are neighbor genes,calculate the size of this window in bp for each gene. Return the mean and standard deviation.'''
winL = []
for neighborT in neighborTL:
winL.append(calcWinSize(neighborT,geneNames,geneInfoD))
median = statistics.median(winL)
mean = statistics.mean(winL)
stdev = statistics.stdev(winL)
print(" median",round(median))
print(" mean",round(mean))
print(" stdev",round(stdev))
## mods for core stuff (requires changing functions,so we move them here)
def evaluate_and_update_max_score(self, t, episodes):
eval_stats = eval_performance(
self.env, self.agent, self.n_runs,
max_episode_len=self.max_episode_len, explorer=self.explorer,
logger=self.logger)
elapsed = time.time() - self.start_time
custom_values = tuple(tup[1] for tup in self.agent.get_statistics())
mean = eval_stats['mean']
values = (t,
episodes,
elapsed,
mean,
eval_stats['median'],
eval_stats['stdev'],
eval_stats['max'],
eval_stats['min']) + custom_values
record_stats(self.outdir, values)
if mean > self.max_score:
update_best_model(self.agent, self.outdir, self.max_score, mean,
logger=self.logger)
self.max_score = mean
return mean
def evaluate_and_update_max_score(self, episodes, env, agent):
eval_stats = eval_performance(
env, agent,
logger=self.logger)
elapsed = time.time() - self.start_time
custom_values = tuple(tup[1] for tup in agent.get_statistics())
mean = eval_stats['mean']
values = (t, values)
with self._max_score.get_lock():
if mean > self._max_score.value:
update_best_model(
agent, self._max_score.value,
logger=self.logger)
self._max_score.value = mean
return mean
def runPutTest(testDataPath, testDatarangeStart, testDatarangeEnd, f):
log.debug('running put tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDatarangeStart, testDatarangeEnd):
print(i)
thisPath = '%s/%i' % (testDataPath, i)
o = loadTestData(thisPath)
f.putObject(o, str(i))
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.warning('RESULT (PUT): total test runtime: %s seconds,mean per object: %s' % (
timeEnd - timeStart, ((timeEnd - timeStart) / testDatarangeEnd)))
log.critical('RESULT (PUT): median result: %s ' % statistics.median(calculatetimedeltas(times)))
log.critical('RESULT (PUT): standard deviation result: %s ' % statistics.stdev(calculatetimedeltas(times)))
log.critical('RESULT (PUT): mean result: %s ' % statistics.mean(calculatetimedeltas(times)))
# log.critical('RESULT (PUT): individual times: %s ' % (calculatetimedeltas(times)))
def runGetTest(testDataPath, f):
log.debug('running get tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDatarangeStart, testDatarangeEnd):
thisPath = '%s/%i' % (testDataPath, i)
o = f.getobject(str(i))
saveTestData(o, thisPath)
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.critical('RESULT (GET): total test runtime: %s seconds, ((timeEnd - timeStart) / testDatarangeEnd)))
log.critical('RESULT (GET): median result: %s ' % statistics.median(calculatetimedeltas(times)))
log.critical('RESULT (GET): standard deviation result: %s ' % statistics.stdev(calculatetimedeltas(times)))
log.critical('RESULT (GET): mean result: %s ' % statistics.mean(calculatetimedeltas(times)))
# log.critical('RESULT (GET): individual times: %s ' % (calculatetimedeltas(times)))
def runDeleteTest(testDatarangeStart, f):
log.debug('running delete tests...')
timeStart = time.perf_counter()
times = [time.perf_counter()]
for i in range(testDatarangeStart, testDatarangeEnd):
f.deleteObject(str(i))
times.append(time.perf_counter())
timeEnd = time.perf_counter()
log.critical('RESULT (DELETE): total test runtime: %s seconds, ((timeEnd - timeStart) / testDatarangeEnd)))
log.critical('RESULT (DELETE): median result: %s ' % statistics.median(calculatetimedeltas(times)))
log.critical('RESULT (DELETE): standard deviation result: %s ' % statistics.stdev(calculatetimedeltas(times)))
log.critical('RESULT (DELETE): mean result: %s ' % statistics.mean(calculatetimedeltas(times)))
# log.critical('RESULT (DELETE): individual times: %s ' % (calculatetimedeltas(times)))
###############################################################################
###############################################################################
def eval_performance(rom, p_func, n_runs):
assert n_runs > 1, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
env = ale.ALE(rom, treat_life_lost_as_terminal=False)
test_r = 0
while not env.is_terminal:
s = chainer.Variable(np.expand_dims(dqn_phi(env.state), 0))
pout = p_func(s)
a = pout.action_indices[0]
test_r += env.receive_action(a)
scores.append(test_r)
print('test_{}:'.format(i), test_r)
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, median, stdev
def eval_performance(process_idx, make_env, model, phi, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
model.reset_state()
env = make_env(process_idx, test=True)
obs = env.reset()
done = False
test_r = 0
while not done:
s = chainer.Variable(np.expand_dims(phi(obs), 0))
pout, _ = model.pi_and_v(s)
a = pout.action_indices[0]
obs, r, done, info = env.step(a)
test_r += r
scores.append(test_r)
print('test_{}:'.format(i), stdev
def math_stats_calculations(point_map):
point_array = []
for team in team_array:
point_array.append(point_map[team])
# Calculates mean
mean_val = str(round(statistics.mean(point_array), 2))
# Calculates median
median_val = str(round(statistics.median(point_array), 2))
# Calculates standard deviation
stdev_val = str(round(statistics.stdev(point_array), 2))
# Calculates variance
var_val = str(round(statistics.variance(point_array), 2))
return (mean_val,median_val,stdev_val,var_val)
# Calls my function
def test_odd_number_repeated(self):
# Test median.grouped with repeated median values.
data = [12, 13, 14, 15, 15]
assert len(data)%2 == 1
self.assertEqual(self.func(data), 14)
#---
data = [12, 13.875)
#---
data = [5, 10, 20, 25, 30]
assert len(data)%2 == 1
self.assertEqual(self.func(data, 5), 19.375)
#---
data = [16, 18, 22, 24, 26, 28]
assert len(data)%2 == 1
self.assertApproxEqual(self.func(data, 2), 20.66666667, tol=1e-8)
def test_even_number_repeated(self):
# Test median.grouped with repeated median values.
data = [5, 30]
assert len(data)%2 == 0
self.assertApproxEqual(self.func(data, 19.16666667, tol=1e-8)
#---
data = [2, 3, 4, 5]
assert len(data)%2 == 0
self.assertApproxEqual(self.func(data), 3.83333333, 5, 6, 6]
assert len(data)%2 == 0
self.assertEqual(self.func(data), 4.5)
#---
data = [3, 4.75)
def temp_stat(temps):
""" prints the average,median,std dev,and variance of temps """
import statistics
print(temps)
print("Mean: ", statistics.mean(temps))
print("Median: ", statistics.median(temps))
print("Standard deviation: ", statistics.stdev(temps))
print("Variance: ", statistics.variance(temps))
#%%
def temp_stat(temps):
""" computes the average, statistics.variance(temps))
try:
print("Mode: ", statistics.mode(temps))
except statistics.StatisticsError as e:
print("Mode error: ", e)
#%%
def test_odd_number_repeated(self):
# Test median.grouped with repeated median values.
data = [12, tol=1e-8)
def test_even_number_repeated(self):
# Test median.grouped with repeated median values.
data = [5, 4.75)
def test():
"""Tests the statistical functions.
Raises:
AssertionError if a test fails.
"""
testlist0 = [1, 2, 5]
testlist1 = [1, 6]
testlist2 = [2, 6]
testlist3 = [2, 7]
assert mean(testlist0) - 5 <= 1e-6, mean(testlist0)
assert mean(testlist1) - 3.5 <= 1e-6, mean(testlist1)
assert mean(testlist2) - 21 / 6 <= 1e-6, mean(testlist2)
assert mean(testlist3) - 29 / 7 <= 1e-6, mean(testlist3)
assert median(testlist0) == 3, median(testlist0)
assert median(testlist1) - 3.5 <= 1e-6, median(testlist1)
assert median(testlist2) - 3.5 <= 1e-6, median(testlist2)
assert median(testlist3) == 4, median(testlist3)
assert mode(testlist3) == 2, mode(testlist3)
def gsr_response(stream_id: uuid, start_time: datetime, end_time: datetime, label_attachment: str, label_off: str,
CC_obj: CerebralCortex, config: dict) -> str:
"""
This method analyzes galvanic skin response to label a window as improper attachment or sensor-off-body
:param stream_id: UUID
:param start_time:
:param end_time:
:param label_attachment:
:param label_off:
:param CC_obj:
:param config:
:return: string
"""
datapoints = CC_obj.get_datastream(stream_id, start_time=start_time, end_time=end_time, data_type=DataSet.COMPLETE)
vals = []
for dp in datapoints:
vals.append(dp.sample)
if stat.median(stat.array(vals)) < config["attachment_marker"]["improper_attachment"]:
return label_attachment
elif stat.median(stat.array(vals)) > config["attachment_marker"]["gsr_off_body"]:
return label_off
def outlier_detection(window_data: list) -> list:
"""
removes outliers from a list
This algorithm is modified version of Chauvenet's_criterion (https://en.wikipedia.org/wiki/Chauvenet's_criterion)
:param window_data:
:return:
"""
if not window_data:
raise ValueError("List is empty.")
vals = []
for dp in window_data:
vals.append(float(dp.sample))
median = stat.median(vals)
standard_deviation = stat.stdev(vals)
normal_values = list()
for val in window_data:
if (abs(float(val.sample)) - median) < standard_deviation:
normal_values.append(float(val.sample))
return normal_values
def graphRampTime(deltas, nocontribs, graphtitle, xtitle, filename):
data = [Histogram(x=deltas)]
layout = Layout(
title=graphtitle,
yaxis=dict(title='Number of contributors'),
xaxis=dict(title= xtitle +
'<br>Mean: ' + '{:.2f}'.format(statistics.mean(deltas)) + ' days,' +
'Median: ' + '{:.2f}'.format(statistics.median(deltas)) + ' days' +
'<br>Number of contributors who did this: ' +
'{:,g}'.format(len(deltas)) +
'<br>Percentage of contributors who did this: ' +
'{:.2f}'.format(len(deltas)/(len(deltas)+len(nocontribs))*100) + '%')
)
fig = figure(data=data, layout=layout)
return offline.plot(fig, show_link=False, include_plotlyjs=False, output_type='div')
# FIXME Maybe look for the word 'bot' in the user description?
def sync_gain_sliders(self):
conf = self.get_config_for_selected_device()
prefix = self.rx_tx_prefix
if prefix + "rf_gain" in conf:
key = prefix + "rf_gain"
gain = conf[key][int(median(range(len(conf[key]))))]
self.ui.spinBoxGain.setValue(gain)
self.ui.spinBoxGain.valueChanged.emit(gain)
if prefix + "if_gain" in conf:
key = prefix + "if_gain"
if_gain = conf[key][int(median(range(len(conf[key]))))]
self.ui.spinBoxIFGain.setValue(if_gain)
self.ui.spinBoxIFGain.valueChanged.emit(if_gain)
if prefix + "baseband_gain" in conf:
key = prefix + "baseband_gain"
baseband_gain = conf[key][int(median(range(len(conf[key]))))]
self.ui.spinBoxBasebandGain.setValue(baseband_gain)
self.ui.spinBoxBasebandGain.valueChanged.emit(baseband_gain)
def eval_performance(rom, treat_life_lost_as_terminal=False)
test_r = 0
while not env.is_terminal:
s = util.dqn_phi(env.state)
pout = p_func(s)
a = util.categorical_sample(pout)
test_r += env.receive_action(a)
scores.append(test_r)
print 'test_',i,':',test_r
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, stdev
def _print(self):
"""Print statistics and other informational text."""
mean = statistics.mean(self.prices)
median = statistics.median(self.prices)
stdev = statistics.stdev(self.prices)
high = mean + stdev
low = mean - stdev
print(dedent('''\
Sourced %d prices in %.3f seconds
Mean:\t$%.2f
Median:\t$%.2f
Hi/Lo:\t$%.2f/$%.2f
StDev:\t%.2f
''' % (len(self.prices), self.duration,
mean, high, low, stdev)))
def get_stats(self, metrics, lang=UNSPECIFIED_TRANSLATION, limit=100):
stats = super(NumField, self).get_stats(metrics, lang, limit)
stats.update({
'median': '*',
'mean': '*',
'mode': '*',
'stdev': '*'
})
try:
# require a non empty dataset
stats['mean'] = statistics.mean(self.flatten_dataset(metrics))
stats['median'] = statistics.median(self.flatten_dataset(metrics))
# requires at least 2 values in the dataset
stats['stdev'] = statistics.stdev(self.flatten_dataset(metrics),
xbar=stats['mean'])
# requires a non empty dataset and a unique mode
stats['mode'] = statistics.mode(self.flatten_dataset(metrics))
except statistics.StatisticsError:
pass
return stats
def getMedWeight(graph, node1, node2):
weights = []
for (x, weight) in graph[node1]:
if weight != 1.1:
weights.append(weight)
else:
weights.append(1)
for (x, weight) in graph[node2]:
if weight != 1.1:
weights.append(weight)
else:
weights.append(1)
if not weights:
return(0)
else:
return(statistics.median(weights))
def average(numbers, type='mean'):
import statistics
type = type.lower()
try:
statistics.mean(numbers)
except:
raise RuntimeError('An Error Has Occured: List Not Specified (0018)')
if type == 'mean':
return statistics.mean(numbers)
elif type == 'mode':
return statistics.mode(numbers)
elif type == 'median':
return statistics.median(numbers)
elif type == 'min':
return min(numbers)
elif type == 'max':
return max(numbers)
elif type == 'range':
return max(numbers) - min(numbers)
else:
raise RuntimeError('An Error Has Occured: You Entered An Invalid Operation (0003)')
# Throw A Runtime Error
def async_update(self):
"""Get the latest data and updates the states."""
if not self.is_binary:
try:
self.mean = round(statistics.mean(self.states), 2)
self.median = round(statistics.median(self.states), 2)
self.stdev = round(statistics.stdev(self.states), 2)
self.variance = round(statistics.variance(self.states), 2)
except statistics.StatisticsError as err:
_LOGGER.warning(err)
self.mean = self.median = STATE_UNKNowN
self.stdev = self.variance = STATE_UNKNowN
if self.states:
self.total = round(sum(self.states), 2)
self.min = min(self.states)
self.max = max(self.states)
else:
self.min = self.max = self.total = STATE_UNKNowN
def get_median_problems_solved(eligible=True, scoring=True):
teams = api.team.get_all_teams(show_ineligible=(not eligible))
return statistics.median([len(api.problem.get_solved_pids(tid=t['tid'])) for t in teams
if not scoring or len(api.problem.get_solved_pids(tid=t['tid'])) > 0])
def print_stat(msg, times_taken):
print('{}: mean {:.2f} secs,median {:.2f} secs,stdev {:.2f}'.format(
msg, mean(times_taken), median(times_taken), stdev(times_taken)
))
def get_median_problems_solved(eligible=True, scoring=True):
teams = api.team.get_all_teams(show_ineligible=(not eligible))
return statistics.median([len(api.problem.get_solved_pids(tid=t['tid'])) for t in teams
if not scoring or len(api.problem.get_solved_pids(tid=t['tid'])) > 0])
def get_median_problems_solved(eligible=True, scoring=True):
teams = api.team.get_all_teams(show_ineligible=(not eligible))
return statistics.median([len(api.problem.get_solved_pids(tid=t['tid'])) for t in teams
if not scoring or len(api.problem.get_solved_pids(tid=t['tid'])) > 0])
def getIslandPositions(familyL,geneInfoD,strainNum2StrD,grID,mrcaNum,strain):
'''Given a list of families (from a single island in a single strain),
return its chrom,start,end.
'''
chromL=[]
islandMin=float('inf')
islandMax=-float('inf')
geneMidpointL=[]
for fam,geneL in familyL:
for gene in geneL:
commonName,locusTag,descrip,chrom,start,end,strand=geneInfoD[gene]
chromL.append(chrom)
start = int(start)
end = int(end)
if start<islandMin:
islandMin=start
if end>islandMax:
islandMax=end
geneMidpointL.append(int((end-start)/2))
# sanity check: all entries in chromL should be same
if not all((c==chromL[0] for c in chromL)):
print("Genes in island","at mrca",strainNum2StrD[mrcaNum],"in strain",strain,"are not all on the same chromosome.",file=sys.stderr)
islandMedianMidpoint = statistics.median(geneMidpointL)
return chrom,islandMedianMidpoint,islandMin,islandMax
def eval_performance(env, n_runs, max_episode_len=None,
explorer=None, logger=None):
"""Run multiple evaluation episodes and return statistics.
Args:
env (Environment): Environment used for evaluation
agent (Agent): Agent to evaluate.
n_runs (int): Number of evaluation runs.
max_episode_len (int or None): If specified,episodes longer than this
value will be truncated.
explorer (Explorer): If specified,the given Explorer will be used for
selecting actions.
logger (Logger or None): If specified,the given Logger object will be
used for logging results. If not specified,the default logger of
this module will be used.
Returns:
Dict of statistics.
"""
scores = run_evaluation_episodes(
env,
max_episode_len=max_episode_len,
explorer=explorer,
logger=logger)
stats = dict(
mean=statistics.mean(scores),
median=statistics.median(scores),
stdev=statistics.stdev(scores) if n_runs >= 2 else 0.0,
max=np.max(scores),
min=np.min(scores))
return stats
def _avg_time_spent(history, value):
index = history[0]
avg_list = history[1]
if index >= 100:
index = 0
avg_list.insert(index, value)
index += 1
final_avg = statistics.median(avg_list)
return [index, avg_list], final_avg
def get_edge_Trade_size(self, side: OrderSide, order_type: OrderType, seconds_ago: int, edge_type: EdgeType,
group_by_period: Optional[int] = None) -> float:
qty = None
if edge_type == EdgeType.best:
qty = 0.
elif edge_type == EdgeType.mean:
qty = self.get_average_Trade_size(side, order_type, seconds_ago, group_by_period)
elif edge_type == EdgeType.median:
qty = self.get_median_Trade_size(side, group_by_period)
elif edge_type == EdgeType.custom:
qty = self.get_average_Trade_size(side, group_by_period)
if qty is not None:
qty = qty / 10.
return qty
def get_consensus_time(hg, x) -> datetime:
times = [dateutil.parser.parse(e.time) for e in get_events_for_consensus_time(hg, x)]
timestamps = [int(time.mktime(t.timetuple())) for t in times]
median_timestamp = int(median(timestamps)) if timestamps else 0
return datetime.fromtimestamp(median_timestamp)
def get_events_for_consensus_time(hg, x) -> Set[Event]:
"""
"set of each event z such that z is a self-ancestor of a round r unique famous witness,
and x is an ancestor of z but not of the self-parent of z"
:param hg: The hashgraph
:param x: The event for which we want to calculate the median timestamp
:return:
"""
result = set()
# For all famous round r witnesses
r = x.round_received
for witness_id in hg.witnesses[r].values():
witness = hg.lookup_table[witness_id]
if witness.is_famous != Fame.TRUE:
continue
# Go through the self ancestors
z = hg.lookup_table[witness.parents.self_parent]
if z.parents.self_parent is not None:
z_self_parent = hg.lookup_table[z.parents.self_parent]
while not event_can_see_event(hg, z, x) or event_can_see_event(hg, z_self_parent, x):
z = hg.lookup_table[z.parents.self_parent]
if z.parents.self_parent is None: # Special case for the first event - this is not described in the paper
break
else:
z_self_parent = hg.lookup_table[z.parents.self_parent]
result.add(z)
else: # Special case for the first event - this is not described in the paper
result.add(z)
return result
def update_stats(self, refresh=conf.STAT_REFRESH, med=median, count=conf.GRID[0] * conf.GRID[1]):
visits = []
seen_per_worker = []
after_spawns = []
speeds = []
for w in self.workers:
after_spawns.append(w.after_spawn)
seen_per_worker.append(w.total_seen)
visits.append(w.visits)
speeds.append(w.speed)
self.stats = (
'Seen per worker: min {},max {},med {:.0f}\n'
'Visits per worker: min {},med {:.0f}\n'
'Visit delay: min {:.1f},max {:.1f},med {:.1f}\n'
'Speed: min {:.1f},med {:.1f}\n'
'Extra accounts: {},CAPTCHAs needed: {}\n'
).format(
min(seen_per_worker), max(seen_per_worker), med(seen_per_worker),
min(visits), max(visits), med(visits),
min(after_spawns), max(after_spawns), med(after_spawns),
min(speeds), max(speeds), med(speeds),
self.extra_queue.qsize(), self.captcha_queue.qsize()
)
self.sighting_cache_size = len(SIGHTING_CACHE.store)
self.mystery_cache_size = len(MYSTERY_CACHE.store)
self.update_coroutines_count()
self.counts = (
'KNown spawns: {},unkNown: {},more: {}\n'
'{} workers,{} coroutines\n'
'sightings cache: {},mystery cache: {},DB queue: {}\n'
).format(
len(spawns), len(spawns.unkNown), spawns.cells_count,
count, self.coroutines_count,
len(SIGHTING_CACHE), len(MYSTERY_CACHE), len(db_proc)
)
LOOP.call_later(refresh, self.update_stats)
def format_stats(series, d, f=0):
mean = statistics.mean(series)
med = statistics.median(series)
std = statistics.stdev(series)
return f"x {mean:{d}.{f}f} ? {med:{d}.{f}f} ?² {std:{d}.{f}f}"