Skip to main content

Queue processors and job schedulers

Pega Platform™ supports various options for background processing. You can use queue processors, job schedulers, standard and advanced agents, Service-Level Agreements (SLAs), wait shapes, and listeners to design background processing in your application.

Note: Use job scheduler and queue processor Rules instead of agents for better scalability, ease of use, and faster background processing.

Queue Processor

A queue processor Rule is an internal background process that you configure for queue management and asynchronous message processing. Apply standard queue processor Rules for straightforward queue management or low-throughput scenarios. For higher-scaling throughput and customized or delayed processing of messages, apply dedicated queue processor Rules.

Pega Platform provides numerous default standard queue processors. The system triggers pzstandardprocessor internally to queue the messages when you select standard in the Queue-For-Processing method or Run in Background Shape. This queue processor is an immediate type, and you cannot delay the processing. 

By using a queue processor Rule, you can set up specific operations that occur in the background. Pega Platform provides built-in features for handling errors, queuing, and dequeuing and can conditionally commit when using a queue processor. All queue processors are Rule-resolved against the context specified in the system runtime context.

When setting up the Queue-For-Processing method in an activity or the Run in Background shape in a Stage, you can specify an alternate Access Group. The activity that the queue processor runs can also change the Access Group. An example is the pzinitiatetestsuiterun activity that the pzinitiatetestsuiterun queue processor runs.

If you define a delayed queue processor, define the date and time when the system calls through the Queue-For-Processing method or using the Run in Background smart shape.

Queues are multi-threaded and shared across all nodes. Each queue processor can process messages across six partitions. Using multiple queue processors on separate nodes to process the items in a queue can also improve throughput.

For example, if you want to send a Notification, you can use the pyprocessnotification queue processor, as shown in the following figure. Whenever users queue a message to a queue processor based on the availability of the stream node, the system queues the message to a database or Kafka topic folder. The Data Flow work object that corresponds to the queue processor selects the message from a partition in the Kafka topic list and then pushes it to the queue processor activity to process the message, which sends a Notification. A job scheduler runs periodically to check whether the stream node is running. If the stream node runs, the system pushes all the delayed messages to the Kafka Topic.

Diagram of a process notification flow.

Performance has two dimensions: time to process a message and total message throughput. You can improve performance by using the following actions:

  • Optimize the activity to reduce the time to process a message. The time to process a message depends on the amount of work that the processing activity performs.
  • Enhance total message throughput:  
    • Scale out by increasing the number of processing nodes. This setting applies only to on-premises Users. For Pega Cloud® Services, you require a larger sandbox.  
    • Scale up by increasing the number of threads for each node.

When you run a queue processor, the system creates a topic that corresponds to that queue processor in the Kafka server. Based on the number of partitions mentioned in the server.properties file, the system creates the same number of folders in the tomcat\Kafka-data folder.

At least one stream node is necessary for the system to queue messages to the Kafka Server. If you do not define a stream node in a cluster, the system queues items to the database and then processes these items when a stream node is available. 

The queue processor Rule automatically generates a stream Data Set and a corresponding Data Flow. The stream Data Set sends to and receives items from the Stream Service. The Data Flow manages the subscription of items to ensure item processing. You can view Data Flows that correspond with the queue processors in the system on the Queue processors landing page in Admin Studio. 

There are three main database tables that actively participate in queue processor processing: 

  • pr_sys_delayed_queue: Holds queue processor items that are delayed for reasons such as and artificial delay declared by producer, or a retry delay according to strategy.
  • pr_sys_msg_qp_brokenitems: Holds queue processor items that the system  cannot process and considers broken.
  • pr_data_qp_run_partition: Holds queue processor partition metadata such as partition offset, timestamp of last processed message, or timestamp of last enqueued message.

There are also components that do not take an active part in item processing but are crucial for the system and administrators to preserve the safe and continuous work of queue processors. These components are pzQueueProcessorMaintenance, pzDelayedQueueProcessorSchedule, and DelayedItemsDataFlowService.

The pzQueueProcessorMaintenance job scheduler runs every two minutes to: 

  • Calculate the count of items in the Processed in Last Hour column that is displayed on the Queue processor landing page in Admin Studio.
  • Try to restart the failed queue processors.
  • Monitor and generate PEGA0134 and PEGA0137 alerts if required.

The pzDelayedQueueProcessorSchedule job scheduler and the DelayedItemsDataFlowService Data Flow are responsible for the processing of the delayed queue processor items and handling retries.

Default queue processors

Pega Platform provides three default queue processors:

  • pyProcessNotification
  • pzStandardProcessor 
  • pyFTSIncrementalIndexer

pyProcessNotification

The pyProcessNotification queue processor sends notifications to customers and runs the pxNotify activity to calculate the list of recipients, the message, or the Channel. The possible Channels include an Email Channel, a Gadget notification, or a Push notification.

pzStandardProcessor

You can use the pzStandardProcessor queue processor for standard asynchronous processing in the following scenarios:

  • Processing does not require high throughput, or processing resources can have slight delays.
  • Default and standard queue behaviors are acceptable.

This queue processor is useful for tasks such as submitting each status change to an external system. You can use pzStandardProcessor to run bulk processes in the background. When the queue processor resolves all the items from the queue, you receive a notification about the number of successful and failed attempts.

pyFTSIncrementalIndexer

The pyFTSIncrementalIndexer queue processor performs incremental indexing in the background. This Queue Processor posts Rule, Data, and Work objects into the search subsystem as soon as you create or change them, which helps keep search data current and closely reflects the content of the database.

Pega Platform contains search and reporting service features for use on dedicated backing service nodes. Backing nodes are supporting nodes and separate from the Pega Platform service nodes.

The system indexes data by using the following queue processors:

  • pySASBatchIndexClassesProcessor
  • pySASBatchIndexProcessor
  • pySASIncrementalIndexer

System Runtime Context

Pega Platform performs Rule resolution on all queue processors and job schedulers against the context specified in the System Runtime Context. Pega Platform adds applications to the SRC by default, which you can update as required. Queue processors operate with the SRC for Rule resolution during platform (node) startup and other background processing initialization. Throughout run time, queue processors consistently resolve using the context of the message, including the Access Group and other identifiers.

Understanding these two phases of rule resolution clarifies how queue processors operate and assists in troubleshooting. 

SRC includes queue processors for all applications with the Include in Background Processing checkbox enabled on the Application Definition landing page or those manually added to SRC on the System Runtime Context page. 

Pega Platform triggers reinitialization whenever the SRC changes because it might be necessary to initialize different queue processors than the ones currently running.

Rule resolution at run time for queue processors

During run time, the context for queue processors is that of the message producer. The queue processor undergoes instantiation during initialization. When the producer queues a message at run time, it includes the current operator, Access Group, application, and other context identifiers with the message payload. (A producer refers to a process that queues messages.) 
 
When the consumer prepares to process the message, it reads the payload and the additional context. The system uses this to initialize the requestor during message processing. The message is processed in the context of the message producer, not the context configured in SRC. The consumer processes the message by invoking an activity and passing the data from the message as a page into that activity.

Job scheduler

A job scheduler is a background server process that runs activities periodically. In a multi-node cluster, job scheduler Rules can run on multiple nodes. These Rules replace advanced agents for better performance, monitoring, and diagnostics. You can use job scheduler Rules to perform jobs that run at a particular time, such as system cleanup jobs. They can be associated with a node type and run on any node or all nodes. They can run multiple times a day, daily, weekly, monthly, or yearly.  

Use the job scheduler Rule when there is no requirement to queue for a recurring task. Unlike queue processors, the job scheduler must decide which records to process and establish each Step page context of each record before working on that record. For example, if you need to generate Statistics every midnight for Reporting Purposes, the output of a report definition can determine the list of items to process. The job scheduler then operates on each item in the list. You can configure a job scheduler to run on startup (only when All associated nodes is selected), on a periodic basis (such as daily, weekly, monthly, or yearly), or on a recurring basis (such as multiple times a day) 

The job schedulers are rule-resolved against the context in the System Runtime Context. If an activity requires a specific context, you can select Specify Access Group to provide the Access Group. For example, you can specify a Manager Access Group if the job scheduler requires a Manager role. 

If an activity requires the System Runtime Context (for example, use the same context for the job scheduler and Activity resolution), then select Use System Runtime Context on the Rule form of the job scheduler. A job scheduler can run on one or more nodes in a cluster or on a specific node in a cluster. Configure the number of Threads for the job scheduler Thread Pool by modifying the prconfig.xml file to run multiple job schedulers simultaneously. The default value is 5, and the number of Threads should equal the number of job schedulers running simultaneously. 
 
Unlike queue processors, job schedulers decide whether a record requires a lock and whether it needs to commit Records that use Obj-Save to update. If a job scheduler creates a Case or opens a Case with a lock and causes it to move to a new Assignment or complete its Life Cycle, it does not have to issue a commit.

Default job schedulers

Pega Platform provides three default Job Schedulers that can be useful in your application:

  • Node cleaner
  • Cluster and database cleaner
  • Persist code and cluster state

Node cleaner

The node cleaner cleans up expired locks and outdated module version reports.

By default, the node clearner job scheduler (pyNodeCleaner) runs the Code-pzNodeCleaner activity on all the nodes in the cluster.

Cluster and database cleaner

By default, the cluster and database job scheduler (pyClusterAndDBCleaner) runs the Code-.pzClusterAndDBCleaner activity on only one node in the cluster, once every 24 hours for housekeeping tasks. This job purges the following items:

  • Older records from log rables
  • Idle requestors for 48 hours
  • Passivation data for expired requestors (Clipboard cleanup)
  • Expired locks
  • Cluster State Data that is older than 90 days.

Persist node and cluster state

 pyPersistNodeState saves the node state on node startup. The system saves cluster state data daily by using the pyPersistClusterState job scheduler.

The pzClusterAndDBCleaner job scheduler purges cluster state data that is older than 90 days.

Check your knowledge with the following interaction: 


This Topic is available in the following Module:

If you are having problems with your training, please review the Pega Academy Support FAQs.

Did you find this content helpful?

Want to help us improve this content?

We'd prefer it if you saw us at our best.

Pega Academy has detected you are using a browser which may prevent you from experiencing the site as intended. To improve your experience, please update your browser.

Close Deprecation Notice