Starting point and objective

In the following guide we will fill the still empty module created in the last guide with life. Name and namespace have been adjusted in the meantime, to be able to show a realistic CSV module. The goal is to develop a module that can import CSV files. Here, the module will generate products directly from CSV lines. In the overall context synQup, the chaining of individual modules is often more viable than the development of individual, monolithic modules. For example, the presented example could also be developed as follows:

  • a module that imports arbitrary CSV data into MongoDB documents
  • a second module that transforms from any MongoDB document to products using a configurable mapping table

The increased flexibility is obvious: for example, after the first step, the documents can still be edited by further modules after the first step, for example to add further attributes to the products. In exactly the same way, however, the second module could also process data imported from another module (for example, a generic XML module).

Nevertheless, in this guide a simple module is developed, which creates products directly from CSV data. The balance between configurability and reusability on the one hand and development time on the other is always project-dependent. always be made on a project-dependent basis.

Module namespaces

Modules in synQup are divided into 3 namespaces: input, transformations and output. A division can be quite useful for modules that support multiple directions: it is expected that in addition to the input module for a specific ERP (i.e. the module that reads in data), there should also be an output module. (i.e. a module that writes data to the ERP), it makes sense to map this in namespaces in the module package. Also, this convention allows easy detection of misconfigurations (for example, input modules in output steps).

Since we want to develop a CSV import module in the following example, we create a folder Input under src/. The StartMessage of our module is now also stored in this folder. For example, the StartCsvInputMessage.php file looks like this:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

class StartCsvInputMessage 
{

}

StartMessages and ProgressDefinition

Of course, modules and core need a handover point where the core starts the module and gives up the control flow. So-called StartMessages are used for this purpose. This name is an abstract name, which does not appear as such in the programming, but is nevertheless helpful for the understanding. In the concrete implementation, all classes that can be started from the core must implement the ProgressDefinitionProvidingMessage interface.

So let's extend our class to implement the interface and create the methods as stubs:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

use App\Connector\Flow\ProcessManagement\Progress\Definition\SubsectionProgressDefinition;
use App\Connector\Flow\ProcessManagement\Queue\ControlMessages\ProgressDefinitionProvidingMessage;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class StartCsvInputMessage extends ModuleJobDispatchContextAwareMessage implements ProgressDefinitionProvidingMessage
{

    public static function getProgressDefinition(): SubsectionProgressDefinition
    {
        
    }
}

Furthermore, an extends statement for the class ModuleJobDispatchContextAwareMessage was added. This context will be explained in more detail later.

ProgressDefinition

The concept of progress has already been briefly outlined in the introduction to the concepts behind synQup. The ProgressDefinition is this "blueprint" from which the tree structure behind the progress tables is then generated. Creating the ProgressDefinition as the first step of module development is also useful to consolidate the planning behind the module.

The ProgressDefinition is passed statically from the module to the core and cannot be stateful. The ProgressDefinition is modeled by an associative array of arbitrary depth.

The tasks that our CSV module should do:

  • Initialization
  • Determine the existing files (how many?)
  • Reading of individual files and processing into products

This corresponds to a linear progression. How do you create a progress definition? The initial object for this is always a SubsectionProgressDefinition. A subsection always denotes a substep of the entire progress tree of a flow. By the property children at the SubsectionProgressDefinition tree structures can be mapped. Since the interface specifies the return of a single definition and our example runs linearly, our method getProgressDefinition sees our method getProgressDefinition now looks like this:

    public static function getProgressDefinition(): SubsectionProgressDefinition
    {
        return new SubsectionProgressDefinition('input', 'csv-input', 0, [
           new SubsectionProgressDefinition('input','sanity-checks', 0),
           new SubsectionProgressDefinition('input', 'read-file', 1),
           new SubsectionProgressDefinition('input', 'create-product-batch', 2)
        ]);
    }

The tree structure also provides a concatenatable addressing option for these progress items: by concatenating the so-called subsection identifiers, our first process step thus results in the expression input.csv-input.sanity-checks. The tasks of the steps are as follows:

  • Sanity Check: Check preconditions (e.g. API reachability or in our case the existence of files)
  • Read File: Read a single file (parallelizable!)
  • Create Product Batch: Create a subset of the products to be created from the CSV file.

These partial steps can be mapped in the module with messages and the central message bus. After the StartMessage the control flow passes to the module (see above). This then determines and sets the basic information about progress entries is determined and set, and further processing is orchestrated. First of all, we should still check our module for function check.

Configuration: Flows

As already discussed in the introduction to the basic concepts, individual module configurations are grouped within so-called flows. So, in order to test our module, we need a flow. We can simply create this at the database level. The Flow table consists of a name, the fully qualified class name of a process definition, and a JSON-based flexible configuration. The process definition is in the standard mostly the class App\Connector\Flow\ProcessDefinition\Standard\StandardProcessDefinition. The JSON configuration holds parameters which are relevant for the entire flow (i.e. all modules equally or the core). Since we do not need these, the table row for our testflow looks very simple:

Flow-Tabelle

Configuration: JobDispatcherMapping

If we want to use our newly created module in this flow, we need an additional entry in the table job_dispatcher_mapping (JDM). This table controls all modules of a specific flow, their assignment to process steps, their configuration and interdependencies. The modules are identified by the fully qualified class name of their StartMessage. The configuration can be filled with arbitrary JSON again. The configuration is made available to the module at startup. This can be used, for example, to configure API access data or used file system abstractions. There is also a replacement logic: certain placeholders in the JSON, such as %LAST_EXECUTION_CREATED_AT%, are replaced by the core before the configuration is made available to the modules.

For example, the JDM for our CSV module looks like this: JobDispatcherMapping-Tabelle

Hello, World!

In order to test our configuration and the steps we have taken so far, it is a good idea to create an initial handler for the StartMessage and check if the core can find and call the created module. For this we create another class in the Input folder, which will then process the received StartMessage. As a simple convention we name all handler classes in this module with the names of the message, extended by "handler". The finished class:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class StartCsvInputMessageHandler implements MessageHandlerInterface
{
    public function __invoke(StartCsvInputMessage $message) {
        die('Hello World' . PHP_EOL);
    }
}

It should be noted here:

  • The class implements Symfony's own MessageHandlerInterface. This does not prescribe any methods, but exists only as a marker for the framework
  • The magic __invoke method establishes the mapping to the messages to be received (in this case messages of type StartCsvInputMessage)

If all steps have been done correctly up to this point, the module can now be started. First of all, a process must be started, which processes the messages. This so-called worker process can be easily started via CLI for local development. simply via CLI. The following command works inside the container:

root@f4490118644b:/var/www/html# php bin/console messenger:consume pm job_high job_mid job_low


 [OK] Consuming messages from transports "pm, job_high, job_mid, job_low".


 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

 // Re-run the command with a -vv option to see logs about consumed messages.

Worker Started!

To start the flow now, a second terminal tab is needed. The following command can also be used within the container:

root@f4490118644b:/var/www/html# php bin/console synqup:flow:start 1 --force-removal
Creating flow execution…
Created flow execution via service, see worker for output.

we now observe the output in our second tab:

Hello World
root@f4490118644b:/var/www/html#

So you can see that the code has been loaded and the module is registered correctly.

StartMessageHandler implementation

So the StartMessageHandler, which currently ends the process with "Hello World", is our entry point. This includes the following tasks:

  • Setting the initial progress values ("0/X records processed")
  • Checking the existence of the files (sanity check)
  • Generate more messages

It should be noted that the StartMessageHandler should generally not do lengthy tasks. This is due to the fact StartMessageHandler are treated as high priority messages by the core. API calls or other time-critical requests are better handled in a later message where the priority is lower.

Progress

The first step would be the initialization of the progress values. For this purpose, the core provides so-called Traits. The use of traits frees the module developer from complex Symfony container configurations and provides simple access methods to frequently used core functions. Traits are small "function modules": the described methods and properties are transferred to clients of the trait and are available afterwards, as if they had been implemented in the client class. The use declaration of a trait follows in the class interior, that is, where its own methods and properties are also defined.

class StartCsvInputMessageHandler implements MessageHandlerInterface
{
    use UpdatesProgress;

The setting of single progress-items is done by the method updateProgress with the help of the already introduced presented progress paths, i.e. the dot-linked string of subsection identifiers (e.g.: input.csv-input.sanity-checks).
If we now look at the signature of the updateProgress method added by the trait, we notice that it requires a context. What is this context and what is it needed for?

Context

The passed context (written out: ModuleJobDispatchContext) contains the already mentioned module configuration and information about the current execution. This information is important on a regular basis to allow the Core to to track individual module executions and their progress. The update of the Progress-Items for example needs this context to assign JobDispatcherMapping and Flow-Execution, because the same module could also run in multiple Flows and/or JobDispatcherMappings at the same time. It is also essential for context-based logging to loop through this context over and over again. It should be passed within the module itself by each message, which is why there is the above extended base class ModuleJobDispatchContextAwareMessage` exists. This can also be used in your own module as a simple base class with constructor parameters for the context.

So let's implement the initialization of the progress items:

    public function __invoke(StartCsvInputMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        $this->updateProgress($context, 'input.csv-input', 0, 3);
        $this->updateProgress($context, 'input.csv-input.sanity-checks', 0, 1);
        $this->updateProgress($context, 'input.csv-input.read-file', 0, 1);
        $this->updateProgress($context, 'input.csv-input.create-product-batch', 0, 1);
    }

It quickly becomes apparent that the given values do not make sense: without the information about how many files there are, the total progress cannot be determined. How do we actually know how many files there are? Where are they located?

Configuration extension

To avoid using a hard-coded path in the module and to increase reusability, it is a good idea to make the path, in which files are to be searched for, configurable. We have already learned that the JSON config of the JobDispatcherMapping (i.e. the concrete module configuration for a given flow) accepts arbitrary JSON. So let's extend the configuration column of the job_dispatcher_mapping table, with an information of the path:

Warning: the file system abstraction should be used in modules that are not only created for educational purposes.

{
  "csvPath": "/var/www/html/csvs"
}

So now let's extend our invoke method with functions to count the files.

    public function __invoke(StartCsvInputMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        
        $path = $context->getConfig()['csvPath'] ?? '/var/www/html/csv_files_fallback';
        $csvFiles = glob(rtrim($path, '/') . '/*.csv');
        $csvCount = count($csvFiles);
        
        $this->updateProgress($context, 'input.csv-input', 0, 3);
        $this->updateProgress($context, 'input.csv-input.sanity-checks', 1, 1);
        $this->updateProgress($context, 'input.csv-input.read-file', 0, $csvCount);
        
        if (!$csvCount) {
            $this->updateProgress($context, 'input.csv-input.create-product-batch', 1, 1);
        } else {
            // @TODO: implement Message Dispatch
        }
    }

Note: the function to set the batch count has been removed. The progress of the sanity check is also set directly here. If no CSV files are found, all progress items are marked as finished (processed count = total count). This would already end our FlowExecution, since the only module is finished.

So now let's implement a 2nd message that reads in the CSV files it finds. For this we create two folders under "Input": Jobs and Jobs/ReadFile. Now we create the new message, the ReadFileMessage:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ReadFile;

use App\Connector\Context\ModuleJobDispatchContext;
use App\Connector\Flow\ProcessDefinition\Standard\Messages\TagInterfaces\TagPriorityMidInterface;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class ReadFileMessage extends ModuleJobDispatchContextAwareMessage
{
    private string $filePath;

    /**
     * ReadFileMessage constructor.
     * @param string $filePath
     */
    public function __construct(string $filePath, ModuleJobDispatchContext $moduleJobDispatchContext)
    {
        $this->filePath = $filePath;
        parent::__construct($moduleJobDispatchContext);
    }

    /**
     * @return string
     */
    public function getFilePath(): string
    {
        return $this->filePath;
    }
}

This class is a good example of how messages can also be used for data exchange: of course, the data fragments should be kept as small as possible (control data, no user data!). Nevertheless, the concerns of individual messages can be wonderfully encapsulated or parameterized in this way.

This message must now be sent depending on the files found. For this we extend the constructor of the StartCsvInputMessageHandler and get the MessageBusInterface from Symfony's service container, with which we can send our own messages.

    use UpdatesProgress;
    
    private MessageBusInterface $messageBus;

    public function __construct(MessageBusInterface $messageBus) {
        $this->messageBus = $messageBus;
    }
    
    public function __invoke(StartCsvInputMessage $message) {

Next, let's extend the invoke method to send the ReadFileMessage for each file found:

    $this->updateProgress($context, 'input.csv-input.read-file', 0, $csvCount);
    
    foreach ($csvFiles as $filePath) {
        $path = realpath($filePath);
        $this->messageBus->dispatch(new ReadFileMessage($path, $context));
    }

Implementation Reading in the files

Now let's add another MessageHandler in the ReadFile folder for our ReadFileMessage, the ReadFileMessageHandler:

<?php

namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ReadFile;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ReadFileMessageHandler implements MessageHandlerInterface
{
    private MessageBusInterface $messageBus;

    public function __construct(MessageBusInterface $messageBus) {
        $this->messageBus = $messageBus;
    }    
    public function __invoke(ReadFileMessage $message) {
        
    }
}

The files are to be divided into small parts, so-called "batches", when they are read in. This can increase the processing speed on multiprocessor systems. In our example we choose a batch size of 50, i.e. 50 products per partial step. For example, this can be implemented like this:

    public function __invoke(ReadFileMessage $message) {
        $filePath = $message->getFilePath();
        $handle = fopen($filePath, "r");
        
        $batch = [];
        if ($handle) {
            while (($line = fgets($handle)) !== false) {
                $batch[] = $line;
                if (count($batch) === 50) {
                    // Batch built, @TODO: dispatch message
                    
                    $batch = [];
                }
            }
            // Remainder Batch, @TODO: dispatch message

            fclose($handle);
        } else {
            throw new RuntimeException(sprintf('Cannot read passed file %s', $filePath));
        }
    }

Now we also consider the progress handling: the reading of a file is finished when the file handling is completed. So we can increase the progress for read-file afterwards. However, we have not yet been able to determine the total progress of the batches because we have no info in the StartMessageHandler about how many lines a file contains. Likewise, it does not make sense to read in all files once, only in order to receive the line information (and thus the _total count_of the batches to be created). So far we have used the updatesProgress method only to set absolute values. However, the dynamic parameters (strings or integers) can also be used to implement an increment/decrement of processed count and total count. This looks like this, for example:

$this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');

Here only the total count is increased. This makes sense for the batches, because the processing message handler of the batches should set the processed count. The instruction '+0' in the processed count is necessary because the value is not known: possibly a batch was already processed until the last file was read into the ReadFileMessageHandler(n).

Or also for incrementing the processed files:

$this->updateProgress($context, 'input.csv-input.create-product-batch', '+1');

So let's complete the example with new messages and create them.

Input\Jobs\ImportBatch\ImportBatchMessage

<?php

namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ImportBatch;

use App\Connector\Context\ModuleJobDispatchContext;
use App\Connector\Flow\ProcessManagement\Queue\ModuleJobDispatchContextAwareMessage;

class ImportBatchMessage extends ModuleJobDispatchContextAwareMessage
{
    private array $batches;

    public function __construct(array $batches, ModuleJobDispatchContext $context)
    {
        $this->batches = $batches;
        parent::__construct($context);
    }

    /**
     * @return array
     */
    public function getBatches(): array
    {
        return $this->batches;
    }
}

ReadFileMessageHandler::invoke

    public function __invoke(ReadFileMessage $message) {
        $filePath = $message->getFilePath();
        $handle = fopen($filePath, "r");
        $context = $message->getModuleJobDispatchContext();

        $batch = [];
        if ($handle) {
            while (($line = fgets($handle)) !== false) {
                $batch[] = $line;
                if (count($batch) === 50) {
                    $this->messageBus->dispatch(new ImportBatchMessage($batch, $context));
                    $this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');
                    $batch = [];
                }
            }
            if (!empty($batch)) {
                // Increase total progress only, since we have no info about parallel processing of messages
                $this->updateProgress($context, 'input.csv-input.create-product-batch', '+0', '+1');
                $this->messageBus->dispatch(new ImportBatchMessage($batch, $context));
            }

            $this->updateProgress($context, 'input.csv-input.read-file', '+1');
            fclose($handle);
        } else {
            throw new RuntimeException(sprintf('Cannot read passed file %s', $filePath));
        }
    }

Implementation product creation

Finally, only products need to be created from the CSV data sets. Let's consider an example data set:

Kleine blaue Gartenschere,92729,Diese kleine blaue Gartenschere ermöglicht Ihnen genaue Schnitte,100

So the header definition of our CSV file looks like this:

Artikelname,Artikelnummer,Beschreibung,Preis

synQup comes with its own data model (transfer schema/transfer model), which provides mappings for 99% and extensions for the last percent of e-commerce use cases. Of course, products of great complexity can also be mapped. The data model is available for viewing in the packages synqup/common and synqup/commerce. In the following we use only a very small part of it.

The message handler for this (ImportBatchMessageHandler) is essentially self-explanatory:

<?php
namespace Synqup\Modules\CsvModuleBundle\Input\Jobs\ImportBatch;

use App\Connector\Flow\ProcessManagement\Progress\Traits\UpdatesProgress;
use Doctrine\ODM\MongoDB\DocumentManager;
use Elio\CommerceBundle\Document\Product\GeneralInformation;
use Elio\CommerceBundle\Document\Product\Price\SellingPrice;
use Elio\CommerceBundle\Document\Product\Product;
use Elio\CommerceBundle\Document\Product\ProductIdentifiers;
use Elio\CommonBundle\Definition\Locale;
use Elio\CommonBundle\Document\Translation\TranslationCollection;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ImportBatchMessageHandler implements MessageHandlerInterface
{
    use UpdatesProgress;

    private DocumentManager $documentManager;

    public function __construct(DocumentManager $documentManager) {

        $this->documentManager = $documentManager;
    }

    public function __invoke(ImportBatchMessage $message) {
        $context = $message->getModuleJobDispatchContext();
        foreach ($message->getBatches() as $currentRow) {
            $row = str_getcsv($currentRow);
            $product = new Product($row[1]);
            $product->setProductIdentifiers(new ProductIdentifiers($row[1]));
            $product->setActive(true);
            $product->setGeneralInformation(new GeneralInformation(
                TranslationCollection::create(
                    [Locale::de_DE, $row[0]],
                ),
                TranslationCollection::create(
                    [Locale::de_DE, $row[2]],
                )
            ));
            $product->setDefaultPrice(new SellingPrice($row[3]));
            $this->documentManager->persist($product);
        }
        $this->documentManager->flush();
        $this->updateProgress($context, 'input.csv-input.create-product-batch', '+1');
    }
}

The only thing to note is: the call to flush() on the DocumentManager should be made outside the loop in which individual elements are marked with persist() for saving in the DB. Furthermore, it shows up that some attributes are translatable and have to be filled with TranslationCollections.