Skip to content

quantify module

consecutive_minutes(fasts)

Create a time series of consecutive minutes (cumulative summation of each fast) fasted from a time series of fasting status.

Credit to George Pipis for inspiration of the solution. You can find George's solution here

Examples:

Input = [0,1,1,1,0,1,1] Output = [0,1,2,3,0,1,2]

Parameters:

Name Type Description Default
fasts Series

pandas Series of fasting status with 1 minute frequency. - Yes (ie. fasting) as 1. - No (i.e. not fasting) as 0.

required

Returns: A time series of a consecutive minutes fasted.

Source code in fasting/quantify.py
def consecutive_minutes(fasts: pd.Series) -> pd.Series:
    """
    Create a time series of consecutive minutes (cumulative summation of each fast) fasted from
    a time series of fasting status.

    Credit to George Pipis for inspiration of the solution.
    You can find George's solution [here](https://predictivehacks.com/count-the-consecutive-events-in-python/)

    Example:
        Input =  [0,1,1,1,0,1,1]
        Output = [0,1,2,3,0,1,2]

    Args:
        fasts: pandas Series of fasting status with 1 minute frequency.
                    - Yes (ie. fasting) as 1.
                    - No (i.e. not fasting) as 0.
    Returns: A time series of a consecutive minutes fasted.
    """

    if not validate_continuous_fasts(fasts):
        raise Exception('Continuous log is invalid. Check error raised by validate_continuous_log().')

    consecutive_mins = fasts.groupby((fasts != fasts.shift()).cumsum()).cumcount() + 1
    consecutive_mins[fasts == 0] = 0
    return consecutive_mins

continuous_fasts(fasts, start_col='start_dt', end_col='end_dt')

Create a continuous time series of fasting status (0 ~ no or 1 ~ yes) from a DataFrame of individual events (start datetime and end datetime) with a datetime index at a frequency of 1 minute.

Parameters:

Name Type Description Default
fasts DataFrame

DataFrame of discrete logs with start and end datetime columns.

required
start_col str

Name of column representing fasting start datetimes.

'start_dt'
end_col str

Name of column representing fasting end datetimes.

'end_dt'

!!! returns "A pandas Series of event status at 1 minute frequency." - Yes (ie. fasting) as 1. - No (i.e. not fasting) as 0.

Source code in fasting/quantify.py
def continuous_fasts(fasts: pd.DataFrame, start_col: str = 'start_dt', end_col: str = 'end_dt') -> pd.Series:
    """
    Create a continuous time series of fasting status (0 ~ no or 1 ~ yes)
    from a DataFrame of individual events (start datetime and end datetime)
    with a datetime index at a frequency of 1 minute.

    Args:
        fasts: DataFrame of discrete logs with start and end datetime columns.
        start_col: Name of column representing fasting start datetimes.
        end_col: Name of column representing fasting end datetimes.

    Returns: A pandas Series of event status at 1 minute frequency.
                - Yes (ie. fasting) as 1.
                - No (i.e. not fasting) as 0.

    """
    if not validate_discrete_fasts(fasts, 'start_dt', 'end_dt'):
        raise Exception('Discrete log is invalid. Check error raised by validate_discrete_log().')

    # Sort by start_dt (oldest to newest)
    fasts = fasts.sort_values(by=start_col, ascending=True, ignore_index=True)

    # Create continuous log
    start = fasts[start_col].iloc[0]  # First timestamp: start_dt of first fast
    end = fasts[end_col].iloc[-1]  # Last timestamp: end_dt of last fast
    time_range = pd.date_range(start=start, end=end, freq='1T')
    log = pd.Series(0, index=time_range)  # Initialize continuous log to 0
    for index, row in fasts.iterrows():  # Set fasting value to 1 for timestamps between fast start and end
        start = row[start_col]
        end = row[end_col]
        log[start:end] = 1

    return log

daily_cumulative_hours(fasts)

Calculate the daily cumulative hours fasted from a pandas Series of fasting status with 1 minute frequency.

Parameters:

Name Type Description Default
fasts Series

pandas Series of fasting status with 1 minute frequency. - Yes (ie. fasting) as 1. - No (i.e. not fasting) as 0.

required

Returns: The daily cumulative hours fasted as a pandas Series.

Source code in fasting/quantify.py
def daily_cumulative_hours(fasts: pd.Series) -> pd.Series:
    """
    Calculate the daily cumulative hours fasted from a pandas Series of fasting status with 1 minute frequency.
    Args:
        fasts: pandas Series of fasting status with 1 minute frequency.
                    - Yes (ie. fasting) as 1.
                    - No (i.e. not fasting) as 0.
    Returns: The daily cumulative hours fasted as a pandas Series.

    """
    if not validate_continuous_fasts(fasts):
        raise Exception('Continuous log is invalid. Check error raised by validate_continuous_log().')

    minutes_per_hour = 60
    cumulative_mins = fasts.resample('1D').sum()
    cumulative_hrs = cumulative_mins / minutes_per_hour
    return cumulative_hrs

daily_max_consecutive_hours(fasts)

Calculate the maximum daily consecutive hours fasted from a pandas Series of fasting status with 1 minute frequency.

There are potentially 2 fasts occurring in a single day (one ends and another starts). Maximum consecutive hours fasted his can include hours carried over from previous day.

Parameters:

Name Type Description Default
fasts Series

pandas Series of fasting status with 1 minute frequency. - Yes (ie. fasting) as 1. - No (i.e. not fasting) as 0.

required

Returns: The daily maximum consecutive hours fasted as a pandas Series.

Source code in fasting/quantify.py
def daily_max_consecutive_hours(fasts: pd.Series) -> pd.Series:
    """
    Calculate the maximum daily consecutive hours fasted from a pandas Series of fasting status with 1 minute frequency.

    There are potentially 2 fasts occurring in a single day (one ends and another starts).
    Maximum consecutive hours fasted his can include hours carried over from previous day.


    Args:
        fasts: pandas Series of fasting status with 1 minute frequency.
                    - Yes (ie. fasting) as 1.
                    - No (i.e. not fasting) as 0.
    Returns: The daily maximum consecutive hours fasted as a pandas Series.

    """

    if not validate_continuous_fasts(fasts):
        raise Exception('Continuous log is invalid. Check error raised by validate_continuous_log().')

    minutes_per_hour = 60
    consecutive_mins = consecutive_minutes(fasts)
    daily_maximum_mins = consecutive_mins.resample('1D').max()
    daily_maximum_hrs = daily_maximum_mins / minutes_per_hour
    return daily_maximum_hrs

validate_continuous_fasts(fasts)

Validate a continuous log of fasts for use by other module functions. !!! validations - Frequency of series index is 1 minute ('T') - Value at each time step is either 0 or 1 (0 ~ not fasting, 1 ~ fasting), no extraneous or NaN values

Parameters:

Name Type Description Default
fasts Series

Series of continuous logs with a datetime index at a 1 minute frequency and values of 0 or 1.

required

Returns: True if the fasts series is valid.

Source code in fasting/quantify.py
def validate_continuous_fasts(fasts: pd.Series) -> bool:
    """
    Validate a continuous log of fasts for use by other module functions.
    Validations:
        - Frequency of series index is 1 minute ('T')
        - Value at each time step is either 0 or 1 (0 ~ not fasting, 1 ~ fasting), no extraneous or NaN values

    Args:
        fasts: Series of continuous logs with a datetime index at a 1 minute frequency and values of 0 or 1.

    Returns: True if the fasts series is valid.
    """

    # Validate frequency of index is 1 minute ('T')
    freq = pd.infer_freq(fasts.index)
    if freq != 'T':
        raise ValueError(f"""
                        Frequency of the continuous fast must be: 'T' (1 minute).
                        Frequency of fasts series input: {freq}.
                        """)

    # Validate values only contain 0 or 1
    if not fasts.isin([0, 1]).all():
        unexpected_values = fasts[((fasts != 0) & (fasts != 1))]
        raise ValueError(f"""
                        Continuous fast (input to fasts) must contain only values of 0 or 1.
                        Check fasts for extraneous or NaN values:
                        {unexpected_values}
                        """)

    return True

validate_discrete_fasts(fasts, start_col='start_dt', end_col='end_dt')

Validate a discrete log of fasts for use by other module functions. Discrete logs should have a start and end datetime for each fast. !!! validations - Each fast has a start and end datetime (start_col and end_col cannot contain missing values) - Start datetimes are before end datetimes for each fast - Fasts do not overlap.

Parameters:

Name Type Description Default
fasts DataFrame

DataFrame of discrete logs with start and end datetime columns.

required
start_col str

Name of column representing fasting start datetimes.

'start_dt'
end_col str

Name of column representing fasting end datetimes.

'end_dt'

Returns: True if the discrete fasts DataFrame is valid.

Source code in fasting/quantify.py
def validate_discrete_fasts(fasts: pd.DataFrame, start_col: str = 'start_dt', end_col: str = 'end_dt') -> bool:
    """
    Validate a discrete log of fasts for use by other module functions.
    Discrete logs should have a start and end datetime for each fast.
    Validations:
        - Each fast has a start and end datetime (start_col and end_col cannot contain missing values)
        - Start datetimes are before end datetimes for each fast
        - Fasts do not overlap.

    Args:
        fasts: DataFrame of discrete logs with start and end datetime columns.
        start_col: Name of column representing fasting start datetimes.
        end_col: Name of column representing fasting end datetimes.

    Returns: True if the discrete fasts DataFrame is valid.
    """

    fasts = fasts.copy()
    fasts.sort_values(by=start_col, ascending=True, inplace=True, ignore_index=True)

    # TODO validate fasts[start_col] and fasts[end_col] data types

    # Validate no missing start or end datetimes
    start_end = fasts[[start_col, end_col]]
    if start_end.isnull().values.any():
        nan_rows = start_end.isnull().any(axis=1)
        nan_rows_indexes = nan_rows[nan_rows].index
        nan_fasts = fasts.iloc[nan_rows_indexes, :]
        raise ValueError(f"""
                        Discrete logs must contain start and end datetimes.
                        Check columns '{start_col}' and '{end_col}' for missing values:
                        {nan_fasts}
                        """)

    # validate start_dt < end_dt
    start_end_mismatch = fasts[start_col] > fasts[end_col]  # Is start datetime AFTER end datetime?
    if start_end_mismatch.any():  # If any mismatch
        conflicting_logs = fasts[start_end_mismatch]  # Subset fasts with start and end mismatch
        raise ValueError(f"""
                        Start datetime must be before associated end datetime.
                        The following fasts have start and end datetime conflicts:
                        {conflicting_logs}
                        """)

    # Validate no overlapping fasts
    intervals = pd.IntervalIndex.from_arrays(left=fasts.start_dt, right=fasts.end_dt.values)
    if intervals.is_overlapping:
        # get overlapping fasts
        previous_end_dt = fasts[end_col].shift(1).dropna()
        overlapping = previous_end_dt > fasts[start_col][1:]
        true_overlap = overlapping[overlapping]
        overlapping_fasts = fasts.iloc[true_overlap.index]
        raise ValueError(f"""
                        Overlapping fasts found in DataFrame.
                        The following fasts overlap with previous fast:
                        {overlapping_fasts}
                        """)

    return True

zero_fasts(zero_log_file)

Load a log export from Zero Fasting and return the start and end datetimes of each fast. DataFrame is reindexed chronologically, oldest to newest, before returned.

Parameters:

Name Type Description Default
zero_log_file

File path of log export.

required

Returns: pandas DataFrame of log export.

Source code in fasting/quantify.py
def zero_fasts(zero_log_file) -> pd.DataFrame:
    """
    Load a log export from Zero Fasting and return the start and end datetimes of each fast.
    DataFrame is reindexed chronologically, oldest to newest, before returned.
    Args:
        zero_log_file: File path of log export.

    Returns: pandas DataFrame of log export.
    """
    # Read in log as a csv
    expected_cols = ['Date', 'Start', 'End', 'Hours', 'Night Eating']
    dtypes = {'End': str, 'Hours': float, 'Night Eating': float}
    fasts = pd.read_csv(zero_log_file,
                        header=0,
                        parse_dates=[['Date', 'Start']],
                        dtype=dtypes,
                        usecols=expected_cols)

    # Clean up DataFrame
    fasts.dropna(subset=['Hours'], inplace=True)  # Drop incomplete fasts (Hours will be NA if incomplete)
    fasts.rename(columns={'Date_Start': 'start_dt'}, inplace=True)  # Rename date parsed column
    fasts = fasts.iloc[::-1].reset_index(drop=True)  # Order by oldest to newest

    # Get end datetime of each fast and add to fasts DataFrame as a new column: 'end_dt'
    end_times = pd.to_datetime(fasts.End).dt.strftime('%H:%M:%S')
    end_time_deltas = pd.to_timedelta(end_times)
    fast_durations = pd.to_timedelta(fasts.Hours, 'H')
    end_dates = (fasts.start_dt + fast_durations).dt.date
    end_dt = pd.to_datetime(end_dates)
    end_dt += end_time_deltas
    fasts['end_dt'] = end_dt

    #  Return just the start and end datetimes of each completed fast, in descending order
    return fasts[['start_dt', 'end_dt']]

Last update: 2021-03-03