Python pandas 模块,infer_freq() 实例源码
我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用pandas.infer_freq()。
def get_approximate_frequency(trace):
if trace.data is None:
logger.warn(
"Could not determine frequency:"
" {} is placeholder instance."
.format(trace)
)
return None
def _log_success(freq):
logger.debug(
"Determined frequency of '{}' for {}."
.format(freq, trace)
)
try:
freq = pd.infer_freq(trace.data.index)
except ValueError: # too few data points
logger.error("Could not determine frequency - too few points.")
return None
else:
if freq is not None:
_log_success(freq)
return freq
# freq is None - maybe because of a DST change (23/25 hours)?
# strategy: try two groups of 5 dates
for i in range(0, 9, 5):
try:
freq = pd.infer_freq(trace.data.index[i:i + 5])
except ValueError:
pass
else:
if freq is not None:
_log_success(freq)
return freq
logger.warning("Could not determine frequency - no dominant frequency.")
return None
def indexed_temperatures(self, index, unit, allow_mixed_frequency=False):
''' Return average temperatures over the given index.
Parameters
----------
index : pandas.DatetimeIndex
Index over which to supply average temperatures.
The :code:`index` should be given as either an hourly ('H') or
daily ('D') frequency.
unit : str,{"degF","degC"}
Target temperature unit for returned temperature series.
Returns
-------
temperatures : pandas.Series with DatetimeIndex
Average temperatures over series indexed by :code:`index`.
'''
if index.shape == (0,):
return pd.Series([], index=index, dtype=float)
self._verify_index_presence(index) # fetches weather data if needed
if index.freq is not None:
freq = index.freq
else:
try:
freq = pd.infer_freq(index)
except ValueError:
freq = None
if freq == 'D':
return self._daily_indexed_temperatures(index, unit)
elif freq == 'H':
return self._hourly_indexed_temperatures(index, unit)
elif allow_mixed_frequency:
return self._mixed_frequency_indexed_temperatures(index, unit)
else:
message = 'DatetimeIndex with unkNown frequency not supported.'
raise ValueError(message)
def test_constructor_from_series(self):
expected = DatetimeIndex([Timestamp('20110101'), Timestamp('20120101'),
Timestamp('20130101')])
s = Series([Timestamp('20110101'), Timestamp(
'20130101')])
result = Index(s)
self.assertTrue(result.equals(expected))
result = DatetimeIndex(s)
self.assertTrue(result.equals(expected))
# GH 6273
# create from a series,passing a freq
s = Series(pd.to_datetime(['1-1-1990', '2-1-1990', '3-1-1990',
'4-1-1990', '5-1-1990']))
result = DatetimeIndex(s, freq='MS')
expected = DatetimeIndex(
['1-1-1990', '4-1-1990', '5-1-1990'
], freq='MS')
self.assertTrue(result.equals(expected))
df = pd.DataFrame(np.random.rand(5, 3))
df['date'] = ['1-1-1990',
'5-1-1990']
result = DatetimeIndex(df['date'], freq='MS')
self.assertTrue(result.equals(expected))
self.assertEqual(df['date'].dtype, object)
exp = pd.Series(
['1-1-1990', name='date')
self.assert_series_equal(df['date'], exp)
# GH 6274
# infer freq of same
result = pd.infer_freq(df['date'])
self.assertEqual(result, 'MS')
def data_freq(time_series):
"""
Determine frequency of given time series
Args:
time_series (Series): Series with datetime index
Returns:
string: frequency specifier
"""
try:
freq = time_series.index.freq
return freq.freqstr or pd.infer_freq(time_series.index)
except AttributeError:
return pd.infer_freq(time_series.index)
def shift_dates(self,h):
""" Auxiliary function for creating dates for forecasts
Parameters
----------
h : int
How many steps to forecast
Returns
----------
A transformed date_index object
"""
date_index = copy.deepcopy(self.index)
date_index = date_index[self.max_lag:len(date_index)]
if self.is_pandas is True:
if isinstance(date_index, pd.core.indexes.datetimes.DatetimeIndex):
if pd.infer_freq(date_index) in ['H', 'M', 'S']:
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).seconds)
else: # Assume higher frequency (configured for days)
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).days)
elif isinstance(date_index, pd.core.indexes.numeric.Int64Index):
for i in range(h):
new_value = date_index.values[len(date_index.values)-1] + (date_index.values[len(date_index.values)-1] - date_index.values[len(date_index.values)-2])
date_index = pd.Int64Index(np.append(date_index.values,new_value))
else:
for t in range(h):
date_index.append(date_index[len(date_index)-1]+1)
return date_index
def read_knmi(fname, variables='RD'):
"""This method can be used to import KNMI data.
Parameters
----------
fname: str
Filename and path to a Dino file.
Returns
-------
ts: Pandas Series
returns a standard Pastas TimeSeries object or a list of it.
"""
knmi = KnmiStation.fromfile(fname)
if variables is None:
variables = knmi.variables.keys()
if type(variables) == str:
variables = [variables]
stn_codes = knmi.data['STN'].unique()
ts = []
for code in stn_codes:
for variable in variables:
if variable not in knmi.data.keys():
raise (ValueError(
"variable %s is not in this dataset. Please use one of "
"the following keys: %s" % (variable, knmi.data.keys())))
series = knmi.data.loc[knmi.data['STN'] == code, variable]
# get rid of the hours when data is daily
if pd.infer_freq(series.index) == 'D':
series.index = series.index.normalize()
Metadata = {}
if knmi.stations is not None and not knmi.stations.empty:
station = knmi.stations.loc[str(code), :]
Metadata['x'] = station.LON_east
Metadata['y'] = station.LAT_north
Metadata['z'] = station.ALT_m
Metadata['projection'] = 'epsg:4326'
stationname = station.NAME
else:
stationname = str(code)
Metadata['description'] = knmi.variables[variable]
if variable == 'RD' or variable == 'RH':
kind = 'prec'
elif variable == 'EV24':
kind = 'evap'
else:
kind = None
ts.append(TimeSeries(series, name=variable + stationname,
Metadata=Metadata, kind=kind))
if len(ts) == 1:
ts = ts[0]
return ts