Queue processors and job schedulers
Pega Platform™ supports various options for background processing. You can use queue processors, job schedulers, standard and advanced agents, Service-Level Agreements (SLAs), wait shapes, and listeners to design background processing in your application.
Note: Use job scheduler and queue processor Rules instead of agents for better scalability, ease of use, and faster background processing.
Queue Processor
A queue processor Rule is an internal background process that you configure to queue and process items asynchronously, in real-time. Apply standard queue processor Rules for straightforward queue management or low-throughput scenarios. For higher-scaling throughput and customized or delayed processing of messages, apply dedicated queue processor Rules.
Pega Platform provides numerous default standard queue processors. The system triggers pzstandardprocessor internally to queue the messages when you select standard in the Queue-For-Processing method or Run in Background Shape. This queue processor is an immediate type, and you cannot delay the processing.
By using a queue processor Rule, you can set up specific operations that occur in the background. Pega Platform provides built-in features for handling errors, queuing, and dequeuing and can conditionally commit when using a queue processor. All queue processors are Rule-resolved against the context specified in the system runtime context.
When setting up the Queue-For-Processing method in an activity or the Run in Background shape in a Stage, you can specify an alternate Access Group. The activity that the queue processor runs can also change the Access Group. An example is the pzinitiatetestsuiterun activity that the pzinitiatetestsuiterun queue processor runs.
If you define a delayed queue processor, define the date and time when the system calls through the Queue-For-Processing method or using the Run in Background smart shape.
Queue processing requires a Kafka server to run. A queue processor rule requires at least one stream node to run. Without a stream node, messages cannot be queued to or retrieved from the Kafka server. A Pega node works as a stream node when you start it with a stream node type. A universal node type also works as a stream node type. If you do not define a stream node in a cluster, the items are queued to the database, and then these items are processed when a stream node is available.
Queues are multi-threaded and shared across all nodes. Each queue processor can process messages across six partitions (Kafka partitions). Using multiple queue processors on separate nodes to process the items in a queue can also improve throughput.
For example, if you want to send a Notification, you can use the @baseclass.pxNotify activity which uses the pyprocessnotification queue processor, as shown in the following figure. If the stream node is available the system queues the message to a Kafka topic. If the stream node is not available the system stores the message in the database. A job scheduler runs periodically to check whether the stream node is running. If the stream node is found to be running, the job scheduler queues all such delayed messages to the Kafka Topic. The Data Flow work object that corresponds to the queue processor selects the message from a partition in the Kafka topic and then pushes it to the queue processor activity to process the message, which sends a Notification
Performance has two dimensions: time to process a message and total message throughput. You can improve performance by using the following actions:
- Optimize the activity to reduce the time to process a message. The time to process a message depends on the amount of work that the processing activity performs.
- Enhance total message throughput:
- Scale out by increasing the number of processing nodes. This setting applies only to on-premises Users. For Pega Cloud® Services, you require a larger sandbox.
- Scale up by increasing the number of threads for each node.
When you run a queue processor, the system creates a topic that corresponds to that queue processor in the Kafka server. Based on the number of partitions mentioned in the server.properties file, the system creates the same number of folders in the tomcat\Kafka-data folder.
At least one stream node is necessary for the system to queue messages to the Kafka Server. If you do not define a stream node in a cluster, the system queues items to the database and then processes these items when a stream node is available.
The queue processor Rule automatically generates a stream Data Set and a corresponding Data Flow. The stream Data Set sends to and receives items from the Stream Service. The Data Flow manages the subscription of items to ensure item processing. You can view Data Flows that correspond with the queue processors in the system on the Queue processors landing page in Admin Studio.
There are three main database tables that actively participate in queue processor processing:
- pr_sys_delayed_queue: Holds queue processor items that are delayed for reasons such as and artificial delay declared by producer, or a retry delay according to strategy.
- pr_sys_msg_qp_brokenitems: Holds queue processor items that the system cannot process and considers broken.
- pr_data_qp_run_partition: Holds queue processor partition metadata such as partition offset, timestamp of last processed message, or timestamp of last enqueued message.
There are also components that do not take an active part in item processing but are crucial for the system and administrators to preserve the safe and continuous work of queue processors. These components are pzQueueProcessorMaintenance, pzDelayedQueueProcessorSchedule, and DelayedItemsDataFlowService.
The pzQueueProcessorMaintenance job scheduler runs every two minutes to:
- Calculate the count of items in the Processed in Last Hour column that is displayed on the Queue processor landing page in Admin Studio.
- Try to restart the failed queue processors.
- Monitor and generate PEGA0134 and PEGA0137 alerts if required.
The pzDelayedQueueProcessorSchedule job scheduler and the DelayedItemsDataFlowService Data Flow are responsible for the processing of the delayed queue processor items and handling retries.
Default queue processors
Pega Platform provides three default queue processors:
- pyProcessNotification
- pzStandardProcessor
- pyFTSIncrementalIndexer
pyProcessNotification
The pyProcessNotification queue processor sends notifications to customers and runs the pxNotify activity to calculate the list of recipients, the message, or the Channel. The possible Channels include an Email Channel, a Gadget notification, or a Push notification.
pzStandardProcessor
You can use the pzStandardProcessor queue processor for standard asynchronous processing in the following scenarios:
- Processing does not require high throughput, or processing resources can have slight delays.
- Default and standard queue behaviors are acceptable.
This queue processor is useful for tasks such as submitting each status change to an external system. You can use pzStandardProcessor to run bulk processes in the background. When the queue processor resolves all the items from the queue, you receive a notification about the number of successful and failed attempts.
pyFTSIncrementalIndexer
The pyFTSIncrementalIndexer queue processor performs incremental indexing in the background. This Queue Processor posts Rule, Data, and Work objects into the search subsystem as soon as you create or change them, which helps keep search data current and closely reflects the content of the database.
Pega Platform contains search and reporting service features for use on dedicated backing service nodes. Backing nodes are supporting nodes and separate from the Pega Platform service nodes.
The system indexes data by using the following queue processors:
- pySASBatchIndexClassesProcessor
- pySASBatchIndexProcessor
- pySASIncrementalIndexer
System Runtime Context
Pega Platform performs Rule resolution on all queue processors and job schedulers against the context specified in the System Runtime Context. Pega Platform adds applications to the SRC by default, which you can update as required. Queue processors operate with the SRC for Rule resolution during platform (node) startup and other background processing initialization. SRC includes queue processors for all applications with the “Include in background processing” flag enabled on the Application Definition page or manually added to SRC on the System Runtime Context page.
-
Initialization Phase:
- During initialization, Pega Platform discovers the correct queue processors to initialize using SRC.
- It converts the list of applications from SRC into a tree of application rulesets and creates a flattened list of rulesets.
- Pega Platform iterates through this list by ruleset, opening and initiating each queue processor.
- If a queue processor with a given name is already instantiated, it skips the initialization (allowing for overrides).
-
Runtime Behavior:
- At runtime, the context for queue processors is always that of the message producer. The producer publishes messages (events) to a queue processor rule by using the Queue-for-Processing method in an activity or by using the Run in Background SmartShape from a flow rule in your application.
- The queue processor has already been instantiated during initialization.
- When the producer queues a message, it passes the current operator, access group, application, and other context identifiers with the message payload.
- The consumer processes the message using the context of the message producer, not the context configured in SRC. For queue processors, the consumer is the data flow work object, which uses the Stream data set as the source. The object subscribes to the Kafka topic and forwards the messages to the queue processor activity, which is the destination.
Understanding these two phases of rule resolution clarifies how queue processors operate and assists in troubleshooting.
Pega Platform triggers reinitialization whenever the SRC changes because it might be necessary to initialize different queue processors than the ones currently running.
Rule resolution at run time for queue processors
During run time, the context for queue processors is that of the message producer. The queue processor undergoes instantiation during initialization. When the producer queues a message at run time, it includes the current operator, Access Group, application, and other context identifiers with the message payload. (A producer refers to a process that queues messages.)
When the consumer prepares to process the message, it reads the payload and the additional context. The system uses this to initialize the requestor during message processing. The message is processed in the context of the message producer, not the context configured in SRC. The consumer processes the message by invoking an activity and passing the data from the message as a page into that activity.
Job scheduler
A job scheduler is a background server process that runs activities periodically. In a multi-node cluster, job scheduler Rules can run on multiple nodes. These Rules replace advanced agents for better performance, monitoring, and diagnostics. You can use job scheduler Rules to perform jobs that run at a particular time, such as system cleanup jobs. They can be associated with a node type and run on any node or all nodes. They can run multiple times a day, daily, weekly, monthly, or yearly.
Use the job scheduler Rule when there is no requirement to queue for a recurring task. Unlike queue processors, the job scheduler must decide which records to process and establish each Step page context of each record before working on that record. For example, if you need to generate Statistics every midnight for Reporting Purposes, the output of a report definition can determine the list of items to process. The job scheduler then operates on each item in the list. You can configure a job scheduler to run on startup (only when All associated nodes is selected), on a periodic basis (such as daily, weekly, monthly, or yearly), or on a recurring basis (such as multiple times a day)
The job schedulers are rule-resolved against the context in the System Runtime Context. If an activity requires a specific context, you can select Specify Access Group to provide the Access Group. For example, you can specify a Manager Access Group if the job scheduler requires a Manager role.
If an activity requires the System Runtime Context (for example, use the same context for the job scheduler and Activity resolution), then select Use System Runtime Context on the Rule form of the job scheduler. A job scheduler can run on one or more nodes in a cluster or on a specific node in a cluster. Configure the number of Threads for the job scheduler Thread Pool by modifying the prconfig.xml file to run multiple job schedulers simultaneously. The default value is 5, and the number of Threads should equal the number of job schedulers running simultaneously.
Unlike queue processors, job schedulers decide whether a record requires a lock and whether it needs to commit Records that use Obj-Save to update. If a job scheduler creates a Case or opens a Case with a lock and causes it to move to a new Assignment or complete its Life Cycle, it does not have to issue a commit.
Default job schedulers
Pega Platform provides three default Job Schedulers that can be useful in your application:
- Node cleaner
- Cluster and database cleaner
- Persist code and cluster state
Node cleaner
The node cleaner cleans up expired locks and outdated module version reports.
By default, the node clearner job scheduler (pyNodeCleaner) runs the Code-pzNodeCleaner activity on all the nodes in the cluster.
Cluster and database cleaner
By default, the cluster and database job scheduler (pyClusterAndDBCleaner) runs the Code-.pzClusterAndDBCleaner activity on only one node in the cluster, once every 24 hours for housekeeping tasks. This job purges the following items:
- Older records (instances) from log tables
- Any requestors that have had no activity during at least 48 hours
- Passivation data for expired requestors
- Rows from the
pc_events
database table according to the EventsRetentionPeriod dynamic system setting (the default retention period is 90 days) - Any nodes that have not responded to the pulse for 30 days
- Old usage data if usage tracking is enabled
- Cluster state data that is older than 90 days
Persist node and cluster state
pyPersistNodeState saves the node state on node startup. The system saves cluster state data daily by using the pyPersistClusterState job scheduler.
Check your knowledge with the following interaction:
This Topic is available in the following Module:
If you are having problems with your training, please review the Pega Academy Support FAQs.
Want to help us improve this content?