Python pandas 模块,NaT() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pandas.NaT()。
def assert_timestamp_equal(left, right, compare_nat_equal=True, msg=""):
"""
Assert that two pandas Timestamp objects are the same.
Parameters
----------
left,right : pd.Timestamp
The values to compare.
compare_nat_equal : bool,optional
Whether to consider `NaT` values equal. Defaults to True.
msg : str,optional
A message to forward to `pd.util.testing.assert_equal`.
"""
if compare_nat_equal and left is pd.NaT and right is pd.NaT:
return
return pd.util.testing.assert_equal(left, msg=msg)
def to_series(tuples):
"""Transforms a list of tuples of the form (date,count) in to a pandas
series indexed by dt.
"""
cleaned_time_val_tuples = [tuple for tuple in tuples if not (
tuple[0] is pd.NaT or tuple[1] is None)]
if len(cleaned_time_val_tuples) > 0:
# change list of tuples ie [(a1,b1),(a2,b2),...] into
# tuple of lists ie ([a1,a2,...],[b1,b2,...])
unzipped_cleaned_time_values = zip(*cleaned_time_val_tuples)
# just being explicit about what these are
counts = unzipped_cleaned_time_values[1]
timestamps = unzipped_cleaned_time_values[0]
# Create the series with a sorted index.
ret_val = pd.Series(counts, index=timestamps).sort_index()
else:
ret_val = None
return ret_val
# In[ ]:
def last_date_in_output_for_sid(self, sid):
"""
Parameters:
-----------
sid : int
Asset identifier.
Returns:
--------
out : pd.Timestamp
The midnight of the last date written in to the output for the
given sid.
"""
sizes_path = "{0}/close/Meta/sizes".format(self.sidpath(sid))
if not os.path.exists(sizes_path):
return pd.NaT
with open(sizes_path, mode='r') as f:
sizes = f.read()
data = json.loads(sizes)
num_days = data['shape'][0] / self._minutes_per_day
if num_days == 0:
# empty container
return pd.NaT
return self._Trading_days[num_days - 1]
def df_type_to_str(i):
'''
Convert into simple datatypes from pandas/numpy types
'''
if isinstance(i, np.bool_):
return bool(i)
if isinstance(i, np.int_):
return int(i)
if isinstance(i, np.float):
if np.isnan(i):
return 'NaN'
elif np.isinf(i):
return str(i)
return float(i)
if isinstance(i, np.uint):
return int(i)
if type(i) == bytes:
return i.decode('UTF-8')
if isinstance(i, (tuple, list)):
return str(i)
if i is pd.NaT: # not identified as a float null
return 'NaN'
return str(i)
def _infer_fill_value(val):
"""
infer the fill value for the nan/NaT from the provided
scalar/ndarray/list-like if we are a NaT,return the correct dtyped
element to provide proper block construction
"""
if not is_list_like(val):
val = [val]
val = np.array(val, copy=False)
if is_datetimelike(val):
return np.array('NaT', dtype=val.dtype)
elif is_object_dtype(val.dtype):
dtype = lib.infer_dtype(_ensure_object(val))
if dtype in ['datetime', 'datetime64']:
return np.array('NaT', dtype=_NS_DTYPE)
elif dtype in ['timedelta', 'timedelta64']:
return np.array('NaT', dtype=_TD_DTYPE)
return np.nan
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcNow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'Now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
def test_pickle(self):
# GH4606
p = self.round_trip_pickle(NaT)
self.assertTrue(p is NaT)
idx = pd.to_datetime(['2013-01-01', NaT, '2014-01-06'])
idx_p = self.round_trip_pickle(idx)
self.assertTrue(idx_p[0] == idx[0])
self.assertTrue(idx_p[1] is NaT)
self.assertTrue(idx_p[2] == idx[2])
# GH11002
# don't infer freq
idx = date_range('1750-1-1', '2050-1-1', freq='7D')
idx_p = self.round_trip_pickle(idx)
tm.assert_index_equal(idx, idx_p)
def test_timestamp_equality(self):
# GH 11034
s = Series([Timestamp('2000-01-29 01:59:00'), 'NaT'])
result = s != s
assert_series_equal(result, Series([False, True]))
result = s != s[0]
assert_series_equal(result, True]))
result = s != s[1]
assert_series_equal(result, Series([True, True]))
result = s == s
assert_series_equal(result, False]))
result = s == s[0]
assert_series_equal(result, False]))
result = s == s[1]
assert_series_equal(result, False]))
def test_asobject_tolist(self):
idx = timedelta_range(start='1 days', periods=4, freq='D', name='idx')
expected_list = [timedelta('1 days'), timedelta('2 days'),
timedelta('3 days'), timedelta('4 days')]
expected = pd.Index(expected_list, dtype=object, name='idx')
result = idx.asobject
self.assertTrue(isinstance(result, Index))
self.assertEqual(result.dtype, object)
self.assertTrue(result.equals(expected))
self.assertEqual(result.name, expected.name)
self.assertEqual(idx.tolist(), expected_list)
idx = timedeltaIndex([timedelta(days=1), timedelta(days=2), pd.NaT,
timedelta(days=4)],
timedelta('4 days')]
expected = pd.Index(expected_list, Index))
self.assertEqual(result.dtype, expected_list)
def test_dti_tdi_numeric_ops(self):
# These are normally union/diff set-like ops
tdi = timedeltaIndex(['1 days', '2 days'], name='foo')
dti = date_range('20130101', periods=3, name='bar')
# Todo(wesm): unused?
# td = timedelta('1 days')
# dt = Timestamp('20130101')
result = tdi - tdi
expected = timedeltaIndex(['0 days', '0 days'], name='foo')
tm.assert_index_equal(result, expected)
result = tdi + tdi
expected = timedeltaIndex(['2 days', '4 days'], expected)
result = dti - tdi # name will be reset
expected = DatetimeIndex(['20121231', '20130101'])
tm.assert_index_equal(result, expected)
def test_nat_fields(self):
# GH 10050
ts = Timestamp('NaT')
self.assertTrue(np.isnan(ts.year))
self.assertTrue(np.isnan(ts.month))
self.assertTrue(np.isnan(ts.day))
self.assertTrue(np.isnan(ts.hour))
self.assertTrue(np.isnan(ts.minute))
self.assertTrue(np.isnan(ts.second))
self.assertTrue(np.isnan(ts.microsecond))
self.assertTrue(np.isnan(ts.nanosecond))
self.assertTrue(np.isnan(ts.dayofweek))
self.assertTrue(np.isnan(ts.quarter))
self.assertTrue(np.isnan(ts.dayofyear))
self.assertTrue(np.isnan(ts.week))
self.assertTrue(np.isnan(ts.daysinmonth))
self.assertTrue(np.isnan(ts.days_in_month))
def test_period_cons_nat(self):
p = Period('NaT', freq='M')
self.assertEqual(p.ordinal, tslib.iNaT)
self.assertEqual(p.freq, 'M')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
p = Period('nat', freq='W-SUN')
self.assertEqual(p.ordinal, 'W-SUN')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
p = Period(tslib.iNaT, freq='D')
self.assertEqual(p.ordinal, 'D')
self.assertEqual((p + 1).ordinal, freq='3D')
self.assertEqual(p.ordinal, offsets.Day(3))
self.assertEqual(p.freqstr, '3D')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
self.assertRaises(ValueError, Period, 'NaT')
def test_to_timestamp_pi_nat(self):
# GH 7228
index = Periodindex(['NaT', '2011-01', '2011-02'], freq='M',
name='idx')
result = index.to_timestamp('D')
expected = DatetimeIndex([pd.NaT, datetime(2011, 1, 1),
datetime(2011, 2, 1)], name='idx')
self.assertTrue(result.equals(expected))
self.assertEqual(result.name, 'idx')
result2 = result.to_period(freq='M')
self.assertTrue(result2.equals(index))
self.assertEqual(result2.name, 'idx')
result3 = result.to_period(freq='3M')
exp = Periodindex(['NaT', freq='3M', name='idx')
self.assert_index_equal(result3, exp)
self.assertEqual(result3.freqstr, '3M')
msg = ('Frequency must be positive,because it'
' represents span: -2A')
with tm.assertRaisesRegexp(ValueError, msg):
result.to_period(freq='-2A')
def test_to_timedelta_invalid(self):
# these will error
self.assertRaises(ValueError, lambda: to_timedelta([1, 2], unit='foo'))
self.assertRaises(ValueError, lambda: to_timedelta(1, unit='foo'))
# time not supported ATM
self.assertRaises(ValueError, lambda: to_timedelta(time(second=1)))
self.assertTrue(to_timedelta(
time(second=1), errors='coerce') is pd.NaT)
self.assertRaises(ValueError, lambda: to_timedelta(['foo', 'bar']))
tm.assert_index_equal(timedeltaIndex([pd.NaT, pd.NaT]),
to_timedelta(['foo', 'bar'], errors='coerce'))
tm.assert_index_equal(timedeltaIndex(['1 day', '1 min']),
to_timedelta(['1 day', 'bar', '1 min'],
errors='coerce'))
def test_apply_to_timedelta(self):
timedelta_NaT = pd.to_timedelta('NaT')
list_of_valid_strings = ['00:00:01', '00:00:02']
a = pd.to_timedelta(list_of_valid_strings)
b = Series(list_of_valid_strings).apply(pd.to_timedelta)
# Can't compare until apply on a Series gives the correct dtype
# assert_series_equal(a,b)
list_of_strings = ['00:00:01', np.nan, timedelta_NaT]
# Todo: unused?
a = pd.to_timedelta(list_of_strings) # noqa
b = Series(list_of_strings).apply(pd.to_timedelta) # noqa
# Can't compare until apply on a Series gives the correct dtype
# assert_series_equal(a,b)
def test_isin_nan(self):
tm.assert_numpy_array_equal(
Index(['a', np.nan]).isin([np.nan]), [False, True])
tm.assert_numpy_array_equal(
Index(['a', pd.NaT]).isin([pd.NaT]), np.nan]).isin([float('nan')]), False])
tm.assert_numpy_array_equal(
Index(['a', np.nan]).isin([pd.NaT]), False])
# Float64Index overrides isin,so must be checked separately
tm.assert_numpy_array_equal(
Float64Index([1.0, True])
tm.assert_numpy_array_equal(
Float64Index([1.0, True])
def test_fillna_period(self):
# GH 11343
idx = pd.Periodindex(
['2011-01-01 09:00', '2011-01-01 11:00'], freq='H')
exp = pd.Periodindex(
['2011-01-01 09:00', '2011-01-01 10:00', '2011-01-01 11:00'
], freq='H')
self.assert_index_equal(
idx.fillna(pd.Period('2011-01-01 10:00', freq='H')), exp)
exp = pd.Index([pd.Period('2011-01-01 09:00', freq='H'), 'x',
pd.Period('2011-01-01 11:00', freq='H')], dtype=object)
self.assert_index_equal(idx.fillna('x'), exp)
with tm.assertRaisesRegexp(
ValueError,
'Input has different freq=D from Periodindex\\(freq=H\\)'):
idx.fillna(pd.Period('2011-01-01', freq='D'))
def test_cummin_timedelta64(self):
s = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'1 min',
'3 min', ]))
expected = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'1 min', ]))
result = s.cummin(skipna=True)
self.assert_series_equal(expected, result)
expected = pd.Series(pd.to_timedelta(['NaT', ]))
result = s.cummin(skipna=False)
self.assert_series_equal(expected, result)
def test_ops_consistency_on_empty(self):
# GH 7869
# consistency on empty
# float
result = Series(dtype=float).sum()
self.assertEqual(result, 0)
result = Series(dtype=float).mean()
self.assertTrue(isnull(result))
result = Series(dtype=float).median()
self.assertTrue(isnull(result))
# timedelta64[ns]
result = Series(dtype='m8[ns]').sum()
self.assertEqual(result, timedelta(0))
result = Series(dtype='m8[ns]').mean()
self.assertTrue(result is pd.NaT)
result = Series(dtype='m8[ns]').median()
self.assertTrue(result is pd.NaT)
def test_datetime64_tz_dropna(self):
# DatetimeBlock
s = Series([Timestamp('2011-01-01 10:00'), Timestamp(
'2011-01-03 10:00'), pd.NaT])
result = s.dropna()
expected = Series([Timestamp('2011-01-01 10:00'),
Timestamp('2011-01-03 10:00')], index=[0, 2])
self.assert_series_equal(result, expected)
# DatetimeBlockTZ
idx = pd.DatetimeIndex(['2011-01-01 10:00',
'2011-01-03 10:00', pd.NaT],
tz='Asia/Tokyo')
s = pd.Series(idx)
self.assertEqual(s.dtype, 'datetime64[ns,Asia/Tokyo]')
result = s.dropna()
expected = Series([Timestamp('2011-01-01 10:00', tz='Asia/Tokyo'),
Timestamp('2011-01-03 10:00', tz='Asia/Tokyo')],
index=[0, 2])
self.assertEqual(result.dtype,Asia/Tokyo]')
self.assert_series_equal(result, expected)
def test_valid_dt_with_missing_values(self):
from datetime import date, time
# GH 8689
s = Series(date_range('20130101', periods=5, freq='D'))
s.iloc[2] = pd.NaT
for attr in ['microsecond', 'nanosecond', 'second', 'minute', 'hour',
'day']:
expected = getattr(s.dt, attr).copy()
expected.iloc[2] = np.nan
result = getattr(s.dt, attr)
tm.assert_series_equal(result, expected)
result = s.dt.date
expected = Series(
[date(2013, date(2013, 2), 4),
date(2013, 5)], dtype='object')
tm.assert_series_equal(result, expected)
result = s.dt.time
expected = Series(
[time(0), time(0), time(0)], expected)
def test_first_last_max_min_on_time_data(self):
# GH 10295
# Verify that NaT is not in the result of max,min,first and last on
# Dataframe with datetime or timedelta values.
from datetime import timedelta as td
df_test = DataFrame(
{'dt': [nan, '2015-07-24 10:10', '2015-07-25 11:11',
'2015-07-23 12:12', nan],
'td': [nan, td(days=1), td(days=2), td(days=3), nan]})
df_test.dt = pd.to_datetime(df_test.dt)
df_test['group'] = 'A'
df_ref = df_test[df_test.dt.notnull()]
grouped_test = df_test.groupby('group')
grouped_ref = df_ref.groupby('group')
assert_frame_equal(grouped_ref.max(), grouped_test.max())
assert_frame_equal(grouped_ref.min(), grouped_test.min())
assert_frame_equal(grouped_ref.first(), grouped_test.first())
assert_frame_equal(grouped_ref.last(), grouped_test.last())
def test_datetimeindex(self):
index = date_range('20130102', periods=6)
s = Series(1, index=index)
result = s.to_string()
self.assertTrue('2013-01-02' in result)
# nat in index
s2 = Series(2, index=[Timestamp('20130111'), NaT])
s = s2.append(s)
result = s.to_string()
self.assertTrue('NaT' in result)
# nat in summary
result = str(s2.index)
self.assertTrue('NaT' in result)
def test_timestamp_compare(self):
# make sure we can compare Timestamps on the right AND left hand side
# GH4982
df = DataFrame({'dates1': date_range('20010101', periods=10),
'dates2': date_range('20010102',
'intcol': np.random.randint(1000000000, size=10),
'floatcol': np.random.randn(10),
'stringcol': list(tm.rands(10))})
df.loc[np.random.rand(len(df)) > 0.5, 'dates2'] = pd.NaT
ops = {'gt': 'lt', 'lt': 'gt', 'ge': 'le', 'le': 'ge', 'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
# no nats
expected = left_f(df, Timestamp('20010109'))
result = right_f(Timestamp('20010109'), df)
assert_frame_equal(result, expected)
# nats
expected = left_f(df, Timestamp('nat'))
result = right_f(Timestamp('nat'), expected)
def test_v12_compat(self):
df = DataFrame(
[[1.56808523, 0.65727391, 1.81021139, -0.17251653],
[-0.2550111, -0.08072427, -0.03202878, -0.17581665],
[1.51493992, 0.11805825, 1.629455, -1.31506612],
[-0.02765498, 0.44679743, 0.33192641, -0.27885413],
[0.05951614, -2.69652057, 1.28163262, 0.34703478]],
columns=['A', 'B', 'C', 'D'],
index=pd.date_range('2000-01-03', '2000-01-07'))
df['date'] = pd.Timestamp('19920106 18:21:32.12')
df.ix[3, 'date'] = pd.Timestamp('20130101')
df['modified'] = df['date']
df.ix[1, 'modified'] = pd.NaT
v12_json = os.path.join(self.dirpath, 'tsframe_v012.json')
df_unser = pd.read_json(v12_json)
assert_frame_equal(df, df_unser)
df_iso = df.drop(['modified'], axis=1)
v12_iso_json = os.path.join(self.dirpath, 'tsframe_iso_v012.json')
df_unser_iso = pd.read_json(v12_iso_json)
assert_frame_equal(df_iso, df_unser_iso)
def test_date_format_frame(self):
df = self.tsframe.copy()
def test_w_date(date, date_unit=None):
df['date'] = Timestamp(date)
df.ix[1, 'date'] = pd.NaT
df.ix[5, 'date'] = pd.NaT
if date_unit:
json = df.to_json(date_format='iso', date_unit=date_unit)
else:
json = df.to_json(date_format='iso')
result = read_json(json)
assert_frame_equal(result, df)
test_w_date('20130101 20:43:42.123')
test_w_date('20130101 20:43:42', date_unit='s')
test_w_date('20130101 20:43:42.123', date_unit='ms')
test_w_date('20130101 20:43:42.123456', date_unit='us')
test_w_date('20130101 20:43:42.123456789', date_unit='ns')
self.assertRaises(ValueError, df.to_json, date_format='iso',
date_unit='foo')
def test_date_format_series(self):
def test_w_date(date, date_unit=None):
ts = Series(Timestamp(date), index=self.ts.index)
ts.ix[1] = pd.NaT
ts.ix[5] = pd.NaT
if date_unit:
json = ts.to_json(date_format='iso', date_unit=date_unit)
else:
json = ts.to_json(date_format='iso')
result = read_json(json, typ='series')
assert_series_equal(result, ts)
test_w_date('20130101 20:43:42.123')
test_w_date('20130101 20:43:42', date_unit='ns')
ts = Series(Timestamp('20130101 20:43:42.123'), index=self.ts.index)
self.assertRaises(ValueError, ts.to_json,
date_unit='foo')
def test_date_unit(self):
df = self.tsframe.copy()
df['date'] = Timestamp('20130101 20:43:42')
df.ix[1, 'date'] = Timestamp('19710101 20:43:42')
df.ix[2, 'date'] = Timestamp('21460101 20:43:42')
df.ix[4, 'date'] = pd.NaT
for unit in ('s', 'ms', 'us', 'ns'):
json = df.to_json(date_format='epoch', date_unit=unit)
# force date unit
result = read_json(json, date_unit=unit)
assert_frame_equal(result, df)
# detect date unit
result = read_json(json, date_unit=None)
assert_frame_equal(result, df)
def zipline_splits_and_dividends(symbol_map):
raw_splits, raw_dividends = load_splits_and_dividends()
splits = []
dividends = []
for sid, code in symbol_map.iteritems():
if code in raw_splits:
split = pd.DataFrame(data=raw_splits[code])
split['sid'] = sid
split.index = split['effective_date'] = pd.DatetimeIndex(split['effective_date'])
splits.append(split)
if code in raw_dividends:
dividend = pd.DataFrame(data = raw_dividends[code])
dividend['sid'] = sid
dividend['record_date'] = dividend['declared_date'] = dividend['pay_date'] = pd.NaT
dividend.index = dividend['ex_date'] = pd.DatetimeIndex(dividend['ex_date'])
dividends.append(dividend)
return splits, dividends
def _display_dimensions(self, dimensions, operations):
req_dimension_keys = [utils.slice_first(dimension)
for dimension in dimensions]
display_dims = OrderedDict()
for key in req_dimension_keys:
dimension = self.slicer.dimensions[key]
display_dim = {'label': dimension.label}
if hasattr(dimension, 'display_options'):
display_dim['display_options'] = {opt.key: opt.label
for opt in dimension.display_options}
display_dim['display_options'].update({pd.NaT: '', np.nan: ''})
if hasattr(dimension, 'display_field') and dimension.display_field:
display_dim['display_field'] = '%s_display' % dimension.key
display_dims[key] = display_dim
return display_dims
def test_categorical_dimension(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo'],
dimensions=['locale'],
)
self.assertDictEqual(
{
'metrics': {'foo': {'label': 'foo', 'axis': 0}},
'dimensions': {
'locale': {'label': 'Locale', 'display_options': {
'us': 'United States', 'de': 'Germany', np.nan: '', pd.NaT: ''
}},
},
'references': {},
},
display_schema
)
def test_multiple_metrics_and_dimensions(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo',
dimensions=[('date', DatetimeDimension.month), ('clicks', 50, 100), 'locale', 'account'],
)
self.assertDictEqual(
{
'metrics': {
'foo': {'label': 'foo', 'axis': 0},
'bar': {'label': 'FizBuz', 'axis': 1},
'dimensions': {
'date': {'label': 'date'},
'clicks': {'label': 'My Clicks'},
'locale': {'label': 'Locale',
'account': {'label': 'Account', 'display_field': 'account_display'},
display_schema
)
def _make_time(timearr):
"""Return a :class:`datetime.datetime` object for the array of characters.
Args:
timearr (:class:`numpy.ndarray`): An array of characters.
Returns:
:class:`datetime.datetime`: A datetime object.
"""
try:
return dt.datetime.strptime("".join(npbytes_to_str(timearr)),
"%Y-%m-%d_%H:%M:%s")
except ValueError:
return np.datetime64("NaT")
def test_date_breaks():
# cpython
x = [datetime(year, 1) for year in [2010, 2026, 2015]]
limits = min(x), max(x)
breaks = date_breaks('5 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(
years, [2010, 2015, 2020, 2025, 2030])
breaks = date_breaks('10 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(years, 2030])
# numpy
x = [np.datetime64(i*10, 'D') for i in range(1, 10)]
breaks = date_breaks('10 Years')
limits = min(x), max(x)
with pytest.raises(AttributeError):
breaks(limits)
# NaT
limits = np.datetime64('NaT'), datetime(2017, 1)
breaks = date_breaks('10 Years')
assert len(breaks(limits)) == 0
def automatic_events(self, timestamp):
"""
Update the current time of the Blotter,triggering all scheduled events
between prevIoUs clock time and new clock time such as interest
charges,margin charges,PnL calculations and PnL sweeps. See
create_events() for more information on the type of events.
Parameters
----------
timestamp: pandas.Timestamp
Time to update clock to and tigger internal events up until
"""
current_time = self._holdings.timestamp
# first event so there is nothing automatic that needs to be done
if current_time is pd.NaT:
return
actions = self._get_actions(current_time, timestamp, self._actions)
for ts, action in actions.iteritems():
events = self.create_events(ts, action)
self.dispatch_events(events)
def pad(self, sid, date):
"""
Fill sid container with empty data through the specified date.
e.g. if the date is two days after the last date in the sid's existing
output,2 x `minute_per_day` worth of zeros will be added to the
output.
Parameters:
-----------
sid : int
The asset identifier for the data being written.
date : datetime-like
The date used to calculate how many slots to be pad.
The padding is done through the date,i.e. after the padding is
done the `last_date_in_output_for_sid` will be equal to `date`
"""
table = self._ensure_ctable(sid)
last_date = self.last_date_in_output_for_sid(sid)
tds = self._Trading_days
if date <= last_date or date < tds[0]:
# No need to pad.
return
if last_date == pd.NaT:
# If there is no data,determine how many days to add so that
# desired days are written to the correct slots.
days_to_zerofill = tds[tds.slice_indexer(end=date)]
else:
days_to_zerofill = tds[tds.slice_indexer(
start=last_date + tds.freq,
end=date)]
self._zerofill(table, len(days_to_zerofill))
new_last_date = self.last_date_in_output_for_sid(sid)
assert new_last_date == date, "new_last_date={0} != date={1}".format(
new_last_date, date)
def __init__(self,
window,
items,
sids,
cap_multiple=2,
dtype=np.float64,
initial_dates=None):
self._pos = window
self._window = window
self.items = _ensure_index(items)
self.minor_axis = _ensure_index(sids)
self.cap_multiple = cap_multiple
self.dtype = dtype
if initial_dates is None:
self.date_buf = np.empty(self.cap, dtype='M8[ns]') * pd.NaT
elif len(initial_dates) != window:
raise ValueError('initial_dates must be of length window')
else:
self.date_buf = np.hstack(
(
initial_dates,
np.empty(
window * (cap_multiple - 1),
dtype='datetime64[ns]',
),
),
)
self.buffer = self._create_buffer()
def _update_dividends(self, asset_id, raw_data):
divs = raw_data.ex_dividend
df = pd.DataFrame({'amount': divs[divs != 0]})
df.index.name = 'ex_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
# we do not have this data in the WIKI dataset
df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT
self.dividends.append(df)
def last_date_in_output_for_sid(self, sid):
"""
Parameters
----------
sid : int
Asset identifier.
Returns
-------
out : pd.Timestamp
The midnight of the last date written in to the output for the
given sid.
"""
sizes_path = "{0}/close/Meta/sizes".format(self.sidpath(sid))
if not os.path.exists(sizes_path):
return pd.NaT
with open(sizes_path, mode='r') as f:
sizes = f.read()
data = json.loads(sizes)
# use integer division so that the result is an int
# for pandas index later https://github.com/pandas-dev/pandas/blob/master/pandas/tseries/base.py#L247 # noqa
num_days = data['shape'][0] // self._minutes_per_day
if num_days == 0:
# empty container
return pd.NaT
return self._session_labels[num_days - 1]
def get_last_Traded_dt(self, asset, dt):
"""
Get the latest minute on or before ``dt`` in which ``asset`` Traded.
If there are no Trades on or before ``dt``,returns ``pd.NaT``.
Parameters
----------
asset : catalyst.asset.Asset
The asset for which to get the last Traded minute.
dt : pd.Timestamp
The minute at which to start searching for the last Traded minute.
Returns
-------
last_Traded : pd.Timestamp
The dt of the last Trade for the given asset,using the input
dt as a vantage point.
"""
rf = self._roll_finders[asset.roll_style]
sid = (rf.get_contract_center(asset.root_symbol,
dt,
asset.offset))
if sid is None:
return pd.NaT
contract = rf.asset_finder.retrieve_asset(sid)
return self._bar_reader.get_last_Traded_dt(contract, dt)
def _get_daily_spot_value(self, column, dt):
reader = self._get_pricing_reader('daily')
if column == "last_Traded":
last_Traded_dt = reader.get_last_Traded_dt(asset, dt)
if isnull(last_Traded_dt):
return pd.NaT
else:
return last_Traded_dt
elif column in OHLCV_FIELDS:
# don't forward fill
try:
return reader.get_value(asset, dt, column)
except NoDataOnDate:
return np.nan
elif column == "price":
found_dt = dt
while True:
try:
value = reader.get_value(
asset, found_dt, "close"
)
if not isnull(value):
if dt == found_dt:
return value
else:
# adjust if needed
return self.get_adjusted_value(
asset, "minute",
spot_value=value
)
else:
found_dt -= self.Trading_calendar.day
except NoDataOnDate:
return np.nan
def assert_same(self, val1, val2):
try:
self.assertEqual(val1, val2)
except AssertionError:
if val1 is pd.NaT:
self.assertTrue(val2 is pd.NaT)
elif np.isnan(val1):
self.assertTrue(np.isnan(val2))
else:
raise
def test_day_before_assets_Trading(self):
# use the day before self.bcolz_daily_bar_days[0]
minute = self.get_last_minute_of_session(
self.Trading_calendar.prevIoUs_session_label(
self.equity_daily_bar_days[0]
)
)
bar_data = self.create_bardata(
simulation_dt_func=lambda: minute,
)
self.check_internal_consistency(bar_data)
self.assertFalse(bar_data.can_Trade(self.ASSET1))
self.assertFalse(bar_data.can_Trade(self.ASSET2))
self.assertFalse(bar_data.is_stale(self.ASSET1))
self.assertFalse(bar_data.is_stale(self.ASSET2))
for field in ALL_FIELDS:
for asset in self.ASSETS:
asset_value = bar_data.current(asset, field)
if field in OHLCP:
self.assertTrue(np.isnan(asset_value))
elif field == "volume":
self.assertEqual(0, asset_value)
elif field == "last_Traded":
self.assertTrue(asset_value is pd.NaT)
def test_semi_active_day(self):
# on self.equity_daily_bar_days[0],only asset1 has data
bar_data = self.create_bardata(
simulation_dt_func=lambda: self.get_last_minute_of_session(
self.equity_daily_bar_days[0]
),
)
self.check_internal_consistency(bar_data)
self.assertTrue(bar_data.can_Trade(self.ASSET1))
self.assertFalse(bar_data.can_Trade(self.ASSET2))
# because there is real data
self.assertFalse(bar_data.is_stale(self.ASSET1))
# because there has never been a Trade bar yet
self.assertFalse(bar_data.is_stale(self.ASSET2))
self.assertEqual(3, bar_data.current(self.ASSET1, "open"))
self.assertEqual(4, "high"))
self.assertEqual(1, "low"))
self.assertEqual(2, "close"))
self.assertEqual(200, "volume"))
self.assertEqual(2, "price"))
self.assertEqual(self.equity_daily_bar_days[0],
bar_data.current(self.ASSET1, "last_Traded"))
for field in OHLCP:
self.assertTrue(np.isnan(bar_data.current(self.ASSET2, field)),
field)
self.assertEqual(0, bar_data.current(self.ASSET2, "volume"))
self.assertTrue(
bar_data.current(self.ASSET2, "last_Traded") is pd.NaT
)
def test_should_properly_handle_null_timestamp(self):
query = 'SELECT TIMESTAMP(NULL) AS null_timestamp'
df = gbq.read_gbq(query, project_id=_get_project_id(),
private_key=_get_private_key_path())
tm.assert_frame_equal(df, DataFrame({'null_timestamp': [NaT]}))