Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Data Sync Tools

This document provides a collection of specialized data synchronization tools. Instead of one monolithic sync system, these modular tools can be combined as needed.

Overview

Data synchronization is split into focused, reusable tools:

ToolPurposeFile
crm-sync.basCRM to/from internal databaseBidirectional customer data
erp-sync.basERP system integrationOrders, inventory, accounting
inventory-sync.basReal-time inventory updatesStock levels across systems
user-sync.basUser/employee directory syncHR systems, Active Directory
conflict-resolver.basHandle sync conflictsAutomated or manual resolution
sync-monitor.basMonitor sync healthAlerts and dashboards

Tool 1: CRM Sync

Bidirectional synchronization with CRM systems (Salesforce, HubSpot, etc.).

' crm-sync.bas
' Bidirectional CRM synchronization tool

WEBHOOK "crm-inbound"

event = body.event
record_type = body.type
record_id = body.id
data = body.data
source_timestamp = body.timestamp

' Validate webhook signature
signature = headers.x_webhook_signature
secret = GET BOT MEMORY "crm_webhook_secret"
IF NOT VERIFY_SIGNATURE(body, signature, secret) THEN
    WITH result = NEW OBJECT
        .status = 401
        .error = "Invalid signature"
    END WITH
    EXIT
END IF

' Log incoming sync event
WITH sync_event = NEW OBJECT
    .direction = "inbound"
    .source = "crm"
    .event = event
    .record_type = record_type
    .record_id = record_id
    .timestamp = source_timestamp
    .received_at = NOW()
END WITH
INSERT "sync_events", sync_event

' Check for conflicts before processing
existing = FIND "local_data", "external_id=" + record_id
IF existing.updated_at > source_timestamp THEN
    ' Local data is newer - create conflict record
    WITH conflict = NEW OBJECT
        .record_id = record_id
        .local_timestamp = existing.updated_at
        .remote_timestamp = source_timestamp
        .local_data = existing
        .remote_data = data
        .status = "pending"
        .created_at = NOW()
    END WITH
    INSERT "sync_conflicts", conflict
    
    WITH result = NEW OBJECT
        .status = "conflict"
        .conflict_id = conflict.id
    END WITH
    EXIT
END IF

' Transform CRM data to local format
SELECT CASE record_type
    CASE "contact"
        WITH local_record = NEW OBJECT
            .external_id = record_id
            .external_source = "crm"
            .first_name = data.firstName
            .last_name = data.lastName
            .email = data.email
            .phone = data.phone
            .company = data.account.name
            .title = data.title
            .source = data.leadSource
            .status = data.status
            .updated_at = NOW()
            .synced_at = NOW()
        END WITH
        table_name = "contacts"
        
    CASE "account"
        WITH local_record = NEW OBJECT
            .external_id = record_id
            .external_source = "crm"
            .company_name = data.name
            .industry = data.industry
            .website = data.website
            .annual_revenue = data.annualRevenue
            .employee_count = data.numberOfEmployees
            .billing_address = data.billingAddress
            .updated_at = NOW()
            .synced_at = NOW()
        END WITH
        table_name = "accounts"
        
    CASE "opportunity"
        WITH local_record = NEW OBJECT
            .external_id = record_id
            .external_source = "crm"
            .name = data.name
            .account_id = data.accountId
            .amount = data.amount
            .stage = data.stageName
            .probability = data.probability
            .close_date = data.closeDate
            .updated_at = NOW()
            .synced_at = NOW()
        END WITH
        table_name = "opportunities"
        
    CASE ELSE
        WITH result = NEW OBJECT
            .status = 400
            .error = "Unknown record type: " + record_type
        END WITH
        EXIT
END SELECT

' Apply changes based on event type
SELECT CASE event
    CASE "created"
        INSERT table_name, local_record
        
    CASE "updated"
        UPDATE table_name, "external_id=" + record_id, local_record
        
    CASE "deleted"
        WITH soft_delete = NEW OBJECT
            .deleted_at = NOW()
            .deleted_from = "crm"
        END WITH
        UPDATE table_name, "external_id=" + record_id, soft_delete
END SELECT

' Update sync status
WITH sync_status = NEW OBJECT
    .record_id = record_id
    .record_type = record_type
    .last_sync = NOW()
    .sync_direction = "inbound"
    .status = "success"
END WITH
SAVE "sync_status", record_type + "_" + record_id, sync_status

WITH result = NEW OBJECT
    .status = "synced"
    .record_id = record_id
    .direction = "inbound"
END WITH

CRM Outbound Sync

' crm-outbound.bas
' Push local changes to CRM

ON "contacts", "INSERT,UPDATE"

record = trigger.new_data
old_record = trigger.old_data

' Skip if this update came from CRM (prevent loops)
IF record.external_source = "crm" AND record.synced_at = record.updated_at THEN
    EXIT
END IF

' Check if record exists in CRM
IF record.external_id = "" THEN
    ' New record - create in CRM
    operation = "create"
    endpoint = "/api/contacts"
ELSE
    ' Existing record - update in CRM
    operation = "update"
    endpoint = "/api/contacts/" + record.external_id
END IF

' Transform to CRM format
WITH crm_data = NEW OBJECT
    .firstName = record.first_name
    .lastName = record.last_name
    .email = record.email
    .phone = record.phone
    .title = record.title
    .leadSource = record.source
END WITH

' Send to CRM
crm_api_key = GET BOT MEMORY "crm_api_key"
SET HEADER "Authorization", "Bearer " + crm_api_key
SET HEADER "Content-Type", "application/json"

IF operation = "create" THEN
    response = POST "https://api.crm.com" + endpoint, crm_data
    
    ' Store external ID
    WITH id_update = NEW OBJECT
        .external_id = response.id
        .external_source = "crm"
        .synced_at = NOW()
    END WITH
    UPDATE "contacts", "id=" + record.id, id_update
ELSE
    response = PUT "https://api.crm.com" + endpoint, crm_data
    
    WITH sync_update = NEW OBJECT
        .synced_at = NOW()
    END WITH
    UPDATE "contacts", "id=" + record.id, sync_update
END IF

' Log outbound sync
WITH sync_event = NEW OBJECT
    .direction = "outbound"
    .destination = "crm"
    .event = operation
    .record_type = "contact"
    .record_id = record.id
    .external_id = record.external_id
    .timestamp = NOW()
    .response_status = response.status
END WITH
INSERT "sync_events", sync_event

Tool 2: ERP Sync

Integration with ERP systems for orders, inventory, and accounting.

' erp-sync.bas
' ERP system synchronization tool

WEBHOOK "erp-webhook"

event_type = body.eventType
entity = body.entity
entity_id = body.entityId
payload = body.payload

' Authenticate request
api_key = headers.x_api_key
expected_key = GET BOT MEMORY "erp_webhook_key"
IF api_key <> expected_key THEN
    WITH result = NEW OBJECT
        .status = 401
        .error = "Unauthorized"
    END WITH
    EXIT
END IF

' Route to appropriate handler
SELECT CASE entity
    CASE "salesOrder"
        CALL process_sales_order(event_type, entity_id, payload)
        
    CASE "purchaseOrder"
        CALL process_purchase_order(event_type, entity_id, payload)
        
    CASE "invoice"
        CALL process_invoice(event_type, entity_id, payload)
        
    CASE "inventory"
        CALL process_inventory_update(entity_id, payload)
        
    CASE "shipment"
        CALL process_shipment(event_type, entity_id, payload)
END SELECT

WITH result = NEW OBJECT
    .status = "processed"
    .entity = entity
    .entity_id = entity_id
END WITH

' --- Sub-procedures ---

SUB process_sales_order(event_type, order_id, data)
    WITH order = NEW OBJECT
        .erp_order_id = order_id
        .order_number = data.orderNumber
        .customer_id = data.customerId
        .order_date = data.orderDate
        .ship_date = data.requestedShipDate
        .status = data.status
        .subtotal = data.subtotal
        .tax = data.taxAmount
        .shipping = data.shippingAmount
        .total = data.total
        .currency = data.currency
        .updated_at = NOW()
    END WITH
    
    IF event_type = "created" THEN
        INSERT "orders", order
        
        ' Create line items
        FOR EACH item IN data.lineItems
            WITH line = NEW OBJECT
                .order_id = order_id
                .sku = item.sku
                .description = item.description
                .quantity = item.quantity
                .unit_price = item.unitPrice
                .discount = item.discount
                .total = item.lineTotal
            END WITH
            INSERT "order_lines", line
        NEXT item
        
        ' Notify sales team
        SEND MAIL "sales@company.com", "New Order: " + data.orderNumber, "Order total: $" + data.total
        
    ELSE IF event_type = "updated" THEN
        UPDATE "orders", "erp_order_id=" + order_id, order
        
        ' Check for status changes
        old_order = FIND "orders", "erp_order_id=" + order_id
        IF old_order.status <> data.status THEN
            ' Notify customer of status change
            customer = FIND "customers", "id=" + data.customerId
            SEND MAIL customer.email, "Order Update: " + data.orderNumber, "Your order status is now: " + data.status
        END IF
    END IF
END SUB

SUB process_inventory_update(sku, data)
    WITH inventory = NEW OBJECT
        .sku = sku
        .quantity_on_hand = data.qtyOnHand
        .quantity_available = data.qtyAvailable
        .quantity_reserved = data.qtyReserved
        .quantity_on_order = data.qtyOnOrder
        .warehouse = data.warehouse
        .bin_location = data.binLocation
        .last_count_date = data.lastCountDate
        .updated_at = NOW()
    END WITH
    
    SAVE "inventory", sku, inventory
    
    ' Check for low stock alert
    product = FIND "products", "sku=" + sku
    IF data.qtyAvailable < product.reorder_point THEN
        WITH alert = NEW OBJECT
            .sku = sku
            .product_name = product.name
            .current_qty = data.qtyAvailable
            .reorder_point = product.reorder_point
            .reorder_qty = product.reorder_quantity
            .created_at = NOW()
        END WITH
        INSERT "stock_alerts", alert
        
        SEND MAIL "purchasing@company.com", "Low Stock Alert: " + sku, "Product " + product.name + " is below reorder point. Current: " + data.qtyAvailable + ", Reorder at: " + product.reorder_point
    END IF
END SUB

SUB process_shipment(event_type, shipment_id, data)
    WITH shipment = NEW OBJECT
        .erp_shipment_id = shipment_id
        .order_id = data.orderId
        .carrier = data.carrier
        .tracking_number = data.trackingNumber
        .ship_date = data.shipDate
        .estimated_delivery = data.estimatedDelivery
        .status = data.status
        .updated_at = NOW()
    END WITH
    
    IF event_type = "created" THEN
        INSERT "shipments", shipment
        
        ' Notify customer
        order = FIND "orders", "erp_order_id=" + data.orderId
        customer = FIND "customers", "id=" + order.customer_id
        
        tracking_url = "https://track.carrier.com/" + data.trackingNumber
        
        SEND MAIL customer.email, "Your Order Has Shipped!", "Good news! Your order " + order.order_number + " has shipped.\n\nTracking: " + data.trackingNumber + "\nCarrier: " + data.carrier + "\nEstimated Delivery: " + data.estimatedDelivery + "\n\nTrack your package: " + tracking_url
        
    ELSE IF event_type = "updated" THEN
        UPDATE "shipments", "erp_shipment_id=" + shipment_id, shipment
        
        IF data.status = "delivered" THEN
            ' Update order status
            WITH order_update = NEW OBJECT
                .status = "delivered"
                .delivered_at = NOW()
            END WITH
            UPDATE "orders", "erp_order_id=" + data.orderId, order_update
        END IF
    END IF
END SUB

Tool 3: Inventory Sync

Real-time inventory synchronization across multiple systems.

' inventory-sync.bas
' Real-time inventory synchronization

WEBHOOK "inventory-update"

source = body.source
sku = body.sku
warehouse = body.warehouse
adjustment_type = body.type
quantity = body.quantity
reason = body.reason
reference = body.reference

' Get current inventory
current = FIND "inventory", "sku=" + sku + " AND warehouse=" + warehouse

' Calculate new quantity based on adjustment type
SELECT CASE adjustment_type
    CASE "receipt"
        new_qty = current.quantity_on_hand + quantity
        
    CASE "shipment"
        new_qty = current.quantity_on_hand - quantity
        
    CASE "adjustment"
        new_qty = quantity
        
    CASE "transfer_out"
        new_qty = current.quantity_on_hand - quantity
        
    CASE "transfer_in"
        new_qty = current.quantity_on_hand + quantity
        
    CASE "count"
        new_qty = quantity
END SELECT

' Validate quantity
IF new_qty < 0 THEN
    WITH result = NEW OBJECT
        .status = 400
        .error = "Inventory cannot be negative"
        .current_qty = current.quantity_on_hand
        .attempted_qty = new_qty
    END WITH
    EXIT
END IF

' Update local inventory
WITH inv_update = NEW OBJECT
    .quantity_on_hand = new_qty
    .updated_at = NOW()
    .last_adjustment_type = adjustment_type
    .last_adjustment_source = source
END WITH
UPDATE "inventory", "sku=" + sku + " AND warehouse=" + warehouse, inv_update

' Log the transaction
WITH transaction = NEW OBJECT
    .sku = sku
    .warehouse = warehouse
    .adjustment_type = adjustment_type
    .quantity_before = current.quantity_on_hand
    .quantity_change = quantity
    .quantity_after = new_qty
    .reason = reason
    .reference = reference
    .source = source
    .created_at = NOW()
END WITH
INSERT "inventory_transactions", transaction

' Sync to other systems based on source
systems_to_sync = ["erp", "ecommerce", "pos", "wms"]

FOR EACH system IN systems_to_sync
    IF system <> source THEN
        CALL sync_inventory_to_system(system, sku, warehouse, new_qty)
    END IF
NEXT system

' Check for alerts
product = FIND "products", "sku=" + sku
IF new_qty <= product.reorder_point AND current.quantity_on_hand > product.reorder_point THEN
    ' Just crossed below reorder point
    WITH alert_msg = NEW OBJECT
        .text = "⚠️ *Low Stock Alert*\n\nSKU: " + sku + "\nProduct: " + product.name + "\nWarehouse: " + warehouse + "\nCurrent Qty: " + new_qty + "\nReorder Point: " + product.reorder_point
    END WITH
    POST "https://hooks.slack.com/services/xxx", alert_msg
END IF

IF new_qty = 0 THEN
    ' Out of stock
    WITH alert_msg = NEW OBJECT
        .text = "🚨 *Out of Stock*\n\nSKU: " + sku + "\nProduct: " + product.name + "\nWarehouse: " + warehouse
    END WITH
    POST "https://hooks.slack.com/services/xxx", alert_msg
    
    ' Disable on e-commerce
    CALL disable_product_ecommerce(sku)
END IF

WITH result = NEW OBJECT
    .status = "synced"
    .sku = sku
    .warehouse = warehouse
    .new_quantity = new_qty
END WITH

' --- Helper procedures ---

SUB sync_inventory_to_system(system, sku, warehouse, qty)
    SELECT CASE system
        CASE "erp"
            SET HEADER "Authorization", "Bearer " + GET BOT MEMORY "erp_api_key"
            WITH erp_payload = NEW OBJECT
                .sku = sku
                .warehouseCode = warehouse
                .qtyOnHand = qty
            END WITH
            PUT "https://erp.company.com/api/inventory/" + sku, erp_payload
            
        CASE "ecommerce"
            SET HEADER "Authorization", "Bearer " + GET BOT MEMORY "ecom_api_key"
            WITH ecom_payload = NEW OBJECT
                .inventory_quantity = qty
            END WITH
            PUT "https://api.shopify.com/products/" + sku + "/inventory", ecom_payload
            
        CASE "pos"
            SET HEADER "X-API-Key", GET BOT MEMORY "pos_api_key"
            WITH pos_payload = NEW OBJECT
                .item_id = sku
                .quantity = qty
                .location_id = warehouse
            END WITH
            POST "https://api.pos.com/inventory/update", pos_payload
            
        CASE "wms"
            SET HEADER "Authorization", "Bearer " + GET BOT MEMORY "wms_api_key"
            WITH wms_payload = NEW OBJECT
                .sku = sku
                .location = warehouse
                .qty = qty
            END WITH
            PUT "https://wms.company.com/api/inventory", wms_payload
    END SELECT
    
    ' Log sync
    WITH sync_log = NEW OBJECT
        .system = system
        .sku = sku
        .warehouse = warehouse
        .quantity = qty
        .synced_at = NOW()
    END WITH
    INSERT "inventory_sync_log", sync_log
END SUB

SUB disable_product_ecommerce(sku)
    SET HEADER "Authorization", "Bearer " + GET BOT MEMORY "ecom_api_key"
    WITH update_payload = NEW OBJECT
        .available = FALSE
        .inventory_policy = "deny"
    END WITH
    PUT "https://api.shopify.com/products/" + sku, update_payload
END SUB

Tool 4: Conflict Resolver

Handle and resolve synchronization conflicts.

' conflict-resolver.bas
' Automated and manual sync conflict resolution

' Scheduled job to process conflicts
SET SCHEDULE "resolve-conflicts", "*/15 * * * *"

' Get pending conflicts
conflicts = FIND "sync_conflicts", "status=pending ORDER BY created_at ASC LIMIT 50"

FOR EACH conflict IN conflicts
    resolution = CALL attempt_auto_resolve(conflict)
    
    IF resolution.resolved THEN
        ' Apply the resolution
        CALL apply_resolution(conflict, resolution)
        
        ' Update conflict status
        WITH status_update = NEW OBJECT
            .status = "resolved"
            .resolution_type = "automatic"
            .resolution_details = resolution.details
            .resolved_at = NOW()
        END WITH
        UPDATE "sync_conflicts", "id=" + conflict.id, status_update
    ELSE
        ' Escalate for manual review
        IF conflict.escalated_at = "" THEN
            CALL escalate_conflict(conflict)
            
            WITH escalate_update = NEW OBJECT
                .status = "escalated"
                .escalated_at = NOW()
            END WITH
            UPDATE "sync_conflicts", "id=" + conflict.id, escalate_update
        END IF
    END IF
NEXT conflict

' --- Functions ---

FUNCTION attempt_auto_resolve(conflict)
    WITH result = NEW OBJECT
        .resolved = FALSE
        .winner = ""
        .details = ""
    END WITH
    
    ' Rule 1: Timestamp-based (most recent wins)
    time_diff = DATEDIFF(conflict.local_timestamp, conflict.remote_timestamp, "second")
    IF ABS(time_diff) > 60 THEN
        ' Clear winner by timestamp
        IF conflict.local_timestamp > conflict.remote_timestamp THEN
            result.resolved = TRUE
            result.winner = "local"
            result.details = "Local data is " + ABS(time_diff) + " seconds newer"
        ELSE
            result.resolved = TRUE
            result.winner = "remote"
            result.details = "Remote data is " + ABS(time_diff) + " seconds newer"
        END IF
        RETURN result
    END IF
    
    ' Rule 2: Field-level merge (non-conflicting changes)
    local_changes = CALL get_changed_fields(conflict.original_data, conflict.local_data)
    remote_changes = CALL get_changed_fields(conflict.original_data, conflict.remote_data)
    
    ' Check if changes affect different fields
    overlap = FALSE
    FOR EACH field IN local_changes
        IF INSTR(remote_changes, field) > 0 THEN
            overlap = TRUE
            EXIT FOR
        END IF
    NEXT field
    
    IF NOT overlap THEN
        ' Can merge without conflict
        result.resolved = TRUE
        result.winner = "merge"
        result.details = "Field-level merge: local changed [" + local_changes + "], remote changed [" + remote_changes + "]"
        RETURN result
    END IF
    
    ' Rule 3: Source priority
    priority_source = GET BOT MEMORY "sync_priority_source"
    IF priority_source <> "" THEN
        IF conflict.source = priority_source THEN
            result.resolved = TRUE
            result.winner = "remote"
            result.details = "Priority source rule: " + priority_source + " wins"
        ELSE
            result.resolved = TRUE
            result.winner = "local"
            result.details = "Non-priority source: local wins"
        END IF
        RETURN result
    END IF
    
    ' Cannot auto-resolve
    result.details = "Manual resolution required: same fields modified within 60 seconds"
    RETURN result
END FUNCTION

SUB apply_resolution(conflict, resolution)
    SELECT CASE resolution.winner
        CASE "local"
            ' Push local data to remote
            CALL sync_to_remote(conflict.record_type, conflict.record_id, conflict.local_data)
            
        CASE "remote"
            ' Apply remote data locally
            UPDATE conflict.record_type, "id=" + conflict.record_id, conflict.remote_data
            
        CASE "merge"
            ' Merge both changes
            merged_data = CALL merge_records(conflict.original_data, conflict.local_data, conflict.remote_data)
            UPDATE conflict.record_type, "id=" + conflict.record_id, merged_data
            CALL sync_to_remote(conflict.record_type, conflict.record_id, merged_data)
    END SELECT
END SUB

SUB escalate_conflict(conflict)
    ' Send notification to data admin
    WITH notification = NEW OBJECT
        .conflict_id = conflict.id
        .record_type = conflict.record_type
        .record_id = conflict.record_id
        .local_timestamp = conflict.local_timestamp
        .remote_timestamp = conflict.remote_timestamp
        .local_summary = CALL summarize_data(conflict.local_data)
        .remote_summary = CALL summarize_data(conflict.remote_data)
    END WITH
    
    email_body = "A sync conflict requires manual resolution.\n\n"
    email_body = email_body + "Record: " + conflict.record_type + " #" + conflict.record_id + "\n"
    email_body = email_body + "Local changes: " + notification.local_summary + "\n"
    email_body = email_body + "Remote changes: " + notification.remote_summary + "\n\n"
    email_body = email_body + "Please review at: https://admin.company.com/conflicts/" + conflict.id
    
    SEND MAIL "data-admin@company.com", "Sync Conflict: " + conflict.record_type + " #" + conflict.record_id, email_body
    
    ' Also post to Slack
    WITH slack_msg = NEW OBJECT
        .text = "⚠️ *Sync Conflict Requires Review*\n\nRecord: " + conflict.record_type + " #" + conflict.record_id + "\n<https://admin.company.com/conflicts/" + conflict.id + "|Review Now>"
    END WITH
    POST "https://hooks.slack.com/services/xxx", slack_msg
END SUB

Tool 5: Sync Monitor

Monitor sync health and generate alerts.

' sync-monitor.bas
' Data sync health monitoring

SET SCHEDULE "sync-health-check", "*/5 * * * *"

' Check sync lag for each integration
integrations = ["crm", "erp", "ecommerce", "wms"]

WITH health_report = NEW OBJECT
    .timestamp = NOW()
    .status = "healthy"
    .issues = []
END WITH

FOR EACH integration IN integrations
    ' Get latest sync event
    latest = FIND "sync_events", "source=" + integration + " OR destination=" + integration + " ORDER BY timestamp DESC LIMIT 1"
    
    lag_minutes = DATEDIFF(latest.timestamp, NOW(), "minute")
    
    WITH integration_status = NEW OBJECT
        .name = integration
        .last_sync = latest.timestamp
        .lag_minutes = lag_minutes
        .status = "ok"
    END WITH
    
    ' Check for concerning lag
    max_lag = GET BOT MEMORY "max_sync_lag_" + integration
    IF max_lag = "" THEN max_lag = 30 END IF
    
    IF lag_minutes > max_lag THEN
        integration_status.status = "warning"
        health_report.status = "degraded"
        
        WITH issue = NEW OBJECT
            .integration = integration
            .type = "sync_lag"
            .message = integration + " sync lag: " + lag_minutes + " minutes (max: " + max_lag + ")"
        END WITH
        health_report.issues.ADD(issue)
    END IF
    
    ' Check for recent errors
    recent_errors = FIND "sync_events", "source=" + integration + " AND status='error' AND timestamp > DATEADD(NOW(), -1, 'hour')"
    error_count = UBOUND(recent_errors)
    
    IF error_count > 5 THEN
        integration_status.status = "error"
        health_report.status = "unhealthy"
        
        WITH issue = NEW OBJECT
            .integration = integration
            .type = "high_error_rate"
            .message = integration + " has " + error_count + " errors in the last hour"
        END WITH
        health_report.issues.ADD(issue)
    END IF
    
    integration_status.error_count_1h = error_count
NEXT integration

' Check pending conflicts
pending_conflicts = AGGREGATE "COUNT", "sync_conflicts", "status='pending'"
escalated_conflicts = AGGREGATE "COUNT", "sync_conflicts", "status='escalated'"

IF pending_conflicts > 100 THEN
    health_report.status = "degraded"
    WITH issue = NEW OBJECT
        .type = "pending_conflicts"
        .message = pending_conflicts + " sync conflicts pending resolution"
    END WITH
    health_report.issues.ADD(issue)
END IF

' Check queue depth
queue_depth = AGGREGATE "COUNT", "sync_queue", "status='pending'"
IF queue_depth > 1000 THEN
    health_report.status = "degraded"
    WITH issue = NEW OBJECT
        .type = "queue_backlog"
        .message = "Sync queue backlog: " + queue_depth + " items"
    END WITH
    health_report.issues.ADD(issue)
END IF

' Store health report
INSERT "sync_health_reports", health_report

' Alert if unhealthy
IF health_report.status = "unhealthy" THEN
    alert_message = "🚨 *Data Sync Unhealthy*\n\n"
    FOR EACH issue IN health_report.issues
        alert_message = alert_message + "• " + issue.message + "\n"
    NEXT issue
    
    ' Slack alert
    WITH slack_alert = NEW OBJECT
        .text = alert_message
        .channel = "#ops-alerts"
    END WITH
    POST "https://hooks.slack.com/services/xxx", slack_alert
    
    ' PagerDuty for critical
    WITH pagerduty = NEW OBJECT
        .routing_key = GET BOT MEMORY "pagerduty_key"
        .event_action = "trigger"
        .payload.summary = "Data sync system unhealthy"
        .payload.severity = "critical"
        .payload.source = "sync-monitor"
    END WITH
    POST "https://events.pagerduty.com/v2/enqueue", pagerduty
    
ELSE IF health_report.status = "degraded" THEN
    alert_message = "⚠️ *Data Sync Degraded*\n\n"
    FOR EACH issue IN health_report.issues
        alert_message = alert_message + "• " + issue.message + "\n"
    NEXT issue
    
    WITH slack_alert = NEW OBJECT
        .text = alert_message
        .channel = "#ops-alerts"
    END WITH
    POST "https://hooks.slack.com/services/xxx", slack_alert
END IF

' Generate dashboard data
WITH dashboard = NEW OBJECT
    .timestamp = NOW()
    .overall_status = health_report.status
    .integrations = integration_statuses
    .pending_conflicts = pending_conflicts
    .escalated_conflicts = escalated_conflicts
    .queue_depth = queue_depth
    .events_last_hour = AGGREGATE "COUNT", "sync_events", "timestamp > DATEADD(NOW(), -1, 'hour')"
    .errors_last_hour = AGGREGATE "COUNT", "sync_events", "status='error' AND timestamp > DATEADD(NOW(), -1, 'hour')"
END WITH

SAVE "sync_dashboard", "current", dashboard

Tool 6: Bulk Sync

Initial data load and bulk synchronization.

' bulk-sync.bas
' Bulk data synchronization tool

WEBHOOK "bulk-sync"

source_system = body.source
target_system = body.target
entity_type = body.entity
batch_size = body.batch_size
offset = body.offset

IF batch_size = "" THEN batch_size = 100 END IF
IF offset = "" THEN offset = 0 END IF

' Create sync job
job_id = "SYNC-" + FORMAT(NOW(), "YYYYMMDDHHmmss")

WITH job = NEW OBJECT
    .id = job_id
    .source = source_system
    .target = target_system
    .entity_type = entity_type
    .status = "running"
    .total_records = 0
    .processed_records = 0
    .error_count = 0
    .started_at = NOW()
END WITH
INSERT "sync_jobs", job

' Fetch data from source
SET HEADER "Authorization", "Bearer " + GET BOT MEMORY source_system + "_api_key"

has_more = TRUE
total_processed = 0
total_errors = 0

WHILE has_more
    source_url = CALL build_source_url(source_system, entity_type, batch_size, offset)
    response = GET source_url
    
    records = response.data
    has_more = response.has_more
    
    IF UBOUND(records) = 0 THEN
        has_more = FALSE
    ELSE
        FOR EACH record IN records
            ' Transform record
            transformed = CALL transform_record(record, source_system, target_system, entity_type)
            
            ' Send to target
            success = CALL send_to_target(target_system, entity_type, transformed)
            
            IF success THEN
                total_processed = total_processed + 1
            ELSE
                total_errors = total_errors + 1
                
                ' Log error
                WITH error_log = NEW OBJECT
                    .job_id = job_id
                    .record_id = record.id
                    .error = "Failed to sync to " + target_system
                    .created_at = NOW()
                END WITH
                INSERT "sync_errors", error_log
            END IF
            
            ' Update progress every 100 records
            IF (total_processed + total_errors) MOD 100 = 0 THEN
                WITH progress = NEW OBJECT
                    .processed_records = total_processed
                    .error_count = total_errors
                    .updated_at = NOW()
                END WITH
                UPDATE "sync_jobs", "id=" + job_id, progress
            END IF
        NEXT record
        
        offset = offset + batch_size
    END IF
WEND

' Finalize job
WITH final_update = NEW OBJECT
    .status = "completed"
    .total_records = total_processed + total_errors
    .processed_records = total_processed
    .error_count = total_errors
    .completed_at = NOW()
END WITH
UPDATE "sync_jobs", "id=" + job_id, final_update

' Send completion notification
completion_msg = "Bulk sync completed\n\n"
completion_msg = completion_msg + "Job ID: " + job_id + "\n"
completion_msg = completion_msg + "Source: " + source_system + "\n"
completion_msg = completion_msg + "Target: " + target_system + "\n"
completion_msg = completion_msg + "Entity: " + entity_type + "\n"
completion_msg = completion_msg + "Processed: " + total_processed + "\n"
completion_msg = completion_msg + "Errors: " + total_errors

SEND MAIL "data-admin@company.com", "Bulk Sync Complete: " + job_id, completion_msg

WITH result = NEW OBJECT
    .status = "completed"
    .job_id = job_id
    .processed = total_processed
    .errors = total_errors
END WITH

Configuration

Store sync configuration in bot memory:

' Configure sync settings
SET BOT MEMORY "crm_api_key", "your-crm-api-key"
SET BOT MEMORY "erp_api_key", "your-erp-api-key"
SET BOT MEMORY "ecom_api_key", "your-ecommerce-api-key"
SET BOT MEMORY "max_sync_lag_crm", "30"
SET BOT MEMORY "max_sync_lag_erp", "15"
SET BOT MEMORY "sync_priority_source", "erp"

See Also