دليل تقني

البنية المدفوعة بالأحداث عملياً: شو اللي فعلاً يخرب

أنماط بنية مدفوعة بالأحداث من الإنتاج الفعلي. عواصف الأحداث، حلقات المزامنة الثنائية، dead letters، مخازن الـ idempotency، والاختيار بين Kafka وRabbitMQ وBullMQ وSymfony Messenger.

23 فبراير 202618 دقيقة للقراءةفريق هندسة أورنتس

عاصفة الأحداث اللي ما حدا يتوقعها

البنية المدفوعة بالأحداث شكلها نظيف على اللوح الأبيض. الخدمة A تطلق حدث. الخدمة B تستهلكه. ربط فضفاض. توسع مستقل. رسومات حلوة.

بعدين تنشر على الإنتاج. عملية حفظ منتج واحدة تطلق 8 مشتركين بالأحداث. كل مشترك يرسل 3 رسائل غير متزامنة. كل معالج رسالة يحمّل المنتج، يعدّل حقل، ويحفظ. كل عملية حفظ تطلق 8 مشتركين من جديد. خلال ثواني، طابور الرسائل عندك فيه آلاف الرسائل لتحديث منتج واحد، الـ workers مشبعين، وقاعدة البيانات تحت ضغط أقفال.

شغّلنا أربع أنظمة مدفوعة بالأحداث مختلفة بالإنتاج. كل واحد استخدم message broker مختلف. كل واحد كان عنده أنماط فشل مختلفة. هالمقال يغطي شو اللي فعلاً يخرب وكيف تصلحه. للسياق الأوسع عن كيف نتعامل مع بنية الأنظمة، هداك الدليل يغطي منهجيتنا.

أنماط الفشل الأربعة

كل نظام مدفوع بالأحداث بالنهاية بيواجه هالأنماط:

النمطشو بيصيرالخطورة
عاصفة أحداثإجراء واحد يتسلسل لآلاف الرسائلحرجة (ممكن تشبع البنية التحتية)
حلقة مزامنة ثنائيةالنظام A يزامن لـ B، و B يزامن رجوعاً لـ A، حلقة لا نهائيةحرجة (نمو رسائل لا نهائي)
معالجة مكررةنفس الرسالة تتعالج مرتين، تنشئ بيانات مكررةعالية (سلامة البيانات)
تراكم dead letterالرسائل الفاشلة تتكدس بدون استراتيجية حلمتوسطة (دين تشغيلي)

1. عواصف الأحداث

النمط الأكثر شيوعاً والأكثر خطورة. يصير لما معالجات الأحداث تطلق أحداث جديدة، وهالأحداث تطلق معالجات أكثر.

حفظ منتج
  -> EventSubscriber A يرسل MessageA
  -> EventSubscriber B يرسل MessageB
  -> EventSubscriber C يرسل MessageC
  -> ...8 مشتركين بالمجمل

WorkerA يعالج MessageA
  -> يعدّل المنتج، يستدعي save()
  -> حفظ المنتج يطلق 8 مشتركين مرة ثانية
  -> كل واحد يرسل رسائل أكثر

WorkerB يعالج MessageB (نفس النمط)
  -> 8 رسائل إضافية

النتيجة: 1 حفظ -> 8 رسائل -> 8 حفظ -> 64 رسالة -> ...

الحل: EventSubscriberSupervisor. تحكم على مستوى العملية يوقف مشتركي الأحداث أثناء حفظ الـ worker.

// معالج Worker: وقّف المشتركين قبل الحفظ
async function handleGenerateThumbnail(message: ThumbnailMessage) {
    const product = await productService.findById(message.productId);

    subscriberSupervisor.disableAll();
    try {
        product.thumbnail = await generateThumbnail(product);
        await product.save(); // ما في مشتركين يشتغلون، ما في رسائل جديدة تنرسل
    } finally {
        subscriberSupervisor.enableAll();
    }
}

الترتيب مهم. الحفظ لازم يصير داخل النطاق المعطّل. إذا أعدت تفعيل المشتركين قبل الحفظ، الحفظ يطلق التسلسل.

لنظرة أعمق على كيف نطبق هالنمط تحديداً بـ Pimcore، شوف دليل تصميم سير عمل Pimcore.

2. حلقات المزامنة الثنائية

لما نظامين يزامنون بيانات بشكل ثنائي الاتجاه، كل تحديث من A يطلق مزامنة لـ B، واللي تطلق مزامنة رجوعاً لـ A. بدون منع الحلقات، هالشي يشتغل للأبد.

النظام A (التجارة)          النظام B (العمليات)
     │                           │
     │  عميل تحدّث               │
     │ ────────────────────────▶ │
     │                           │  المزامنة وصلت، حفظ العميل
     │                           │  حدث تحديث العميل يطلق
     │  المزامنة وصلت            │
     │ ◀──────────────────────── │
     │  حفظ العميل               │
     │  حدث تحديث العميل         │
     │ ────────────────────────▶ │
     │           ...للأبد...     │

الحل: تتبع المصدر. كل رسالة تحمل header باسم x-source يحدد النظام المُصدر. لما نظام يستقبل رسالة من نفسه (عبر النظام الثاني)، يتجاهلها.

// لما ننشر رسالة مزامنة
async function publishCustomerSync(customer: Customer, source: string) {
    await messageQueue.publish('customer.updated', {
        customerId: customer.id,
        data: customer,
        source: source,  // "commerce" أو "operations"
    });
}

// لما نستهلك رسالة مزامنة
async function handleCustomerSync(message: CustomerSyncMessage) {
    // تجاهل الرسائل اللي مصدرها هالنظام
    if (message.source === THIS_SYSTEM_ID) {
        logger.debug('Ignoring self-originated sync', { customerId: message.customerId });
        return; // ACK الرسالة، ما نعالجها
    }

    const customer = await customerService.update(message.customerId, message.data);

    // لما نحفظ، نحدد المصدر عشان الأحداث اللاحقة تحمله
    await publishCustomerSync(customer, THIS_SYSTEM_ID);
}

نهج بديل: إزالة التكرار بهاش المحتوى. حسّب هاش لمحتوى الرسالة وتحقق إذا نفس الهاش تعالج مؤخراً. إذا البيانات ما تغيرت، المزامنة بدون تأثير.

3. المعالجة المكررة

أعطال الشبكة، إعادة تشغيل الـ workers، وضمانات التسليم at-least-once تعني إنه نفس الرسالة ممكن توصل أكثر من مرة. بدون idempotency، بتحصل على طلبات مكررة، إيميلات مكررة، سجلات قاعدة بيانات مكررة.

الحل: مخازن idempotency مع إزالة تكرار بمفتاح عمل تجاري.

// مخزن Idempotency بمفتاح عمل تجاري
class IdempotencyStore {
    async checkAndAcquire(key: string, scope: string): Promise<boolean> {
        try {
            await this.db.insert('idempotency_keys', {
                key,
                scope,
                status: 'PROCESSING',
                created_at: new Date(),
                expires_at: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 ساعة TTL
            });
            return true; // تم الحجز، تابع المعالجة
        } catch (error) {
            if (isDuplicateKeyError(error)) {
                return false; // تمت المعالجة سابقاً، تخطى
            }
            throw error;
        }
    }

    async complete(key: string, scope: string): Promise<void> {
        await this.db.update('idempotency_keys',
            { status: 'COMPLETED', completed_at: new Date() },
            { key, scope }
        );
    }
}

// الاستخدام بالـ worker
async function handleNotification(message: NotificationMessage) {
    const dedupeKey = `${message.recipientEmail}:${message.category}:${message.entityRef}:${todayBucket()}`;

    const acquired = await idempotencyStore.checkAndAcquire(dedupeKey, 'notification');
    if (!acquired) {
        logger.info('Duplicate notification skipped', { dedupeKey });
        return; // ACK الرسالة
    }

    try {
        await emailService.send(message);
        await idempotencyStore.complete(dedupeKey, 'notification');
    } catch (error) {
        await idempotencyStore.fail(dedupeKey, 'notification');
        throw error; // NACK، خلي الطابور يعيد المحاولة
    }
}

مفتاح إزالة التكرار لازم يكون ذو معنى تجاري. للإشعارات: المستلم + الفئة + الكيان + فترة اليوم (يمنع إرسال نفس الإشعار مرتين بنفس اليوم لكن يسمح فيه اليوم التالي). للطلبات: المستأجر + المنتج + التاريخ. للاستيرادات: معرف السجل المصدر + معرف دفعة الاستيراد.

4. التعامل مع Dead Letters

الرسائل اللي تفشل بشكل متكرر بتنتهي بطابور dead letter. أغلب الفرق تعمل الـ DLQ وبعدين تتجاهله. الـ dead letters تتراكم. بعد أشهر، حدا يكتشف آلاف الرسائل غير المعالجة.

// معالج dead letter مع تصنيف
async function processDeadLetter(message: DeadLetterMessage) {
    const failureType = classifyFailure(message.error);

    switch (failureType) {
        case 'TRANSIENT':
            // انقطاع شبكة، خدمة معطلة مؤقتاً
            // أعد الإدخال بتأخير أسي
            await requeue(message, { delay: calculateBackoff(message.retryCount) });
            break;

        case 'PERMANENT':
            // بيانات غير صالحة، عدم تطابق schema، مخالفة قاعدة عمل
            // سجّل، نبّه، أرشف. لا تعيد المحاولة.
            await archiveDeadLetter(message);
            await alertOps('Permanent failure in dead letter', message);
            break;

        case 'POISON':
            // الرسالة نفسها تسبب انهيارات (محتوى مشوّه)
            // أرشف فوراً، لا تعيد المحاولة أبداً
            await archiveDeadLetter(message);
            await alertOps('Poison message detected', message);
            break;
    }
}

function classifyFailure(error: string): 'TRANSIENT' | 'PERMANENT' | 'POISON' {
    if (error.includes('ECONNREFUSED') || error.includes('timeout')) return 'TRANSIENT';
    if (error.includes('SyntaxError') || error.includes('Cannot parse')) return 'POISON';
    return 'PERMANENT';
}

الفكرة الأساسية: مش كل dead letters متشابهة. الأعطال المؤقتة لازم تنعاد. الأعطال الدائمة لازم تتأرشف وتُحقق. الرسائل السامة لازم تنحجر فوراً.

لكيف نتعامل مع أنماط الفشل بأنظمة الذكاء الاصطناعي تحديداً، شوف دليل أنماط فشل الذكاء الاصطناعي.

اختيار Message Broker

استخدمنا أربع brokers مختلفين بالإنتاج. كل واحد عنده نقطة قوة واضحة.

الـ Brokerالأفضل لـمش مناسب لـتعقيد النشر
Kafkaبث أحداث عالي الإنتاجية، event sourcing، إعادة التشغيلطوابير مهام بسيطة، أنظمة منخفضة الحجمعالي (ZooKeeper/KRaft، أقسام، مجموعات مستهلكين)
RabbitMQالتوجيه، dead letters، طوابير الأولوية، طوبولوجيات معقدةإنتاجية عالية جداً (ملايين/ثانية)، إعادة تشغيل الأحداثمتوسط (تجميع، سياسات HA)
BullMQ (Redis)طوابير مهام، مهام مؤجلة، تحديد المعدل، إعدادات بسيطةتوجيه معقد، إعادة تشغيل رسائل، أنماط متعددة المستهلكينمنخفض (بس Redis)
Symfony Messengerتطبيقات PHP/Symfony، Pimcore، إرسال غير متزامن بسيطبيئات غير PHP، ميزات broker معقدةمنخفض (يستخدم Doctrine أو AMQP أو Redis كنقل)

متى تستخدم Kafka

  • تحتاج إعادة تشغيل الأحداث (المستهلكين يقدرون يقرأون من أي offset)
  • عندك مجموعات مستهلكين متعددة تعالج نفس الأحداث بشكل مختلف
  • الإنتاجية تتجاوز 100 ألف رسالة بالثانية
  • عم تعمل event sourcing أو تبني pipeline لالتقاط تغييرات البيانات
  • تحتاج ترتيب مضمون داخل القسم

متى تستخدم RabbitMQ

  • تحتاج توجيه معقد (topic exchanges، توجيه بالـ headers)
  • طوابير dead letter مع سياسات إعادة محاولة تلقائية مهمة
  • أولوية الرسائل مهمة (رسائل عاجلة تتعالج أولاً)
  • تحتاج أنماط طلب-رد (RPC عبر الرسائل)
  • نظامك فيه خدمات متعددة بلغات مختلفة

متى تستخدم BullMQ

  • بيئتك Node.js/TypeScript (مثل Vendure، NestJS)
  • جدولة مهام مع تأخير وأنماط cron
  • تحديد المعدل حسب الطابور أو نوع المهمة
  • عندك Redis مسبقاً للكاش والجلسات
  • نظامك بحجم صغير لمتوسط (أقل من 10 آلاف رسالة/ثانية)

متى تستخدم Symfony Messenger

  • بيئتك PHP/Symfony/Pimcore
  • تحتاج إرسال غير متزامن بسيط لمهام الخلفية
  • بنية الـ worker تتبع اتفاقيات Symfony (مثل supervisord)
  • مرونة النقل (تحويل بين Doctrine وAMQP وRedis بدون تغيير الكود)

لمشاريع Pimcore، نستخدم Symfony Messenger مع نقل RabbitMQ. لمشاريع Vendure، نستخدم BullMQ. لمنصات استيعاب بيانات عالية الإنتاجية، نستخدم Kafka. اختيار الـ broker يتبع البيئة، مش العكس.

هالأنماط مهمة بكل مشاريع البرمجيات المخصصة وهندسة البيانات اللي نشتغل عليها. لو بتبني أنظمة سحابية، اختيار الـ broker بيأثر مباشرة على بنية النشر.

ترتيب الرسائل

أغلب الفرق تفترض إنه ترتيب الرسائل مهم. عادةً مش كذلك.

السيناريوالترتيب مطلوب؟ليش
إرسال إيميل إشعارلاالإيميلات بطبيعتها غير مرتبة
توليد صورة مصغرةلاآخر نسخة تكسب
تحديث فهرس البحثلاآخر حالة تكسب (اتساق تدريجي)
معالجة خطوات الدفعنعمالشحن لازم يصير قبل الالتقاط
انتقالات حالة الطلبنعمما تقدر تشحن قبل تأكيد الدفع
إعادة تشغيل event sourcingنعمالأحداث لازم تنعاد بترتيب سببي

لما الترتيب مهم، استخدم طوابير FIFO (مثل SQS FIFO، أقسام Kafka مفتاحية بمعرف الكيان، RabbitMQ بمستهلك واحد). لما مش مهم، المعالجة المتوازية أسرع وأبسط.

// Kafka: ترتيب داخل القسم (مفتاح بمعرف المنتج)
await producer.send({
    topic: 'product-updates',
    messages: [{
        key: productId,  // كل تحديثات نفس المنتج تروح لنفس القسم
        value: JSON.stringify(event),
    }],
});

// BullMQ: ما في حاجة للترتيب بالصور المصغرة
await thumbnailQueue.add('generate', { productId }, {
    removeOnComplete: true,
    attempts: 3,
    backoff: { type: 'exponential', delay: 5000 },
});

استراتيجيات إعادة المحاولة

مش كل إعادات المحاولة متساوية. الاستراتيجية تعتمد على نوع الفشل.

الاستراتيجيةمتى تستخدمهامثال
إعادة فوريةخلل مؤقت (حالة سباق)فشل قفل تفاؤلي
تأخير أسيخدمة معطلة مؤقتاًانتهاء مهلة API خارجي
تأخير ثابتتحديد معدلAPI يرجع 429
بدون إعادةفشل دائممحتوى غير صالح، مخالفة قاعدة عمل
// إعداد إعادة المحاولة بـ BullMQ
const queue = new Queue('notifications', {
    defaultJobOptions: {
        attempts: 5,
        backoff: {
            type: 'exponential',
            delay: 60000, // 1 دقيقة، 2 دقيقة، 4 دقائق، 8 دقائق، 16 دقيقة
        },
        removeOnComplete: { count: 1000 },
        removeOnFail: false, // احتفظ فيها لتحليل dead letter
    },
});

بعد ما تخلص كل المحاولات، الرسالة تنتقل لطابور dead letter. لا تعيد المحاولة للأبد. حط عدد محاولات أقصى وتعامل مع الفشل بشكل صريح.

مراقبة الأنظمة المدفوعة بالأحداث

أصعب جزء بالأنظمة المدفوعة بالأحداث هو التنقيح. لما شي يفشل، الخطأ ممكن يكون بعد 5 خدمات و12 رسالة من السبب الأصلي.

معرفات الارتباط (Correlation IDs)

كل رسالة تحمل correlation ID يربطها بالإجراء الأصلي:

// طلب API الأصلي يولّد correlation ID
const correlationId = generateUUID();

// كل رسالة بالسلسلة تحمله
await queue.add('process-order', {
    orderId: order.id,
    correlationId,
});

// الـ Workers يمررونه للرسائل اللاحقة
async function handleProcessOrder(job) {
    const { orderId, correlationId } = job.data;

    // ... معالجة الطلب ...

    // الرسائل اللاحقة تحمل نفس الـ correlation ID
    await notificationQueue.add('send-confirmation', {
        orderId,
        correlationId, // نفس المعرف، قابل للتتبع رجوعاً للطلب الأصلي
    });
}

لما تنقّح، استعلم عن كل السجلات والأحداث بالـ correlation ID عشان تشوف السلسلة الكاملة. لأنماط التتبع الموزع أكثر، شوف دليل مراقبة الذكاء الاصطناعي. ونهجنا بالـ منهجية يوضح كيف ندمج هالممارسات بمشاريعنا.

مقاييس صحة الطابور

المقياسصحيتحذيرحرج
عمق الطابور< 100100-1000> 1000
وقت المعالجة (p95)< 5 ثواني5-30 ثانية> 30 ثانية
معدل الفشل< 1%1-5%> 5%
عدد dead letter01-10> 10
تأخر المستهلك (Kafka)< 10001K-10K> 10K

نبّه على اتجاهات عمق الطابور، مش القيم المطلقة. عمق طابور 500 مستقر من ساعات طبيعي. عمق طابور 50 كان 0 قبل ساعة وعم يزيد هو مشكلة.

هالنوع من المراقبة حرج بـ خدمات الاستشارات عندنا، ونطبق نفس المبادئ على أنظمة الذكاء الاصطناعي اللي نبنيها.

الأخطاء الشائعة

  1. ما في تحكم بالمشتركين أثناء حفظ الـ worker. المصدر الأكبر لعواصف الأحداث. حفظ الـ worker لازم ما يطلق نفس مشتركي الأحداث اللي يرسلون شغل للـ workers.

  2. ما في تتبع مصدر بالمزامنة الثنائية. بدون headers الـ x-source، المزامنة الثنائية بين نظامين بتدور للأبد.

  3. استخدام معرف الرسالة للـ idempotency. معرفات الرسائل تتغير بإعادة التسليم ببعض الـ brokers. استخدم مفاتيح عمل تجارية (معرف الكيان + الإجراء + فترة زمنية) بدلاً.

  4. إعادة محاولة لا نهائية. حط عدد محاولات أقصى. بعد الاستنفاد، حوّل لطابور dead letter. لا تعيد المحاولة للأبد.

  5. تجاهل طوابير dead letter. الـ dead letters هي دين تشغيلي. صنفها (مؤقتة، دائمة، سامة) وتعامل مع كل نوع بشكل مختلف.

  6. افتراض ترتيب الرسائل. أغلب أحمال العمل ما تحتاج ترتيب. المعالجة المتوازية أسرع. استخدم FIFO بس لما انتقالات الحالة أو الترتيب السببي يتطلبه.

  7. نفس استراتيجية إعادة المحاولة لكل الأعطال. الانقطاع يحتاج تأخير أسي. المحتوى غير الصالح يحتاج صفر محاولات. تحديد المعدل يحتاج تأخير ثابت.

  8. ما في correlation IDs. بدونها، تنقيح سلسلة فشل عبر 5 خدمات مستحيل.

لتجنب هالأخطاء بمشاريعك، دليلنا عن هندسة البرمجيات يغطي الممارسات الأساسية. واستكشف حالات الاستخدام الفعلية عندنا لأمثلة تطبيقية.

النقاط الرئيسية

  • عواصف الأحداث هي أخطر نمط فشل. تسلسل حفظ واحد غير متحكم فيه ممكن يشبع كل بنيتك التحتية. الـ EventSubscriberSupervisor مش اختياري.

  • المزامنة الثنائية تحتاج تتبع مصدر. كل رسالة تحمل معرف النظام المُصدر. لما تستقبل رسالة من نفسك (عبر النظام الثاني)، تجاهلها.

  • الـ Idempotency يستخدم مفاتيح عمل تجارية، مش معرفات الرسائل. المستلم + الفئة + الكيان + فترة اليوم للإشعارات. المستأجر + المنتج + التاريخ للطلبات. المفتاح لازم يكون ذو معنى تجاري.

  • الـ Dead letters تحتاج تصنيف. الأعطال المؤقتة تنعاد. الأعطال الدائمة تتأرشف. الرسائل السامة تنحجر. لا تتجاهل طابور dead letter أبداً.

  • الـ Broker يتبع البيئة. Kafka للبث عالي الإنتاجية، RabbitMQ للتوجيه المعقد، BullMQ لطوابير مهام Node.js، Symfony Messenger لـ PHP. لا تختار broker وبعدين تبني حوله.

  • أغلب أحمال العمل ما تحتاج ترتيب. المعالجة المتوازية أبسط وأسرع. استخدم FIFO بس لما انتقالات الحالة تتطلبه.

نطبق هالأنماط بكل مشاريعنا من البرمجيات المخصصة وهندسة البيانات والنشر السحابي. إذا عم تبني نظام مدفوع بالأحداث أو تنقّح واحد بمشاكل، تواصل مع فريقنا أو اطلب عرض سعر.

المواضيع المغطاة

البنية المدفوعة بالأحداثطوابير الرسائل إنتاجevent sourcingCQRS إنتاجRabbitMQ مقابل KafkaBullMQعواصف الأحداثidempotencydead letter queueمزامنة ثنائية الاتجاه

جاهز لبناء أنظمة ذكاء اصطناعي جاهزة للإنتاج؟

فريقنا متخصص في بناء أنظمة ذكاء اصطناعي جاهزة للإنتاج. خلينا نحكي كيف نقدر نساعد.

ابدأ محادثة