Domain: Product
Scenario: Product Lifecycle (Create → Sync → Add to Post → Checkout)
1. Business Goal
Product is the sellable inventory unit. Katana supports multiple product sources:
- Pear products: Native to Katana platform
- Shopify products: Synced from Shopify stores
- AliExpress products: Imported from AliExpress
- Event tickets: Linked to Event entities
2. Trigger & Entry
| API | Method | Description | Request Body |
|---|---|---|---|
/merchant/product |
POST | Create Pear product | { title, bodyHtml, variants[], images[], ... } |
/merchant/product/:productId |
PUT | Update product | ProductUpdateDto |
/merchant/product/sync/:productId |
PATCH | Sync product (force) | - |
/merchant/product/connect/shop |
PUT | Connect Shopify shop | { shopDomain, accessToken } |
/v2/products |
POST | Create product V2 (strategy) | { platform, listingType, ... } |
/merchant/product/duplicate/:productId |
POST | Duplicate product | - |
3. Data Flow (Create + Shopify Sync)
sequenceDiagram
participant Merchant as Merchant
participant API as ProductController
participant Service as MerchantProductsService
participant DB as PostgreSQL
participant Queue as BullMQ
participant Shopify as Shopify API
Note over Merchant: SCENARIO 1: Create Pear Product
Merchant->>API: POST /merchant/product
API->>Service: saveProductForPear(dto)
Note over Service: 1. Validate & transform
Service->>Service: validateProductInput()
Service->>Service: transformImages()
Note over Service: 2. Create Product record
Service->>DB: INSERT Product (platform=KATANA)
DB-->>Service: product.id
Note over Service: 3. Create ProductVariant records
Service->>DB: INSERT ProductVariant[] (productId)
Note over Service: 4. Create ProductImage records
Service->>DB: INSERT ProductImage[]
Note over Service: 5. Update inventory
Service->>DB: UPDATE Product (inventoryQuantity = SUM(variants))
Service-->>Merchant: { product, variants }
Note over Merchant: SCENARIO 2: Shopify Sync
Merchant->>API: PUT /merchant/product/connect/shop
API->>Queue: ImportProductsQueue.add(importShopifyProducts)
Queue->>Shopify: GET /admin/api/products
loop For each page
Shopify-->>Queue: products[]
Queue->>Service: upsertProduct(shopifyProduct)
Service->>DB: INSERT/UPDATE Product (remote_id, platform=SHOPIFY)
Service->>DB: INSERT/UPDATE ProductVariant[]
end
Queue->>Queue: shopifyInventoryItemPublisher.upsetInventoryItemByIds()
4. DB Operations (Chronological)
| Step | Table | Action | PK/FK Touched | Notes |
|---|---|---|---|---|
| 1 | Product |
INSERT | id (PK), merchantId (FK), eventId (FK) |
platform, status, priceDisplay |
| 2 | ProductVariant |
INSERT | id (PK), productId (FK) |
price, inventoryQuantity |
| 3 | ProductImage |
INSERT | id (PK), productId (FK) |
src, position |
| 4 | ProductOption |
INSERT (if multi-variant) | id (PK), productId (FK) |
name, values |
| 5 | PromoterProduct |
INSERT (optional) | id (PK), merchantProductId (FK), promoterId (FK) |
For promoter-specific products |
5. Shopify Sync Logic
Import Products Queue:
// Queue job payload
interface ImportProductsJob {
merchantId: string;
shopDomain: string;
accessToken: string;
progressId?: string; // For progress tracking
}
// Service method
async importShopifyProducts(job: ImportProductsJob) {
// 1. Fetch products with pagination
const products = await shopifyApi.products.list({ limit: 250 });
// 2. Upsert each product
for (const shopifyProduct of products) {
await this.upsertProduct({
remote_id: shopifyProduct.id,
platform: 'SHOPIFY',
merchantId: job.merchantId,
title: shopifyProduct.title,
bodyHtml: shopifyProduct.body_html,
variants: shopifyProduct.variants,
images: shopifyProduct.images,
// ... map other fields
});
}
// 3. Sync inventory items
await this.shopifyInventoryItemPublisher.upsetInventoryItemByIds(
variantIds
);
}
Field Mapping (Shopify → Product):
{
remote_id: shopifyProduct.id,
platform: 'SHOPIFY',
title: shopifyProduct.title,
bodyHtml: shopifyProduct.body_html,
productType: shopifyProduct.product_type,
vendor: shopifyProduct.vendor,
status: shopifyProduct.status === 'active' ? 'ACTIVE' : 'DRAFT',
tags: shopifyProduct.tags,
variants: shopifyProduct.variants.map(v => ({
remote_id: v.id,
price: v.price,
inventoryQuantity: v.inventory_quantity,
// ...
})),
images: shopifyProduct.images.map(i => ({
src: i.src,
position: i.position,
// ...
}))
}
6. Product → Post Relationship
When a product is added to a post:
// Post.relatedProducts structure
interface PostRelatedProduct {
merchantProductId: string; // The Product.id
variants: PostRelatedProductVariant[];
syncWithCatalogPrice: boolean; // Auto-sync price changes
toReplaceMerchantProductIds: string[]; // Replacement logic
isPurchaseQuantityLimited: boolean;
purchaseQuantityLimit: number;
isFreeShipping: boolean;
hasDependentProducts: boolean;
dependentProductIds: string[];
isRecommendation: boolean;
}
7. Event/Queue Messages
| Message | Producer → Consumer | Payload Snippet |
|---|---|---|
product.created |
ProductModule → Queue | { productId, merchantId, platform } |
product.updated |
ProductModule → Queue | { productId, changes } |
product.sync |
ProductModule → ImportProductsQueue | { merchantId, shopDomain } |
inventory.updated |
ShopifyWebhook → ProductModule | { variantId, quantity } |
product.added-to-post |
PostModule → ProductModule | { postId, productIds[] } |
8. File Map (Top 5 Must-Read)
| File | Purpose |
|---|---|
src/products/merchant-products.controller.ts |
HTTP endpoints |
src/products/merchant-products.service.ts |
Core business logic |
src/products/import-products.subscriber.ts |
Queue processor for Shopify sync |
src/product-v2/product-v2.service.ts |
V2 service (strategy pattern) |
src/database/schema.prisma |
Product/Variant schema |
9. DB Schema Snippet (Prisma)
model Product {
id String @id @default(uuid()) @db.Uuid
remote_id String?
platform MerchantPlatform?
merchantId String @map("merchant_id") @db.Uuid
eventId String? @map("event_id") @db.Uuid
// Basic info
title String
bodyHtml String @map("body_html")
bodyText String @default("") @map("body_text")
handle String?
productType String @map("product_type")
vendor String
status ProductStatus @default(ACTIVE)
externalStatus ProductStatus @default(ACTIVE) @map("external_status")
// Media
images ProductImage[]
imageCount Int @default(0) @map("image_count")
coverImage Json? @map("cover_image")
// Variants & Options
variants ProductVariant[]
options ProductOptions[]
// Inventory
inventoryQuantity Int @default(0) @map("inventory_quantity")
inventoryQuantityOriginal Int @default(0) @map("inventory_quantity_original")
soldQuantity Int @default(0) @map("sold_quantity")
// Pricing
priceDisplay Float @default(0.0) @map("price_display")
priceMin Float @default(0.0) @map("price_min")
priceMax Float @default(0.0) @map("price_max")
priceImportedMin Float? @default(0.0) @map("price_imported_min")
priceImportedMax Float? @default(0.0) @map("price_imported_max")
// Shipping
shippingType ShippingType @default(INHERIT_SHIPPING) @map("shipping_type")
additionalShippingFee Float @default(0.0) @map("additional_shipping_fee")
deliveryTime Int[] @default([0, 5]) @map("delivery_time")
returnPolicyApplied Boolean @default(true) @map("return_policy_applied")
// Commission
commissionRate Float? @map("commission_rate")
// Features
isFeatured Boolean @default(false) @map("is_featured")
featuredScore Int?
isAvailable Boolean @default(true) @map("is_available")
listingType ProductType @default(REGULAR) @map("listing_type")
// Event ticket specific
deliveryMethod ProductDeliveryMethod? @map("delivery_method")
autoFulfill Boolean @default(false) @map("auto_fulfill")
isMultipleDaysPassEnabled Boolean @default(false) @map("is_multiple_days_pass_enabled")
multipleDaysPass Json? @map("multiple_days_pass")
// Sync
syncAt DateTime? @map("sync_at") @db.Timestamptz()
followExternalStatus Boolean @default(true) @map("follow_external_status")
// Soft delete
deletedAt DateTime? @map("deleted_at")
delisted Boolean @default(false)
excludedByImportingTag Boolean @default(false) @map("excluded_by_importing_tag")
// Relations
event Event? @relation(fields: [eventId], references: [id])
PromoterProduct PromoterProduct[]
Post Post[] // Via relatedProducts JSON
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@unique([remote_id, merchantId])
@@index([deletedAt, status])
@@index([merchantId])
@@index([eventId])
}
model ProductVariant {
id String @id @default(uuid()) @db.Uuid
remote_id String? @unique
productId String @map("product_id") @db.Uuid
product Product @relation(fields: [productId], references: [id], onDelete: Cascade)
// Variant info
title String
option Json // { name: "Color", value: "Red" }
position Int
sku String?
// Inventory
inventoryQuantity Int @map("inventory_quantity")
inventoryQuantityOriginal Int @default(0) @map("inventory_quantity_original")
inventoryPolicy ProductVariantPolicy @default(DENY) @map("inventory_policy")
soldQuantity Int @default(0) @map("sold_quantity")
// Pricing
price String
priceAnchor String @default("0.00") @map("price_anchor")
priceImported String? @default("0.00") @map("price_imported")
compareAtPrice String? @map("compare_at_price")
fees Float? @default(0.0) @map("fees")
transactionFee Json @default("{}") @map("transaction_fee")
// Ticket specific
ticketPrice Float? @default(0.0) @map("ticket_price")
// Quantity constraints
isMinPurchaseQuantityEnabled Boolean @default(false) @map("is_min_purchase_quantity_enabled")
minPurchaseQuantity Int? @map("min_purchase_quantity")
isMaxPurchaseQuantityEnabled Boolean @default(false) @map("is_max_purchase_quantity_enabled")
maxPurchaseQuantity Int? @map("max_purchase_quantity")
isPackSizeEnabled Boolean @default(false) @map("is_pack_size_enabled")
packSize Int? @map("pack_size")
// Media
imageId String? @map("image_id") @db.Uuid
imageIds String[] @map("image_ids")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@index([productId])
@@index([remote_id])
}
model ProductImage {
id String @id @default(uuid()) @db.Uuid
productId String @map("product_id") @db.Uuid
src String
position Int
Product Product @relation(fields: [productId], references: [id], onDelete: Cascade)
@@index([productId])
}
enum MerchantPlatform {
SHOPIFY
ALIEXPRESS
KATANA
}
enum ProductStatus {
ACTIVE
DRAFT
ARCHIVED
DELETED
}
enum ProductType {
REGULAR
TICKET
}
enum ShippingType {
INHERIT_SHIPPING
CUSTOM_SHIPPING
FREE_SHIPPING
}
enum ProductDeliveryMethod {
SHIPPING
PICKUP
DIGITAL
}
10. Product → PromoterProduct
When a promoter adds a product to their collection:
sequenceDiagram
participant Promoter as Promoter
participant API as PromoterController
participant Service as PromoterProductService
participant DB as PostgreSQL
Promoter->>API: POST /promoter/product
API->>Service: create(promoterId, merchantProductId)
Note over Service: 1. Check if already exists
Service->>DB: SELECT PromoterProduct WHERE promoterId AND merchantProductId
alt exists and deletedAt is not null
Service->>DB: UPDATE PromoterProduct (deletedAt=null)
else not exists
Note over Service: 2. Create PromoterProduct
Service->>DB: INSERT PromoterProduct (promoterId, merchantProductId)
Note over Service: 3. Clone PromoterProductVariant
Service->>DB: SELECT ProductVariant WHERE productId
loop for each variant
Service->>DB: INSERT PromoterProductVariant
end
Note over Service: 4. Clone ProductImage
Service->>DB: INSERT PromoterProductImage[]
end
Service-->>Promoter: PromoterProduct