|
| 1 | +# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
| 2 | + |
| 3 | +# Copyright 2011 OpenStack LLC. |
| 4 | +# All Rights Reserved. |
| 5 | +# |
| 6 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 7 | +# not use this file except in compliance with the License. You may obtain |
| 8 | +# a copy of the License at |
| 9 | +# |
| 10 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +# |
| 12 | +# Unless required by applicable law or agreed to in writing, software |
| 13 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 14 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 15 | +# License for the specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | + |
| 18 | +""" |
| 19 | +Time related utilities and helper functions. |
| 20 | +""" |
| 21 | + |
| 22 | +import calendar |
| 23 | +import datetime |
| 24 | + |
| 25 | +import iso8601 |
| 26 | + |
| 27 | + |
| 28 | +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" |
| 29 | +PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" |
| 30 | + |
| 31 | + |
| 32 | +def isotime(at=None): |
| 33 | + """Stringify time in ISO 8601 format""" |
| 34 | + if not at: |
| 35 | + at = utcnow() |
| 36 | + str = at.strftime(TIME_FORMAT) |
| 37 | + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' |
| 38 | + str += ('Z' if tz == 'UTC' else tz) |
| 39 | + return str |
| 40 | + |
| 41 | + |
| 42 | +def parse_isotime(timestr): |
| 43 | + """Parse time from ISO 8601 format""" |
| 44 | + try: |
| 45 | + return iso8601.parse_date(timestr) |
| 46 | + except iso8601.ParseError as e: |
| 47 | + raise ValueError(e.message) |
| 48 | + except TypeError as e: |
| 49 | + raise ValueError(e.message) |
| 50 | + |
| 51 | + |
| 52 | +def strtime(at=None, fmt=PERFECT_TIME_FORMAT): |
| 53 | + """Returns formatted utcnow.""" |
| 54 | + if not at: |
| 55 | + at = utcnow() |
| 56 | + return at.strftime(fmt) |
| 57 | + |
| 58 | + |
| 59 | +def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): |
| 60 | + """Turn a formatted time back into a datetime.""" |
| 61 | + return datetime.datetime.strptime(timestr, fmt) |
| 62 | + |
| 63 | + |
| 64 | +def normalize_time(timestamp): |
| 65 | + """Normalize time in arbitrary timezone to UTC""" |
| 66 | + offset = timestamp.utcoffset() |
| 67 | + return timestamp.replace(tzinfo=None) - offset if offset else timestamp |
| 68 | + |
| 69 | + |
| 70 | +def is_older_than(before, seconds): |
| 71 | + """Return True if before is older than seconds.""" |
| 72 | + return utcnow() - before > datetime.timedelta(seconds=seconds) |
| 73 | + |
| 74 | + |
| 75 | +def utcnow_ts(): |
| 76 | + """Timestamp version of our utcnow function.""" |
| 77 | + return calendar.timegm(utcnow().timetuple()) |
| 78 | + |
| 79 | + |
| 80 | +def utcnow(): |
| 81 | + """Overridable version of utils.utcnow.""" |
| 82 | + if utcnow.override_time: |
| 83 | + return utcnow.override_time |
| 84 | + return datetime.datetime.utcnow() |
| 85 | + |
| 86 | + |
| 87 | +utcnow.override_time = None |
| 88 | + |
| 89 | + |
| 90 | +def set_time_override(override_time=datetime.datetime.utcnow()): |
| 91 | + """Override utils.utcnow to return a constant time.""" |
| 92 | + utcnow.override_time = override_time |
| 93 | + |
| 94 | + |
| 95 | +def advance_time_delta(timedelta): |
| 96 | + """Advance overridden time using a datetime.timedelta.""" |
| 97 | + assert(not utcnow.override_time is None) |
| 98 | + utcnow.override_time += timedelta |
| 99 | + |
| 100 | + |
| 101 | +def advance_time_seconds(seconds): |
| 102 | + """Advance overridden time by seconds.""" |
| 103 | + advance_time_delta(datetime.timedelta(0, seconds)) |
| 104 | + |
| 105 | + |
| 106 | +def clear_time_override(): |
| 107 | + """Remove the overridden time.""" |
| 108 | + utcnow.override_time = None |
| 109 | + |
| 110 | + |
| 111 | +def marshall_now(now=None): |
| 112 | + """Make an rpc-safe datetime with microseconds. |
| 113 | +
|
| 114 | + Note: tzinfo is stripped, but not required for relative times.""" |
| 115 | + if not now: |
| 116 | + now = utcnow() |
| 117 | + return dict(day=now.day, month=now.month, year=now.year, hour=now.hour, |
| 118 | + minute=now.minute, second=now.second, |
| 119 | + microsecond=now.microsecond) |
| 120 | + |
| 121 | + |
| 122 | +def unmarshall_time(tyme): |
| 123 | + """Unmarshall a datetime dict.""" |
| 124 | + return datetime.datetime(day=tyme['day'], month=tyme['month'], |
| 125 | + year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], |
| 126 | + second=tyme['second'], microsecond=tyme['microsecond']) |
0 commit comments