Skip to content

Custom Data Source Upload & Processing System

✅ Core Infrastructure Complete: CSV upload, data processing, versioning, and connection management are fully implemented and ready for production.

Phase 1: Data Source Version Management (Frontend Focus)

Section titled “Phase 1: Data Source Version Management (Frontend Focus)”
  • 1. Data Source Version Hooks: Create useDataSourceVersions and useDataSourceVersion query hooks.
  • 2. Version History UI: Add version list component with status, dates, and metrics summary.
  • 3. Version Selection UI: Allow users to select specific versions for grid targeting.
  • 4. Version Comparison UI: Show differences between data source versions (metrics, cell counts).

Phase 2: Connection OAuth & Live Data Setup

Section titled “Phase 2: Connection OAuth & Live Data Setup”
  • 5. OAuth Flow Infrastructure: Implement Google OAuth redirect handling and token storage.
  • 6. Connection Setup UI: Build platform-specific connection configuration forms.
  • 7. Connection Status Dashboard: Show sync status, last update, and error handling for live connections.
  • 8. Connection Testing: Add “Test Connection” functionality with real-time validation.
  • 9. SyncConnectionWorkflow: Implement workflow for scheduled data fetching from third-party APIs.
  • 10. Platform API Integrations: Complete Google Ads, Mailchimp, and Stripe API clients in Connection Actor.
  • 11. ProcessLiveDataSourceWorkflow: Create workflow for processing live data into versioned data sources.
  • 12. Sync Scheduling: Implement configurable sync frequency (daily, weekly, manual).
  • 13. Grid Auto-Update Logic: Automatically create pending grid versions when data sources update.
  • 14. Data Change Notifications: Alert users when live data sources have significant changes.
  • 15. Sync Error Handling: Implement retry logic, error notifications, and connection health monitoring.
  • 16. Performance Optimization: Add data source preview limits and large dataset handling.
  • 17. Deployment Configuration: Verify all workflows and actors are properly configured in wrangler.jsonc.
  • 18. Connection Migration Tools: Build tools to migrate existing connections to new flexible schema.
  • 19. Documentation & Testing: Complete API documentation and end-to-end testing.
  • 20. Monitoring & Observability: Add metrics, alerts, and dashboards for data source operations.

The following core features are production-ready:

  • ✅ CSV upload and processing with drag-and-drop UI
  • ✅ Data source and grid versioning with full API support
  • ✅ Workspace-scoped connection management
  • ✅ H3 cell aggregation and KV storage
  • ✅ Grid version management with auto-update settings
  • ✅ Connection Actor infrastructure with historical data storage


Custom Data Source Upload & Processing System

Section titled “Custom Data Source Upload & Processing System”

This document outlines the functional and technical requirements for enabling users to add their own data sources to the HoneyGrid for use in digital advertising location targeting filters. This feature expands targeting filters within the grid system using user-provided metrics (e.g., customer counts, revenue, leads, conversions, ROAS by cell). The sources of this data can be a static file uploaded by the user or a live data source from a third party that is updated periodically.

The connection architecture has been updated to use a flexible, workspace-scoped approach:

  • Connections moved to WorkspaceDO: No more global connection database - all connections stored in workspace-specific databases
  • Flexible auth storage: JSON fields support OAuth2, API keys, webhooks, and other auth mechanisms without schema changes
  • Nullable connection_id: Data sources have connection_id field where null = CSV upload, non-null = live connection
  • Platform-agnostic design: New third-party platforms don’t require schema migrations

The flexible schema supports multiple authentication mechanisms:

OAuth2 (Google Ads, Facebook, etc.)

{
"auth_type": "oauth2",
"auth_data": {
"access_token": "ya29.a0...",
"refresh_token": "1//04...",
"expires_at": "2024-01-15T10:00:00Z",
"scopes": ["https://www.googleapis.com/auth/adwords"]
},
"platform_data": {
"account_id": "987-654-3210",
"account_name": "Client Campaign Account"
}
}

API Key (Mailchimp, Stripe, etc.)

{
"auth_type": "api_key",
"auth_data": {
"api_key": "sk_live_...",
"server_prefix": "us19"
}
}

Users now choose connection type first when creating data sources:

  1. Select “CSV Upload” for static files (sets connection_id to null)
  2. Select existing third-party connection for live data sources
  3. Configure data-specific settings based on the chosen connection type

The aggregation phase transforms the raw CSV rows into a single, version-scoped JSON blob keyed by H3 cell. Processing is linear with the number of rows and all per-row operations are O(1).

  1. Every column in the uploaded dataset except the address columns (address1, address2, city, state, zip – plus common aliases) is treated as a metric that will be summed.

  2. Efficient H3 Cell Lookup (row ➜ h3 cell)

    Section titled “Efficient H3 Cell Lookup (row ➜ h3 cell)”

    To avoid array scans we build an indexed map once and reuse it during the metrics scan:

    a. Attach a query_id – while converting CSV rows to Address objects set query_id = rowIndex.

    b. Bulk geocode – call getH3AddressBulk(addressBatch, H3_RESOLUTION); each result includes the originating inputAddress.query_id.

    c. Map construction

    const idToCell = new Map<number, string>()
    for (const result of geocoded) {
    if (!result.error) idToCell.set(result.inputAddress.query_id!, result.h3Cell)
    }

    d. Lookup during aggregationconst cell = idToCell.get(rowIndex) ?? 'unknown'.

  3. For each row: • Retrieve its H3 cell via the map above.
    • For every metric column, sum into aggregated[cell][metric].
    • Round any decimal inputs with Math.round() to prevent FP drift.

  4. Regardless of the supplied columns, increment aggregated[cell].customerCount by 1 per row.

  5. After processing, write the aggregated object to KV:
    datasource:{dataSourceId}/version/{versionId}.

Example structure:

{
"8a2a1072b59ffff": {
"customerCount": 42,
"revenue": 123456,
"conversions": 319
},
"8a2a1072b597fff": {
"customerCount": 9,
"revenue": 7200,
"conversions": 22
}
}

This implementation plan uses Cloudflare Workflows as the primary engine for data processing. This approach simplifies the overall system by removing the need for a dedicated state-managing Durable Object and its internal database, relying instead on the inherent state management and resiliency features of Workflows.

  • Allow users to upload CSV data files with address and metric columns.
  • Allow users to connect to live data sources from third parties (e.g., Google Ads).
  • Geocode addresses from the data source into H3 cells.
  • Aggregate user-defined metrics by H3 cell.
  • Store the aggregated data in Workers KV.
  • Create a data_sources table in the WorkspaceDO schema to store metadata for each data source.
  • Make the aggregated data available as filters in the user’s targeting grids.
  • Support versioning of data sources to track changes over time.
  • Enable grid versioning to manage targeting configurations over time.
  • Provide automatic grid updates based on data source changes.

Data sources come in two types, with different versioning behaviors:

  • Single-version data sources
  • Each CSV upload creates a new data source with one version
  • No additional versions can be added
  • Immutable once processed
  • Support multiple versions as data changes over time
  • New versions created on data refresh or manual update
  • Track version history and allow version comparison
  • Users can select specific versions for targeting
  • All data sources use the same schema and storage pattern
  • Each data source has at least one version
  • Each version maintains its own:
    • Processing status and metadata
    • Aggregated data in KV (datasource:{dataSourceId}/version/{versionId})
  • The parent data source tracks its current active version
  • New versions can only be created for dynamic data sources
  • All versions go through the same processing workflow
  • Users can select specific versions when configuring grid targeting
  • The current version is used by default in grids

Grid versions provide a way to track and manage changes to targeting configurations over time, particularly as data sources are updated.

Each grid version contains:

  • Targeted areas (radius, zip code, county, carrier routes)
  • Targeting filters:
    • Demographics
    • Custom metrics
    • Manual inclusion filters
    • Manual exclusion filters
  • Data source references (pinned to specific versions)
  • Every grid starts with an initial version marked as “active”
  • Only one “pending” version can exist at a time
    • Represents the grid with latest versions of all data sources
    • Shows in UI as “new grid version available”
    • Updates automatically when data sources change
    • Users can promote pending version to active at any time
  • The parent Grid has an auto_update flag (boolean) that persists across versions
    • When enabled, pending version is automatically promoted when data sources change
  • Version cleanup
    • Versions are automatically deleted after 2 years of inactivity
    • Activity is defined as:
      • Making a version active
      • Updating a version’s configuration
    • Activity resets the 2-year retention clock

The architecture is centered around a CreateDataSourceVersionWorkflow. This workflow orchestrates the entire lifecycle of processing a data source file, from ingestion to notifying the grids. The workflow communicates directly with the WorkspaceDO via RPC (Remote Procedure Calls) to update status and retrieve information, eliminating the need for API callbacks or internal fetch handlers.

sequenceDiagram
participant UI
participant API (Hono)
participant R2
participant WorkspaceDO
participant CreateDataSourceVersionWorkflow
participant GeoCodingService
participant KV
participant GridDO
UI->>+API (Hono): 1. Upload CSV
API (Hono)->>+R2: 2. Stream file to storage
R2-->>-API (Hono): File path
API (Hono)->>+WorkspaceDO: 3. Create DataSource & Version records (RPC Call)
WorkspaceDO-->>-API (Hono): dataSourceId, versionId
API (Hono)->>+CreateDataSourceVersionWorkflow: 4. Start workflow(workspaceId, dataSourceId, versionId, r2Path)
CreateDataSourceVersionWorkflow-->>-API (Hono): workflowId
API (Hono)->>+WorkspaceDO: 5. Update Version with workflowId (RPC Call)
deactivate WorkspaceDO
activate CreateDataSourceVersionWorkflow
CreateDataSourceVersionWorkflow->>+R2: 6. Read CSV data
R2-->>-CreateDataSourceVersionWorkflow: Raw data rows
deactivate R2
CreateDataSourceVersionWorkflow->>+GeoCodingService: 7. Geocode addresses (RPC Call)
GeoCodingService-->>-CreateDataSourceVersionWorkflow: Enriched data with H3 cells
deactivate GeoCodingService
CreateDataSourceVersionWorkflow->>CreateDataSourceVersionWorkflow: 8. Aggregate metrics by H3 cell (sum custom metrics & customerCount)
CreateDataSourceVersionWorkflow->>+KV: 9. Store aggregated data with version key
deactivate KV
CreateDataSourceVersionWorkflow->>+WorkspaceDO: 10. getGridIdsForDataSource() (RPC Call)
WorkspaceDO-->>-CreateDataSourceVersionWorkflow: [gridId1, gridId2]
loop For each gridId
CreateDataSourceVersionWorkflow->>+GridDO: 11. updatePendingGridVersion() (RPC Call)
deactivate GridDO
end
alt On Success
CreateDataSourceVersionWorkflow->>+WorkspaceDO: 12a. updateVersionStatus('ready') (RPC Call)
else On Failure
CreateDataSourceVersionWorkflow->>+WorkspaceDO: 12b. updateVersionStatus('failed') (RPC Call)
end
deactivate CreateDataSourceVersionWorkflow
deactivate WorkspaceDO
deactivate API (Hono)

The schema has been updated to support the unified connection model with nullable connection_id and flexible connection storage.

  • File to Modify: packages/honeygrid-types/schema/WorkspaceDOSchema.ts
  • Action: Add the Connections table and update DataSources with nullable connection_id. Move version-specific fields to DataSourceVersions.
packages/honeygrid-types/schema/WorkspaceDOSchema.ts
// New flexible Connections table
export const Connections = sqliteTable('connections', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
workspace_id: text('workspace_id').notNull(),
name: text('name').notNull(),
platform: text('platform').notNull(), // 'google_ads', 'mailchimp', 'stripe', etc.
auth_type: text('auth_type', {
enum: ['oauth2', 'api_key', 'basic_auth', 'jwt', 'webhook_secret'],
}).notNull(),
status: text('status', {
enum: ['pending', 'active', 'inactive', 'expired', 'error'],
}).notNull(),
// Flexible auth storage (encrypted JSON)
auth_data: text('auth_data', { mode: 'json' }).notNull(),
platform_data: text('platform_data', { mode: 'json' }),
settings: text('settings', { mode: 'json' }),
// Audit fields
created_by: text('created_by').notNull(),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
last_synced_at: text('last_synced_at'),
expires_at: text('expires_at'),
})
// Updated DataSources - clean separation of concerns
export const DataSources = sqliteTable('data_sources', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
workspace_id: text('workspace_id').notNull(),
name: text('name').notNull(),
type: text('type', { enum: ['csv', 'google_ads'] }).notNull(),
status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }).notNull(),
status_message: text('status_message'),
total_cells: integer('total_cells').default(0).notNull(),
file_path: text('file_path'),
connection_id: integer('connection_id').references(() => Connections.id), // Nullable - null = CSV
current_version_id: integer('current_version_id'),
workflow_id: text('workflow_id'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
completed_at: text('completed_at'),
})
// Version-specific fields moved here
export const DataSourceVersions = sqliteTable('data_source_versions', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
data_source_id: integer('data_source_id')
.notNull()
.references(() => DataSources.id),
version_number: integer('version_number').notNull(),
file_path: text('file_path'),
status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] })
.notNull()
.default('pending'),
status_message: text('status_message'),
total_cells: integer('total_cells').default(0).notNull(),
workflow_id: text('workflow_id'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
completed_at: text('completed_at'),
metadata: text('metadata', { mode: 'json' }),
})
// Grid versioning support
export const GridVersions = sqliteTable('grid_versions', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
grid_id: integer('grid_id')
.notNull()
.references(() => Grids.id),
name: text('name').notNull(),
version_number: integer('version_number').notNull(),
status: text('status', { enum: ['active', 'pending', 'archived'] }).notNull(),
households: integer('households'),
targeted_households: integer('targeted_households'),
targeting_type: text('targeting_type'),
targeting_coverage: integer('targeting_coverage'),
targeted_areas: text('targeted_areas', { mode: 'json' }),
targeting_filters: text('targeting_filters', { mode: 'json' }),
manual_inclusion_filters: text('manual_inclusion_filters', { mode: 'json' }),
manual_exclusion_filters: text('manual_exclusion_filters', { mode: 'json' }),
last_used_at: text('last_used_at').notNull().default(new Date().toISOString()),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
})
// JOIN TABLE grid_version_data_sources
export const GridVersionDataSources = sqliteTable(
'grid_version_data_sources',
{
id: integer('id').primaryKey({ autoIncrement: true }),
grid_version_id: integer('grid_version_id')
.notNull()
.references(() => GridVersions.id),
data_source_id: integer('data_source_id')
.notNull()
.references(() => DataSources.id),
data_source_version_id: integer('data_source_version_id')
.notNull()
.references(() => DataSourceVersions.id),
},
(t) => [index('gvd_idx').on(t.grid_version_id, t.data_source_id)]
)
// Updated Grids table
export const Grids = sqliteTable('grids', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
name: text('name').notNull(),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
status: text('status').notNull(),
auto_update: integer('auto_update', { mode: 'boolean' }).notNull().default(false),
current_version_id: integer('current_version_id'),
pending_version_id: integer('pending_version_id'),
})

The API will handle the full lifecycle of a data source resource.

  • File to Create: apps/api/src/routes/dataSources.ts
  • Actions: Implement the following RESTful endpoints:
    • POST /api/data-sources: Creates a new DataSource record in the WorkspaceDO, streams the file to R2, and starts the CreateDataSourceVersionWorkflow. It will save the workflowId back to the record.
    • GET /api/data-sources: Retrieves a list of all data sources for the workspace.
    • GET /api/data-sources/:id: Retrieves the details for a single data source.
    • GET /api/data-sources/:id/data: Retrieves the processed data for a data source from the KV cache.
    • DELETE /api/data-sources/:id: Deletes a data source.

To minimize client round-trips callers can opt-in to embed related records using an include= query param.

Examples:

GET /api/grids?include=currentVersion
GET /api/data-sources?include=currentVersion

Multiple expansions can be requested with a comma-separated list:

GET /api/grids?include=currentVersion,pendingVersion

When include is omitted (or empty) the API returns slim parent rows only.

Future expansions will follow the same pattern (include=dataSourceVersions, include=latestStats, …).

3. Data Processing Workflow (/workflows/create-data-source-version.ts)

Section titled “3. Data Processing Workflow (/workflows/create-data-source-version.ts)”

This workflow becomes the heart of the processing logic, using typed RPC stubs to communicate with Durable Objects.

  • File: apps/api/src/workflows/create-data-source-version.tsImplemented
  • Action: Define the workflow class, its parameters, and the run method containing the processing steps.
apps/api/src/workflows/ProcessDataSourceWorkflow.ts
import { WorkflowEntrypoint, WorkflowStep } from 'cloudflare:workers'
import type { WorkspaceDO } from '../durable-objects/WorkspaceDO' // Assumes type export
// Define the payload the workflow expects
type ProcessDataSourceParams = {
dataSourceId: number
workspaceId: string
r2Path: string
}
// Define the Env to include the typed WorkspaceDO binding
interface Env {
WORKSPACES: DurableObjectNamespace<WorkspaceDO>
// ... other bindings
}
export class ProcessDataSourceWorkflow extends WorkflowEntrypoint<Env, ProcessDataSourceParams> {
// Helper to get the WorkspaceDO stub
private getWorkspaceDO(workspaceId: string): DurableObjectStub<WorkspaceDO> {
const doId = this.env.WORKSPACES.idFromName(workspaceId)
return this.env.WORKSPACES.get(doId)
}
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { dataSourceId, workspaceId, r2Path } = event.payload
const workspaceDO = this.getWorkspaceDO(workspaceId)
try {
// Step 1: Ingest data from R2
const rawData = await step.do('ingest-csv-from-r2', async () => {
/* ... */ return []
})
// Step 2: Geocode addresses
const geocodedData = await step.do('geocode-addresses', async () => {
/* ... */ return []
})
// Step 3: Aggregate metrics by H3 cell
const aggregatedData = await step.do('aggregate-metrics', async () => {
/* ... */ return {}
})
// Step 4: Cache aggregated data in Workers KV
await step.do('cache-in-kv', async () => {
await this.env.DATA_SOURCES_KV.put(
`datasource:${dataSourceId}`,
JSON.stringify(aggregatedData)
)
})
// Step 5: Notify associated GridDOs
await step.do('notify-grids', async () => {
const gridIds = await workspaceDO.getGridIdsForDataSource(dataSourceId)
// For each grid ID, get the GridDO stub and call `dataSourcesUpdated()`
})
// Step 6: Update status to 'ready' via RPC
await step.do('set-status-ready', () =>
workspaceDO.updateDataSourceStatus({ dataSourceId, status: 'ready' })
)
} catch (error) {
// If any step fails, update status to 'failed' via RPC
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred'
await step.do('set-status-failed', () =>
workspaceDO.updateDataSourceStatus({
dataSourceId,
status: 'failed',
message: errorMessage,
})
)
// Re-throw the error to ensure the workflow itself is marked as failed.
throw error
}
}
}

To support this workflow-driven process, the WorkspaceDO must expose a clear, public RPC interface for other services to call.

  • File to Modify: apps/api/src/durable-objects/WorkspaceDO.ts
  • Actions: Add the following public methods to the WorkspaceDO class.
    • updateDataSourceStatus(args: { dataSourceId: string, status: 'ready' | 'failed', message?: string }): Updates the record in its internal database.
    • getGridIdsForDataSource(dataSourceId: string): Promise<string[]>: Returns a list of gridIds that use a specific data source.
    • createDataSource(...): A method to create the initial data source record.
    • updateDataSourceWorkflowId(...): A method to associate the workflow instance with the data source record.
  • File to Modify: apps/api/src/durable-objects/GridDO.ts
  • Action: Ensure a public dataSourcesUpdated(dataSourceIds: string[]) method exists. This method will fetch the corresponding data from Workers KV and re-materialize the grid’s internal cell data.

  • Task: Create Tanstack Query hooks for interacting with the new API.
  • Files to Create:
    • apps/web-app/src/queries/data-sources.ts
  • Actions:
    1. Create useDataSources, useDataSource, useCreateDataSource, and useDeleteDataSource hooks.
    2. Implement the multipart upload logic, likely in a dedicated function called by a mutation.
  • Testing:
    • Unit tests for the query hooks, mocking the fetch API to simulate server responses and verify correct state management (isLoading, isError, data).
  • Task: Create the file dropzone component for CSV uploads.
  • Files to Create:
    • apps/web-app/src/components/DataSources/CreateDataSourceDialog.tsx
    • apps/web-app/src/components/DataSources/FileUploadDropzone.tsx
  • Actions:
    1. The dialog will orchestrate the creation flow, allowing the user to name the data source.
    2. Implement the FileUploadDropzone based on Flowbite/shadcn examples. This component will handle the client-side multipart upload to R2.
    3. Upon successful upload, the useCreateDataSource mutation will be called to kick off the backend processing workflow.
  • Testing:
    • Component tests to verify file selection, drag-and-drop, and that the upload function is called correctly on user interaction.
  • Task: Develop the primary interface for users to view and manage their data sources.
  • Files to Create:
    • apps/web-app/src/routes/data-sources.tsx (New route)
    • apps/web-app/src/components/DataSources/DataSourcesList.tsx
    • apps/web-app/src/components/DataSources/DataSourceCard.tsx
    • apps/web-app/src/components/DataSources/DataSourceDetails.tsx
    • apps/web-app/src/components/DataSources/DataSourceDataTable.tsx
  • Actions:
    1. Use the existing SidePanel layout for the page structure.
    2. DataSourcesList: Fetch and display a list of sources using the DataSourceCard. The card should show Name, Status, Last Updated, and Type.
    3. DataSourceDetails: Show the selected source’s metadata. This view should contain a DataSourceDataTable.
    4. DataSourceDataTable: Use tanstack-table and shadcn/ui Table components to create a reusable, paginated, and sortable data table. This table will display the processed data for the data source by fetching it from a new API endpoint that reads from the KV cache (GET /api/data-sources/:id/data).
  • Testing:
    • Write Storybook stories or Vitest component tests for each new UI component to ensure correct rendering with mock data.

File: packages/honeygrid-types/schema/WorkspaceDOSchema.ts

export const DataSourceVersions = sqliteTable('data_source_versions', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
data_source_id: integer('data_source_id')
.notNull()
.references(() => DataSources.id),
version_number: integer('version_number').notNull(),
file_path: text('file_path'),
status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] })
.notNull()
.default('pending'),
status_message: text('status_message'),
total_cells: integer('total_cells').default(0).notNull(),
workflow_id: text('workflow_id'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
completed_at: text('completed_at'),
metadata: text('metadata', { mode: 'json' }),
})
export const DataSources = sqliteTable('data_sources', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
workspace_id: text('workspace_id').notNull(),
name: text('name').notNull(),
type: text('type', { enum: ['csv', 'google_ads'] }).notNull(),
status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }).notNull(),
status_message: text('status_message'),
total_cells: integer('total_cells').default(0).notNull(),
file_path: text('file_path'),
connection_id: integer('connection_id').references(() => Connections.id),
current_version_id: integer('current_version_id'),
workflow_id: text('workflow_id'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
completed_at: text('completed_at'),
})

File: packages/honeygrid-types/validators-and-types/DataSourceTypes.ts

export const DataSourceTypeValidator = z.enum(['csv', 'google_ads'])
export const DataSourceStatusValidator = z.enum(['processing', 'failed', 'ready', 'pending'])
export const BaseDataSourceValidator = z.object({
id: z.number(),
uuid: z.string(),
workspace_id: z.string(),
name: z.string(),
type: DataSourceTypeValidator,
status: DataSourceStatusValidator,
status_message: z.string().nullable(),
total_cells: z.number().int().default(0),
file_path: z.string().nullable(),
connection_id: z.number().nullable(),
current_version_id: z.number().nullable(),
workflow_id: z.string().nullable(),
created_at: z.iso.datetime(),
updated_at: z.iso.datetime(),
completed_at: z.string().nullable(),
})
export const DataSourceVersionValidator = z.object({
id: z.number(),
uuid: z.string(),
data_source_id: z.number(),
version_number: z.number().int(),
file_path: z.string().nullable(),
status: DataSourceStatusValidator,
status_message: z.string().nullable(),
total_cells: z.number().int().default(0),
workflow_id: z.string().nullable(),
created_at: z.iso.datetime(),
updated_at: z.iso.datetime(),
completed_at: z.iso.datetime().nullable(),
metadata: z.record(z.string(), z.unknown()).nullable(),
})
export const CreateDataSourceValidator = z.object({
name: z.string(),
type: DataSourceTypeValidator,
connection_id: z.number().nullable(),
})
export type DataSource = z.infer<typeof BaseDataSourceValidator>
export type DataSourceVersion = z.infer<typeof DataSourceVersionValidator>
export type CreateDataSource = z.infer<typeof CreateDataSourceValidator>

File: packages/honeygrid-types/schema/WorkspaceDOSchema.ts

export const GridVersionStatusValidator = z.enum(['active', 'pending', 'archived'])
export const GridVersions = sqliteTable('grid_versions', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
grid_id: integer('grid_id')
.notNull()
.references(() => Grids.id),
name: text('name').notNull(),
version_number: integer('version_number').notNull(),
status: text('status', { enum: ['active', 'pending', 'archived'] }).notNull(),
households: integer('households'),
targeted_households: integer('targeted_households'),
targeting_type: text('targeting_type'),
targeting_coverage: integer('targeting_coverage'),
targeted_areas: text('targeted_areas', { mode: 'json' }),
targeting_filters: text('targeting_filters', { mode: 'json' }),
manual_inclusion_filters: text('manual_inclusion_filters', { mode: 'json' }),
manual_exclusion_filters: text('manual_exclusion_filters', { mode: 'json' }),
last_used_at: text('last_used_at').notNull().default(new Date().toISOString()),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
})
export const GridVersionDataSources = sqliteTable(
'grid_version_data_sources',
{
id: integer('id').primaryKey({ autoIncrement: true }),
grid_version_id: integer('grid_version_id')
.notNull()
.references(() => GridVersions.id),
data_source_id: integer('data_source_id')
.notNull()
.references(() => DataSources.id),
data_source_version_id: integer('data_source_version_id')
.notNull()
.references(() => DataSourceVersions.id),
},
(t) => [index('gvd_idx').on(t.grid_version_id, t.data_source_id)]
)
// Modify existing Grids table
export const Grids = sqliteTable('grids', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
name: text('name').notNull(),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),
status: text('status').notNull(),
auto_update: integer('auto_update', { mode: 'boolean' }).notNull().default(false),
current_version_id: integer('current_version_id'),
pending_version_id: integer('pending_version_id'),
})

File: packages/honeygrid-types/validators-and-types/GridTypes.ts

export const GridVersionStatusValidator = z.enum(['active', 'pending', 'archived'])
export const GridVersionValidator = z.object({
id: z.number(),
uuid: z.string(),
grid_id: z.number(),
name: z.string(),
version_number: z.number().int(),
status: GridVersionStatusValidator,
households: z.number().nullable(),
targeted_households: z.number().nullable(),
targeting_type: z.string().nullable(),
targeting_coverage: z.number().nullable(),
targeted_areas: TargetedAreasValidator,
targeting_filters: TargetingFilterValidator,
manual_inclusion_filters: z.array(z.string()),
manual_exclusion_filters: z.array(z.string()),
last_used_at: z.string(),
created_at: z.iso.datetime(),
updated_at: z.iso.datetime(),
})
export const GridVersionDataSourceValidator = z.object({
id: z.number(),
grid_version_id: z.number(),
data_source_id: z.number(),
data_source_version_id: z.number(),
})
// Update existing GridValidator
export const GridValidator = z.object({
id: z.number(),
uuid: z.string(),
name: z.string(),
created_at: z.iso.datetime(),
updated_at: z.iso.datetime(),
status: z.string(),
auto_update: z.boolean(),
current_version_id: z.number().nullable(),
pending_version_id: z.number().nullable(),
})
export type GridVersion = z.infer<typeof GridVersionValidator>
export type GridVersionStatus = z.infer<typeof GridVersionStatusValidator>
export type GridVersionDataSource = z.infer<typeof GridVersionDataSourceValidator>

File: apps/api/src/durable-objects/WorkspaceDO.ts

// New methods to add:
- createDataSourceVersion(dataSourceId: number, data: Partial<typeof DataSourceVersions.$inferSelect>)
- updateDataSourceVersion(versionId: number, data: Partial<typeof DataSourceVersions.$inferSelect>)
- getDataSourceVersions(dataSourceId: number)
- getDataSourceVersion(versionId: number)

File: apps/api/src/routes/dataSources.ts

// New endpoints to add:
- POST /api/data-sources/:id/versions - Create new version
- GET /api/data-sources/:id/versions - List versions
- GET /api/data-sources/:id/versions/:versionId - Get version details
- GET /api/data-sources/:id/versions/:versionId/data - Get version data

File: apps/api/src/workflows/create-data-source-version.ts

// Update KV key format:
const kvKey = `datasource:${dataSourceId}/version/${versionId}`

File: apps/api/src/durable-objects/GridDO.ts

// New methods to add:
- createGridVersion(data: Partial<typeof GridVersions.$inferSelect>)
- updateGridVersion(versionId: number, data: Partial<typeof GridVersions.$inferSelect>)
- getGridVersions()
- getGridVersion(versionId: number)
- promoteVersionToActive(versionId: number)
- updatePendingGridVersion()
- cleanupOldVersions() // Deletes versions not used in 2 years

File: apps/api/src/durable-objects/GridDO.ts

// Update GridState type:
export type GridState = {
gridId: string
workspaceId: string
currentVersionId: number | null
pendingVersionId: number | null
autoUpdate: boolean
}

File: apps/api/src/routes/grids.ts

// New endpoints to add:
- POST /api/grids/:id/versions - Create new version
- GET /api/grids/:id/versions - List versions
- GET /api/grids/:id/versions/:versionId - Get version details
- POST /api/grids/:id/versions/:versionId/promote - Promote version to active
- PUT /api/grids/:id/auto-update - Toggle auto-update setting

File: apps/api/src/workflows/create-data-source-version.ts

// Update workflow to:
1. Create/update data source version
2. Store processed data with version-specific KV key
3. Notify grids to update their pending versions

File: apps/api/src/workflows/cleanup-grid-versions-workflow.ts

// New workflow to:
1. Query for grid versions not used in 730 days (2 years)
2. Delete old versions and their associated data
3. Run on a daily schedule

Dynamic data sources (like Google Ads) require active management to keep their data current. This uses a hybrid approach with Connection Actors for data storage and scheduling, while keeping connection metadata in WorkspaceDO for clean joins and permissions.

Connection Metadata in WorkspaceDO:

  • Connection auth credentials, settings, and permissions
  • References to data sources and grids
  • Connection health status and last sync timestamps
  • Enables clean SQL joins across workspace data

Connection Actors for Data Management:

  • Scheduled data fetching using Durable Object alarms
  • Historical data storage in Actor’s SQLite database
  • Daily snapshots for aggregation queries (“12 month conversions”)
  • RPC methods for data retrieval and management
sequenceDiagram
participant ConnectionActor
participant ThirdPartyAPI
participant WorkspaceDO
participant ProcessLiveDataSourceWorkflow
participant KV
participant GridDO
ConnectionActor->>ConnectionActor: 1. Daily alarm fires (scheduled)
ConnectionActor->>+WorkspaceDO: 2. getConnectionAuth(connectionId) (RPC)
WorkspaceDO-->>-ConnectionActor: Auth credentials & settings
ConnectionActor->>+ThirdPartyAPI: 3. Fetch latest data (using auth)
ThirdPartyAPI-->>-ConnectionActor: Raw data
ConnectionActor->>ConnectionActor: 4. Store daily snapshot in local SQLite
ConnectionActor->>+WorkspaceDO: 5. getDataSourcesForConnection(connectionId) (RPC)
WorkspaceDO-->>-ConnectionActor: [dataSourceId1, dataSourceId2]
loop For each dataSourceId
ConnectionActor->>+ProcessLiveDataSourceWorkflow: 6. Start workflow(dataSourceId, connectionId)
end
activate ProcessLiveDataSourceWorkflow
ProcessLiveDataSourceWorkflow->>+ConnectionActor: 7. getLatestSnapshot() (RPC)
ConnectionActor-->>-ProcessLiveDataSourceWorkflow: Processed data
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 8. Create new DataSourceVersion (RPC)
WorkspaceDO-->>-ProcessLiveDataSourceWorkflow: versionId
ProcessLiveDataSourceWorkflow->>+KV: 9. Store aggregated data with version key
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 10. getGridIdsForDataSource()
WorkspaceDO-->>-ProcessLiveDataSourceWorkflow: [gridId1, gridId2]
loop For each gridId
ProcessLiveDataSourceWorkflow->>+GridDO: 11. updatePendingGridVersion()
end
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 12. updateVersionStatus('ready')
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 13. updateConnectionSyncStatus(connectionId, 'synced')
deactivate ProcessLiveDataSourceWorkflow

✅ Status: Core Connection Actor is implemented with basic sync functionality. Platform-specific integrations are partially implemented.

Using the @cloudflare/actors library with a composition-based approach for platform-specific functionality. This approach uses a single ConnectionActor class that handles all platforms through strategy pattern methods, avoiding the operational complexity of multiple Durable Object classes.

File: apps/api/src/durable-objects/ConnectionActor.ts

import { Actor, handler, Persist } from '@cloudflare/actors'
import { SyncSnapshots, ErrorLogs, SyncHistory } from 'honeygrid-types/schema/ConnectionActorSchema'
import { and, count, desc, eq, gte, lte, sum } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/durable-sqlite'
import { migrate } from 'drizzle-orm/durable-sqlite/migrator'
// @ts-ignore
import migrations from '../../ConnectionActordrizzleMigrations/migrations.js'
import type { DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite'
/**
* Connection Actor manages live data sources with daily snapshots and historical aggregation
* Each actor instance represents one connection and handles:
* - Scheduled data fetching using alarms
* - Historical data storage in SQLite
* - Daily snapshots for aggregation queries
* - RPC methods for data retrieval and management
*/
export class ConnectionActor extends Actor<Env> {
#db: DrizzleSqliteDODatabase<any>
#connectionId: string
// Configuration stored using @Persist decorator
@Persist
public syncFrequencyHours: number = 24
@Persist
public syncEnabled: boolean = true
@Persist
public lastSuccessfulSync: string | null = null
@Persist
public nextScheduledSync: string | null = null
@Persist
public platformSettings: Record<string, any> = {}
constructor(ctx?: any, env?: Env) {
super(ctx, env)
this.#connectionId = this.identifier || 'unknown'
this.#db = drizzle(this.ctx.storage, {
schema: {
SyncSnapshots,
SyncHistory,
ErrorLogs,
},
})
// Ensure database is migrated before processing requests
this.ctx.blockConcurrencyWhile(async () => {
await this.migrateDb()
})
}
/**
* Migrates the Connection Actor database using Drizzle migrations
*/
private async migrateDb() {
try {
await migrate(this.#db, migrations)
} catch (error) {
console.error(`Error migrating connection actor database for ${this.#connectionId}:`, error)
throw error
}
}
/**
* RPC method to get connection status
*/
public async getConnectionStatus(): Promise<{
connection_id: string
last_snapshot: string | null
status: string
total_snapshots: number
}> {
const [lastSnapshot] = await this.#db
.select({ last_snapshot: SyncSnapshots.snapshot_date })
.from(SyncSnapshots)
.orderBy(desc(SyncSnapshots.snapshot_date))
.limit(1)
const [totalCount] = await this.#db.select({ count: count() }).from(SyncSnapshots)
return {
connection_id: this.#connectionId,
last_snapshot: lastSnapshot?.last_snapshot || null,
status: 'active',
total_snapshots: totalCount?.count || 0,
}
}
/**
* Initialize connection configuration and schedule first sync
*/
public async initializeConnection(syncFrequencyHours: number = 24): Promise<void> {
console.log(`Initializing connection actor: ${this.#connectionId}`)
// Set configuration using @Persist properties
this.syncFrequencyHours = syncFrequencyHours
this.syncEnabled = true
this.nextScheduledSync = new Date(
Date.now() + syncFrequencyHours * 60 * 60 * 1000
).toISOString()
// Schedule first sync
this.alarms.schedule(60, 'performDailySync', []) // Start in 1 minute for immediate testing
}
/**
* Scheduled method called by daily alarm to fetch and store data
*/
public async performDailySync(): Promise<void> {
console.log(`Daily sync triggered for connection: ${this.#connectionId}`)
const syncId = `${this.#connectionId}-${Date.now()}`
const syncStart = new Date().toISOString()
const today = new Date().toISOString().split('T')[0]
try {
// Record sync start
await this.#db.insert(SyncHistory).values({
id: syncId,
sync_date: today,
status: 'in_progress',
started_at: syncStart,
})
// Get connection auth from WorkspaceDO
const workspaceDO = this.getWorkspaceDO()
const authData = await workspaceDO.getConnectionAuth(this.#connectionId)
// Fetch data from third-party API
const rawData = await this.fetchFromThirdParty(authData)
// Process and store daily snapshot
const recordsProcessed = await this.storeDailySnapshot(rawData, today)
// Update sync history with success
await this.#db
.update(SyncHistory)
.set({
status: 'success',
records_processed: recordsProcessed,
completed_at: new Date().toISOString(),
sync_duration_ms: Date.now() - new Date(syncStart).getTime(),
})
.where(eq(SyncHistory.id, syncId))
// Trigger data source version creation workflows
const dataSourceIds = await workspaceDO.getDataSourcesForConnection(this.#connectionId)
await this.triggerDataSourceUpdates(dataSourceIds)
// Update sync status in WorkspaceDO
await workspaceDO.updateConnectionSyncStatus(this.#connectionId, 'active')
// Update connection config with successful sync
this.lastSuccessfulSync = new Date().toISOString()
this.nextScheduledSync = new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString() // Next day
// Schedule next sync (24 hours)
this.alarms.schedule(24 * 60 * 60, 'performDailySync', [])
} catch (error) {
console.error(`Sync failed for connection ${this.#connectionId}:`, error)
// Update sync history with failure
await this.#db
.update(SyncHistory)
.set({
status: 'failed',
error_message: (error as Error).message,
completed_at: new Date().toISOString(),
sync_duration_ms: Date.now() - new Date(syncStart).getTime(),
})
.where(eq(SyncHistory.id, syncId))
// Log detailed error
await this.#db.insert(ErrorLogs).values({
id: `${syncId}-error`,
sync_history_id: syncId,
error_type: 'sync_failed',
error_message: (error as Error).message,
error_details: { stack: (error as Error).stack },
})
// Update WorkspaceDO with error status
const workspaceDO = this.getWorkspaceDO()
await workspaceDO.updateConnectionSyncStatus(this.#connectionId, 'error')
// Retry in 1 hour on failure
this.alarms.schedule(60 * 60, 'performDailySync', [])
}
}
/**
* RPC method to get latest snapshot data for workflow processing
*/
public async getLatestSnapshot(): Promise<Record<string, Record<string, number>>> {
const today = new Date().toISOString().split('T')[0]
const results = await this.#db
.select({
h3_cell: SyncSnapshots.h3_cell,
metric_name: SyncSnapshots.metric_name,
metric_value: SyncSnapshots.metric_value,
})
.from(SyncSnapshots)
.where(eq(SyncSnapshots.snapshot_date, today))
// Transform to nested object: { h3Cell: { metricName: value } }
const data: Record<string, Record<string, number>> = {}
for (const row of results) {
if (!data[row.h3_cell]) data[row.h3_cell] = {}
data[row.h3_cell][row.metric_name] = row.metric_value
}
return data
}
/**
* RPC method to get historical aggregated data for specific metrics and time ranges
*/
public async getHistoricalAggregation(
h3Cell: string,
metric: string,
startDate: string,
endDate: string
): Promise<number> {
const [result] = await this.#db
.select({ total: sum(SyncSnapshots.metric_value) })
.from(SyncSnapshots)
.where(
and(
eq(SyncSnapshots.h3_cell, h3Cell),
eq(SyncSnapshots.metric_name, metric),
gte(SyncSnapshots.snapshot_date, startDate),
lte(SyncSnapshots.snapshot_date, endDate)
)
)
return Number(result?.total) || 0
}
/**
* Store daily snapshot data in local SQLite using Drizzle ORM
*/
private async storeDailySnapshot(rawData: any, snapshotDate: string): Promise<number> {
// Process raw data into H3 cell aggregations
const aggregatedData = await this.processRawDataToH3(rawData)
let recordsProcessed = 0
// Store each cell's metrics
for (const [h3Cell, metrics] of Object.entries(aggregatedData)) {
for (const [metricName, value] of Object.entries(metrics as Record<string, number>)) {
await this.#db
.insert(SyncSnapshots)
.values({
id: `${h3Cell}-${metricName}-${snapshotDate}`,
h3_cell: h3Cell,
metric_name: metricName,
metric_value: value,
snapshot_date: snapshotDate,
raw_data: rawData, // Store original response for debugging
})
.onConflictDoUpdate({
target: [SyncSnapshots.h3_cell, SyncSnapshots.metric_name, SyncSnapshots.snapshot_date],
set: { metric_value: value, raw_data: rawData },
})
recordsProcessed++
}
}
return recordsProcessed
}
/**
* Process raw API data into H3 cell aggregations
* This method uses composition to handle different platforms
*/
private async processRawDataToH3(rawData: any): Promise<Record<string, Record<string, number>>> {
switch (rawData.platform) {
case 'google_ads':
return this.processGoogleAdsData(rawData)
case 'mailchimp':
return this.processMailchimpData(rawData)
case 'stripe':
return this.processStripeData(rawData)
default:
console.warn(`processRawDataToH3 not implemented for platform ${rawData.platform}`)
return this.processDefaultMockData(rawData)
}
}
/**
* Google Ads specific data processing
*/
private async processGoogleAdsData(
rawData: any
): Promise<Record<string, Record<string, number>>> {
const aggregated: Record<string, Record<string, number>> = {}
// Process Google Ads location reports
for (const location of rawData.location_data || []) {
if (location.geographic_view?.postal_code) {
// Convert postal code to H3 cell
const h3Cell = await this.env.GEOCODING_SERVICE.zipCodeToH3(
location.geographic_view.postal_code
)
if (!aggregated[h3Cell]) aggregated[h3Cell] = {}
aggregated[h3Cell].conversions =
(aggregated[h3Cell].conversions || 0) + (location.metrics.conversions || 0)
aggregated[h3Cell].impressions =
(aggregated[h3Cell].impressions || 0) + (location.metrics.impressions || 0)
aggregated[h3Cell].clicks =
(aggregated[h3Cell].clicks || 0) + (location.metrics.clicks || 0)
aggregated[h3Cell].spend =
(aggregated[h3Cell].spend || 0) + (location.metrics.cost_micros / 1000000 || 0)
}
}
return aggregated
}
/**
* Mailchimp specific data processing
*/
private async processMailchimpData(
rawData: any
): Promise<Record<string, Record<string, number>>> {
const aggregated: Record<string, Record<string, number>> = {}
// Process Mailchimp member data with addresses
for (const member of rawData.members || []) {
if (member.merge_fields?.ADDRESS) {
const address = `${member.merge_fields.ADDRESS.addr1}, ${member.merge_fields.ADDRESS.city}, ${member.merge_fields.ADDRESS.state} ${member.merge_fields.ADDRESS.zip}`
// Convert address to H3 cell
const h3Cell = await this.env.GEOCODING_SERVICE.addressToH3(address)
if (!aggregated[h3Cell]) aggregated[h3Cell] = {}
aggregated[h3Cell].subscribers = (aggregated[h3Cell].subscribers || 0) + 1
aggregated[h3Cell].opens =
(aggregated[h3Cell].opens || 0) + (member.stats?.avg_open_rate || 0)
aggregated[h3Cell].clicks =
(aggregated[h3Cell].clicks || 0) + (member.stats?.avg_click_rate || 0)
}
}
return aggregated
}
/**
* Stripe specific data processing
*/
private async processStripeData(rawData: any): Promise<Record<string, Record<string, number>>> {
const aggregated: Record<string, Record<string, number>> = {}
// Process Stripe customer/charge data with billing addresses
for (const charge of rawData.charges || []) {
if (charge.billing_details?.address) {
const address = `${charge.billing_details.address.line1}, ${charge.billing_details.address.city}, ${charge.billing_details.address.state} ${charge.billing_details.address.postal_code}`
// Convert address to H3 cell
const h3Cell = await this.env.GEOCODING_SERVICE.addressToH3(address)
if (!aggregated[h3Cell]) aggregated[h3Cell] = {}
aggregated[h3Cell].revenue = (aggregated[h3Cell].revenue || 0) + charge.amount / 100 // Convert cents to dollars
aggregated[h3Cell].transactions = (aggregated[h3Cell].transactions || 0) + 1
}
}
return aggregated
}
/**
* Default mock data processing for testing and unsupported platforms
*/
private async processDefaultMockData(
rawData: any
): Promise<Record<string, Record<string, number>>> {
// Return mock data to demonstrate the structure
// In a real implementation, this would:
// 1. Extract addresses from rawData based on platform format
// 2. Use GeoCodingService to convert to H3 cells
// 3. Aggregate metrics by cell
console.warn(`processRawDataToH3 not implemented for connection ${this.#connectionId}`)
return {
'8a2a1072b59ffff': {
conversions: 42,
impressions: 1500,
clicks: 89,
spend: 125.5,
},
'8a2a1072b5affff': {
conversions: 18,
impressions: 890,
clicks: 45,
spend: 67.25,
},
}
}
/**
* Fetch data from third-party API using stored auth credentials
* This method uses composition to handle different platforms
*/
private async fetchFromThirdParty(authData: any): Promise<any> {
switch (authData.platform) {
case 'google_ads':
return this.fetchFromGoogleAds(authData)
case 'mailchimp':
return this.fetchFromMailchimp(authData)
case 'stripe':
return this.fetchFromStripe(authData)
default:
console.warn(`fetchFromThirdParty not implemented for platform ${authData.platform}`)
return this.fetchMockData(authData)
}
}
/**
* Google Ads API integration
*/
private async fetchFromGoogleAds(authData: any): Promise<any> {
// Initialize Google Ads API client
const adsApi = new GoogleAds({
client_id: authData.auth_data.client_id,
client_secret: authData.auth_data.client_secret,
refresh_token: authData.auth_data.refresh_token,
})
const accountId = authData.platform_data.account_id
try {
// Fetch campaign data
const campaigns = await adsApi.campaigns.list({ customer_id: accountId })
// Fetch geographic performance report
const locationReports = await adsApi.reports.generate({
customer_id: accountId,
query: `
SELECT
campaign.id,
geographic_view.location_type,
geographic_view.postal_code,
metrics.impressions,
metrics.conversions,
metrics.clicks,
metrics.cost_micros
FROM geographic_view
WHERE segments.date DURING TODAY
AND geographic_view.location_type = 'POSTAL_CODE'
`,
})
return {
platform: 'google_ads',
account_id: accountId,
campaigns: campaigns.data,
location_data: locationReports.data,
fetched_at: new Date().toISOString(),
}
} catch (error) {
console.error(`Google Ads API error for connection ${this.#connectionId}:`, error)
throw new Error(`Google Ads fetch failed: ${(error as Error).message}`)
}
}
/**
* Mailchimp API integration
*/
private async fetchFromMailchimp(authData: any): Promise<any> {
const mailchimp = new MailchimpApi(authData.auth_data.api_key)
const serverPrefix = authData.auth_data.server_prefix
try {
// Fetch all lists
const lists = await mailchimp.lists.getAllLists()
// Fetch member data with addresses for all lists
const allMembers = []
for (const list of lists.lists) {
const members = await mailchimp.lists.getListMembersInfo(list.id, {
fields: ['members.email_address', 'members.merge_fields', 'members.stats'],
count: 1000, // Adjust based on needs
})
allMembers.push(...members.members)
}
return {
platform: 'mailchimp',
server_prefix: serverPrefix,
lists: lists.lists,
members: allMembers,
fetched_at: new Date().toISOString(),
}
} catch (error) {
console.error(`Mailchimp API error for connection ${this.#connectionId}:`, error)
throw new Error(`Mailchimp fetch failed: ${(error as Error).message}`)
}
}
/**
* Stripe API integration
*/
private async fetchFromStripe(authData: any): Promise<any> {
const stripe = new Stripe(authData.auth_data.secret_key, {
apiVersion: '2023-10-16',
})
try {
// Fetch recent charges with billing addresses
const charges = await stripe.charges.list({
limit: 1000,
created: {
gte: Math.floor((Date.now() - 24 * 60 * 60 * 1000) / 1000), // Last 24 hours
},
expand: ['data.customer'],
})
return {
platform: 'stripe',
charges: charges.data,
fetched_at: new Date().toISOString(),
}
} catch (error) {
console.error(`Stripe API error for connection ${this.#connectionId}:`, error)
throw new Error(`Stripe fetch failed: ${(error as Error).message}`)
}
}
/**
* Mock data for testing and unsupported platforms
*/
private async fetchMockData(authData: any): Promise<any> {
// For now, return mock data to demonstrate the flow
// In a real implementation, this would:
// 1. Use authData.auth_data for API credentials
// 2. Make authenticated requests to platform APIs
// 3. Handle pagination and rate limiting
// 4. Return structured data for processing
console.warn(`fetchFromThirdParty not implemented for connection ${this.#connectionId}`)
return {
platform: 'google_ads',
account_id: authData.platform_data?.account_id || 'demo-account',
campaigns: [
{
id: 'campaign-1',
name: 'Demo Campaign',
locations: [
{ address: '123 Main St, San Francisco, CA', conversions: 10, impressions: 500 },
{ address: '456 Oak Ave, San Jose, CA', conversions: 8, impressions: 350 },
],
},
],
}
}
/**
* Get WorkspaceDO instance for this connection
*/
private getWorkspaceDO() {
// Extract workspace ID from connection identifier
// Assuming format "workspaceId:connectionId"
const workspaceId = this.#connectionId.split(':')[0]
const doId = this.env.WORKSPACE_DO.idFromName(workspaceId)
return this.env.WORKSPACE_DO.get(doId)
}
/**
* Trigger data source version creation workflows
*/
private async triggerDataSourceUpdates(dataSourceIds: string[]): Promise<void> {
for (const dataSourceId of dataSourceIds) {
try {
await this.env.CREATE_DATA_SOURCE_VERSION_WORKFLOW.create({
id: `${dataSourceId}-${Date.now()}`,
params: { dataSourceId, connectionId: this.#connectionId },
})
console.log(`Triggered workflow for data source: ${dataSourceId}`)
} catch (error) {
console.error(`Failed to trigger workflow for data source ${dataSourceId}:`, error)
}
}
}
}
export default handler(ConnectionActor)

The composition approach provides several advantages:

  1. Single Deployment: Only one Durable Object binding needed in wrangler.jsonc
  2. Dynamic Platform Support: Platform type determined at runtime from connection metadata
  3. Shared Infrastructure: Common alarm scheduling, database operations, and error handling
  4. Easy Extension: Adding new platforms requires only new methods, no deployment changes
  5. Unified Monitoring: Single metrics namespace with platform tags

The system uses multiple Cloudflare Workflows to orchestrate data processing and synchronization. These workflows provide built-in observability, retry logic, and durability for complex multi-step operations.

1. SyncConnectionWorkflow - Primary Data Sync Orchestrator

Section titled “1. SyncConnectionWorkflow - Primary Data Sync Orchestrator”

⚠️ Note: This workflow is not yet implemented. The following is a proposed design for future development.

This workflow is triggered by Connection Actors and orchestrates the entire sync process with platform-specific steps.

File: apps/api/src/workflows/SyncConnectionWorkflow.ts

import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'
/**
* Parameters for the sync workflow
*/
export type SyncConnectionParams = {
connectionId: number
workspaceId: string
platform: string
triggeredBy: 'scheduled' | 'manual' | 'retry'
}
/**
* Main workflow for synchronizing data from third-party connections
* Called by Connection Actors on their scheduled alarms
*/
export class SyncConnectionWorkflow extends WorkflowEntrypoint<Env, SyncConnectionParams> {
// Helper to get the Connection Actor stub
private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub {
const actorId = `${workspaceId}:${connectionId}`
const doId = this.env.CONNECTION_ACTORS.idFromName(actorId)
return this.env.CONNECTION_ACTORS.get(doId)
}
// Helper to get the WorkspaceDO stub
private getWorkspaceDO(workspaceId: string): DurableObjectStub {
const id = this.env.WORKSPACE_DO.idFromName(workspaceId)
return this.env.WORKSPACE_DO.get(id)
}
async run(event: WorkflowEvent<SyncConnectionParams>, step: WorkflowStep) {
const { connectionId, workspaceId, platform, triggeredBy } = event.payload
console.log(`Starting sync workflow for connection ${connectionId} (${platform})`)
// Get actor and workspace stubs
const actor = this.getConnectionActor(connectionId, workspaceId)
const workspace = this.getWorkspaceDO(workspaceId)
try {
// Step 1: Validate connection and get auth data
const authData = await step.do(
'validate-connection',
{
retries: { limit: 2, delay: '5 seconds' },
timeout: '30 seconds',
},
async () => {
return await workspace.getConnectionAuth(connectionId)
}
)
// Step 2: Fetch data from platform (platform-specific with longer timeout for API calls)
const rawData = await step.do(
'fetch-platform-data',
{
retries: { limit: 3, delay: '30 seconds', backoff: 'exponential' },
timeout: '5 minutes',
},
async () => {
// The actor handles platform-specific logic
return await actor.fetchFromThirdParty(authData)
}
)
// Step 3: Process and store snapshot data in Connection Actor
const recordsProcessed = await step.do(
'store-snapshot',
{
retries: { limit: 2, delay: '10 seconds' },
timeout: '2 minutes',
},
async () => {
return await actor.processAndStoreSnapshot(rawData)
}
)
console.log(`Processed ${recordsProcessed} records for connection ${connectionId}`)
// Step 4: Get associated data sources for this connection
const dataSourceIds = await step.do('get-data-sources', async () => {
return await workspace.getDataSourcesForConnection(connectionId)
})
// Step 5: Trigger data source version creation for each data source
if (dataSourceIds.length > 0) {
await step.do(
'trigger-data-source-updates',
{
retries: { limit: 2, delay: '5 seconds' },
},
async () => {
const promises = dataSourceIds.map((dataSourceId) =>
this.env.CREATE_DATA_SOURCE_VERSION_WORKFLOW.create({
id: `${dataSourceId}-${Date.now()}`,
params: {
dataSourceId,
workspaceId,
connectionId,
versionId: null, // Will be created by the workflow
r2Path: null, // Live data doesn't use R2
},
})
)
return await Promise.all(promises)
}
)
console.log(`Triggered ${dataSourceIds.length} data source version workflows`)
}
// Step 6: Update connection sync status
await step.do('update-sync-status', async () => {
await workspace.updateConnectionSyncStatus(connectionId, 'active')
await actor.updateSyncSuccess(recordsProcessed)
})
// Step 7: Schedule next sync (handled by actor's alarm system)
await step.do('schedule-next-sync', async () => {
const syncFrequency = await actor.getSyncFrequency()
await actor.scheduleNextSync(syncFrequency)
})
console.log(`Sync workflow completed successfully for connection ${connectionId}`)
} catch (error) {
console.error(`Sync workflow failed for connection ${connectionId}:`, error)
// Log detailed error information
await step.do('handle-sync-error', async () => {
await workspace.updateConnectionSyncStatus(connectionId, 'error')
await actor.logSyncError(error as Error)
})
// Schedule retry with exponential backoff
const retryDelay = triggeredBy === 'retry' ? '4 hours' : '1 hour'
await step.sleep('retry-delay', retryDelay)
// Trigger retry workflow
await step.do('schedule-retry', async () => {
return await this.env.SYNC_CONNECTION_WORKFLOW.create({
id: `${connectionId}-retry-${Date.now()}`,
params: {
...event.payload,
triggeredBy: 'retry',
},
})
})
// Re-throw to mark workflow as failed
throw error
}
}
}

2. ProcessLiveDataSourceWorkflow - Connection Data Processing

Section titled “2. ProcessLiveDataSourceWorkflow - Connection Data Processing”

⚠️ Note: This workflow is not yet implemented. The following is a proposed design for future development.

This workflow processes snapshot data from Connection Actors and creates new data source versions.

File: apps/api/src/workflows/ProcessLiveDataSourceWorkflow.ts

import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'
/**
* Parameters for processing live data source
*/
export type ProcessLiveDataSourceParams = {
dataSourceId: number
workspaceId: string
connectionId: number
}
/**
* Workflow for processing live data from Connection Actors into versioned data sources
*/
export class ProcessLiveDataSourceWorkflow extends WorkflowEntrypoint<
Env,
ProcessLiveDataSourceParams
> {
// Helper to get the Connection Actor stub
private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub {
const actorId = `${workspaceId}:${connectionId}`
const doId = this.env.CONNECTION_ACTORS.idFromName(actorId)
return this.env.CONNECTION_ACTORS.get(doId)
}
// Helper to get the WorkspaceDO stub
private getWorkspaceDO(workspaceId: string): DurableObjectStub {
const id = this.env.WORKSPACE_DO.idFromName(workspaceId)
return this.env.WORKSPACE_DO.get(id)
}
async run(event: WorkflowEvent<ProcessLiveDataSourceParams>, step: WorkflowStep) {
const { dataSourceId, workspaceId, connectionId } = event.payload
console.log(`Processing live data source ${dataSourceId} from connection ${connectionId}`)
const workspace = this.getWorkspaceDO(workspaceId)
const actor = this.getConnectionActor(connectionId, workspaceId)
try {
// Step 1: Create new data source version
const version = await step.do('create-version', async () => {
return await workspace.createDataSourceVersion(dataSourceId, {
status: 'processing',
workflow_id: event.workflowId,
})
})
console.log(`Created version ${version.id} for data source ${dataSourceId}`)
// Step 2: Get latest processed data from Connection Actor
const aggregatedData = await step.do(
'get-snapshot-data',
{
retries: { limit: 3, delay: '10 seconds' },
timeout: '1 minute',
},
async () => {
return await actor.getLatestSnapshot()
}
)
// Step 3: Store aggregated data in KV with version-specific key
await step.do(
'store-in-kv',
{
retries: { limit: 3, delay: '5 seconds' },
timeout: '30 seconds',
},
async () => {
const kvKey = `datasource:${dataSourceId}/version/${version.id}`
await this.env.DATA_SOURCES_KV.put(kvKey, JSON.stringify(aggregatedData))
console.log(`Stored data in KV with key: ${kvKey}`)
}
)
// Step 4: Update version status and metadata
await step.do('update-version-status', async () => {
const cellCount = Object.keys(aggregatedData).length
return await workspace.updateDataSourceVersion(version.id, {
status: 'ready',
total_cells: cellCount,
completed_at: new Date().toISOString(),
metadata: {
connection_id: connectionId,
platform: await actor.getPlatform(),
sync_timestamp: new Date().toISOString(),
},
})
})
// Step 5: Update parent data source to point to new version
await step.do('update-current-version', async () => {
return await workspace.updateDataSource(dataSourceId, {
current_version_id: version.id,
updated_at: new Date().toISOString(),
})
})
// Step 6: Notify grids that depend on this data source
const gridIds = await step.do('get-affected-grids', async () => {
return await workspace.getGridIdsForDataSource(dataSourceId)
})
if (gridIds.length > 0) {
await step.do(
'update-grid-versions',
{
retries: { limit: 2, delay: '5 seconds' },
},
async () => {
// For each grid, update its pending version with the new data source version
const promises = gridIds.map(async (gridId) => {
const gridDO = this.env.GRID_DO.get(this.env.GRID_DO.idFromName(gridId))
return await gridDO.updatePendingVersion({
dataSourceId,
versionId: version.id,
})
})
return await Promise.all(promises)
}
)
console.log(`Updated ${gridIds.length} grid versions with new data source version`)
}
console.log(`Successfully processed live data source ${dataSourceId}`)
} catch (error) {
console.error(`Failed to process live data source ${dataSourceId}:`, error)
// Update version status to failed
await step.do('mark-version-failed', async () => {
const version = await workspace.getDataSourceVersion(dataSourceId)
if (version) {
await workspace.updateDataSourceVersion(version.id, {
status: 'failed',
completed_at: new Date().toISOString(),
metadata: {
error: (error as Error).message,
failed_at: new Date().toISOString(),
},
})
}
})
// Re-throw to mark workflow as failed
throw error
}
}
}

3. Enhanced CreateDataSourceVersionWorkflow - Unified CSV & Live Data Processing

Section titled “3. Enhanced CreateDataSourceVersionWorkflow - Unified CSV & Live Data Processing”

Updated to handle both CSV uploads and live data from connections.

File: apps/api/src/workflows/create-data-source-version.ts (Updated)

import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'
import { parse } from 'csv-parse/sync'
import { Workspace } from '../durable-objects/WorkspaceDO'
import { H3_RESOLUTION } from '../utils/h3-utils'
import type { Address } from 'honeygrid-types'
/**
* Params supplied when the workflow is started.
*/
export type CreateDataSourceVersionParams = {
dataSourceId: number
workspaceId: string
versionId?: number | null // Pre-created version ID (for live data)
r2Path?: string | null // R2 path for CSV files (null for live data)
connectionId?: number | null // Connection ID for live data (null for CSV)
}
/**
* Enhanced workflow that processes both CSV uploads and live connection data
*/
export class CreateDataSourceVersionWorkflow extends WorkflowEntrypoint<
Env,
CreateDataSourceVersionParams
> {
// Helper to get the WorkspaceDO stub
private getWorkspaceDO(workspaceId: string): DurableObjectStub<Workspace> {
const id = this.env.WORKSPACE_DO.idFromName(workspaceId)
return this.env.WORKSPACE_DO.get(id)
}
// Helper to get the Connection Actor stub (for live data)
private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub {
const actorId = `${workspaceId}:${connectionId}`
const doId = this.env.CONNECTION_ACTORS.idFromName(actorId)
return this.env.CONNECTION_ACTORS.get(doId)
}
async run(event: WorkflowEvent<CreateDataSourceVersionParams>, step: WorkflowStep) {
const { dataSourceId, workspaceId, versionId, r2Path, connectionId } = event.payload
console.log(`Processing data source ${dataSourceId} - Type: ${r2Path ? 'CSV' : 'Live Data'}`)
const workspace = this.getWorkspaceDO(workspaceId)
// Determine processing type
const isLiveData = !r2Path && connectionId
const isCsvUpload = !!r2Path && !connectionId
if (!isLiveData && !isCsvUpload) {
throw new Error('Must specify either r2Path (CSV) or connectionId (live data)')
}
try {
// Step 1: Create or get the version record
const version = await step.do('setup-version', async () => {
if (versionId) {
// Version already created (live data flow)
return await workspace.getDataSourceVersion(versionId)
} else {
// Create new version (CSV upload flow)
return await workspace.createDataSourceVersion(dataSourceId, {
status: 'processing',
workflow_id: event.workflowId,
file_path: r2Path,
})
}
})
console.log(`Processing version ${version.id} for data source ${dataSourceId}`)
let aggregatedData: Record<string, Record<string, number>>
if (isCsvUpload) {
// CSV Processing Path
aggregatedData = await step.do('process-csv', async () => {
// Step 1a: Ingest data from R2
const object = await this.env.CUSTOM_DATA_SOURCES.get(r2Path!)
if (!object) throw new Error(`File not found in R2: ${r2Path}`)
const csvText = await object.text()
const rawData: Record<string, any>[] = parse(csvText, {
columns: true,
skip_empty_lines: true,
})
console.log(`Parsed ${rawData.length} rows from CSV`)
// Step 1b: Process CSV data through geocoding and aggregation
return await this.processCsvToH3(rawData)
})
} else {
// Live Data Processing Path
aggregatedData = await step.do('get-live-data', async () => {
const actor = this.getConnectionActor(connectionId!, workspaceId)
return await actor.getLatestSnapshot()
})
}
// Step 2: Store aggregated data in KV (common for both paths)
await step.do('store-in-kv', async () => {
const kvKey = `datasource:${dataSourceId}/version/${version.id}`
await this.env.DATA_SOURCES_KV.put(kvKey, JSON.stringify(aggregatedData))
console.log(`Stored aggregated data in KV: ${kvKey}`)
})
// Step 3: Update version metadata
await step.do('update-version-metadata', async () => {
const cellCount = Object.keys(aggregatedData).length
return await workspace.updateDataSourceVersion(version.id, {
status: 'ready',
total_cells: cellCount,
completed_at: new Date().toISOString(),
metadata: {
processing_type: isLiveData ? 'live_data' : 'csv_upload',
connection_id: connectionId,
cells_processed: cellCount,
metrics_included: this.getMetricNames(aggregatedData),
},
})
})
// Step 4: Update parent data source
await step.do('update-data-source', async () => {
return await workspace.updateDataSource(dataSourceId, {
current_version_id: version.id,
updated_at: new Date().toISOString(),
})
})
// Step 5: Notify grids using this data source
const gridIds = await step.do('get-affected-grids', async () => {
return await workspace.getGridIdsForDataSource(dataSourceId)
})
if (gridIds.length > 0) {
await step.do('update-grids', async () => {
const promises = gridIds.map(async (gridId) => {
const gridDO = this.env.GRID_DO.get(this.env.GRID_DO.idFromName(gridId))
return await gridDO.dataSourceUpdated(dataSourceId, version.id)
})
return await Promise.all(promises)
})
console.log(`Updated ${gridIds.length} grids with new data source version`)
}
console.log(`Successfully processed data source version ${version.id}`)
} catch (error) {
console.error(`Failed to process data source ${dataSourceId}:`, error)
// Update version status to failed
await step.do('mark-failed', async () => {
if (versionId) {
await workspace.updateDataSourceVersion(versionId, {
status: 'failed',
completed_at: new Date().toISOString(),
metadata: {
error: (error as Error).message,
failed_at: new Date().toISOString(),
},
})
}
})
// Re-throw to mark workflow as failed
throw error
}
}
/**
* Process CSV data into H3 aggregations (existing CSV logic)
*/
private async processCsvToH3(
rawData: Record<string, any>[]
): Promise<Record<string, Record<string, number>>> {
// Existing CSV processing logic from the original workflow
// Convert addresses to H3 cells and aggregate metrics
// This would use the GeoCodingService to batch geocode addresses
// and then aggregate metrics by H3 cell
// For now, return mock structure - implement full logic as needed
return {
'8a2a1072b59ffff': {
customerCount: rawData.length,
// ... other aggregated metrics
},
}
}
/**
* Extract metric names from aggregated data
*/
private getMetricNames(aggregatedData: Record<string, Record<string, number>>): string[] {
const metricSet = new Set<string>()
Object.values(aggregatedData).forEach((cell) => {
Object.keys(cell).forEach((metric) => metricSet.add(metric))
})
return Array.from(metricSet)
}
}
  1. Observability: Every data processing operation is visible in Workflows dashboard
  2. Reliability: Built-in retries with exponential backoff for transient failures
  3. Durability: Workflows survive worker restarts and continue from last completed step
  4. Error Handling: Comprehensive error logging and graceful failure recovery
  5. Scalability: Each workflow instance handles one data source, enabling parallel processing
  6. Version Control: Clear audit trail of when and how each data source version was created
  • Connection ActorsSyncConnectionWorkflowCreateDataSourceVersionWorkflow
  • API RoutesCreateDataSourceVersionWorkflow (for CSV uploads)
  • All WorkflowsGridDO updates for downstream grid version management
  • Error casesWorkspaceDO status updates and retry scheduling

File: apps/api/src/durable-objects/WorkspaceDO.ts - Add connection helper methods:

export class Workspace extends DurableObject<Env> {
// ... existing methods ...
/**
* Get connection authentication data for a Connection Actor
*/
async getConnectionAuth(connectionId: number): Promise<any> {
const [connection] = await this.db
.select()
.from(Connections)
.where(eq(Connections.id, connectionId))
.limit(1)
if (!connection) throw new Error(`Connection ${connectionId} not found`)
return {
auth_data: connection.auth_data,
platform_data: connection.platform_data,
settings: connection.settings,
}
}
/**
* Get Connection Actor instance for a connection
*/
getConnectionActor(connectionId: number) {
const actorId = `${this.#id}:${connectionId}`
const doId = this.env.CONNECTION_ACTORS.idFromName(actorId)
return this.env.CONNECTION_ACTORS.get(doId)
}
/**
* Initialize a new connection with its Actor
*/
async initializeConnectionActor(connectionId: number): Promise<void> {
const actor = this.getConnectionActor(connectionId)
// Initialize connection config and schedule first sync
await actor.performDailySync()
}
}
  1. Scheduled Data Fetching: Built-in alarm system for reliable daily syncing
  2. Historical Data Storage: SQLite database for daily snapshots and aggregation queries
  3. Isolated State Management: Each connection has its own persistent state
  4. RPC Integration: Clean integration with WorkspaceDO and processing workflows
  5. Resilient Error Handling: Automatic retry scheduling on failures
  6. Aggregation Queries: Support for “12 month conversions” type queries per H3 cell

Install Cloudflare Actors:

Terminal window
pnpm add @cloudflare/actors

Generate Connection Actor Migrations:

Terminal window
cd apps/api
pnpm drizzle-kit generate --config=connectionActor.drizzle.config.ts
pnpm drizzle-kit push --config=connectionActor.drizzle.config.ts

Update apps/api/wrangler.jsonc:

{
"durable_objects": {
"bindings": [
{
"name": "CONNECTION_ACTORS",
"class_name": "ConnectionActor",
},
// ... existing bindings
],
},
"migrations": [
{
"tag": "v2",
"new_sqlite_classes": ["ConnectionActor"],
},
],
}

Schema Management:

The Connection Actor uses Drizzle ORM for schema management with migrations stored in ConnectionActordrizzleMigrations/. The schema is defined in packages/honeygrid-types/schema/ConnectionActorSchema.ts and includes:

  • SyncSnapshots: Historical data aggregated by H3 cell and date
  • SyncHistory: Sync attempt tracking with success/failure status
  • ConnectionConfig: Platform settings and sync configuration
  • ErrorLogs: Detailed error logging for debugging sync issues

Schema Management:

The Connection Actor uses Drizzle ORM for schema management with migrations stored in ConnectionActordrizzleMigrations/. The schema is defined in packages/honeygrid-types/schema/ConnectionActorSchema.ts and includes:

packages/honeygrid-types/schema/ConnectionActorSchema.ts
/**
* Periodic snapshots of metrics aggregated by H3 cell
*/
export const SyncSnapshots = sqliteTable(
'sync_snapshots',
{
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
h3_cell: text('h3_cell').notNull(),
metric_name: text('metric_name').notNull(),
metric_value: integer('metric_value').notNull(),
snapshot_date: text('snapshot_date').notNull(),
raw_data: text('raw_data', { mode: 'json' }),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
},
(t) => [unique('uniq_snapshot').on(t.h3_cell, t.metric_name, t.snapshot_date)]
)
/** Sync history records */
export const SyncHistory = sqliteTable('sync_history', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
sync_date: text('sync_date').notNull(),
status: text('status', { enum: ['success', 'failed', 'in_progress'] }).notNull(),
records_processed: integer('records_processed').default(0).notNull(),
error_message: text('error_message'),
started_at: text('started_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
completed_at: text('completed_at'),
sync_duration_ms: integer('sync_duration_ms'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
})
/** Detailed error logs */
export const ErrorLogs = sqliteTable('error_logs', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
sync_history_id: integer('sync_history_id')
.notNull()
.references(() => SyncHistory.id),
error_type: text('error_type', {
enum: ['sync_failed', 'auth_failed', 'api_error', 'processing_error', 'network_error'],
}).notNull(),
error_message: text('error_message').notNull(),
error_details: text('error_details', { mode: 'json' }),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
})

The global D1 database schema is defined in packages/honeygrid-types/schema/HoneyGridD1Schema.ts and manages workspace-level data:

packages/honeygrid-types/schema/HoneyGridD1Schema.ts
export const Workspaces = table(
'Workspaces',
{
account_id: text(),
background_image: text(),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
id: integer('id').primaryKey({ autoIncrement: true }).notNull(),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
name: text().notNull(),
status: text().notNull(),
campaign_count: integer().default(0),
connection_count: integer().default(0),
grid_count: integer().default(0),
landing_page_count: integer().default(0),
location_count: integer().default(0),
},
(t) => [check('status_check', sql`${t.status} IN ('active', 'archived')`)]
)
export const WorkspacePermissions = table(
'WorkspacePermissions',
{
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
workspace_id: integer('workspace_id')
.notNull()
.references(() => Workspaces.id),
user_id: text(),
email: text().notNull(),
role: text().notNull(),
invitation_status: text('invitation_status').notNull().default('pending'),
created_at: text('created_at')
.$defaultFn(() => new Date().toISOString())
.notNull(),
},
(t) => [
check('role_check', sql`${t.role} IN ('owner', 'editor', 'viewer')`),
index('workspace_permissions_email_idx').on(t.email),
index('workspace_permissions_workspace_email_idx').on(t.workspace_id, t.email),
]
)
export const Users = table('Users', {
id: integer('id').primaryKey({ autoIncrement: true }),
uuid: text('uuid')
.unique()
.$defaultFn(() => generateUuid())
.notNull(),
first_name: text('first_name'),
last_name: text('last_name'),
email: text().notNull().unique(),
created_date: text('created_date').$defaultFn(() => new Date().toISOString()),
})

Historical Aggregation Query:

// Get 12 months of conversions for a specific H3 cell
const connectionActor = env.CONNECTION_ACTORS.get(env.CONNECTION_ACTORS.idFromName(connectionId))
const totalConversions = await connectionActor.getHistoricalAggregation(
'8a2a1072b59ffff', // H3 cell
'conversions', // metric name
'2023-01-01', // start date
'2023-12-31' // end date
)
// Returns: 1,247 (total conversions for that cell over 12 months)

Connection Status Check:

const status = await connectionActor.getConnectionStatus()
// Returns: {
// connection_id: "conn_123",
// last_snapshot: "2024-01-15",
// status: "active",
// total_snapshots: 365
// }

Latest Data for Workflow Processing:

const latestData = await connectionActor.getLatestSnapshot()
// Returns: {
// "8a2a1072b59ffff": { "conversions": 42, "impressions": 1500 },
// "8a2a1072b5affff": { "conversions": 18, "impressions": 890 }
// }