Skip to main content
ALTWORX Dokumentace
Přepnout tmavý/světlý/automatický režim Přepnout tmavý/světlý/automatický režim Přepnout tmavý/světlý/automatický režim Zpět na domovskou stránku

Správa běžících normalizérů

  • Zastavení a spuštění nové normalizér pipeline
  • Smazání pipeline.

Toto dnes funguje dost jinak, popsat operace v Admin UI a případně i vhodné položky Admin UI.

How to stop, start and delete a pipeline

Stop pipeline

Normalizer Pipelines are managed by ALTWORX. You don’t need to start them manually, they are started automatically based on the configuration.

The simplest way to stop a pipeline is to disable it. First, locate the configuration of the pipeline, then add attribute disabled: true. This will make the pipeline disabled.

def get_pipeline(...) do
    %{
        disabled: true,
        # rest of the pipeline configuration
    }
end

Remember that after changing a configuration you need to apply it, see changing configuration for details.

Stopped pipeline no longer consumes any data and doesn’t update the resulting Runtime topic. Scenarios consuming the topic will be stuck, waiting for new data, until the pipeline is restarted.

Start pipeline

When starting a pipeline one must first find out why it’s stopped. Pipeline status will be the indication in this case, it can be found in the Admin UI or in the developer console via API Normalizer.inspect_pipelines(). Stopped pipelines can be in the following states

  • :ignored - pipeline failed to start because the input topic doesn’t exist. Starting such pipeline is not possible until the topic is created.
  • :disabled - pipeline was disabled via configuration. Such pipeline can be started by changing :disabled to false or removing it altogether from the configuration.

Remember that you need to apply configuration if you change it, see changing configuration for details.

When a pipeline is started it will start consuming data from where it finished last time.

Delete pipeline

If you no longer wish to have a pipeline present in the system you can also delete it. To delete it just remove it from the configuration. Note that deleting a pipeline isn’t too different from stopping it. Deleting a pipeline doesn’t have any effect on the data, if you wish to delete the data too must do so separately. Also, if you don’t delete the data, you can add the pipeline back to the configuration at any time and it will start where it left off last time it was running.

How to manage topics in Kafka

As was mentioned above, Pipelines are not tied to the data they process or generate. Both Raw and Runtime topics live independently of Acceptors or Normalizer Pipelines. Data in Kafka Topics must be managed separately. Typically you’d not have to do this often, usually it only concerns creating new and deleting old unused topics.

Create a topic

Creating a topic is usually not needed. Acceptors usually create their own topics, Normalizer Pipelines do that exclusively. I.e. you can create a topic for an Acceptor, if it is required, but you shouldn’t create topics for Normalizer Pipelines.

Topic can be created either via ALTWORX or via independent tools directly via Kafka. In ALTWORX you can use the following API in a developer console.

iex(1)> Normalizer.create_topics(["my_new_topic", "my other new topic"], %{"message.timestamp.type" => "LogAppendTime"})
:ok

Via Kafka a topic can be created even when ALTWORX is not running. First open an interactive shell inside the Kafka container.

docker exec -it aw_kafka_1 bash

then you can use script kafka-topics.sh to create new topic.

kafka-topics.sh --zookeeper zookeeper:2181 --topic my_new_topic --create \
                --config "message.timestamp.type=LogAppendTime" \
                --partitions 1 --replication-factor 1

Note that partitions and replication factor must always be set to 1. ALTWORX configures this automatically but it is required when creating the topic directly via Kafka. Other configuration (message.timestamp.type in the examples above) is up to you. Follow the instructions and documentation, as sometimes specific parameters are required.

Delete a topic

Deleting a topic is an operation which is irreversible. Runtime topics can usually be recomputed, but RAW topics might be very hard or impossible to reconstruct. Make sure you are really OK with losing the data forever! If you are not sure, make a dump and back it up first. Topic can be deleted either via ALTWORX developer console or directly via Kafka. In ALTWORX run the following API

iex(1)> Normalizer.delete_topics(["first_topic_to_delete", "second_topic_to_delete"])
:ok

Note there is no confirmation, the topic is deleted immediately.

In Kafka you can use the script kafka-topics.sh.

kafka-topics.sh --zookeeper zookeeper:2181 --topic topic_to_delete --delete

Note this also deletes the topic immediately without a confirmation.

How to recompute a topic

Recomputing a topic is a process of restarting the normalization from the beginning.

Recomputation can be useful if you need to change the normalization but don’t want to bump Runtime Topic Version. You can just drop the old data and run the new Normalizer Pipeline from the beginning. Recomputation can also be used when you need to change Raw data after they have been normalized. Changing a message that already has been normalized doesn’t have any effect on the Runtime topic, therefore, you need to recompute the Runtime topic in order to propagate the changes there.

The process consists of the following.

  1. Stop Normalizer Pipeline. The pipeline must not be running during the process, be sure to disable it first. You can use the quicker temporary workflow - change the configuration directly on the host and running Normalizer.reload_config(), as described in changing configuration section.

  2. Delete the Runtime topics. Normalizer creates multiple topics for each pipeline, you should delete them all to ensure all old data is gone. They all share the same prefix, e.g. N5_rt_Topic_1, N5_rt_Topic_1_error, N5_rt_Topic_1_jitter.

    iex(1)> Normalizer.delete_topics(["N5_rt_Topic_1", "N5_rt_Topic_1_error", "N5_rt_Topic_1_jitter"])
    :ok
    
  3. Re-enable the Normalizer Pipeline. The pipeline will start from the beginning, because no Runtime topics exist. It may take some time until the recomputation finishes.