JavaScript is required to for searching.
Skip Navigation Links
Exit Print View
STREAMS Programming Guide     Oracle Solaris 11.1 Information Library
search filter icon
search icon

Document Information

Preface

Part I Application Programming Interface

1.  Overview of STREAMS

2.  STREAMS Application-Level Components

3.  STREAMS Application-Level Mechanisms

4.  Application Access to the STREAMS Driver and Module Interfaces

5.  STREAMS Administration

6.  Pipes and Queues

Part II Kernel Interface

7.  STREAMS Framework - Kernel Level

8.  STREAMS Kernel-Level Mechanisms

9.  STREAMS Drivers

10.  STREAMS Modules

Module Overview

STREAMS Module Configuration

Module Procedures

Filter Module Example

Data Flow Control

Design Guidelines

htonl(3B) and ntohl(3B)

11.  Configuring STREAMS Drivers and Modules

12.  Multithreaded STREAMS

13.  STREAMS Multiplex Drivers

Part III Advanced Topics

14.  Debugging STREAMS-based Applications

Part IV Appendixes

A.  Message Types

B.  Kernel Utility Interface Summary

C.  STREAMS-Based Terminal Subsystem

D.  STREAMS FAQ

Glossary

Index

Data Flow Control

To support the STREAMS flow control mechanism, modules that use service procedures must invoke canputnext(9F) before calling putnext(9F), and use appropriate values for the high-water and low-water marks. If your module has a service procedure, you manage the flow control. If you don't have a service procedure, then there is no need to do anything.

The queue hiwat and lowat values limit the amount of data that can be placed on a queue. These limits prevent depletion of buffers in the buffer pool. Flow control is advisory in nature and can be bypassed. It is managed by high-water and low-water marks and regulated by the utility routines getq(9F), putq(9F), putbq(9F), insq(9F), rmvq(9F), and canputnext(9F).

The following scenario takes place normally in flow control:

A driver sends data to a module using putnext(9F), and the module's put procedure queues data using putq(9F). Calling putq(9F) enables the service procedure and executes it at some indeterminate time in the future. When the service procedure runs, it retrieves the data by calling getq(9F).

If the module cannot process data at the rate at which the driver is sending the data, the following happens:

When the message is queued, putq(9F) increments the value of q_count by the size of the message and compares the result to the module's high-water limit (q_hiwat) value for the queue. If the count reaches q_hiwat, putq(9F) sets the internal FULL indicator for the queue. This causes messages from upstream in the case of a write-side queue or downstream in the case of a read-side queue to be halted (canputnext(9F) returns FALSE) until the queue count drops below q_lowat. getq(9F) decrements the queue count. If the resulting count is below q_lowat, getq(9F) back-enables and causes the service procedure to be called for any blocked queue. (Flow control does not prevent reaching q_hiwat on a queue. Flow control can exceed its maximum value before canputnext(9F) detects QFULL and flow is stopped.)

The next example show a line discipline module's flow control. Example 10-6 shows a read-side line discipline module and a write-side line discipline module. Note that the read side is the same as the write side but without the M_IOCTL processing.

Example 10-6 Read-side Line Discipline Module

/* read side line discipline module flow control */
static mblk_t *read_canon(mblk_t *);

static int
ld_read_srv(
    queue_t *q)                            /* pointer to read queue */
{
    mblk_t *mp;                            /* original message */
    mblk_t *bp;                            /* canonicalized message */

    while ((mp = getq(q)) != NULL) {
            switch (mp->b_datap->db_type) { /* type of msg */
            case M_DATA:     /* data message */
                if (canputnext(q)) {
                        bp = read_canon(mp);
                        putnext(q, bp);
                } else {
                        putbq(q, mp); /* put messagebackinqueue */
                        return (0);
                }
                break;

            default:
                if (mp->b_datap->db_type >= QPCTL)
                        putnext(q, mp);         /* high-priority message */
                else { /* ordinary message */
                        if (canputnext(q))
                                 putnext(q, mp);
                        else {
                                 putbq(q, mp);
                                 return (0);
                        }
                }
                break;
            }
    }
return (0);
}

/* write side line discipline module flow control */
static int
ld_write_srv(
    queue_t *q)                                /* pointer to write queue */
{
    mblk_t *mp;                                /* original message */
    mblk_t *bp;                                /* canonicalized message */

    while ((mp = getq(q)) != NULL) {
            switch (mp->b_datap->db_type) { /* type of msg */
            case M_DATA:             /* data message */
                if (canputnext(q)) {
                        bp = write_canon(mp);
                        putnext(q, bp);
                } else {
                        putbq(q, mp);
                        return (0);
                }
                break;

            case M_IOCTL:
                ld_ioctl(q, mp);
                break;

            default:
                if (mp->b_datap->db_type >= QPCTL)
                        putnext(q, mp);        /* high priority message */
                else {                         /* ordinary message */
                        if (canputnext(q))        
                                putnext(q, mp);
                        else {
                                putbq(q, mp);
                                return (0);
                        }
                }
                break;
            }
    }
return (0);
}