Custom Data Source Upload & Processing System
Task Checklist
Section titled “Task Checklist”✅ Core Infrastructure Complete: CSV upload, data processing, versioning, and connection management are fully implemented and ready for production.
Phase 1: Data Source Version Management (Frontend Focus)
Section titled “Phase 1: Data Source Version Management (Frontend Focus)”- 1. Data Source Version Hooks: Create
useDataSourceVersionsanduseDataSourceVersionquery hooks. - 2. Version History UI: Add version list component with status, dates, and metrics summary.
- 3. Version Selection UI: Allow users to select specific versions for grid targeting.
- 4. Version Comparison UI: Show differences between data source versions (metrics, cell counts).
Phase 2: Connection OAuth & Live Data Setup
Section titled “Phase 2: Connection OAuth & Live Data Setup”- 5. OAuth Flow Infrastructure: Implement Google OAuth redirect handling and token storage.
- 6. Connection Setup UI: Build platform-specific connection configuration forms.
- 7. Connection Status Dashboard: Show sync status, last update, and error handling for live connections.
- 8. Connection Testing: Add “Test Connection” functionality with real-time validation.
Phase 3: Live Data Sync Workflows
Section titled “Phase 3: Live Data Sync Workflows”- 9. SyncConnectionWorkflow: Implement workflow for scheduled data fetching from third-party APIs.
- 10. Platform API Integrations: Complete Google Ads, Mailchimp, and Stripe API clients in Connection Actor.
- 11. ProcessLiveDataSourceWorkflow: Create workflow for processing live data into versioned data sources.
- 12. Sync Scheduling: Implement configurable sync frequency (daily, weekly, manual).
Phase 4: Advanced Features & Automation
Section titled “Phase 4: Advanced Features & Automation”- 13. Grid Auto-Update Logic: Automatically create pending grid versions when data sources update.
- 14. Data Change Notifications: Alert users when live data sources have significant changes.
- 15. Sync Error Handling: Implement retry logic, error notifications, and connection health monitoring.
- 16. Performance Optimization: Add data source preview limits and large dataset handling.
Phase 5: Production Readiness
Section titled “Phase 5: Production Readiness”- 17. Deployment Configuration: Verify all workflows and actors are properly configured in
wrangler.jsonc. - 18. Connection Migration Tools: Build tools to migrate existing connections to new flexible schema.
- 19. Documentation & Testing: Complete API documentation and end-to-end testing.
- 20. Monitoring & Observability: Add metrics, alerts, and dashboards for data source operations.
✅ Completed Foundation
Section titled “✅ Completed Foundation”The following core features are production-ready:
- ✅ CSV upload and processing with drag-and-drop UI
- ✅ Data source and grid versioning with full API support
- ✅ Workspace-scoped connection management
- ✅ H3 cell aggregation and KV storage
- ✅ Grid version management with auto-update settings
- ✅ Connection Actor infrastructure with historical data storage
Custom Data Source Upload & Processing System
Section titled “Custom Data Source Upload & Processing System”This document outlines the functional and technical requirements for enabling users to add their own data sources to the HoneyGrid for use in digital advertising location targeting filters. This feature expands targeting filters within the grid system using user-provided metrics (e.g., customer counts, revenue, leads, conversions, ROAS by cell). The sources of this data can be a static file uploaded by the user or a live data source from a third party that is updated periodically.
Connection Architecture Updates
Section titled “Connection Architecture Updates”Unified Connection Model
Section titled “Unified Connection Model”The connection architecture has been updated to use a flexible, workspace-scoped approach:
- Connections moved to WorkspaceDO: No more global connection database - all connections stored in workspace-specific databases
- Flexible auth storage: JSON fields support OAuth2, API keys, webhooks, and other auth mechanisms without schema changes
- Nullable connection_id: Data sources have
connection_idfield wherenull= CSV upload, non-null = live connection - Platform-agnostic design: New third-party platforms don’t require schema migrations
Connection Types
Section titled “Connection Types”The flexible schema supports multiple authentication mechanisms:
OAuth2 (Google Ads, Facebook, etc.)
{ "auth_type": "oauth2", "auth_data": { "access_token": "ya29.a0...", "refresh_token": "1//04...", "expires_at": "2024-01-15T10:00:00Z", "scopes": ["https://www.googleapis.com/auth/adwords"] }, "platform_data": { "account_id": "987-654-3210", "account_name": "Client Campaign Account" }}API Key (Mailchimp, Stripe, etc.)
{ "auth_type": "api_key", "auth_data": { "api_key": "sk_live_...", "server_prefix": "us19" }}Data Source Creation Flow
Section titled “Data Source Creation Flow”Users now choose connection type first when creating data sources:
- Select “CSV Upload” for static files (sets
connection_idtonull) - Select existing third-party connection for live data sources
- Configure data-specific settings based on the chosen connection type
Metrics Aggregation Rules
Section titled “Metrics Aggregation Rules”The aggregation phase transforms the raw CSV rows into a single, version-scoped JSON blob keyed by H3 cell. Processing is linear with the number of rows and all per-row operations are O(1).
-
Metric Detection
Section titled “Metric Detection”Every column in the uploaded dataset except the address columns (
address1,address2,city,state,zip– plus common aliases) is treated as a metric that will be summed. -
Efficient H3 Cell Lookup (row ➜ h3 cell)
Section titled “Efficient H3 Cell Lookup (row ➜ h3 cell)”To avoid array scans we build an indexed map once and reuse it during the metrics scan:
a. Attach a
query_id– while converting CSV rows toAddressobjects setquery_id = rowIndex.b. Bulk geocode – call
getH3AddressBulk(addressBatch, H3_RESOLUTION); each result includes the originatinginputAddress.query_id.c. Map construction
const idToCell = new Map<number, string>()for (const result of geocoded) {if (!result.error) idToCell.set(result.inputAddress.query_id!, result.h3Cell)}d. Lookup during aggregation –
const cell = idToCell.get(rowIndex) ?? 'unknown'. -
Aggregation Logic
Section titled “Aggregation Logic”For each row: • Retrieve its H3 cell via the map above.
• For every metric column, sum intoaggregated[cell][metric].
• Round any decimal inputs withMath.round()to prevent FP drift. -
Implicit
Section titled “Implicit customerCount Metric”customerCountMetricRegardless of the supplied columns, increment
aggregated[cell].customerCountby 1 per row. -
Storage
Section titled “Storage”After processing, write the aggregated object to KV:
datasource:{dataSourceId}/version/{versionId}.
Example structure:
{ "8a2a1072b59ffff": { "customerCount": 42, "revenue": 123456, "conversions": 319 }, "8a2a1072b597fff": { "customerCount": 9, "revenue": 7200, "conversions": 22 }}This implementation plan uses Cloudflare Workflows as the primary engine for data processing. This approach simplifies the overall system by removing the need for a dedicated state-managing Durable Object and its internal database, relying instead on the inherent state management and resiliency features of Workflows.
- Allow users to upload CSV data files with address and metric columns.
- Allow users to connect to live data sources from third parties (e.g., Google Ads).
- Geocode addresses from the data source into H3 cells.
- Aggregate user-defined metrics by H3 cell.
- Store the aggregated data in Workers KV.
- Create a
data_sourcestable in theWorkspaceDOschema to store metadata for each data source. - Make the aggregated data available as filters in the user’s targeting grids.
- Support versioning of data sources to track changes over time.
- Enable grid versioning to manage targeting configurations over time.
- Provide automatic grid updates based on data source changes.
Data Source Versioning
Section titled “Data Source Versioning”Data sources come in two types, with different versioning behaviors:
Static Data Sources (CSV)
Section titled “Static Data Sources (CSV)”- Single-version data sources
- Each CSV upload creates a new data source with one version
- No additional versions can be added
- Immutable once processed
Dynamic Data Sources (e.g., Google Ads)
Section titled “Dynamic Data Sources (e.g., Google Ads)”- Support multiple versions as data changes over time
- New versions created on data refresh or manual update
- Track version history and allow version comparison
- Users can select specific versions for targeting
Version Storage
Section titled “Version Storage”- All data sources use the same schema and storage pattern
- Each data source has at least one version
- Each version maintains its own:
- Processing status and metadata
- Aggregated data in KV (
datasource:{dataSourceId}/version/{versionId})
- The parent data source tracks its current active version
Version Management
Section titled “Version Management”- New versions can only be created for dynamic data sources
- All versions go through the same processing workflow
- Users can select specific versions when configuring grid targeting
- The current version is used by default in grids
Grid Versioning
Section titled “Grid Versioning”Grid versions provide a way to track and manage changes to targeting configurations over time, particularly as data sources are updated.
Version Components
Section titled “Version Components”Each grid version contains:
- Targeted areas (radius, zip code, county, carrier routes)
- Targeting filters:
- Demographics
- Custom metrics
- Manual inclusion filters
- Manual exclusion filters
- Data source references (pinned to specific versions)
Version Management
Section titled “Version Management”- Every grid starts with an initial version marked as “active”
- Only one “pending” version can exist at a time
- Represents the grid with latest versions of all data sources
- Shows in UI as “new grid version available”
- Updates automatically when data sources change
- Users can promote pending version to active at any time
- The parent Grid has an
auto_updateflag (boolean) that persists across versions- When enabled, pending version is automatically promoted when data sources change
- Version cleanup
- Versions are automatically deleted after 2 years of inactivity
- Activity is defined as:
- Making a version active
- Updating a version’s configuration
- Activity resets the 2-year retention clock
Proposed Architecture & Flow
Section titled “Proposed Architecture & Flow”The architecture is centered around a CreateDataSourceVersionWorkflow. This workflow orchestrates the entire lifecycle of processing a data source file, from ingestion to notifying the grids. The workflow communicates directly with the WorkspaceDO via RPC (Remote Procedure Calls) to update status and retrieve information, eliminating the need for API callbacks or internal fetch handlers.
sequenceDiagram participant UI participant API (Hono) participant R2 participant WorkspaceDO participant CreateDataSourceVersionWorkflow participant GeoCodingService participant KV participant GridDO
UI->>+API (Hono): 1. Upload CSV API (Hono)->>+R2: 2. Stream file to storage R2-->>-API (Hono): File path API (Hono)->>+WorkspaceDO: 3. Create DataSource & Version records (RPC Call) WorkspaceDO-->>-API (Hono): dataSourceId, versionId API (Hono)->>+CreateDataSourceVersionWorkflow: 4. Start workflow(workspaceId, dataSourceId, versionId, r2Path) CreateDataSourceVersionWorkflow-->>-API (Hono): workflowId API (Hono)->>+WorkspaceDO: 5. Update Version with workflowId (RPC Call) deactivate WorkspaceDO
activate CreateDataSourceVersionWorkflow CreateDataSourceVersionWorkflow->>+R2: 6. Read CSV data R2-->>-CreateDataSourceVersionWorkflow: Raw data rows deactivate R2 CreateDataSourceVersionWorkflow->>+GeoCodingService: 7. Geocode addresses (RPC Call) GeoCodingService-->>-CreateDataSourceVersionWorkflow: Enriched data with H3 cells deactivate GeoCodingService CreateDataSourceVersionWorkflow->>CreateDataSourceVersionWorkflow: 8. Aggregate metrics by H3 cell (sum custom metrics & customerCount) CreateDataSourceVersionWorkflow->>+KV: 9. Store aggregated data with version key deactivate KV
CreateDataSourceVersionWorkflow->>+WorkspaceDO: 10. getGridIdsForDataSource() (RPC Call) WorkspaceDO-->>-CreateDataSourceVersionWorkflow: [gridId1, gridId2]
loop For each gridId CreateDataSourceVersionWorkflow->>+GridDO: 11. updatePendingGridVersion() (RPC Call) deactivate GridDO end
alt On Success CreateDataSourceVersionWorkflow->>+WorkspaceDO: 12a. updateVersionStatus('ready') (RPC Call) else On Failure CreateDataSourceVersionWorkflow->>+WorkspaceDO: 12b. updateVersionStatus('failed') (RPC Call) end deactivate CreateDataSourceVersionWorkflow deactivate WorkspaceDO deactivate API (Hono)Implementation Plan
Section titled “Implementation Plan”1. Updated Schema (WorkspaceDOSchema.ts)
Section titled “1. Updated Schema (WorkspaceDOSchema.ts)”The schema has been updated to support the unified connection model with nullable connection_id and flexible connection storage.
- File to Modify:
packages/honeygrid-types/schema/WorkspaceDOSchema.ts - Action: Add the
Connectionstable and updateDataSourceswith nullableconnection_id. Move version-specific fields toDataSourceVersions.
// New flexible Connections tableexport const Connections = sqliteTable('connections', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), workspace_id: text('workspace_id').notNull(), name: text('name').notNull(), platform: text('platform').notNull(), // 'google_ads', 'mailchimp', 'stripe', etc. auth_type: text('auth_type', { enum: ['oauth2', 'api_key', 'basic_auth', 'jwt', 'webhook_secret'], }).notNull(), status: text('status', { enum: ['pending', 'active', 'inactive', 'expired', 'error'], }).notNull(),
// Flexible auth storage (encrypted JSON) auth_data: text('auth_data', { mode: 'json' }).notNull(), platform_data: text('platform_data', { mode: 'json' }), settings: text('settings', { mode: 'json' }),
// Audit fields created_by: text('created_by').notNull(), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), last_synced_at: text('last_synced_at'), expires_at: text('expires_at'),})
// Updated DataSources - clean separation of concernsexport const DataSources = sqliteTable('data_sources', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), workspace_id: text('workspace_id').notNull(), name: text('name').notNull(), type: text('type', { enum: ['csv', 'google_ads'] }).notNull(), status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }).notNull(), status_message: text('status_message'), total_cells: integer('total_cells').default(0).notNull(), file_path: text('file_path'), connection_id: integer('connection_id').references(() => Connections.id), // Nullable - null = CSV current_version_id: integer('current_version_id'), workflow_id: text('workflow_id'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), completed_at: text('completed_at'),})
// Version-specific fields moved hereexport const DataSourceVersions = sqliteTable('data_source_versions', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), data_source_id: integer('data_source_id') .notNull() .references(() => DataSources.id), version_number: integer('version_number').notNull(), file_path: text('file_path'), status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }) .notNull() .default('pending'), status_message: text('status_message'), total_cells: integer('total_cells').default(0).notNull(), workflow_id: text('workflow_id'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), completed_at: text('completed_at'), metadata: text('metadata', { mode: 'json' }),})
// Grid versioning supportexport const GridVersions = sqliteTable('grid_versions', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), grid_id: integer('grid_id') .notNull() .references(() => Grids.id), name: text('name').notNull(), version_number: integer('version_number').notNull(), status: text('status', { enum: ['active', 'pending', 'archived'] }).notNull(), households: integer('households'), targeted_households: integer('targeted_households'), targeting_type: text('targeting_type'), targeting_coverage: integer('targeting_coverage'), targeted_areas: text('targeted_areas', { mode: 'json' }), targeting_filters: text('targeting_filters', { mode: 'json' }), manual_inclusion_filters: text('manual_inclusion_filters', { mode: 'json' }), manual_exclusion_filters: text('manual_exclusion_filters', { mode: 'json' }), last_used_at: text('last_used_at').notNull().default(new Date().toISOString()), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),})
// JOIN TABLE grid_version_data_sourcesexport const GridVersionDataSources = sqliteTable( 'grid_version_data_sources', { id: integer('id').primaryKey({ autoIncrement: true }), grid_version_id: integer('grid_version_id') .notNull() .references(() => GridVersions.id), data_source_id: integer('data_source_id') .notNull() .references(() => DataSources.id), data_source_version_id: integer('data_source_version_id') .notNull() .references(() => DataSourceVersions.id), }, (t) => [index('gvd_idx').on(t.grid_version_id, t.data_source_id)])
// Updated Grids tableexport const Grids = sqliteTable('grids', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), name: text('name').notNull(), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), status: text('status').notNull(), auto_update: integer('auto_update', { mode: 'boolean' }).notNull().default(false), current_version_id: integer('current_version_id'), pending_version_id: integer('pending_version_id'),})2. API Endpoints (/routes/dataSources.ts)
Section titled “2. API Endpoints (/routes/dataSources.ts)”The API will handle the full lifecycle of a data source resource.
- File to Create:
apps/api/src/routes/dataSources.ts - Actions: Implement the following RESTful endpoints:
POST /api/data-sources: Creates a newDataSourcerecord in theWorkspaceDO, streams the file to R2, and starts theCreateDataSourceVersionWorkflow. It will save theworkflowIdback to the record.GET /api/data-sources: Retrieves a list of all data sources for the workspace.GET /api/data-sources/:id: Retrieves the details for a single data source.GET /api/data-sources/:id/data: Retrieves the processed data for a data source from the KV cache.DELETE /api/data-sources/:id: Deletes a data source.
Current-Version Hydration Pattern
Section titled “Current-Version Hydration Pattern”To minimize client round-trips callers can opt-in to embed related records using an include= query param.
Examples:
GET /api/grids?include=currentVersionGET /api/data-sources?include=currentVersionMultiple expansions can be requested with a comma-separated list:
GET /api/grids?include=currentVersion,pendingVersionWhen include is omitted (or empty) the API returns slim parent rows only.
Future expansions will follow the same pattern (include=dataSourceVersions, include=latestStats, …).
3. Data Processing Workflow (/workflows/create-data-source-version.ts)
Section titled “3. Data Processing Workflow (/workflows/create-data-source-version.ts)”This workflow becomes the heart of the processing logic, using typed RPC stubs to communicate with Durable Objects.
- File:
apps/api/src/workflows/create-data-source-version.ts✅ Implemented - Action: Define the workflow class, its parameters, and the
runmethod containing the processing steps.
import { WorkflowEntrypoint, WorkflowStep } from 'cloudflare:workers'
import type { WorkspaceDO } from '../durable-objects/WorkspaceDO' // Assumes type export
// Define the payload the workflow expectstype ProcessDataSourceParams = { dataSourceId: number workspaceId: string r2Path: string}
// Define the Env to include the typed WorkspaceDO bindinginterface Env { WORKSPACES: DurableObjectNamespace<WorkspaceDO> // ... other bindings}
export class ProcessDataSourceWorkflow extends WorkflowEntrypoint<Env, ProcessDataSourceParams> { // Helper to get the WorkspaceDO stub private getWorkspaceDO(workspaceId: string): DurableObjectStub<WorkspaceDO> { const doId = this.env.WORKSPACES.idFromName(workspaceId) return this.env.WORKSPACES.get(doId) }
async run(event: WorkflowEvent<Params>, step: WorkflowStep) { const { dataSourceId, workspaceId, r2Path } = event.payload const workspaceDO = this.getWorkspaceDO(workspaceId)
try { // Step 1: Ingest data from R2 const rawData = await step.do('ingest-csv-from-r2', async () => { /* ... */ return [] })
// Step 2: Geocode addresses const geocodedData = await step.do('geocode-addresses', async () => { /* ... */ return [] })
// Step 3: Aggregate metrics by H3 cell const aggregatedData = await step.do('aggregate-metrics', async () => { /* ... */ return {} })
// Step 4: Cache aggregated data in Workers KV await step.do('cache-in-kv', async () => { await this.env.DATA_SOURCES_KV.put( `datasource:${dataSourceId}`, JSON.stringify(aggregatedData) ) })
// Step 5: Notify associated GridDOs await step.do('notify-grids', async () => { const gridIds = await workspaceDO.getGridIdsForDataSource(dataSourceId) // For each grid ID, get the GridDO stub and call `dataSourcesUpdated()` })
// Step 6: Update status to 'ready' via RPC await step.do('set-status-ready', () => workspaceDO.updateDataSourceStatus({ dataSourceId, status: 'ready' }) ) } catch (error) { // If any step fails, update status to 'failed' via RPC const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred' await step.do('set-status-failed', () => workspaceDO.updateDataSourceStatus({ dataSourceId, status: 'failed', message: errorMessage, }) )
// Re-throw the error to ensure the workflow itself is marked as failed. throw error } }}4. WorkspaceDO RPC Interface
Section titled “4. WorkspaceDO RPC Interface”To support this workflow-driven process, the WorkspaceDO must expose a clear, public RPC interface for other services to call.
- File to Modify:
apps/api/src/durable-objects/WorkspaceDO.ts - Actions: Add the following public methods to the
WorkspaceDOclass.updateDataSourceStatus(args: { dataSourceId: string, status: 'ready' | 'failed', message?: string }): Updates the record in its internal database.getGridIdsForDataSource(dataSourceId: string): Promise<string[]>: Returns a list ofgridIds that use a specific data source.createDataSource(...): A method to create the initial data source record.updateDataSourceWorkflowId(...): A method to associate the workflow instance with the data source record.
5. GridDO RPC Interface
Section titled “5. GridDO RPC Interface”- File to Modify:
apps/api/src/durable-objects/GridDO.ts - Action: Ensure a public
dataSourcesUpdated(dataSourceIds: string[])method exists. This method will fetch the corresponding data from Workers KV and re-materialize the grid’s internal cell data.
Frontend Implementation Plan
Section titled “Frontend Implementation Plan”1. API Client & State Management
Section titled “1. API Client & State Management”- Task: Create Tanstack Query hooks for interacting with the new API.
- Files to Create:
apps/web-app/src/queries/data-sources.ts
- Actions:
- Create
useDataSources,useDataSource,useCreateDataSource, anduseDeleteDataSourcehooks. - Implement the multipart upload logic, likely in a dedicated function called by a mutation.
- Create
- Testing:
- Unit tests for the query hooks, mocking the
fetchAPI to simulate server responses and verify correct state management (isLoading,isError,data).
- Unit tests for the query hooks, mocking the
2. File Upload UI
Section titled “2. File Upload UI”- Task: Create the file dropzone component for CSV uploads.
- Files to Create:
apps/web-app/src/components/DataSources/CreateDataSourceDialog.tsxapps/web-app/src/components/DataSources/FileUploadDropzone.tsx
- Actions:
- The dialog will orchestrate the creation flow, allowing the user to name the data source.
- Implement the
FileUploadDropzonebased on Flowbite/shadcn examples. This component will handle the client-side multipart upload to R2. - Upon successful upload, the
useCreateDataSourcemutation will be called to kick off the backend processing workflow.
- Testing:
- Component tests to verify file selection, drag-and-drop, and that the upload function is called correctly on user interaction.
3. Data Source Management UI
Section titled “3. Data Source Management UI”- Task: Develop the primary interface for users to view and manage their data sources.
- Files to Create:
apps/web-app/src/routes/data-sources.tsx(New route)apps/web-app/src/components/DataSources/DataSourcesList.tsxapps/web-app/src/components/DataSources/DataSourceCard.tsxapps/web-app/src/components/DataSources/DataSourceDetails.tsxapps/web-app/src/components/DataSources/DataSourceDataTable.tsx
- Actions:
- Use the existing
SidePanellayout for the page structure. DataSourcesList: Fetch and display a list of sources using theDataSourceCard. The card should show Name, Status, Last Updated, and Type.DataSourceDetails: Show the selected source’s metadata. This view should contain aDataSourceDataTable.DataSourceDataTable: Usetanstack-tableandshadcn/uiTablecomponents to create a reusable, paginated, and sortable data table. This table will display the processed data for the data source by fetching it from a new API endpoint that reads from the KV cache (GET /api/data-sources/:id/data).
- Use the existing
- Testing:
- Write Storybook stories or Vitest component tests for each new UI component to ensure correct rendering with mock data.
Implementation Plan Details
Section titled “Implementation Plan Details”Phase 1: Schema Changes
Section titled “Phase 1: Schema Changes”1. Data Source Versioning Schema & Types
Section titled “1. Data Source Versioning Schema & Types”File: packages/honeygrid-types/schema/WorkspaceDOSchema.ts
export const DataSourceVersions = sqliteTable('data_source_versions', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), data_source_id: integer('data_source_id') .notNull() .references(() => DataSources.id), version_number: integer('version_number').notNull(), file_path: text('file_path'), status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }) .notNull() .default('pending'), status_message: text('status_message'), total_cells: integer('total_cells').default(0).notNull(), workflow_id: text('workflow_id'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), completed_at: text('completed_at'), metadata: text('metadata', { mode: 'json' }),})
export const DataSources = sqliteTable('data_sources', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), workspace_id: text('workspace_id').notNull(), name: text('name').notNull(), type: text('type', { enum: ['csv', 'google_ads'] }).notNull(), status: text('status', { enum: ['processing', 'failed', 'ready', 'pending'] }).notNull(), status_message: text('status_message'), total_cells: integer('total_cells').default(0).notNull(), file_path: text('file_path'), connection_id: integer('connection_id').references(() => Connections.id), current_version_id: integer('current_version_id'), workflow_id: text('workflow_id'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), completed_at: text('completed_at'),})File: packages/honeygrid-types/validators-and-types/DataSourceTypes.ts
export const DataSourceTypeValidator = z.enum(['csv', 'google_ads'])export const DataSourceStatusValidator = z.enum(['processing', 'failed', 'ready', 'pending'])
export const BaseDataSourceValidator = z.object({ id: z.number(), uuid: z.string(), workspace_id: z.string(), name: z.string(), type: DataSourceTypeValidator, status: DataSourceStatusValidator, status_message: z.string().nullable(), total_cells: z.number().int().default(0), file_path: z.string().nullable(), connection_id: z.number().nullable(), current_version_id: z.number().nullable(), workflow_id: z.string().nullable(), created_at: z.iso.datetime(), updated_at: z.iso.datetime(), completed_at: z.string().nullable(),})
export const DataSourceVersionValidator = z.object({ id: z.number(), uuid: z.string(), data_source_id: z.number(), version_number: z.number().int(), file_path: z.string().nullable(), status: DataSourceStatusValidator, status_message: z.string().nullable(), total_cells: z.number().int().default(0), workflow_id: z.string().nullable(), created_at: z.iso.datetime(), updated_at: z.iso.datetime(), completed_at: z.iso.datetime().nullable(), metadata: z.record(z.string(), z.unknown()).nullable(),})
export const CreateDataSourceValidator = z.object({ name: z.string(), type: DataSourceTypeValidator, connection_id: z.number().nullable(),})
export type DataSource = z.infer<typeof BaseDataSourceValidator>export type DataSourceVersion = z.infer<typeof DataSourceVersionValidator>export type CreateDataSource = z.infer<typeof CreateDataSourceValidator>2. Grid Versioning Schema & Types
Section titled “2. Grid Versioning Schema & Types”File: packages/honeygrid-types/schema/WorkspaceDOSchema.ts
export const GridVersionStatusValidator = z.enum(['active', 'pending', 'archived'])
export const GridVersions = sqliteTable('grid_versions', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), grid_id: integer('grid_id') .notNull() .references(() => Grids.id), name: text('name').notNull(), version_number: integer('version_number').notNull(), status: text('status', { enum: ['active', 'pending', 'archived'] }).notNull(), households: integer('households'), targeted_households: integer('targeted_households'), targeting_type: text('targeting_type'), targeting_coverage: integer('targeting_coverage'), targeted_areas: text('targeted_areas', { mode: 'json' }), targeting_filters: text('targeting_filters', { mode: 'json' }), manual_inclusion_filters: text('manual_inclusion_filters', { mode: 'json' }), manual_exclusion_filters: text('manual_exclusion_filters', { mode: 'json' }), last_used_at: text('last_used_at').notNull().default(new Date().toISOString()), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()),})
export const GridVersionDataSources = sqliteTable( 'grid_version_data_sources', { id: integer('id').primaryKey({ autoIncrement: true }), grid_version_id: integer('grid_version_id') .notNull() .references(() => GridVersions.id), data_source_id: integer('data_source_id') .notNull() .references(() => DataSources.id), data_source_version_id: integer('data_source_version_id') .notNull() .references(() => DataSourceVersions.id), }, (t) => [index('gvd_idx').on(t.grid_version_id, t.data_source_id)])
// Modify existing Grids tableexport const Grids = sqliteTable('grids', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), name: text('name').notNull(), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), updated_at: text('updated_at').$onUpdateFn(() => new Date().toISOString()), status: text('status').notNull(), auto_update: integer('auto_update', { mode: 'boolean' }).notNull().default(false), current_version_id: integer('current_version_id'), pending_version_id: integer('pending_version_id'),})File: packages/honeygrid-types/validators-and-types/GridTypes.ts
export const GridVersionStatusValidator = z.enum(['active', 'pending', 'archived'])
export const GridVersionValidator = z.object({ id: z.number(), uuid: z.string(), grid_id: z.number(), name: z.string(), version_number: z.number().int(), status: GridVersionStatusValidator, households: z.number().nullable(), targeted_households: z.number().nullable(), targeting_type: z.string().nullable(), targeting_coverage: z.number().nullable(), targeted_areas: TargetedAreasValidator, targeting_filters: TargetingFilterValidator, manual_inclusion_filters: z.array(z.string()), manual_exclusion_filters: z.array(z.string()), last_used_at: z.string(), created_at: z.iso.datetime(), updated_at: z.iso.datetime(),})
export const GridVersionDataSourceValidator = z.object({ id: z.number(), grid_version_id: z.number(), data_source_id: z.number(), data_source_version_id: z.number(),})
// Update existing GridValidatorexport const GridValidator = z.object({ id: z.number(), uuid: z.string(), name: z.string(), created_at: z.iso.datetime(), updated_at: z.iso.datetime(), status: z.string(), auto_update: z.boolean(), current_version_id: z.number().nullable(), pending_version_id: z.number().nullable(),})
export type GridVersion = z.infer<typeof GridVersionValidator>export type GridVersionStatus = z.infer<typeof GridVersionStatusValidator>export type GridVersionDataSource = z.infer<typeof GridVersionDataSourceValidator>Phase 2: Data Source Versioning
Section titled “Phase 2: Data Source Versioning”1. Update WorkspaceDO RPC Interface
Section titled “1. Update WorkspaceDO RPC Interface”File: apps/api/src/durable-objects/WorkspaceDO.ts
// New methods to add:- createDataSourceVersion(dataSourceId: number, data: Partial<typeof DataSourceVersions.$inferSelect>)- updateDataSourceVersion(versionId: number, data: Partial<typeof DataSourceVersions.$inferSelect>)- getDataSourceVersions(dataSourceId: number)- getDataSourceVersion(versionId: number)2. Update Data Source Routes
Section titled “2. Update Data Source Routes”File: apps/api/src/routes/dataSources.ts
// New endpoints to add:- POST /api/data-sources/:id/versions - Create new version- GET /api/data-sources/:id/versions - List versions- GET /api/data-sources/:id/versions/:versionId - Get version details- GET /api/data-sources/:id/versions/:versionId/data - Get version data3. Update KV Storage Pattern
Section titled “3. Update KV Storage Pattern”File: apps/api/src/workflows/create-data-source-version.ts
// Update KV key format:const kvKey = `datasource:${dataSourceId}/version/${versionId}`Phase 3: Grid Versioning
Section titled “Phase 3: Grid Versioning”1. Update GridDO Implementation
Section titled “1. Update GridDO Implementation”File: apps/api/src/durable-objects/GridDO.ts
// New methods to add:- createGridVersion(data: Partial<typeof GridVersions.$inferSelect>)- updateGridVersion(versionId: number, data: Partial<typeof GridVersions.$inferSelect>)- getGridVersions()- getGridVersion(versionId: number)- promoteVersionToActive(versionId: number)- updatePendingGridVersion()- cleanupOldVersions() // Deletes versions not used in 2 years2. Update Grid State Management
Section titled “2. Update Grid State Management”File: apps/api/src/durable-objects/GridDO.ts
// Update GridState type:export type GridState = { gridId: string workspaceId: string currentVersionId: number | null pendingVersionId: number | null autoUpdate: boolean}3. Add Grid Version Routes
Section titled “3. Add Grid Version Routes”File: apps/api/src/routes/grids.ts
// New endpoints to add:- POST /api/grids/:id/versions - Create new version- GET /api/grids/:id/versions - List versions- GET /api/grids/:id/versions/:versionId - Get version details- POST /api/grids/:id/versions/:versionId/promote - Promote version to active- PUT /api/grids/:id/auto-update - Toggle auto-update settingPhase 4: Data Processing Updates
Section titled “Phase 4: Data Processing Updates”1. Update Workflow Implementation
Section titled “1. Update Workflow Implementation”File: apps/api/src/workflows/create-data-source-version.ts
// Update workflow to:1. Create/update data source version2. Store processed data with version-specific KV key3. Notify grids to update their pending versions2. Add Version Cleanup Job
Section titled “2. Add Version Cleanup Job”File: apps/api/src/workflows/cleanup-grid-versions-workflow.ts
// New workflow to:1. Query for grid versions not used in 730 days (2 years)2. Delete old versions and their associated data3. Run on a daily scheduleLive Data Source Management
Section titled “Live Data Source Management”Dynamic data sources (like Google Ads) require active management to keep their data current. This uses a hybrid approach with Connection Actors for data storage and scheduling, while keeping connection metadata in WorkspaceDO for clean joins and permissions.
Hybrid Architecture
Section titled “Hybrid Architecture”Connection Metadata in WorkspaceDO:
- Connection auth credentials, settings, and permissions
- References to data sources and grids
- Connection health status and last sync timestamps
- Enables clean SQL joins across workspace data
Connection Actors for Data Management:
- Scheduled data fetching using Durable Object alarms
- Historical data storage in Actor’s SQLite database
- Daily snapshots for aggregation queries (“12 month conversions”)
- RPC methods for data retrieval and management
Live Data Update Flow
Section titled “Live Data Update Flow”sequenceDiagram participant ConnectionActor participant ThirdPartyAPI participant WorkspaceDO participant ProcessLiveDataSourceWorkflow participant KV participant GridDO
ConnectionActor->>ConnectionActor: 1. Daily alarm fires (scheduled) ConnectionActor->>+WorkspaceDO: 2. getConnectionAuth(connectionId) (RPC) WorkspaceDO-->>-ConnectionActor: Auth credentials & settings ConnectionActor->>+ThirdPartyAPI: 3. Fetch latest data (using auth) ThirdPartyAPI-->>-ConnectionActor: Raw data
ConnectionActor->>ConnectionActor: 4. Store daily snapshot in local SQLite ConnectionActor->>+WorkspaceDO: 5. getDataSourcesForConnection(connectionId) (RPC) WorkspaceDO-->>-ConnectionActor: [dataSourceId1, dataSourceId2]
loop For each dataSourceId ConnectionActor->>+ProcessLiveDataSourceWorkflow: 6. Start workflow(dataSourceId, connectionId) end
activate ProcessLiveDataSourceWorkflow ProcessLiveDataSourceWorkflow->>+ConnectionActor: 7. getLatestSnapshot() (RPC) ConnectionActor-->>-ProcessLiveDataSourceWorkflow: Processed data ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 8. Create new DataSourceVersion (RPC) WorkspaceDO-->>-ProcessLiveDataSourceWorkflow: versionId ProcessLiveDataSourceWorkflow->>+KV: 9. Store aggregated data with version key
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 10. getGridIdsForDataSource() WorkspaceDO-->>-ProcessLiveDataSourceWorkflow: [gridId1, gridId2]
loop For each gridId ProcessLiveDataSourceWorkflow->>+GridDO: 11. updatePendingGridVersion() end
ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 12. updateVersionStatus('ready') ProcessLiveDataSourceWorkflow->>+WorkspaceDO: 13. updateConnectionSyncStatus(connectionId, 'synced') deactivate ProcessLiveDataSourceWorkflowConnection Actor Implementation
Section titled “Connection Actor Implementation”✅ Status: Core Connection Actor is implemented with basic sync functionality. Platform-specific integrations are partially implemented.
Using the @cloudflare/actors library with a composition-based approach for platform-specific functionality. This approach uses a single ConnectionActor class that handles all platforms through strategy pattern methods, avoiding the operational complexity of multiple Durable Object classes.
File: apps/api/src/durable-objects/ConnectionActor.ts
import { Actor, handler, Persist } from '@cloudflare/actors'import { SyncSnapshots, ErrorLogs, SyncHistory } from 'honeygrid-types/schema/ConnectionActorSchema'import { and, count, desc, eq, gte, lte, sum } from 'drizzle-orm'import { drizzle } from 'drizzle-orm/durable-sqlite'import { migrate } from 'drizzle-orm/durable-sqlite/migrator'
// @ts-ignoreimport migrations from '../../ConnectionActordrizzleMigrations/migrations.js'
import type { DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite'
/** * Connection Actor manages live data sources with daily snapshots and historical aggregation * Each actor instance represents one connection and handles: * - Scheduled data fetching using alarms * - Historical data storage in SQLite * - Daily snapshots for aggregation queries * - RPC methods for data retrieval and management */export class ConnectionActor extends Actor<Env> { #db: DrizzleSqliteDODatabase<any> #connectionId: string
// Configuration stored using @Persist decorator @Persist public syncFrequencyHours: number = 24
@Persist public syncEnabled: boolean = true
@Persist public lastSuccessfulSync: string | null = null
@Persist public nextScheduledSync: string | null = null
@Persist public platformSettings: Record<string, any> = {}
constructor(ctx?: any, env?: Env) { super(ctx, env) this.#connectionId = this.identifier || 'unknown' this.#db = drizzle(this.ctx.storage, { schema: { SyncSnapshots, SyncHistory, ErrorLogs, }, })
// Ensure database is migrated before processing requests this.ctx.blockConcurrencyWhile(async () => { await this.migrateDb() }) }
/** * Migrates the Connection Actor database using Drizzle migrations */ private async migrateDb() { try { await migrate(this.#db, migrations) } catch (error) { console.error(`Error migrating connection actor database for ${this.#connectionId}:`, error) throw error } }
/** * RPC method to get connection status */ public async getConnectionStatus(): Promise<{ connection_id: string last_snapshot: string | null status: string total_snapshots: number }> { const [lastSnapshot] = await this.#db .select({ last_snapshot: SyncSnapshots.snapshot_date }) .from(SyncSnapshots) .orderBy(desc(SyncSnapshots.snapshot_date)) .limit(1)
const [totalCount] = await this.#db.select({ count: count() }).from(SyncSnapshots)
return { connection_id: this.#connectionId, last_snapshot: lastSnapshot?.last_snapshot || null, status: 'active', total_snapshots: totalCount?.count || 0, } }
/** * Initialize connection configuration and schedule first sync */ public async initializeConnection(syncFrequencyHours: number = 24): Promise<void> { console.log(`Initializing connection actor: ${this.#connectionId}`)
// Set configuration using @Persist properties this.syncFrequencyHours = syncFrequencyHours this.syncEnabled = true this.nextScheduledSync = new Date( Date.now() + syncFrequencyHours * 60 * 60 * 1000 ).toISOString()
// Schedule first sync this.alarms.schedule(60, 'performDailySync', []) // Start in 1 minute for immediate testing }
/** * Scheduled method called by daily alarm to fetch and store data */ public async performDailySync(): Promise<void> { console.log(`Daily sync triggered for connection: ${this.#connectionId}`)
const syncId = `${this.#connectionId}-${Date.now()}` const syncStart = new Date().toISOString() const today = new Date().toISOString().split('T')[0]
try { // Record sync start await this.#db.insert(SyncHistory).values({ id: syncId, sync_date: today, status: 'in_progress', started_at: syncStart, })
// Get connection auth from WorkspaceDO const workspaceDO = this.getWorkspaceDO() const authData = await workspaceDO.getConnectionAuth(this.#connectionId)
// Fetch data from third-party API const rawData = await this.fetchFromThirdParty(authData)
// Process and store daily snapshot const recordsProcessed = await this.storeDailySnapshot(rawData, today)
// Update sync history with success await this.#db .update(SyncHistory) .set({ status: 'success', records_processed: recordsProcessed, completed_at: new Date().toISOString(), sync_duration_ms: Date.now() - new Date(syncStart).getTime(), }) .where(eq(SyncHistory.id, syncId))
// Trigger data source version creation workflows const dataSourceIds = await workspaceDO.getDataSourcesForConnection(this.#connectionId) await this.triggerDataSourceUpdates(dataSourceIds)
// Update sync status in WorkspaceDO await workspaceDO.updateConnectionSyncStatus(this.#connectionId, 'active')
// Update connection config with successful sync this.lastSuccessfulSync = new Date().toISOString() this.nextScheduledSync = new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString() // Next day
// Schedule next sync (24 hours) this.alarms.schedule(24 * 60 * 60, 'performDailySync', []) } catch (error) { console.error(`Sync failed for connection ${this.#connectionId}:`, error)
// Update sync history with failure await this.#db .update(SyncHistory) .set({ status: 'failed', error_message: (error as Error).message, completed_at: new Date().toISOString(), sync_duration_ms: Date.now() - new Date(syncStart).getTime(), }) .where(eq(SyncHistory.id, syncId))
// Log detailed error await this.#db.insert(ErrorLogs).values({ id: `${syncId}-error`, sync_history_id: syncId, error_type: 'sync_failed', error_message: (error as Error).message, error_details: { stack: (error as Error).stack }, })
// Update WorkspaceDO with error status const workspaceDO = this.getWorkspaceDO() await workspaceDO.updateConnectionSyncStatus(this.#connectionId, 'error')
// Retry in 1 hour on failure this.alarms.schedule(60 * 60, 'performDailySync', []) } }
/** * RPC method to get latest snapshot data for workflow processing */ public async getLatestSnapshot(): Promise<Record<string, Record<string, number>>> { const today = new Date().toISOString().split('T')[0]
const results = await this.#db .select({ h3_cell: SyncSnapshots.h3_cell, metric_name: SyncSnapshots.metric_name, metric_value: SyncSnapshots.metric_value, }) .from(SyncSnapshots) .where(eq(SyncSnapshots.snapshot_date, today))
// Transform to nested object: { h3Cell: { metricName: value } } const data: Record<string, Record<string, number>> = {} for (const row of results) { if (!data[row.h3_cell]) data[row.h3_cell] = {} data[row.h3_cell][row.metric_name] = row.metric_value }
return data }
/** * RPC method to get historical aggregated data for specific metrics and time ranges */ public async getHistoricalAggregation( h3Cell: string, metric: string, startDate: string, endDate: string ): Promise<number> { const [result] = await this.#db .select({ total: sum(SyncSnapshots.metric_value) }) .from(SyncSnapshots) .where( and( eq(SyncSnapshots.h3_cell, h3Cell), eq(SyncSnapshots.metric_name, metric), gte(SyncSnapshots.snapshot_date, startDate), lte(SyncSnapshots.snapshot_date, endDate) ) )
return Number(result?.total) || 0 }
/** * Store daily snapshot data in local SQLite using Drizzle ORM */ private async storeDailySnapshot(rawData: any, snapshotDate: string): Promise<number> { // Process raw data into H3 cell aggregations const aggregatedData = await this.processRawDataToH3(rawData) let recordsProcessed = 0
// Store each cell's metrics for (const [h3Cell, metrics] of Object.entries(aggregatedData)) { for (const [metricName, value] of Object.entries(metrics as Record<string, number>)) { await this.#db .insert(SyncSnapshots) .values({ id: `${h3Cell}-${metricName}-${snapshotDate}`, h3_cell: h3Cell, metric_name: metricName, metric_value: value, snapshot_date: snapshotDate, raw_data: rawData, // Store original response for debugging }) .onConflictDoUpdate({ target: [SyncSnapshots.h3_cell, SyncSnapshots.metric_name, SyncSnapshots.snapshot_date], set: { metric_value: value, raw_data: rawData }, })
recordsProcessed++ } }
return recordsProcessed }
/** * Process raw API data into H3 cell aggregations * This method uses composition to handle different platforms */ private async processRawDataToH3(rawData: any): Promise<Record<string, Record<string, number>>> { switch (rawData.platform) { case 'google_ads': return this.processGoogleAdsData(rawData) case 'mailchimp': return this.processMailchimpData(rawData) case 'stripe': return this.processStripeData(rawData) default: console.warn(`processRawDataToH3 not implemented for platform ${rawData.platform}`) return this.processDefaultMockData(rawData) } }
/** * Google Ads specific data processing */ private async processGoogleAdsData( rawData: any ): Promise<Record<string, Record<string, number>>> { const aggregated: Record<string, Record<string, number>> = {}
// Process Google Ads location reports for (const location of rawData.location_data || []) { if (location.geographic_view?.postal_code) { // Convert postal code to H3 cell const h3Cell = await this.env.GEOCODING_SERVICE.zipCodeToH3( location.geographic_view.postal_code )
if (!aggregated[h3Cell]) aggregated[h3Cell] = {} aggregated[h3Cell].conversions = (aggregated[h3Cell].conversions || 0) + (location.metrics.conversions || 0) aggregated[h3Cell].impressions = (aggregated[h3Cell].impressions || 0) + (location.metrics.impressions || 0) aggregated[h3Cell].clicks = (aggregated[h3Cell].clicks || 0) + (location.metrics.clicks || 0) aggregated[h3Cell].spend = (aggregated[h3Cell].spend || 0) + (location.metrics.cost_micros / 1000000 || 0) } }
return aggregated }
/** * Mailchimp specific data processing */ private async processMailchimpData( rawData: any ): Promise<Record<string, Record<string, number>>> { const aggregated: Record<string, Record<string, number>> = {}
// Process Mailchimp member data with addresses for (const member of rawData.members || []) { if (member.merge_fields?.ADDRESS) { const address = `${member.merge_fields.ADDRESS.addr1}, ${member.merge_fields.ADDRESS.city}, ${member.merge_fields.ADDRESS.state} ${member.merge_fields.ADDRESS.zip}`
// Convert address to H3 cell const h3Cell = await this.env.GEOCODING_SERVICE.addressToH3(address)
if (!aggregated[h3Cell]) aggregated[h3Cell] = {} aggregated[h3Cell].subscribers = (aggregated[h3Cell].subscribers || 0) + 1 aggregated[h3Cell].opens = (aggregated[h3Cell].opens || 0) + (member.stats?.avg_open_rate || 0) aggregated[h3Cell].clicks = (aggregated[h3Cell].clicks || 0) + (member.stats?.avg_click_rate || 0) } }
return aggregated }
/** * Stripe specific data processing */ private async processStripeData(rawData: any): Promise<Record<string, Record<string, number>>> { const aggregated: Record<string, Record<string, number>> = {}
// Process Stripe customer/charge data with billing addresses for (const charge of rawData.charges || []) { if (charge.billing_details?.address) { const address = `${charge.billing_details.address.line1}, ${charge.billing_details.address.city}, ${charge.billing_details.address.state} ${charge.billing_details.address.postal_code}`
// Convert address to H3 cell const h3Cell = await this.env.GEOCODING_SERVICE.addressToH3(address)
if (!aggregated[h3Cell]) aggregated[h3Cell] = {} aggregated[h3Cell].revenue = (aggregated[h3Cell].revenue || 0) + charge.amount / 100 // Convert cents to dollars aggregated[h3Cell].transactions = (aggregated[h3Cell].transactions || 0) + 1 } }
return aggregated }
/** * Default mock data processing for testing and unsupported platforms */ private async processDefaultMockData( rawData: any ): Promise<Record<string, Record<string, number>>> { // Return mock data to demonstrate the structure // In a real implementation, this would: // 1. Extract addresses from rawData based on platform format // 2. Use GeoCodingService to convert to H3 cells // 3. Aggregate metrics by cell
console.warn(`processRawDataToH3 not implemented for connection ${this.#connectionId}`)
return { '8a2a1072b59ffff': { conversions: 42, impressions: 1500, clicks: 89, spend: 125.5, }, '8a2a1072b5affff': { conversions: 18, impressions: 890, clicks: 45, spend: 67.25, }, } }
/** * Fetch data from third-party API using stored auth credentials * This method uses composition to handle different platforms */ private async fetchFromThirdParty(authData: any): Promise<any> { switch (authData.platform) { case 'google_ads': return this.fetchFromGoogleAds(authData) case 'mailchimp': return this.fetchFromMailchimp(authData) case 'stripe': return this.fetchFromStripe(authData) default: console.warn(`fetchFromThirdParty not implemented for platform ${authData.platform}`) return this.fetchMockData(authData) } }
/** * Google Ads API integration */ private async fetchFromGoogleAds(authData: any): Promise<any> { // Initialize Google Ads API client const adsApi = new GoogleAds({ client_id: authData.auth_data.client_id, client_secret: authData.auth_data.client_secret, refresh_token: authData.auth_data.refresh_token, })
const accountId = authData.platform_data.account_id
try { // Fetch campaign data const campaigns = await adsApi.campaigns.list({ customer_id: accountId })
// Fetch geographic performance report const locationReports = await adsApi.reports.generate({ customer_id: accountId, query: ` SELECT campaign.id, geographic_view.location_type, geographic_view.postal_code, metrics.impressions, metrics.conversions, metrics.clicks, metrics.cost_micros FROM geographic_view WHERE segments.date DURING TODAY AND geographic_view.location_type = 'POSTAL_CODE' `, })
return { platform: 'google_ads', account_id: accountId, campaigns: campaigns.data, location_data: locationReports.data, fetched_at: new Date().toISOString(), } } catch (error) { console.error(`Google Ads API error for connection ${this.#connectionId}:`, error) throw new Error(`Google Ads fetch failed: ${(error as Error).message}`) } }
/** * Mailchimp API integration */ private async fetchFromMailchimp(authData: any): Promise<any> { const mailchimp = new MailchimpApi(authData.auth_data.api_key) const serverPrefix = authData.auth_data.server_prefix
try { // Fetch all lists const lists = await mailchimp.lists.getAllLists()
// Fetch member data with addresses for all lists const allMembers = [] for (const list of lists.lists) { const members = await mailchimp.lists.getListMembersInfo(list.id, { fields: ['members.email_address', 'members.merge_fields', 'members.stats'], count: 1000, // Adjust based on needs }) allMembers.push(...members.members) }
return { platform: 'mailchimp', server_prefix: serverPrefix, lists: lists.lists, members: allMembers, fetched_at: new Date().toISOString(), } } catch (error) { console.error(`Mailchimp API error for connection ${this.#connectionId}:`, error) throw new Error(`Mailchimp fetch failed: ${(error as Error).message}`) } }
/** * Stripe API integration */ private async fetchFromStripe(authData: any): Promise<any> { const stripe = new Stripe(authData.auth_data.secret_key, { apiVersion: '2023-10-16', })
try { // Fetch recent charges with billing addresses const charges = await stripe.charges.list({ limit: 1000, created: { gte: Math.floor((Date.now() - 24 * 60 * 60 * 1000) / 1000), // Last 24 hours }, expand: ['data.customer'], })
return { platform: 'stripe', charges: charges.data, fetched_at: new Date().toISOString(), } } catch (error) { console.error(`Stripe API error for connection ${this.#connectionId}:`, error) throw new Error(`Stripe fetch failed: ${(error as Error).message}`) } }
/** * Mock data for testing and unsupported platforms */ private async fetchMockData(authData: any): Promise<any> { // For now, return mock data to demonstrate the flow // In a real implementation, this would: // 1. Use authData.auth_data for API credentials // 2. Make authenticated requests to platform APIs // 3. Handle pagination and rate limiting // 4. Return structured data for processing
console.warn(`fetchFromThirdParty not implemented for connection ${this.#connectionId}`)
return { platform: 'google_ads', account_id: authData.platform_data?.account_id || 'demo-account', campaigns: [ { id: 'campaign-1', name: 'Demo Campaign', locations: [ { address: '123 Main St, San Francisco, CA', conversions: 10, impressions: 500 }, { address: '456 Oak Ave, San Jose, CA', conversions: 8, impressions: 350 }, ], }, ], } }
/** * Get WorkspaceDO instance for this connection */ private getWorkspaceDO() { // Extract workspace ID from connection identifier // Assuming format "workspaceId:connectionId" const workspaceId = this.#connectionId.split(':')[0] const doId = this.env.WORKSPACE_DO.idFromName(workspaceId) return this.env.WORKSPACE_DO.get(doId) }
/** * Trigger data source version creation workflows */ private async triggerDataSourceUpdates(dataSourceIds: string[]): Promise<void> { for (const dataSourceId of dataSourceIds) { try { await this.env.CREATE_DATA_SOURCE_VERSION_WORKFLOW.create({ id: `${dataSourceId}-${Date.now()}`, params: { dataSourceId, connectionId: this.#connectionId }, }) console.log(`Triggered workflow for data source: ${dataSourceId}`) } catch (error) { console.error(`Failed to trigger workflow for data source ${dataSourceId}:`, error) } } }}
export default handler(ConnectionActor)Platform-Specific Benefits
Section titled “Platform-Specific Benefits”The composition approach provides several advantages:
- Single Deployment: Only one Durable Object binding needed in
wrangler.jsonc - Dynamic Platform Support: Platform type determined at runtime from connection metadata
- Shared Infrastructure: Common alarm scheduling, database operations, and error handling
- Easy Extension: Adding new platforms requires only new methods, no deployment changes
- Unified Monitoring: Single metrics namespace with platform tags
Workflow Implementation
Section titled “Workflow Implementation”The system uses multiple Cloudflare Workflows to orchestrate data processing and synchronization. These workflows provide built-in observability, retry logic, and durability for complex multi-step operations.
1. SyncConnectionWorkflow - Primary Data Sync Orchestrator
Section titled “1. SyncConnectionWorkflow - Primary Data Sync Orchestrator”⚠️ Note: This workflow is not yet implemented. The following is a proposed design for future development.
This workflow is triggered by Connection Actors and orchestrates the entire sync process with platform-specific steps.
File: apps/api/src/workflows/SyncConnectionWorkflow.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'
/** * Parameters for the sync workflow */export type SyncConnectionParams = { connectionId: number workspaceId: string platform: string triggeredBy: 'scheduled' | 'manual' | 'retry'}
/** * Main workflow for synchronizing data from third-party connections * Called by Connection Actors on their scheduled alarms */export class SyncConnectionWorkflow extends WorkflowEntrypoint<Env, SyncConnectionParams> { // Helper to get the Connection Actor stub private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub { const actorId = `${workspaceId}:${connectionId}` const doId = this.env.CONNECTION_ACTORS.idFromName(actorId) return this.env.CONNECTION_ACTORS.get(doId) }
// Helper to get the WorkspaceDO stub private getWorkspaceDO(workspaceId: string): DurableObjectStub { const id = this.env.WORKSPACE_DO.idFromName(workspaceId) return this.env.WORKSPACE_DO.get(id) }
async run(event: WorkflowEvent<SyncConnectionParams>, step: WorkflowStep) { const { connectionId, workspaceId, platform, triggeredBy } = event.payload
console.log(`Starting sync workflow for connection ${connectionId} (${platform})`)
// Get actor and workspace stubs const actor = this.getConnectionActor(connectionId, workspaceId) const workspace = this.getWorkspaceDO(workspaceId)
try { // Step 1: Validate connection and get auth data const authData = await step.do( 'validate-connection', { retries: { limit: 2, delay: '5 seconds' }, timeout: '30 seconds', }, async () => { return await workspace.getConnectionAuth(connectionId) } )
// Step 2: Fetch data from platform (platform-specific with longer timeout for API calls) const rawData = await step.do( 'fetch-platform-data', { retries: { limit: 3, delay: '30 seconds', backoff: 'exponential' }, timeout: '5 minutes', }, async () => { // The actor handles platform-specific logic return await actor.fetchFromThirdParty(authData) } )
// Step 3: Process and store snapshot data in Connection Actor const recordsProcessed = await step.do( 'store-snapshot', { retries: { limit: 2, delay: '10 seconds' }, timeout: '2 minutes', }, async () => { return await actor.processAndStoreSnapshot(rawData) } )
console.log(`Processed ${recordsProcessed} records for connection ${connectionId}`)
// Step 4: Get associated data sources for this connection const dataSourceIds = await step.do('get-data-sources', async () => { return await workspace.getDataSourcesForConnection(connectionId) })
// Step 5: Trigger data source version creation for each data source if (dataSourceIds.length > 0) { await step.do( 'trigger-data-source-updates', { retries: { limit: 2, delay: '5 seconds' }, }, async () => { const promises = dataSourceIds.map((dataSourceId) => this.env.CREATE_DATA_SOURCE_VERSION_WORKFLOW.create({ id: `${dataSourceId}-${Date.now()}`, params: { dataSourceId, workspaceId, connectionId, versionId: null, // Will be created by the workflow r2Path: null, // Live data doesn't use R2 }, }) ) return await Promise.all(promises) } )
console.log(`Triggered ${dataSourceIds.length} data source version workflows`) }
// Step 6: Update connection sync status await step.do('update-sync-status', async () => { await workspace.updateConnectionSyncStatus(connectionId, 'active') await actor.updateSyncSuccess(recordsProcessed) })
// Step 7: Schedule next sync (handled by actor's alarm system) await step.do('schedule-next-sync', async () => { const syncFrequency = await actor.getSyncFrequency() await actor.scheduleNextSync(syncFrequency) })
console.log(`Sync workflow completed successfully for connection ${connectionId}`) } catch (error) { console.error(`Sync workflow failed for connection ${connectionId}:`, error)
// Log detailed error information await step.do('handle-sync-error', async () => { await workspace.updateConnectionSyncStatus(connectionId, 'error') await actor.logSyncError(error as Error) })
// Schedule retry with exponential backoff const retryDelay = triggeredBy === 'retry' ? '4 hours' : '1 hour' await step.sleep('retry-delay', retryDelay)
// Trigger retry workflow await step.do('schedule-retry', async () => { return await this.env.SYNC_CONNECTION_WORKFLOW.create({ id: `${connectionId}-retry-${Date.now()}`, params: { ...event.payload, triggeredBy: 'retry', }, }) })
// Re-throw to mark workflow as failed throw error } }}2. ProcessLiveDataSourceWorkflow - Connection Data Processing
Section titled “2. ProcessLiveDataSourceWorkflow - Connection Data Processing”⚠️ Note: This workflow is not yet implemented. The following is a proposed design for future development.
This workflow processes snapshot data from Connection Actors and creates new data source versions.
File: apps/api/src/workflows/ProcessLiveDataSourceWorkflow.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'
/** * Parameters for processing live data source */export type ProcessLiveDataSourceParams = { dataSourceId: number workspaceId: string connectionId: number}
/** * Workflow for processing live data from Connection Actors into versioned data sources */export class ProcessLiveDataSourceWorkflow extends WorkflowEntrypoint< Env, ProcessLiveDataSourceParams> { // Helper to get the Connection Actor stub private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub { const actorId = `${workspaceId}:${connectionId}` const doId = this.env.CONNECTION_ACTORS.idFromName(actorId) return this.env.CONNECTION_ACTORS.get(doId) }
// Helper to get the WorkspaceDO stub private getWorkspaceDO(workspaceId: string): DurableObjectStub { const id = this.env.WORKSPACE_DO.idFromName(workspaceId) return this.env.WORKSPACE_DO.get(id) }
async run(event: WorkflowEvent<ProcessLiveDataSourceParams>, step: WorkflowStep) { const { dataSourceId, workspaceId, connectionId } = event.payload
console.log(`Processing live data source ${dataSourceId} from connection ${connectionId}`)
const workspace = this.getWorkspaceDO(workspaceId) const actor = this.getConnectionActor(connectionId, workspaceId)
try { // Step 1: Create new data source version const version = await step.do('create-version', async () => { return await workspace.createDataSourceVersion(dataSourceId, { status: 'processing', workflow_id: event.workflowId, }) })
console.log(`Created version ${version.id} for data source ${dataSourceId}`)
// Step 2: Get latest processed data from Connection Actor const aggregatedData = await step.do( 'get-snapshot-data', { retries: { limit: 3, delay: '10 seconds' }, timeout: '1 minute', }, async () => { return await actor.getLatestSnapshot() } )
// Step 3: Store aggregated data in KV with version-specific key await step.do( 'store-in-kv', { retries: { limit: 3, delay: '5 seconds' }, timeout: '30 seconds', }, async () => { const kvKey = `datasource:${dataSourceId}/version/${version.id}` await this.env.DATA_SOURCES_KV.put(kvKey, JSON.stringify(aggregatedData)) console.log(`Stored data in KV with key: ${kvKey}`) } )
// Step 4: Update version status and metadata await step.do('update-version-status', async () => { const cellCount = Object.keys(aggregatedData).length return await workspace.updateDataSourceVersion(version.id, { status: 'ready', total_cells: cellCount, completed_at: new Date().toISOString(), metadata: { connection_id: connectionId, platform: await actor.getPlatform(), sync_timestamp: new Date().toISOString(), }, }) })
// Step 5: Update parent data source to point to new version await step.do('update-current-version', async () => { return await workspace.updateDataSource(dataSourceId, { current_version_id: version.id, updated_at: new Date().toISOString(), }) })
// Step 6: Notify grids that depend on this data source const gridIds = await step.do('get-affected-grids', async () => { return await workspace.getGridIdsForDataSource(dataSourceId) })
if (gridIds.length > 0) { await step.do( 'update-grid-versions', { retries: { limit: 2, delay: '5 seconds' }, }, async () => { // For each grid, update its pending version with the new data source version const promises = gridIds.map(async (gridId) => { const gridDO = this.env.GRID_DO.get(this.env.GRID_DO.idFromName(gridId)) return await gridDO.updatePendingVersion({ dataSourceId, versionId: version.id, }) }) return await Promise.all(promises) } )
console.log(`Updated ${gridIds.length} grid versions with new data source version`) }
console.log(`Successfully processed live data source ${dataSourceId}`) } catch (error) { console.error(`Failed to process live data source ${dataSourceId}:`, error)
// Update version status to failed await step.do('mark-version-failed', async () => { const version = await workspace.getDataSourceVersion(dataSourceId) if (version) { await workspace.updateDataSourceVersion(version.id, { status: 'failed', completed_at: new Date().toISOString(), metadata: { error: (error as Error).message, failed_at: new Date().toISOString(), }, }) } })
// Re-throw to mark workflow as failed throw error } }}3. Enhanced CreateDataSourceVersionWorkflow - Unified CSV & Live Data Processing
Section titled “3. Enhanced CreateDataSourceVersionWorkflow - Unified CSV & Live Data Processing”Updated to handle both CSV uploads and live data from connections.
File: apps/api/src/workflows/create-data-source-version.ts (Updated)
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'import { parse } from 'csv-parse/sync'
import { Workspace } from '../durable-objects/WorkspaceDO'import { H3_RESOLUTION } from '../utils/h3-utils'
import type { Address } from 'honeygrid-types'
/** * Params supplied when the workflow is started. */export type CreateDataSourceVersionParams = { dataSourceId: number workspaceId: string versionId?: number | null // Pre-created version ID (for live data) r2Path?: string | null // R2 path for CSV files (null for live data) connectionId?: number | null // Connection ID for live data (null for CSV)}
/** * Enhanced workflow that processes both CSV uploads and live connection data */export class CreateDataSourceVersionWorkflow extends WorkflowEntrypoint< Env, CreateDataSourceVersionParams> { // Helper to get the WorkspaceDO stub private getWorkspaceDO(workspaceId: string): DurableObjectStub<Workspace> { const id = this.env.WORKSPACE_DO.idFromName(workspaceId) return this.env.WORKSPACE_DO.get(id) }
// Helper to get the Connection Actor stub (for live data) private getConnectionActor(connectionId: number, workspaceId: string): DurableObjectStub { const actorId = `${workspaceId}:${connectionId}` const doId = this.env.CONNECTION_ACTORS.idFromName(actorId) return this.env.CONNECTION_ACTORS.get(doId) }
async run(event: WorkflowEvent<CreateDataSourceVersionParams>, step: WorkflowStep) { const { dataSourceId, workspaceId, versionId, r2Path, connectionId } = event.payload
console.log(`Processing data source ${dataSourceId} - Type: ${r2Path ? 'CSV' : 'Live Data'}`)
const workspace = this.getWorkspaceDO(workspaceId)
// Determine processing type const isLiveData = !r2Path && connectionId const isCsvUpload = !!r2Path && !connectionId
if (!isLiveData && !isCsvUpload) { throw new Error('Must specify either r2Path (CSV) or connectionId (live data)') }
try { // Step 1: Create or get the version record const version = await step.do('setup-version', async () => { if (versionId) { // Version already created (live data flow) return await workspace.getDataSourceVersion(versionId) } else { // Create new version (CSV upload flow) return await workspace.createDataSourceVersion(dataSourceId, { status: 'processing', workflow_id: event.workflowId, file_path: r2Path, }) } })
console.log(`Processing version ${version.id} for data source ${dataSourceId}`)
let aggregatedData: Record<string, Record<string, number>>
if (isCsvUpload) { // CSV Processing Path aggregatedData = await step.do('process-csv', async () => { // Step 1a: Ingest data from R2 const object = await this.env.CUSTOM_DATA_SOURCES.get(r2Path!) if (!object) throw new Error(`File not found in R2: ${r2Path}`)
const csvText = await object.text() const rawData: Record<string, any>[] = parse(csvText, { columns: true, skip_empty_lines: true, })
console.log(`Parsed ${rawData.length} rows from CSV`)
// Step 1b: Process CSV data through geocoding and aggregation return await this.processCsvToH3(rawData) }) } else { // Live Data Processing Path aggregatedData = await step.do('get-live-data', async () => { const actor = this.getConnectionActor(connectionId!, workspaceId) return await actor.getLatestSnapshot() }) }
// Step 2: Store aggregated data in KV (common for both paths) await step.do('store-in-kv', async () => { const kvKey = `datasource:${dataSourceId}/version/${version.id}` await this.env.DATA_SOURCES_KV.put(kvKey, JSON.stringify(aggregatedData)) console.log(`Stored aggregated data in KV: ${kvKey}`) })
// Step 3: Update version metadata await step.do('update-version-metadata', async () => { const cellCount = Object.keys(aggregatedData).length return await workspace.updateDataSourceVersion(version.id, { status: 'ready', total_cells: cellCount, completed_at: new Date().toISOString(), metadata: { processing_type: isLiveData ? 'live_data' : 'csv_upload', connection_id: connectionId, cells_processed: cellCount, metrics_included: this.getMetricNames(aggregatedData), }, }) })
// Step 4: Update parent data source await step.do('update-data-source', async () => { return await workspace.updateDataSource(dataSourceId, { current_version_id: version.id, updated_at: new Date().toISOString(), }) })
// Step 5: Notify grids using this data source const gridIds = await step.do('get-affected-grids', async () => { return await workspace.getGridIdsForDataSource(dataSourceId) })
if (gridIds.length > 0) { await step.do('update-grids', async () => { const promises = gridIds.map(async (gridId) => { const gridDO = this.env.GRID_DO.get(this.env.GRID_DO.idFromName(gridId)) return await gridDO.dataSourceUpdated(dataSourceId, version.id) }) return await Promise.all(promises) })
console.log(`Updated ${gridIds.length} grids with new data source version`) }
console.log(`Successfully processed data source version ${version.id}`) } catch (error) { console.error(`Failed to process data source ${dataSourceId}:`, error)
// Update version status to failed await step.do('mark-failed', async () => { if (versionId) { await workspace.updateDataSourceVersion(versionId, { status: 'failed', completed_at: new Date().toISOString(), metadata: { error: (error as Error).message, failed_at: new Date().toISOString(), }, }) } })
// Re-throw to mark workflow as failed throw error } }
/** * Process CSV data into H3 aggregations (existing CSV logic) */ private async processCsvToH3( rawData: Record<string, any>[] ): Promise<Record<string, Record<string, number>>> { // Existing CSV processing logic from the original workflow // Convert addresses to H3 cells and aggregate metrics
// This would use the GeoCodingService to batch geocode addresses // and then aggregate metrics by H3 cell
// For now, return mock structure - implement full logic as needed return { '8a2a1072b59ffff': { customerCount: rawData.length, // ... other aggregated metrics }, } }
/** * Extract metric names from aggregated data */ private getMetricNames(aggregatedData: Record<string, Record<string, number>>): string[] { const metricSet = new Set<string>() Object.values(aggregatedData).forEach((cell) => { Object.keys(cell).forEach((metric) => metricSet.add(metric)) }) return Array.from(metricSet) }}Workflow Benefits
Section titled “Workflow Benefits”- Observability: Every data processing operation is visible in Workflows dashboard
- Reliability: Built-in retries with exponential backoff for transient failures
- Durability: Workflows survive worker restarts and continue from last completed step
- Error Handling: Comprehensive error logging and graceful failure recovery
- Scalability: Each workflow instance handles one data source, enabling parallel processing
- Version Control: Clear audit trail of when and how each data source version was created
Workflow Integration Points
Section titled “Workflow Integration Points”- Connection Actors →
SyncConnectionWorkflow→CreateDataSourceVersionWorkflow - API Routes →
CreateDataSourceVersionWorkflow(for CSV uploads) - All Workflows → GridDO updates for downstream grid version management
- Error cases → WorkspaceDO status updates and retry scheduling
File: apps/api/src/durable-objects/WorkspaceDO.ts - Add connection helper methods:
export class Workspace extends DurableObject<Env> { // ... existing methods ...
/** * Get connection authentication data for a Connection Actor */ async getConnectionAuth(connectionId: number): Promise<any> { const [connection] = await this.db .select() .from(Connections) .where(eq(Connections.id, connectionId)) .limit(1)
if (!connection) throw new Error(`Connection ${connectionId} not found`)
return { auth_data: connection.auth_data, platform_data: connection.platform_data, settings: connection.settings, } }
/** * Get Connection Actor instance for a connection */ getConnectionActor(connectionId: number) { const actorId = `${this.#id}:${connectionId}` const doId = this.env.CONNECTION_ACTORS.idFromName(actorId) return this.env.CONNECTION_ACTORS.get(doId) }
/** * Initialize a new connection with its Actor */ async initializeConnectionActor(connectionId: number): Promise<void> { const actor = this.getConnectionActor(connectionId) // Initialize connection config and schedule first sync await actor.performDailySync() }}Benefits of Connection Actor Approach
Section titled “Benefits of Connection Actor Approach”- Scheduled Data Fetching: Built-in alarm system for reliable daily syncing
- Historical Data Storage: SQLite database for daily snapshots and aggregation queries
- Isolated State Management: Each connection has its own persistent state
- RPC Integration: Clean integration with WorkspaceDO and processing workflows
- Resilient Error Handling: Automatic retry scheduling on failures
- Aggregation Queries: Support for “12 month conversions” type queries per H3 cell
Setup Requirements
Section titled “Setup Requirements”Install Cloudflare Actors:
pnpm add @cloudflare/actorsGenerate Connection Actor Migrations:
cd apps/apipnpm drizzle-kit generate --config=connectionActor.drizzle.config.tspnpm drizzle-kit push --config=connectionActor.drizzle.config.tsUpdate apps/api/wrangler.jsonc:
{ "durable_objects": { "bindings": [ { "name": "CONNECTION_ACTORS", "class_name": "ConnectionActor", }, // ... existing bindings ], }, "migrations": [ { "tag": "v2", "new_sqlite_classes": ["ConnectionActor"], }, ],}Schema Management:
The Connection Actor uses Drizzle ORM for schema management with migrations stored in ConnectionActordrizzleMigrations/. The schema is defined in packages/honeygrid-types/schema/ConnectionActorSchema.ts and includes:
SyncSnapshots: Historical data aggregated by H3 cell and dateSyncHistory: Sync attempt tracking with success/failure statusConnectionConfig: Platform settings and sync configurationErrorLogs: Detailed error logging for debugging sync issues
Schema Management:
The Connection Actor uses Drizzle ORM for schema management with migrations stored in ConnectionActordrizzleMigrations/. The schema is defined in packages/honeygrid-types/schema/ConnectionActorSchema.ts and includes:
/** * Periodic snapshots of metrics aggregated by H3 cell */export const SyncSnapshots = sqliteTable( 'sync_snapshots', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), h3_cell: text('h3_cell').notNull(), metric_name: text('metric_name').notNull(), metric_value: integer('metric_value').notNull(), snapshot_date: text('snapshot_date').notNull(), raw_data: text('raw_data', { mode: 'json' }), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), }, (t) => [unique('uniq_snapshot').on(t.h3_cell, t.metric_name, t.snapshot_date)])
/** Sync history records */export const SyncHistory = sqliteTable('sync_history', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), sync_date: text('sync_date').notNull(), status: text('status', { enum: ['success', 'failed', 'in_progress'] }).notNull(), records_processed: integer('records_processed').default(0).notNull(), error_message: text('error_message'), started_at: text('started_at') .$defaultFn(() => new Date().toISOString()) .notNull(), completed_at: text('completed_at'), sync_duration_ms: integer('sync_duration_ms'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(),})
/** Detailed error logs */export const ErrorLogs = sqliteTable('error_logs', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), sync_history_id: integer('sync_history_id') .notNull() .references(() => SyncHistory.id), error_type: text('error_type', { enum: ['sync_failed', 'auth_failed', 'api_error', 'processing_error', 'network_error'], }).notNull(), error_message: text('error_message').notNull(), error_details: text('error_details', { mode: 'json' }), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(),})HoneyGridD1 Schema
Section titled “HoneyGridD1 Schema”The global D1 database schema is defined in packages/honeygrid-types/schema/HoneyGridD1Schema.ts and manages workspace-level data:
export const Workspaces = table( 'Workspaces', { account_id: text(), background_image: text(), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), id: integer('id').primaryKey({ autoIncrement: true }).notNull(), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), name: text().notNull(), status: text().notNull(), campaign_count: integer().default(0), connection_count: integer().default(0), grid_count: integer().default(0), landing_page_count: integer().default(0), location_count: integer().default(0), }, (t) => [check('status_check', sql`${t.status} IN ('active', 'archived')`)])
export const WorkspacePermissions = table( 'WorkspacePermissions', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), workspace_id: integer('workspace_id') .notNull() .references(() => Workspaces.id), user_id: text(), email: text().notNull(), role: text().notNull(), invitation_status: text('invitation_status').notNull().default('pending'), created_at: text('created_at') .$defaultFn(() => new Date().toISOString()) .notNull(), }, (t) => [ check('role_check', sql`${t.role} IN ('owner', 'editor', 'viewer')`), index('workspace_permissions_email_idx').on(t.email), index('workspace_permissions_workspace_email_idx').on(t.workspace_id, t.email), ])
export const Users = table('Users', { id: integer('id').primaryKey({ autoIncrement: true }), uuid: text('uuid') .unique() .$defaultFn(() => generateUuid()) .notNull(), first_name: text('first_name'), last_name: text('last_name'), email: text().notNull().unique(), created_date: text('created_date').$defaultFn(() => new Date().toISOString()),})Usage Examples
Section titled “Usage Examples”Historical Aggregation Query:
// Get 12 months of conversions for a specific H3 cellconst connectionActor = env.CONNECTION_ACTORS.get(env.CONNECTION_ACTORS.idFromName(connectionId))const totalConversions = await connectionActor.getHistoricalAggregation( '8a2a1072b59ffff', // H3 cell 'conversions', // metric name '2023-01-01', // start date '2023-12-31' // end date)
// Returns: 1,247 (total conversions for that cell over 12 months)Connection Status Check:
const status = await connectionActor.getConnectionStatus()// Returns: {// connection_id: "conn_123",// last_snapshot: "2024-01-15",// status: "active",// total_snapshots: 365// }Latest Data for Workflow Processing:
const latestData = await connectionActor.getLatestSnapshot()// Returns: {// "8a2a1072b59ffff": { "conversions": 42, "impressions": 1500 },// "8a2a1072b5affff": { "conversions": 18, "impressions": 890 }// }