Ausgangspunkt und Zielsetzung

Im folgenden Guide werden wir das im letzten Guide angelegte, noch leere Modul mit Leben füllen. Name und Namespace wurden zwischenzeitlich angepasst, um ein realistisches CSV-Modul zeigen zu können. Das Ziel hierbei ist es ein Modul zu entwickeln, welches CSV-Dateien importieren kann. Hierbei wird das Modul direkt aus CSV-Zeilen Produkte erzeugen. Im Gesamtkontext synQup ist die Verkettung einzelner Module oft tragfähiger als die Entwicklung einzelner, monolithischer Module. Beispielsweise könnte das vorgestellte Beispiel auch folgendermaßen entwickelt werden:

  • ein Modul, welches beliebige CSV-Daten in MongoDB-Dokumente importiert
  • ein zweites Modul, welches aus beliebigen MongoDB-Documents unter Zuhilfenahme einer konfigurierbaren Mapping-Tabelle zu Produkten transformiert

Die erhöhte Flexibilität ist offensichtlich: beispielsweise können die Dokumente nach dem ersten Schritt noch von weiteren Modulen bearbeitet werden, um die Produkte beispielsweise um weitere Attribute zu erweitern. Genau so könnte das zweite Modul aber auch Daten verarbeiten, die von einem anderen Modul (beispielsweise einem generischen XML-Modul) importiert wurden.

Im vorliegenden Guide wird dennoch ein einfaches Modul entwickelt, welches Produkte direkt aus CSV-Daten anlegt. Die Abstimmung zwischen Konfigurier-, Wiederverwendbarkeit auf der einen und Entwicklungszeit auf der anderen Seite ist immer projektabhängig zu treffen.

Modul-Namespaces

Module in synQup unterteilen sich wie bereits vorgestellt in 3 Namespaces: input, transformations und output. Eine Aufteilung kann bei Modulen, die mehrere Richtungen unterstützen durchaus sinnvoll sein: erwartet man also, dass zu dem Input-Modul für ein bestimmtes ERP (also dem Modul, welches Daten einliest) auch noch ein Output-Modul (also ein Modul, welches Daten zum ERP schreibt) entwickelt wird, macht es Sinn, dies auch in Namespaces im Modulpaket abzubilden. Ebenfalls ermöglicht diese Konvention ein einfaches Erkennen von Fehlkonfigurationen (beispielsweise Eingabemodule in Ausgabeschritten).

Da wir im folgenden Beispiel ein CSV-Importmodul entwickeln möchten, legen wir unter src/ einen Ordner Input an. In diesen Ordner wird nun auch die StartMessage unseres Moduls abgelegt. Die Datei StartCsvInputMessage.php sieht beispielsweise so aus:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

class StartCsvInputMessage 
{

}

StartMessages und ProgressDefinition

Selbstverständlich benötigen Module und Core einen Übergabepunkt, an dem der Core das Modul startet und den Kontrollfluss abgibt. Hierfür werden sogenannte StartMessages genutzt. Dieser Name ist ein abstrakter Name, der als solcher in der Programmierung nicht auftaucht, aber dennoch hilfreich für das Verständnis ist. In der konkreten Implementierung müssen alle Klassen, die vom Core gestartet werden können, das Interface ProgressDefinitionProvidingMessage implementieren.

Erweitern wir unsere Klasse also um die Implementierung des Interfaces und legen die Methoden als Stubs an:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

use App\Connector\Flow\ProcessManagement\Progress\Definition\SubsectionProgressDefinition;
use App\Connector\Flow\ProcessManagement\Queue\ControlMessages\ProgressDefinitionProvidingMessage;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class StartCsvInputMessage extends ModuleJobDispatchContextAwareMessage implements ProgressDefinitionProvidingMessage
{

    public static function getProgressDefinition(): SubsectionProgressDefinition
    {
        
    }
}

Des Weiteren wurde eine extends-Anweisung für die Klasse ModuleJobDispatchContextAwareMessage hinzugefügt. Dieser Kontext wird später noch weiter erläutert.

ProgressDefinition

Das Konzept des Progresses wurde in der Einführung zu den Konzepten hinter synQup schon kurz umrissen. Die ProgressDefinition ist diese "Blaupause", aus der dann die Baumstruktur hinter den Fortschrittstabellen erzeugt wird. Das Anlegen der ProgressDefinition als ersten Schritt der Modulentwicklung ist aber auch sinnvoll, um die Planung hinter dem Modul zu festigen.

Die ProgressDefinition wird statisch vom Modul an den Core übergeben und kann nicht zustandsbehaftet sein. Modelliert wird die ProgressDefinition über ein assoziatives Array beliebiger Tiefe.

Die Aufgaben die unser CSV-Modul erledigen soll:

  • Initialisierung
  • Bestimmen der vorhandenen Dateien (wie viele?)
  • Einlesen der einzelnen Dateien und Verarbeitung zu Produkten

Das entspricht einem linearen Verlauf. Wie legt man nun eine ProgressDefinition an? Das Ausgangsobjekt hierfür ist immer eine SubsectionProgressDefinition. Eine Subsection bezeichnet immer einen Unterschritt des gesamten Fortschrittsbaums eines Flows. Durch die Eigenschaft children an der SubsectionProgressDefinition lassen sich Baumstrukturen abbilden. Da das Interface die Rückgabe einer einzelnen Definition vorgibt und unser Beispiel linear abläuft, sieht unsere Methode getProgressDefinition nun so aus:

    public static function getProgressDefinition(): SubsectionProgressDefinition
    {
        return new SubsectionProgressDefinition('input', 'csv-input', 0, [
           new SubsectionProgressDefinition('input','sanity-checks', 0),
           new SubsectionProgressDefinition('input', 'read-file', 1),
           new SubsectionProgressDefinition('input', 'create-product-batch', 2)
        ]);
    }

Durch die Baumstruktur ergibt sich auch gleich eine konkatenierbare Adressierungsmöglichkeit für diese Progress-Items: durch das Verketten der sogenannten Subsection-Identifier ergibt sich für unseren ersten Prozessschritt also der Ausdruck input.csv-input.sanity-checks. Die Aufgaben der Schritte stellen sich wie folgt dar:

  • Sanity Check: Vorbedingungen prüfen (beispeilsweise API-Erreichbarkeit oder in unserem Fall das Vorhandensein von Dateien)
  • Read File: Einlesen einer einzelnen Datei (parallelisierbar!)
  • Create Product Batch: Erstellen einer Teilmenge der zu erzeugenden Produkte aus der CSV-Datei

Diese Teilschritte lassen sich im Modul mit Messages und dem zentralen Message-Bus abbilden. Nach der StartMessage geht der Kontrollfluss an das Modul über (s.o.). Hierbei werden dann die grundlegenden Informationen über Fortschrittseinträge ermittelt und gesetzt sowie die weitere Verarbeitung orchestriert. Zunächst einmal sollten wir dennoch unser Modul auf Funktion überprüfen.

Konfiguration: Flows

Wie bereits in der Einführung zu den grundlegenden Konzepten besprochen, werden einzelne Modulkonfigurationen innerhalb sogenannter Flows zusammengefasst. Um unser Modul testen zu können, benötigen wir also einen Flow. Diesen können wir einfach auf Datenbankebene anlegen. Die Tabelle Flow besteht aus einem Namen, dem vollqualifizierten Klassennamen einer Prozessdefinition und einer JSON-basierten flexiblen Konfiguration. Die Prozessdefinition ist im Standard meist die Klasse App\Connector\Flow\ProcessDefinition\Standard\StandardProcessDefinition. Die JSON-Konfiguration hält Parameter, welche für den gesamten Flow (also alle Module gleichermaßen oder den Core) relevant sind. Da wir diese nicht benötigen, sieht die Tabellenzeile für unseren Testflow sehr einfach aus:

Flow-Tabelle

Konfiguration: JobDispatcherMapping

Möchten wir in diesem Flow nun unser neu angelegtes Modul nutzen benötigen wir dafür einen weiteren Eintrag in der Tabelle job_dispatcher_mapping (JDM). In dieser Tabelle werden alle Module eines bestimmten Flows, ihre Zuordnung zu Prozessschritten, deren Konfiguration sowie Abhängigkeiten untereinander gesteuert. Die Module werden hierbei über den vollqualifizierten Klassennamen ihrer StartMessage identifiziert. Die Konfiguration kann wieder mit willkürlichem JSON befüllt werden. Die Konfiguration wird dem Modul beim Starten zur Verfügung gestellt. Hierdurch lassen sich beispielsweise API-Zugangsdaten oder genutzte Dateisystemabstraktionen konfigurieren. Ebenfalls gibt es eine Ersetzungslogik: bestimmte Platzhalter im JSON, wie beispielsweise %LAST_EXECUTION_CREATED_AT%, werden vom Core ersetzt, bevor die Konfiguration den Modulen zur Verfügung gestellt wird.

Das JDM für unser CSV-Modul sieht beispielsweise so aus: JobDispatcherMapping-Tabelle

Hello, World!

Um unsere Konfiguration sowie die bisher getätigten Schritte zu testen, bietet es sich an, einen ersten Handler für die StartMessage anzulegen und zu prüfen, ob der Core das angelegte Modul finden und aufrufen kann. Hierzu legen wir im Ordner Input eine weitere Klasse an, die die erhaltene StartMessage dann verarbeitet. Als simple Konvention benennen wir in diesem Modul alle Handler-Klassen mit den Namen der Message, erweitert um "Handler". Die fertige Klasse:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class StartCsvInputMessageHandler implements MessageHandlerInterface
{
    public function __invoke(StartCsvInputMessage $message) {
        die('Hello World' . PHP_EOL);
    }
}

Zu beachten ist hierbei:

  • Die Klasse implementiert das Symfony-eigene MessageHandlerInterface. Dieses schreibt keine Methoden vor, sondern existiert lediglich als Markierung für das Framework
  • Die magische __invoke-Methode stellt das Mapping auf die zu empfangenden Nachrichten her (in diesem Fall Nachrichten vom Typ StartCsvInputMessage)

Wurden alle Schritte bis hier hin richtig gemacht, lässt sich das Modul jetzt starten. Hierzu muss zunächst ein Prozess gestartet werden, welcher die Nachrichten abarbeitet. Dieser sogenannte Worker-Prozess lässt sich für die lokale Entwicklung einfach per CLI starten. Der folgende Befehl funktioniert innerhalb des Containers:

root@f4490118644b:/var/www/html# php bin/console messenger:consume pm job_high job_mid job_low


 [OK] Consuming messages from transports "pm, job_high, job_mid, job_low".


 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

 // Re-run the command with a -vv option to see logs about consumed messages.

Worker Started!

Um den Flow nun zu starten, wird ein zweiter Terminal-Tab benötigt. Ebenfalls innerhalb des Containers lässt sich hierfür dann der folgende Befehl nutzen:

root@f4490118644b:/var/www/html# php bin/console synqup:flow:start 1 --force-removal
Creating flow execution…
Created flow execution via service, see worker for output.

beobachten wir nun die Ausgabe in unserem zweiten Tab:

Hello World
root@f4490118644b:/var/www/html#

Zu sehen ist also, dass der Code geladen wurde und das Modul korrekt registriert ist.

Implementierung StartMessageHandler

Der StartMessageHandler, welcher aktuell den Prozess mit "Hello World" beendet, ist also unser Einstiegspunkt. Diesem fallen folgende Aufgaben zu:

  • Setzen der initialen Fortschrittswerte ("0/X Datensätze verarbeitet")
  • Prüfen d. Vorhandenseins der Dateien (sanity check)
  • Erzeugen weiterer Nachrichten

Zu beachten ist, dass der StartMessageHandler generell keine langwierigen Aufgaben erledigen sollte. Dies liegt darin begründet, dass StartMessageHandler vom Core als hochpriorisierte Nachrichten behandelt werden. API-Aufrufe oder andere zeitkritische Anforderungen sollten besser in einer späteren Nachricht behandelt werden, bei der die Priorität niedriger ist.

Progress

Der erste Schritt wäre also die Initialisierung der Fortschrittswerte. Hierfür stellt der Core sogenannte Traits bereit. Die Nutzung von Traits entbindet den Modulentwickler von aufwändigen Symfony-Containerkonfigurationen und bietet einfache Zugriffsmethoden auf häufig verwendete Core-Funktionen. Traits sind kleine "Funktionsbausteine": die beschriebenen Methoden und Eigenschaften werden in Klienten des Traits übernommen und stehen danach zur Verfügung, als hätte man diese in der Klientenklasse implementiert. Die use-Deklaration eines Traits folgt im Klasseninneren, also dort wo auch eigene Methoden und Eigenschaften definiert werden.

class StartCsvInputMessageHandler implements MessageHandlerInterface
{
    use UpdatesProgress;

Das Setzen einzelner Progress-Items passiert dann über die Methode updateProgress unter Zuhilfenahme des bereits vorgestellten progress paths, also die Punkt-verknüpfte Aneinanderreihung der Subsection-Identifier (bspw.: input.csv-input.sanity-checks).
Schauen wir uns nun die Signatur der vom Trait hinzugefügten updateProgress-Methode an, fällt auf, dass diese einen Kontext benötigt. Was ist dieser Kontext und wofür wird er benötigt?

Kontext

Der übergebene Kontext (ausgeschrieben: ModuleJobDispatchContext) beinhaltet die bereits angesprochene Modulkonfiguration sowie Informationen zur aktuellen Ausführung. Diese Informationen sind regelmäßig wichtig, um dem Core die Nachverfolgung einzelner Modulausführung bzw. deren Verlauf zu ermöglichen. Die Aktualisierung der Progress-Items beispielsweise benötigt diesen Kontext um JobDispatcherMapping und Flow-Execution zuzuordnen, da das gleiche Modul auch in mehreren Flows und/oder JobDispatcherMappings gleichzeitig laufen könnte. Auch für kontextbasiertes Logging ist es unerlässlich, diesen Kontext immer und wieder durchzuschleifen. Er sollte innerhalb des Moduls an sich durch jede Message weitergegeben werden, weswegen es die oben erweiterte Basisklasse ModuleJobDispatchContextAwareMessage gibt. Diese lässt sich auch im eigenen Modul als einfache Basisklasse mit Konstruktorparameter für den Kontext nutzen.

Implementieren wir also die Initialisierung der Progress-Items:

    public function __invoke(StartCsvInputMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        $this->updateProgress($context, 'input.csv-input', 0, 3);
        $this->updateProgress($context, 'input.csv-input.sanity-checks', 0, 1);
        $this->updateProgress($context, 'input.csv-input.read-file', 0, 1);
        $this->updateProgress($context, 'input.csv-input.create-product-batch', 0, 1);
    }

Schnell fällt hier auf, dass die angegebenen Werte so keinen Sinn machen: ohne die Information darüber, wieviele Dateien es gibt, lässt sich der total progress nicht bestimmen. Woher wissen wir eigentlich, wieviele Dateien es gibt? Wo liegen diese?

Erweiterung der Konfiguration

Um im Modul keinen hart-codierten Pfad zu benutzen und die Wiederverwendbarkeit zu erhöhen, bietet es sich an, den Pfad, in dem nach Dateien gesucht werden soll, konfigurierbar zu machen. Wir haben bereits gelernt, dass die JSON-Config des JobDispatcherMappings (also der konkreten Modulkonfiguration für einen bestimmten Flow) willkürliches JSON akzeptiert. Erweitern wir also die Spalte configuration der Tabelle job_dispatcher_mapping, um eine Information des Pfads:

Achtung: in Modulen, welche nicht nur zu Ausbildungszwecken entstehen, sollte die Dateisystemabstraktion genutzt werden.

{
  "csvPath": "/var/www/html/csvs"
}

Erweitern wir nun also unsere invoke-Methode um Funktionen um die Dateien zu zählen.

    public function __invoke(StartCsvInputMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        
        $path = $context->getConfig()['csvPath'] ?? '/var/www/html/csv_files_fallback';
        $csvFiles = glob(rtrim($path, '/') . '/*.csv');
        $csvCount = count($csvFiles);
        
        $this->updateProgress($context, 'input.csv-input', 0, 3);
        $this->updateProgress($context, 'input.csv-input.sanity-checks', 1, 1);
        $this->updateProgress($context, 'input.csv-input.read-file', 0, $csvCount);
        
        if (!$csvCount) {
            $this->updateProgress($context, 'input.csv-input.create-product-batch', 1, 1);
        } else {
            // @TODO: implement Message Dispatch
        }
    }

Zu beachten: die Funktion zum Setzen des Batch-Counts wurde entfernt. Auch wird der Progress des Sanity-Checks hier direkt gesetzt. Werden keine CSV-Dateien gefunden, werden alle Progress-Items als fertig (processed count = total count) markiert. Hierdurch würde unsere FlowExecution bereits beendet, da das einzige Modul fertig ist.

Implementieren wir nun also eine 2. Message, die die gefundenen CSV-Dateien einliest. Hierfür legen wir zwei Ordner unter "Input" an: Jobs und Jobs/ReadFile. Legen wir nun die neue Nachricht, die ReadFileMessage an:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ReadFile;

use App\Connector\Context\ModuleJobDispatchContext;
use App\Connector\Flow\ProcessDefinition\Standard\Messages\TagInterfaces\TagPriorityMidInterface;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class ReadFileMessage extends ModuleJobDispatchContextAwareMessage
{
    private string $filePath;

    /**
     * ReadFileMessage constructor.
     * @param string $filePath
     */
    public function __construct(string $filePath, ModuleJobDispatchContext $moduleJobDispatchContext)
    {
        $this->filePath = $filePath;
        parent::__construct($moduleJobDispatchContext);
    }

    /**
     * @return string
     */
    public function getFilePath(): string
    {
        return $this->filePath;
    }
}

An dieser Klasse lässt sich gut zeigen, dass Nachrichten auch für den Datenaustausch genutzt werden können: natürlich sollten die Datenfragmente möglichst klein gehalten werden (Steuerdaten, keine Nutzdaten!). Dennoch lassen sich somit die Belange einzelner Nachrichten wunderbar kapseln bzw. parametrisieren.

Diese Message muss nun in Abhängigkeit der gefundenen Dateien versendet werden. Erweitern wir hierzu den Konstruktor des StartCsvInputMessageHandler und holen uns aus Symfonys Service-Container das MessageBusInterface, mit dem wir unsere eigenen Nachrichten verschicken können.

    use UpdatesProgress;
    
    private MessageBusInterface $messageBus;

    public function __construct(MessageBusInterface $messageBus) {
        $this->messageBus = $messageBus;
    }
    
    public function __invoke(StartCsvInputMessage $message) {

Erweitern wir als Nächstes die invoke-Methode, um die ReadFileMessage für jede gefundene Datei abzusenden:

    $this->updateProgress($context, 'input.csv-input.read-file', 0, $csvCount);
    
    foreach ($csvFiles as $filePath) {
        $path = realpath($filePath);
        $this->messageBus->dispatch(new ReadFileMessage($path, $context));
    }

Implementierung Einlesen der Dateien

Ergänzen wir nun im ReadFile-Ordner einen weiteren MessageHandler für unsere ReadFileMessage, den ReadFileMessageHandler:

<?php

namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ReadFile;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ReadFileMessageHandler implements MessageHandlerInterface
{
    private MessageBusInterface $messageBus;

    public function __construct(MessageBusInterface $messageBus) {
        $this->messageBus = $messageBus;
    }    
    public function __invoke(ReadFileMessage $message) {
        
    }
}

Die Dateien sollen beim Einlesen in kleine Teile, sogenannte "Batches" aufgeteilt werden. Hierdurch kann die Verarbeitungsgeschwindigkeit auf Multiprozessorsystemen erhöht werden. In unserem Beispiel wählen wir eine batch size von 50, also 50 Produkte je Teilschritt. Dies lässt sich beispielsweise so implementieren:

    public function __invoke(ReadFileMessage $message) {
        $filePath = $message->getFilePath();
        $handle = fopen($filePath, "r");
        
        $batch = [];
        if ($handle) {
            while (($line = fgets($handle)) !== false) {
                $batch[] = $line;
                if (count($batch) === 50) {
                    // Batch built, @TODO: dispatch message
                    
                    $batch = [];
                }
            }
            // Remainder Batch, @TODO: dispatch message

            fclose($handle);
        } else {
            throw new RuntimeException(sprintf('Cannot read passed file %s', $filePath));
        }
    }

Beachten wir nun auch das Progress-Handling: das Einlesen einer Datei ist dann fertig, wenn das Datei-Handling abgeschlossen ist. Den Progress für read-file können wir danach also erhöhen. Dennoch haben wir den Total-Progress der Batches noch nicht bestimmen können, da wir im StartMessageHandler keine Info darüber haben, wieviele Zeilen eine Datei beinhaltet. Ebenso macht es keinen Sinn, alle Dateien einmal einzulesen, nur um die Zeileninformation (und damit den _total count_der zu erzeugenden Batches) zu erhalten. Bisher haben wir die updatesProgress Methode nur genutzt, um absolute Werte zu setzen. Durch die dynamischen Parameter (Strings oder Integer) lässt sich jedoch auch ein Inkrement/Dekrement von processed count und total count umsetzen. Das sieht beispielsweise so aus:

$this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');

hierbei wird nur der total count erhöht. Dies macht für die Batches Sinn, da der verarbeitende Message-Handler der Batches den processed count setzen sollte. Die Anweisung '+0' im processed count ist deshalb nötig, da der Wert nicht bekannt ist: eventuell wurde ja bereits eine Batch verarbeitet, bis in den ReadFileMessageHandler(n) die letzte Datei eingelesen wurde.

Oder auch für die Inkrementierung der verarbeiteten Dateien:

$this->updateProgress($context, 'input.csv-input.create-product-batch', '+1');

vervollständigen wir das Beispiel also um neue Nachrichten und erzeugen diese.

Input\Jobs\ImportBatch\ImportBatchMessage

<?php

namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ImportBatch;

use App\Connector\Context\ModuleJobDispatchContext;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class ImportBatchMessage extends ModuleJobDispatchContextAwareMessage
{
    private array $batches;

    public function __construct(array $batches, ModuleJobDispatchContext $context)
    {
        $this->batches = $batches;
        parent::__construct($context);
    }

    /**
     * @return array
     */
    public function getBatches(): array
    {
        return $this->batches;
    }
}

ReadFileMessageHandler::invoke

    public function __invoke(ReadFileMessage $message) {
        $filePath = $message->getFilePath();
        $handle = fopen($filePath, "r");
        $context = $message->getModuleJobDispatchContext();

        $batch = [];
        if ($handle) {
            while (($line = fgets($handle)) !== false) {
                $batch[] = $line;
                if (count($batch) === 50) {
                    $this->messageBus->dispatch(new ImportBatchMessage($batch, $context));
                    $this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');
                    $batch = [];
                }
            }
            if (!empty($batch)) {
                // Increase total progress only, since we have no info about parallel processing of messages
                $this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');
                $this->messageBus->dispatch(new ImportBatchMessage($batch, $context));
            }

            $this->updateProgress($context, 'input.csv-input.read-file', '+1');
            fclose($handle);
        } else {
            throw new RuntimeException(sprintf('Cannot read passed file %s', $filePath));
        }
    }

Implementierung Produkterstellung

Zuletzt müssen aus den CSV-Datensätzen nur noch Produkte erzeugt werden. Betrachten wir einen Beispieldatensatz:

Kleine blaue Gartenschere,92729,Diese kleine blaue Gartenschere ermöglicht Ihnen genaue Schnitte,100

Die Kopfzeilendefinition unserer CSV-Datei sieht also so aus:

Artikelname,Artikelnummer,Beschreibung,Preis

synQup wird mit einem eigenen Datenmodell ausgeliefert (Transferschema/Transfermodell), welches Abbildungen für 99% und Erweiterungen für das letzte Prozent der E-Commerce Anwendungsfälle bietet. Selbstverständlich lassen sich auch Produkte großer Komplexität abbilden. Das Datenmodell liegt in den Paketen synqup/common und synqup/commerce zur Ansicht bereit. Im folgenden Nutzen wir nur einen sehr kleinen Ausschnitt hiervon.

Der Message Handler hierfür (ImportBatchMessageHandler) ist im Wesentlichen selbsterklärend:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ImportBatch;

use App\Connector\Flow\ProcessManagement\Progress\Traits\UpdatesProgress;
use Doctrine\ODM\MongoDB\DocumentManager;
use Elio\CommerceBundle\Document\Product\GeneralInformation;
use Elio\CommerceBundle\Document\Product\Price\SellingPrice;
use Elio\CommerceBundle\Document\Product\Product;
use Elio\CommerceBundle\Document\Product\ProductIdentifiers;
use Elio\CommonBundle\Definition\Locale;
use Elio\CommonBundle\Document\Translation\TranslationCollection;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ImportBatchMessageHandler implements MessageHandlerInterface
{
    use UpdatesProgress;

    private DocumentManager $documentManager;

    public function __construct(DocumentManager $documentManager) {

        $this->documentManager = $documentManager;
    }

    public function __invoke(ImportBatchMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        foreach ($message->getBatches() as $currentRow) {
            $row = str_getcsv($currentRow);
            $product = new Product($row[1]);
            $product->setProductIdentifiers(new ProductIdentifiers($row[1]));
            $product->setActive(true);
            $product->setGeneralInformation(new GeneralInformation(
                TranslationCollection::create(
                    [Locale::de_DE, $row[0]],
                ),
                TranslationCollection::create(
                    [Locale::de_DE, $row[2]],
                )
            ));
            $product->setDefaultPrice(new SellingPrice($row[3]));
            $this->documentManager->persist($product);
        }
        $this->documentManager->flush();
        $this->updateProgress($context, 'input.csv-input.create-product-batch', '+1');
    }
}

Zu beachten ist lediglich: der Aufruf von flush() am DocumentManager sollte außerhalb der Schleife erfolgen, in der einzelne Elemente mit persist() zum Speichern in der DB vorgemerkt werden. Des Weiteren zeigt sich, dass einige Attribute übersetzbar sind und mit TranslationCollections befüllt werden müssen.