first commit

This commit is contained in:
2026-05-21 08:40:24 -04:00
commit b084545275
711 changed files with 3659856 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
Metadata-Version: 2.4
Name: milestone_attribution_dataset_module
Version: 0.1.0
Summary: Internal PASBDC graph making scripts used to generate figures for the milestone attribution analysis.

View File

@@ -0,0 +1,9 @@
pyproject.toml
milestone_attribution_dataset_module/__init__.py
milestone_attribution_dataset_module/make_funding_data.py
milestone_attribution_dataset_module/make_nbs_data.py
milestone_attribution_dataset_module/shared.py
milestone_attribution_dataset_module.egg-info/PKG-INFO
milestone_attribution_dataset_module.egg-info/SOURCES.txt
milestone_attribution_dataset_module.egg-info/dependency_links.txt
milestone_attribution_dataset_module.egg-info/top_level.txt

View File

@@ -0,0 +1 @@
milestone_attribution_dataset_module

View File

@@ -0,0 +1,10 @@
from .make_funding_data import sanitize_funding_data
from .make_nbs_data import sanitize_nbs_data
from .shared import tag_documentation_level
__all__ = [
'tag_documentation_level',
'sanitize_funding_data',
'sanitize_nbs_data'
]

View File

@@ -0,0 +1,81 @@
# Date Created: 12/29/25
# Author: Vincent Allen
# PURPOSE:
# This file tags an exported capital funding milestones dataset with documentation levels based on if there is
# a value in the affirmation or Attribution statements column as well as a non-blank value in the attribution source
# This script takes in the following filter's data:
# https://pasbdc.neoserra.com/activity/list/20?__formid=20&remove=&savename=&sort=DATE&sortdir=DESC&expr=&field_1=DATE&opt_auto_1=pfy&field_2=F_CENTER_ID&opt_2=-1&field_3=IS_REPORTABLE&field_4=&sortdir=DESC
import pandas as pd
import numpy as np
import argparse
import os
import json
import sys
from pasbdc_data_cleaning import clean_center_name # pyright:ignore
from .shared import tag_documentation_level
from constants_module import NEOSERRA_COLUMNS, OUT_COLUMNS
def sanitize_funding_data(
df:pd.DataFrame,
col_neo_attribution_source:str,
col_neo_affirmation:str,
col_out_documentation_level:str,
col_neo_center:str,
):
clean_center_name(df)
# Remove rows that cannot be attributed to a center
df = df[df[col_neo_attribution_source] != "Not attributed to center"].copy()
df = tag_documentation_level(
df,
col_neo_attribution_source=col_neo_attribution_source,
col_neo_affirmation=col_neo_affirmation,
col_out_documentation_level=col_out_documentation_level,
)
df[col_neo_attribution_source] = df[col_neo_attribution_source].fillna("Blank").astype(str)
df = df.sort_values(by=col_neo_center)
return df
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file",
type=str,
required=True,
help="The data file to de-duplicate and tag with documentation levels")
parser.add_argument("-o", "--out",
type=str,
required=True,
help="The csv file to write the de-duplicated data to")
parser.add_argument("-m", "--mapping",
type=str,
required=False,
default="",
help="The path to a JSON file with all of the columns in the DEFAULT_CONFIG of this script which can be used to override column name mappings")
args = parser.parse_args()
if args.mapping:
NEOSERRA_COLUMNS.apply_json_mapping(args.mapping)
OUT_COLUMNS.apply_json_mapping(args.mapping)
original_df = pd.read_csv(args.file)
original_df = sanitize_funding_data(
original_df,
col_neo_attribution_source=NEOSERRA_COLUMNS.milestone_attribution_source,
col_neo_affirmation=NEOSERRA_COLUMNS.milestone_affirmation,
col_out_documentation_level=OUT_COLUMNS.milestone_documentation_level,
col_neo_center=NEOSERRA_COLUMNS.center
)
print(f"DONE!")
original_df.to_csv(args.out, index=False)

View File

@@ -0,0 +1,454 @@
# Date Created: 12/31/25
# Author: Vincent Allen
# PURPOSE:
# This file tags an exported new business starts milestones dataset with documentation levels based on if there is
# a value in the affirmation or Attribution statements column as well as a non-blank value in the attribution source
# This script is meant to work with the data from this neoserra filter:
# https://pasbdc.neoserra.com/activity/list/7?__formid=7&remove=&savename=&sort=DATE&sortdir=DESC&expr=&field_1=DATE&opt_auto_1=&opt_a_1=10%2F1%2F2024&opt_b_1=9%2F30%2F2025&field_2=F_CENTER_ID&opt_2=-1&field_3=TYPE&opt_not_3=on&opt_3=&field_4=CLI_TYPE2&opt_4=&opt_4=AC&opt_4=IC&field_5=IS_REPORTABLE&field_6=TYPE&opt_6=&opt_6=%2BBSI&field_7=&sortdir=DESC
import pandas as pd
import re
import numpy as np
import argparse
#from tabulate import tabulate
import json
import sys
from pasbdc_data_cleaning import clean_center_name # pyright:ignore
from .shared import tag_documentation_level
from constants_module import NEOSERRA_COLUMNS, OUT_COLUMNS
'''
def filter_valid_milestone(
group: pd.DataFrame,
col_neo_center: str = "Center",
col_neo_client_id: str = "Client ID",
col_neo_milestone_date: str = "Milestone Date",
col_neo_attribution_source: str = "Attribution Source",
col_neo_attribution_date: str = "Attribution Date",
col_neo_affirmation: str = "Affirmation",
col_neo_milestone_type: str = "Milestone Type",
business_start_impact_val: str = "Business Start Impact",
business_established_val: str = "Business Established",
log_filtering_file: str = "",
col_out_documentation_level: str = "Documentation Level",
):
log_file = None
blank_vals = ["", " ", "NaN", "nan", np.nan]
if log_filtering_file:
log_file_drop = open(log_filtering_file + "drop.txt", 'a')
log_file_keep = open(log_filtering_file + "keep.txt", 'a')
if group.shape[0] == 1:
should_return = False
milestone_type = group[col_neo_milestone_type].iloc[0]
# Only one entry for this client, check if it can be counted
if milestone_type == business_established_val:
log_file_drop.write(
f"Dropped milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
#DROPPED_CLIENTS.append(str(group[col_neo_client_id].unique()[0]))
# Drop the milestone
should_return = False
# print(f"Dropped milestone: {group}")
elif milestone_type == business_start_impact_val:
# Keep the milestone
log_file_keep.write(
f"Kept milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
#KEPT_CLIENTS.append(str(group[col_neo_client_id].unique()[0]))
should_return = True
if log_file_drop is not None:
log_file_drop.close()
log_file_keep.close()
if should_return:
return group
else:
return None
else:
should_return = False
has_impact_milestone = (group[col_neo_milestone_type] == business_start_impact_val).any()
combined_milestone = None
if has_impact_milestone:
# We have a valid business start impact
should_return = True
log_file_keep.write(
f"Kept milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
#KEPT_CLIENTS.append(str(group[col_neo_client_id].unique()[0]))
affirmation_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
attribution_source_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
milestone_type_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
milestone_date_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
attribution_date_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
combined_milestone = pd.DataFrame({
col_neo_client_id: [group[col_neo_client_id].iloc[0]],
col_neo_center: [group[col_neo_center].iloc[0]],
col_neo_milestone_type: [milestone_type_concat_logic(group[col_neo_milestone_type])],
col_neo_affirmation: [affirmation_concat_logic(group[col_neo_affirmation])],
col_neo_attribution_source: [attribution_source_concat_logic(group[col_neo_attribution_source])],
col_neo_milestone_date: [milestone_date_concat_logic(group[col_neo_milestone_date])],
col_neo_attribution_date: [attribution_date_concat_logic(group[col_neo_attribution_date])]
})
else:
# print(f"Dropped milestone: {group}")
log_file_drop.write(
f"Dropped milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
#DROPPED_CLIENTS.append(str(group[col_neo_client_id].unique()[0]))
should_return = False
if log_file is not None:
log_file_drop.close()
log_file_keep.close()
if should_return:
return combined_milestone
else:
return None
'''
'''
def tag_documentation_level(
nbs_data: pd.DataFrame,
col_neo_attribution_source: str = "Attribution Source",
col_neo_affirmation: str = "Affirmation",
col_neo_attribution_statement: str = "Attribution Statement",
col_out_documentation_level: str = "Documentation Level"
):
# 1. Define your search lists
no_doc_terms = ['Requested on eCenter']
documented_terms = [
'eCenter',
'Email from Client',
'Director Confirmed through Session Note Review',
'Impact & Outcomes Form'
]
# 2. Create Regex patterns
# We join terms with '|' (OR). We use re.escape to handle symbols like '&' or '.' safely.
no_doc_pattern = '|'.join([re.escape(x) for x in no_doc_terms])
documented_pattern = '|'.join([re.escape(x) for x in documented_terms])
# 3. Define Conditions
# Check if string contains "Requested on eCenter" OR is null/empty
no_doc_condition = (
nbs_data[col_neo_attribution_source].str.contains(no_doc_pattern, case=False, na=False) |
nbs_data[col_neo_attribution_source].isin(['', 'NaN', np.nan])
)
# Note: For wrong_doc, we usually want to ensure it DOES NOT contain the "empty" markers.
# We keep .isin() here because checking for specific "empty" strings is usually exact.
wrong_doc_condition = (
~nbs_data[col_neo_attribution_source].isin(['', 'NaN', np.nan])
) & (
nbs_data[col_neo_affirmation].isin(['', 'NaN', np.nan])
) & (
nbs_data[col_neo_attribution_statement].isin(['', 'NaN', np.nan])
)
# Check if source contains any of the documented terms
documented_condition = (
nbs_data[col_neo_attribution_source].str.contains(documented_pattern, case=False, na=False)
& (
~nbs_data[col_neo_attribution_statement].isin(['', 'NaN', np.nan])
|
~nbs_data[col_neo_affirmation].isin(['', 'NaN', np.nan])
)
)
choices = [
'Not Documented',
'Affirmation Missing',
'Documented'
]
nbs_data[col_out_documentation_level] = np.select(
condlist=[no_doc_condition, wrong_doc_condition, documented_condition],
choicelist=choices,
default='Not Determined'
)
return nbs_data
'''
'''
def tag_documentation_level(
nbs_data: pd.DataFrame,
col_neo_attribution_source:str="Attribution Source",
col_neo_affirmation:str="Affirmation",
col_neo_attribution_statement:str="Attribution Statement",
col_out_documentation_level:str="Documentation Level"
):
no_doc_condition = (
nbs_data[col_neo_attribution_source].isin(['Requested on eCenter', '', 'NaN', np.nan])
)
wrong_doc_condition = (
~nbs_data[col_neo_attribution_source].isin(['', 'NaN', np.nan])
) & (
nbs_data[col_neo_affirmation].isin(['', 'NaN', np.nan])
) & (
nbs_data[col_neo_attribution_statement].isin(['', 'NaN', np.nan])
)
documented_condition = (
nbs_data[col_neo_attribution_source].isin(['eCenter', 'Email from Client', 'Director Confirmed through Session Note Review', 'Impact & Outcomes Form'])
& (
~nbs_data[col_neo_attribution_statement].isin(['', 'NaN', np.nan])
|
~nbs_data[col_neo_affirmation].isin(['', 'NaN', np.nan])
)
)
choices = [
'Not Documented',
'Affirmation Missing',
'Documented'
]
nbs_data[col_out_documentation_level] = np.select(
condlist=[no_doc_condition, wrong_doc_condition, documented_condition],
choicelist=choices,
default='Not Determined'
)
'''
def filter_valid_milestone(
group: pd.DataFrame,
col_neo_center: str = "Center",
col_neo_client_id: str = "Client ID",
col_neo_milestone_date: str = "Milestone Date",
col_neo_attribution_source: str = "Attribution Source",
col_neo_attribution_date: str = "Attribution Date",
col_neo_affirmation: str = "Affirmation",
col_neo_milestone_type: str = "Milestone Type",
col_neo_reportable: str = "Reportable?",
business_start_impact_val: str = "Business Start Impact",
business_established_val: str = "Business Established",
log_filtering_file: bool = "",
):
log_file = None
blank_vals = ["", " ", "NaN", "nan", np.nan]
"""
if log_filtering_file:
log_file_drop = open(log_filtering_file + "drop.txt", 'a')
log_file_keep = open(log_filtering_file + "keep.txt", 'a')
"""
should_return = False
if group.shape[0] == 1:
milestone_type = group[col_neo_milestone_type].iloc[0]
# Only one entry for this client, check if it can be counted
if milestone_type == business_established_val:
"""
log_file_drop.write(
f"Dropped milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
"""
# Drop the milestone
should_return = False
# print(f"Dropped milestone: {group}")
elif milestone_type == business_start_impact_val:
# Keep the milestone
"""
log_file_keep.write(
f"Kept milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
"""
should_return = True
"""
if log_file_drop is not None:
log_file_drop.close()
log_file_keep.close()
"""
if should_return:
return group
else:
return None
else:
has_impact_milestone = (group[col_neo_milestone_type] == business_start_impact_val).any()
combined_milestone = None
if has_impact_milestone:
# We have a valid business start impact
should_return = True
# Filter the group such that we only use the impact milestones for the attribution and affirmation data
# This ensures that we do not take documented status from other milestones like business established
impact_group = group[group[col_neo_milestone_type] == business_start_impact_val]
"""
log_file_keep.write(
f"Kept milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
"""
affirmation_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
attribution_source_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
# Get all of the attribution sources
group[col_neo_attribution_source]
milestone_type_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
milestone_date_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
attribution_date_concat_logic = lambda x: ', '.join(x.astype(str)[~x.isin(blank_vals)].unique())
reportable_concat_logic = lambda x: x.max(skipna=True)
combined_milestone = pd.DataFrame({
col_neo_client_id: [group[col_neo_client_id].iloc[0]],
col_neo_center: [group[col_neo_center].iloc[0]],
col_neo_milestone_type: [milestone_type_concat_logic(group[col_neo_milestone_type])],
col_neo_affirmation: [affirmation_concat_logic(impact_group[col_neo_affirmation])],
col_neo_attribution_source: [attribution_source_concat_logic(impact_group[col_neo_attribution_source])],
col_neo_milestone_date: [milestone_date_concat_logic(group[col_neo_milestone_date])],
col_neo_attribution_date: [attribution_date_concat_logic(group[col_neo_attribution_date])],
col_neo_reportable: [reportable_concat_logic(group[col_neo_reportable])]
})
else:
# print(f"Dropped milestone: {group}")
"""
log_file_drop.write(
f"Dropped milestone: \n{tabulate(group, headers='keys', tablefmt='grid', showindex=False)}\n")
"""
should_return = False
"""
if log_file is not None:
log_file_drop.close()
log_file_keep.close()
"""
if should_return:
return combined_milestone
else:
return None
def sanitize_nbs_data(
df: pd.DataFrame,
col_neo_center:str,
col_neo_client_id: str,
col_neo_milestone_date: str,
col_neo_attribution_date: str,
col_neo_attribution_source: str,
col_neo_affirmation: str,
col_neo_milestone_type: str,
col_out_documentation_level: str,
col_neo_reportable: str,
business_start_impact_val: str,
business_established_val: str,
) -> pd.DataFrame:
# Turn the neoserra center names into the correct ones for visualization
clean_center_name(df)
# Collapse any duplicate milestones within the data, passing in the variable columns
# Note: include_groups=False prevents the grouping key from being included in the applied result
# Remove rows with this
df[col_neo_attribution_source] = df[col_neo_attribution_source].str.strip() #pyright:ignore
df = df[df[col_neo_attribution_source] != "Not attributed to center"]
# If the input is empty, return an empty dataframe with the expected columns
if df.empty:
df_empty = pd.DataFrame(columns=df.columns.tolist() + [col_out_documentation_level])
return df_empty
df_clean = df.groupby(col_neo_client_id).apply(
filter_valid_milestone,
include_groups=True,
col_neo_center=col_neo_center,
col_neo_client_id=col_neo_client_id,
col_neo_milestone_date=col_neo_milestone_date,
col_neo_attribution_source=col_neo_attribution_source,
col_neo_attribution_date=col_neo_attribution_date,
col_neo_affirmation=col_neo_affirmation,
col_neo_milestone_type=col_neo_milestone_type,
col_neo_reportable=col_neo_reportable,
business_start_impact_val=business_start_impact_val,
business_established_val=business_established_val
)
# If all groups were filtered out, df_clean will be empty and reset_index will fail
if df_clean.empty:
df_empty = pd.DataFrame(columns=df.columns.tolist() + [col_out_documentation_level])
return df_empty
# Move the Client ID back to the columns (it becomes the index after groupby)
# Drop just tells it to lose the index copy of Client ID and keep the row based copy included with include_groups=True
df_clean = df_clean.reset_index(level=col_neo_client_id, drop=True)
# Tag the documentation level
df_clean = tag_documentation_level(
df_clean,
col_neo_attribution_source=col_neo_attribution_source,
col_neo_affirmation=col_neo_affirmation,
col_out_documentation_level=col_out_documentation_level
)
# Can be uncommented to drop any not determined documentation levels. It's good to keep this in as it will show you
# if something may have slipped through the cracks.
# df_clean = df_clean[df_clean[col_out_documentation_level] != "Not Determined"]
# Fill in any blanks to allow proper visualization of nan values
df_clean[col_neo_attribution_source] = df_clean[col_neo_attribution_source].fillna("Blank").astype(str).replace(['', ' ', 'nan', 'NaN'], "Blank") #pyright:ignore
df_clean = df_clean.sort_values(by=col_neo_center)
return df_clean #pyright:ignore
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file",
type=str,
required=True,
help="The data file to de-duplicate and tag with documentation levels")
parser.add_argument("-o", "--out",
type=str,
required=True,
help="The csv file to write the de-duplicated data to")
parser.add_argument("-m", "--mapping",
type=str,
required=False,
default="",
help="The path to a JSON file with all of the columns in the DEFAULT_CONFIG of this script which can be used to override column name mappings")
args = parser.parse_args()
if args.mapping:
NEOSERRA_COLUMNS.apply_json_mapping(args.mapping)
OUT_COLUMNS.apply_json_mapping(args.mapping)
# Load Data
nbs_df = pd.read_csv(args.file, parse_dates=[NEOSERRA_COLUMNS.milestone_date, NEOSERRA_COLUMNS.attribution_date])
# Process Data using the new library function
nbs_df = sanitize_nbs_data(
nbs_df,
col_neo_center=NEOSERRA_COLUMNS.center,
col_neo_client_id=NEOSERRA_COLUMNS.client_id,
col_neo_milestone_date=NEOSERRA_COLUMNS.milestone_date,
col_neo_attribution_date=NEOSERRA_COLUMNS.attribution_date,
col_neo_attribution_source=NEOSERRA_COLUMNS.milestone_attribution_source,
col_neo_affirmation=NEOSERRA_COLUMNS.milestone_affirmation,
col_neo_milestone_type=NEOSERRA_COLUMNS.milestone_type_name,
col_out_documentation_level=OUT_COLUMNS.milestone_documentation_level,
col_neo_reportable=NEOSERRA_COLUMNS.reportable,
business_start_impact_val=NEOSERRA_COLUMNS.business_start_impact_val,
business_established_val=NEOSERRA_COLUMNS.business_established_val
)
# Save Data
nbs_df.to_csv(args.out, index=False)
print(f"DONE!")

View File

@@ -0,0 +1,76 @@
import pandas as pd
import re
import numpy as np
def tag_documentation_level(
nbs_data: pd.DataFrame,
col_neo_attribution_source: str = "Attribution Source",
col_neo_affirmation: str = "Affirmation",
col_out_documentation_level: str = "Documentation Level"
):
# Terms for the attribution source that if found by individually constitute no documentation
no_doc_terms = [
'Director Confirmed through Session Note Review',
'Requested on eCenter'
]
# Terms that if seen, even if beside something in no_doc_terms, will constitute a documentable attribution source
documented_terms = [
'eCenter',
'Email from Client',
'Impact & Outcomes Form',
'Quarterly Assessment Survey'
]
no_doc_pattern = '|'.join([re.escape(x) for x in no_doc_terms])
documented_pattern = '|'.join([re.escape(x) for x in documented_terms])
# Remove Requested on eCenter so it doesn't trigger false positives.
clean_source = nbs_data[col_neo_attribution_source].astype(str).str.replace(no_doc_pattern, '', regex=True)
def is_populated(series):
return series.astype(str).str.strip().replace({'nan': '', 'NaN': ''}) != ''
has_affirmation = is_populated(nbs_data[col_neo_affirmation])
has_raw_source = is_populated(nbs_data[col_neo_attribution_source])
# Check if the CLEAN source contains a valid term "Email from Client"
has_valid_term = clean_source.str.contains(documented_pattern, case=False, na=False)
# PRIORITY 1: Documented
# Has a valid term has affirmation
documented_condition = (
has_valid_term &
has_affirmation
)
# PRIORITY 2: Affirmation Missing
# Has a valid term BUT is missing affirmation.
wrong_doc_condition = (
has_valid_term &
(~has_affirmation)
)
# PRIORITY 3: Not Documented
# Matches "Requested..." OR Source is empty
# We don't need to check clean_source here. If it had a valid term,
# it would have been caught by Priority 1 or 2.
no_doc_condition = (
nbs_data[col_neo_attribution_source].str.contains(no_doc_pattern, case=False, na=False) |
(~has_raw_source)
)
choices = [
'Documented', # Priority 1
'Affirmation Missing', # Priority 2
'Not Documented' # Priority 3
]
# Apply the condition across the dataset creating a new column
nbs_data[col_out_documentation_level] = np.select(
condlist=[documented_condition, wrong_doc_condition, no_doc_condition],
choicelist=choices,
default='Not Determined'
)
return nbs_data

View File

@@ -0,0 +1,12 @@
# dataset/pyproject.toml
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "milestone_attribution_dataset_module"
version = "0.1.0"
description = "Internal PASBDC graph making scripts used to generate figures for the milestone attribution analysis."
[tool.setuptools]
packages = ["milestone_attribution_dataset_module"]