summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/common_base1
-rw-r--r--config/rte_config.h1
-rw-r--r--doc/guides/prog_guide/event_ethernet_rx_adapter.rst24
-rw-r--r--lib/librte_eventdev/Makefile9
-rw-r--r--lib/librte_eventdev/meson.build9
-rw-r--r--lib/librte_eventdev/rte_event_eth_rx_adapter.c940
-rw-r--r--lib/librte_eventdev/rte_event_eth_rx_adapter.h5
7 files changed, 958 insertions, 31 deletions
diff --git a/config/common_base b/config/common_base
index e4241db..c305a77 100644
--- a/config/common_base
+++ b/config/common_base
@@ -575,6 +575,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n
CONFIG_RTE_EVENT_MAX_DEVS=16
CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64
CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32
+CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024
CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32
#
diff --git a/config/rte_config.h b/config/rte_config.h
index 8a3432a..28f04b4 100644
--- a/config/rte_config.h
+++ b/config/rte_config.h
@@ -64,6 +64,7 @@
#define RTE_EVENT_MAX_DEVS 16
#define RTE_EVENT_MAX_QUEUES_PER_DEV 64
#define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32
+#define RTE_EVENT_ETH_INTR_RING_SIZE 1024
#define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32
/* rawdev defines */
diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
index 319e4f0..810dfc9 100644
--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
@@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks
if the callback is supported, and the counts maintained by the service function,
if one exists. The service function also maintains a count of cycles for which
it was not able to enqueue to the event device.
+
+Interrupt Based Rx Queues
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The service core function is typically set up to poll ethernet Rx queues for
+packets. Certain queues may have low packet rates and it would be more
+efficient to enable the Rx queue interrupt and read packets after receiving
+the interrupt.
+
+The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf
+is applicable when the adapter uses a service core function. The application
+has to enable Rx queue interrupts when configuring the ethernet device
+using the ``rte_eth_dev_configure()`` function and then use a servicing_weight
+of zero when addding the Rx queue to the adapter.
+
+The adapter creates a thread blocked on the interrupt, on an interrupt this
+thread enqueues the port id and the queue id to a ring buffer. The adapter
+service function dequeues the port id and queue id from the ring buffer,
+invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and
+converts the received packets to events in the same manner as packets
+received on a polled Rx queue. The interrupt thread is affinitized to the same
+CPUs as the lcores of the Rx adapter service function, if the Rx adapter
+service function has not been mapped to any lcores, the interrupt thread
+is mapped to the master lcore.
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index b3e2546..47f599a 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -8,14 +8,19 @@ include $(RTE_SDK)/mk/rte.vars.mk
LIB = librte_eventdev.a
# library version
-LIBABIVER := 4
+LIBABIVER := 5
# build flags
CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += -O3
CFLAGS += $(WERROR_FLAGS)
+ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
+CFLAGS += -DLINUX
+else
+CFLAGS += -DBSD
+endif
LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer
-LDLIBS += -lrte_mbuf -lrte_cryptodev
+LDLIBS += -lrte_mbuf -lrte_cryptodev -lpthread
# library source files
SRCS-y += rte_eventdev.c
diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
index bd138bd..3cbaf29 100644
--- a/lib/librte_eventdev/meson.build
+++ b/lib/librte_eventdev/meson.build
@@ -1,8 +1,15 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation
-version = 4
+version = 5
allow_experimental_apis = true
+
+if host_machine.system() == 'linux'
+ cflags += '-DLINUX'
+else
+ cflags += '-DBSD'
+endif
+
sources = files('rte_eventdev.c',
'rte_event_ring.c',
'rte_event_eth_rx_adapter.c',
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 8fe037f..42dd7f8 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -2,6 +2,11 @@
* Copyright(c) 2017 Intel Corporation.
* All rights reserved.
*/
+#if defined(LINUX)
+#include <sys/epoll.h>
+#endif
+#include <unistd.h>
+
#include <rte_cycles.h>
#include <rte_common.h>
#include <rte_dev.h>
@@ -11,6 +16,7 @@
#include <rte_malloc.h>
#include <rte_service_component.h>
#include <rte_thash.h>
+#include <rte_interrupts.h>
#include "rte_eventdev.h"
#include "rte_eventdev_pmd.h"
@@ -24,6 +30,22 @@
#define ETH_RX_ADAPTER_MEM_NAME_LEN 32
#define RSS_KEY_SIZE 40
+/* value written to intr thread pipe to signal thread exit */
+#define ETH_BRIDGE_INTR_THREAD_EXIT 1
+/* Sentinel value to detect initialized file handle */
+#define INIT_FD -1
+
+/*
+ * Used to store port and queue ID of interrupting Rx queue
+ */
+union queue_data {
+ RTE_STD_C11
+ void *ptr;
+ struct {
+ uint16_t port;
+ uint16_t queue;
+ };
+};
/*
* There is an instance of this struct per polled Rx queue added to the
@@ -75,6 +97,30 @@ struct rte_event_eth_rx_adapter {
uint16_t enq_block_count;
/* Block start ts */
uint64_t rx_enq_block_start_ts;
+ /* epoll fd used to wait for Rx interrupts */
+ int epd;
+ /* Num of interrupt driven interrupt queues */
+ uint32_t num_rx_intr;
+ /* Used to send <dev id, queue id> of interrupting Rx queues from
+ * the interrupt thread to the Rx thread
+ */
+ struct rte_ring *intr_ring;
+ /* Rx Queue data (dev id, queue id) for the last non-empty
+ * queue polled
+ */
+ union queue_data qd;
+ /* queue_data is valid */
+ int qd_valid;
+ /* Interrupt ring lock, synchronizes Rx thread
+ * and interrupt thread
+ */
+ rte_spinlock_t intr_ring_lock;
+ /* event array passed to rte_poll_wait */
+ struct rte_epoll_event *epoll_events;
+ /* Count of interrupt vectors in use */
+ uint32_t num_intr_vec;
+ /* Thread blocked on Rx interrupts */
+ pthread_t rx_intr_thread;
/* Configuration callback for rte_service configuration */
rte_event_eth_rx_adapter_conf_cb conf_cb;
/* Configuration callback argument */
@@ -93,6 +139,8 @@ struct rte_event_eth_rx_adapter {
uint32_t service_id;
/* Adapter started flag */
uint8_t rxa_started;
+ /* Adapter ID */
+ uint8_t id;
} __rte_cache_aligned;
/* Per eth device */
@@ -111,19 +159,40 @@ struct eth_device_info {
uint8_t dev_rx_started;
/* Number of queues added for this device */
uint16_t nb_dev_queues;
- /* If nb_rx_poll > 0, the start callback will
+ /* Number of poll based queues
+ * If nb_rx_poll > 0, the start callback will
* be invoked if not already invoked
*/
uint16_t nb_rx_poll;
+ /* Number of interrupt based queues
+ * If nb_rx_intr > 0, the start callback will
+ * be invoked if not already invoked.
+ */
+ uint16_t nb_rx_intr;
+ /* Number of queues that use the shared interrupt */
+ uint16_t nb_shared_intr;
/* sum(wrr(q)) for all queues within the device
* useful when deleting all device queues
*/
uint32_t wrr_len;
+ /* Intr based queue index to start polling from, this is used
+ * if the number of shared interrupts is non-zero
+ */
+ uint16_t next_q_idx;
+ /* Intr based queue indices */
+ uint16_t *intr_queue;
+ /* device generates per Rx queue interrupt for queue index
+ * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
+ */
+ int multi_intr_cap;
+ /* shared interrupt enabled */
+ int shared_intr_enabled;
};
/* Per Rx queue */
struct eth_rx_queue_info {
int queue_enabled; /* True if added */
+ int intr_enabled;
uint16_t wt; /* Polling weight */
uint8_t event_queue_id; /* Event queue to enqueue packets to */
uint8_t sched_type; /* Sched type for events */
@@ -150,7 +219,7 @@ rxa_validate_id(uint8_t id)
static inline int
rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
{
- return rx_adapter->num_rx_polled;
+ return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
}
/* Greatest common divisor */
@@ -195,6 +264,32 @@ rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
}
static inline int
+rxa_shared_intr(struct eth_device_info *dev_info,
+ int rx_queue_id)
+{
+ int multi_intr_cap;
+
+ if (dev_info->dev->intr_handle == NULL)
+ return 0;
+
+ multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
+ return !multi_intr_cap ||
+ rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
+}
+
+static inline int
+rxa_intr_queue(struct eth_device_info *dev_info,
+ int rx_queue_id)
+{
+ struct eth_rx_queue_info *queue_info;
+
+ queue_info = &dev_info->rx_queue[rx_queue_id];
+ return dev_info->rx_queue &&
+ !dev_info->internal_event_port &&
+ queue_info->queue_enabled && queue_info->wt == 0;
+}
+
+static inline int
rxa_polled_queue(struct eth_device_info *dev_info,
int rx_queue_id)
{
@@ -206,6 +301,95 @@ rxa_polled_queue(struct eth_device_info *dev_info,
queue_info->queue_enabled && queue_info->wt != 0;
}
+/* Calculate change in number of vectors after Rx queue ID is add/deleted */
+static int
+rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
+{
+ uint16_t i;
+ int n, s;
+ uint16_t nbq;
+
+ nbq = dev_info->dev->data->nb_rx_queues;
+ n = 0; /* non shared count */
+ s = 0; /* shared count */
+
+ if (rx_queue_id == -1) {
+ for (i = 0; i < nbq; i++) {
+ if (!rxa_shared_intr(dev_info, i))
+ n += add ? !rxa_intr_queue(dev_info, i) :
+ rxa_intr_queue(dev_info, i);
+ else
+ s += add ? !rxa_intr_queue(dev_info, i) :
+ rxa_intr_queue(dev_info, i);
+ }
+
+ if (s > 0) {
+ if ((add && dev_info->nb_shared_intr == 0) ||
+ (!add && dev_info->nb_shared_intr))
+ n += 1;
+ }
+ } else {
+ if (!rxa_shared_intr(dev_info, rx_queue_id))
+ n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
+ rxa_intr_queue(dev_info, rx_queue_id);
+ else
+ n = add ? !dev_info->nb_shared_intr :
+ dev_info->nb_shared_intr == 1;
+ }
+
+ return add ? n : -n;
+}
+
+/* Calculate nb_rx_intr after deleting interrupt mode rx queues
+ */
+static void
+rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ int rx_queue_id,
+ uint32_t *nb_rx_intr)
+{
+ uint32_t intr_diff;
+
+ if (rx_queue_id == -1)
+ intr_diff = dev_info->nb_rx_intr;
+ else
+ intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
+
+ *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
+}
+
+/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
+ * interrupt queues could currently be poll mode Rx queues
+ */
+static void
+rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ int rx_queue_id,
+ uint32_t *nb_rx_poll,
+ uint32_t *nb_rx_intr,
+ uint32_t *nb_wrr)
+{
+ uint32_t intr_diff;
+ uint32_t poll_diff;
+ uint32_t wrr_len_diff;
+
+ if (rx_queue_id == -1) {
+ intr_diff = dev_info->dev->data->nb_rx_queues -
+ dev_info->nb_rx_intr;
+ poll_diff = dev_info->nb_rx_poll;
+ wrr_len_diff = dev_info->wrr_len;
+ } else {
+ intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
+ poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+ wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+ 0;
+ }
+
+ *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
+ *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+ *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
/* Calculate size of the eth_rx_poll and wrr_sched arrays
* after deleting poll mode rx queues
*/
@@ -240,17 +424,21 @@ rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
int rx_queue_id,
uint16_t wt,
uint32_t *nb_rx_poll,
+ uint32_t *nb_rx_intr,
uint32_t *nb_wrr)
{
+ uint32_t intr_diff;
uint32_t poll_diff;
uint32_t wrr_len_diff;
if (rx_queue_id == -1) {
+ intr_diff = dev_info->nb_rx_intr;
poll_diff = dev_info->dev->data->nb_rx_queues -
dev_info->nb_rx_poll;
wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
- dev_info->wrr_len;
} else {
+ intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
wt - dev_info->rx_queue[rx_queue_id].wt :
@@ -258,6 +446,7 @@ rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
}
*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+ *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
}
@@ -268,10 +457,15 @@ rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
int rx_queue_id,
uint16_t wt,
uint32_t *nb_rx_poll,
+ uint32_t *nb_rx_intr,
uint32_t *nb_wrr)
{
- rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
- wt, nb_rx_poll, nb_wrr);
+ if (wt != 0)
+ rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+ wt, nb_rx_poll, nb_rx_intr, nb_wrr);
+ else
+ rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
+ nb_rx_poll, nb_rx_intr, nb_wrr);
}
/* Calculate nb_rx_* after deleting rx_queue_id */
@@ -280,10 +474,13 @@ rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
struct eth_device_info *dev_info,
int rx_queue_id,
uint32_t *nb_rx_poll,
+ uint32_t *nb_rx_intr,
uint32_t *nb_wrr)
{
rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
nb_wrr);
+ rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
+ nb_rx_intr);
}
/*
@@ -622,7 +819,8 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t port_id,
uint16_t queue_id,
uint32_t rx_count,
- uint32_t max_rx)
+ uint32_t max_rx,
+ int *rxq_empty)
{
struct rte_mbuf *mbufs[BATCH_SIZE];
struct rte_eth_event_enqueue_buffer *buf =
@@ -632,6 +830,8 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t n;
uint32_t nb_rx = 0;
+ if (rxq_empty)
+ *rxq_empty = 0;
/* Don't do a batch dequeue from the rx queue if there isn't
* enough space in the enqueue buffer.
*/
@@ -641,8 +841,11 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
stats->rx_poll_count++;
n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
- if (unlikely(!n))
+ if (unlikely(!n)) {
+ if (rxq_empty)
+ *rxq_empty = 1;
break;
+ }
rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
nb_rx += n;
if (rx_count + nb_rx > max_rx)
@@ -655,6 +858,228 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
return nb_rx;
}
+static inline void
+rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
+ void *data)
+{
+ uint16_t port_id;
+ uint16_t queue;
+ int err;
+ union queue_data qd;
+ struct eth_device_info *dev_info;
+ struct eth_rx_queue_info *queue_info;
+ int *intr_enabled;
+
+ qd.ptr = data;
+ port_id = qd.port;
+ queue = qd.queue;
+
+ dev_info = &rx_adapter->eth_devices[port_id];
+ queue_info = &dev_info->rx_queue[queue];
+ rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+ if (rxa_shared_intr(dev_info, queue))
+ intr_enabled = &dev_info->shared_intr_enabled;
+ else
+ intr_enabled = &queue_info->intr_enabled;
+
+ if (*intr_enabled) {
+ *intr_enabled = 0;
+ err = rte_ring_enqueue(rx_adapter->intr_ring, data);
+ /* Entry should always be available.
+ * The ring size equals the maximum number of interrupt
+ * vectors supported (an interrupt vector is shared in
+ * case of shared interrupts)
+ */
+ if (err)
+ RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
+ " to ring: %s", strerror(err));
+ else
+ rte_eth_dev_rx_intr_disable(port_id, queue);
+ }
+ rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+static int
+rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
+ uint32_t num_intr_vec)
+{
+ if (rx_adapter->num_intr_vec + num_intr_vec >
+ RTE_EVENT_ETH_INTR_RING_SIZE) {
+ RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
+ " %d needed %d limit %d", rx_adapter->num_intr_vec,
+ num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
+ return -ENOSPC;
+ }
+
+ return 0;
+}
+
+/* Delete entries for (dev, queue) from the interrupt ring */
+static void
+rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ uint16_t rx_queue_id)
+{
+ int i, n;
+ union queue_data qd;
+
+ rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+
+ n = rte_ring_count(rx_adapter->intr_ring);
+ for (i = 0; i < n; i++) {
+ rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+ if (!rxa_shared_intr(dev_info, rx_queue_id)) {
+ if (qd.port == dev_info->dev->data->port_id &&
+ qd.queue == rx_queue_id)
+ continue;
+ } else {
+ if (qd.port == dev_info->dev->data->port_id)
+ continue;
+ }
+ rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
+ }
+
+ rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+/* pthread callback handling interrupt mode receive queues
+ * After receiving an Rx interrupt, it enqueues the port id and queue id of the
+ * interrupting queue to the adapter's ring buffer for interrupt events.
+ * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
+ * the adapter service function.
+ */
+static void *
+rxa_intr_thread(void *arg)
+{
+ struct rte_event_eth_rx_adapter *rx_adapter = arg;
+ struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
+ int n, i;
+
+ while (1) {
+ n = rte_epoll_wait(rx_adapter->epd, epoll_events,
+ RTE_EVENT_ETH_INTR_RING_SIZE, -1);
+ if (unlikely(n < 0))
+ RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
+ n);
+ for (i = 0; i < n; i++) {
+ rxa_intr_ring_enqueue(rx_adapter,
+ epoll_events[i].epdata.data);
+ }
+ }
+
+ return NULL;
+}
+
+/* Dequeue <port, q> from interrupt ring and enqueue received
+ * mbufs to eventdev
+ */
+static inline uint32_t
+rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ uint32_t n;
+ uint32_t nb_rx = 0;
+ int rxq_empty;
+ struct rte_eth_event_enqueue_buffer *buf;
+ rte_spinlock_t *ring_lock;
+ uint8_t max_done = 0;
+
+ if (rx_adapter->num_rx_intr == 0)
+ return 0;
+
+ if (rte_ring_count(rx_adapter->intr_ring) == 0
+ && !rx_adapter->qd_valid)
+ return 0;
+
+ buf = &rx_adapter->event_enqueue_buffer;
+ ring_lock = &rx_adapter->intr_ring_lock;
+
+ if (buf->count >= BATCH_SIZE)
+ rxa_flush_event_buffer(rx_adapter);
+
+ while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+ struct eth_device_info *dev_info;
+ uint16_t port;
+ uint16_t queue;
+ union queue_data qd = rx_adapter->qd;
+ int err;
+
+ if (!rx_adapter->qd_valid) {
+ struct eth_rx_queue_info *queue_info;
+
+ rte_spinlock_lock(ring_lock);
+ err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+ if (err) {
+ rte_spinlock_unlock(ring_lock);
+ break;
+ }
+
+ port = qd.port;
+ queue = qd.queue;
+ rx_adapter->qd = qd;
+ rx_adapter->qd_valid = 1;
+ dev_info = &rx_adapter->eth_devices[port];
+ if (rxa_shared_intr(dev_info, queue))
+ dev_info->shared_intr_enabled = 1;
+ else {
+ queue_info = &dev_info->rx_queue[queue];
+ queue_info->intr_enabled = 1;
+ }
+ rte_eth_dev_rx_intr_enable(port, queue);
+ rte_spinlock_unlock(ring_lock);
+ } else {
+ port = qd.port;
+ queue = qd.queue;
+
+ dev_info = &rx_adapter->eth_devices[port];
+ }
+
+ if (rxa_shared_intr(dev_info, queue)) {
+ uint16_t i;
+ uint16_t nb_queues;
+
+ nb_queues = dev_info->dev->data->nb_rx_queues;
+ n = 0;
+ for (i = dev_info->next_q_idx; i < nb_queues; i++) {
+ uint8_t enq_buffer_full;
+
+ if (!rxa_intr_queue(dev_info, i))
+ continue;
+ n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
+ rx_adapter->max_nb_rx,
+ &rxq_empty);
+ nb_rx += n;
+
+ enq_buffer_full = !rxq_empty && n == 0;
+ max_done = nb_rx > rx_adapter->max_nb_rx;
+
+ if (enq_buffer_full || max_done) {
+ dev_info->next_q_idx = i;
+ goto done;
+ }
+ }
+
+ rx_adapter->qd_valid = 0;
+
+ /* Reinitialize for next interrupt */
+ dev_info->next_q_idx = dev_info->multi_intr_cap ?
+ RTE_MAX_RXTX_INTR_VEC_ID - 1 :
+ 0;
+ } else {
+ n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
+ rx_adapter->max_nb_rx,
+ &rxq_empty);
+ rx_adapter->qd_valid = !rxq_empty;
+ nb_rx += n;
+ if (nb_rx > rx_adapter->max_nb_rx)
+ break;
+ }
+ }
+
+done:
+ rx_adapter->stats.rx_intr_packets += nb_rx;
+ return nb_rx;
+}
+
/*
* Polls receive queues added to the event adapter and enqueues received
* packets to the event device.
@@ -668,7 +1093,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
* the hypervisor's switching layer where adjustments can be made to deal with
* it.
*/
-static inline void
+static inline uint32_t
rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
{
uint32_t num_queue;
@@ -676,7 +1101,6 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
struct rte_eth_event_enqueue_buffer *buf;
uint32_t wrr_pos;
uint32_t max_nb_rx;
- struct rte_event_eth_rx_adapter_stats *stats;
wrr_pos = rx_adapter->wrr_pos;
max_nb_rx = rx_adapter->max_nb_rx;
@@ -696,10 +1120,11 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
rxa_flush_event_buffer(rx_adapter);
if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
rx_adapter->wrr_pos = wrr_pos;
- break;
+ return nb_rx;
}
- nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+ nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
+ NULL);
if (nb_rx > max_nb_rx) {
rx_adapter->wrr_pos =
(wrr_pos + 1) % rx_adapter->wrr_len;
@@ -709,14 +1134,14 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
if (++wrr_pos == rx_adapter->wrr_len)
wrr_pos = 0;
}
-
- stats->rx_packets += nb_rx;
+ return nb_rx;
}
static int
rxa_service_func(void *args)
{
struct rte_event_eth_rx_adapter *rx_adapter = args;
+ struct rte_event_eth_rx_adapter_stats *stats;
if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
return 0;
@@ -724,7 +1149,10 @@ rxa_service_func(void *args)
return 0;
rte_spinlock_unlock(&rx_adapter->rx_lock);
}
- rxa_poll(rx_adapter);
+
+ stats = &rx_adapter->stats;
+ stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
+ stats->rx_packets += rxa_poll(rx_adapter);
rte_spinlock_unlock(&rx_adapter->rx_lock);
return 0;
}
@@ -809,6 +1237,350 @@ rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
}
static int
+rxa_epoll_create1(void)
+{
+#if defined(LINUX)
+ int fd;
+ fd = epoll_create1(EPOLL_CLOEXEC);
+ return fd < 0 ? -errno : fd;
+#elif defined(BSD)
+ return -ENOTSUP;
+#endif
+}
+
+static int
+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ if (rx_adapter->epd != INIT_FD)
+ return 0;
+
+ rx_adapter->epd = rxa_epoll_create1();
+ if (rx_adapter->epd < 0) {
+ int err = rx_adapter->epd;
+ rx_adapter->epd = INIT_FD;
+ RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
+ return err;
+ }
+
+ return 0;
+}
+
+static int
+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int err;
+ char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+ if (rx_adapter->intr_ring)
+ return 0;
+
+ rx_adapter->intr_ring = rte_ring_create("intr_ring",
+ RTE_EVENT_ETH_INTR_RING_SIZE,
+ rte_socket_id(), 0);
+ if (!rx_adapter->intr_ring)
+ return -ENOMEM;
+
+ rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
+ RTE_EVENT_ETH_INTR_RING_SIZE *
+ sizeof(struct rte_epoll_event),
+ RTE_CACHE_LINE_SIZE,
+ rx_adapter->socket_id);
+ if (!rx_adapter->epoll_events) {
+ err = -ENOMEM;
+ goto error;
+ }
+
+ rte_spinlock_init(&rx_adapter->intr_ring_lock);
+
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+ "rx-intr-thread-%d", rx_adapter->id);
+
+ err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
+ NULL, rxa_intr_thread, rx_adapter);
+ if (!err) {
+ rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
+ return 0;
+ }
+
+ RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
+error:
+ rte_ring_free(rx_adapter->intr_ring);
+ rx_adapter->intr_ring = NULL;
+ rx_adapter->epoll_events = NULL;
+ return err;
+}
+
+static int
+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int err;
+
+ err = pthread_cancel(rx_adapter->rx_intr_thread);
+ if (err)
+ RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
+ err);
+
+ err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+ if (err)
+ RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
+
+ rte_free(rx_adapter->epoll_events);
+ rte_ring_free(rx_adapter->intr_ring);
+ rx_adapter->intr_ring = NULL;
+ rx_adapter->epoll_events = NULL;
+ return 0;
+}
+
+static int
+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int ret;
+
+ if (rx_adapter->num_rx_intr == 0)
+ return 0;
+
+ ret = rxa_destroy_intr_thread(rx_adapter);
+ if (ret)
+ return ret;
+
+ close(rx_adapter->epd);
+ rx_adapter->epd = INIT_FD;
+
+ return ret;
+}
+
+static int
+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ uint16_t rx_queue_id)
+{
+ int err;
+ uint16_t eth_dev_id = dev_info->dev->data->port_id;
+ int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+ err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+ if (err) {
+ RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
+ rx_queue_id);
+ return err;
+ }
+
+ err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+ rx_adapter->epd,
+ RTE_INTR_EVENT_DEL,
+ 0);
+ if (err)
+ RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
+
+ if (sintr)
+ dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
+ else
+ dev_info->shared_intr_enabled = 0;
+ return err;
+}
+
+static int
+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ int rx_queue_id)
+{
+ int err;
+ int i;
+ int s;
+
+ if (dev_info->nb_rx_intr == 0)
+ return 0;
+
+ err = 0;
+ if (rx_queue_id == -1) {
+ s = dev_info->nb_shared_intr;
+ for (i = 0; i < dev_info->nb_rx_intr; i++) {
+ int sintr;
+ uint16_t q;
+
+ q = dev_info->intr_queue[i];
+ sintr = rxa_shared_intr(dev_info, q);
+ s -= sintr;
+
+ if (!sintr || s == 0) {
+
+ err = rxa_disable_intr(rx_adapter, dev_info,
+ q);
+ if (err)
+ return err;
+ rxa_intr_ring_del_entries(rx_adapter, dev_info,
+ q);
+ }
+ }
+ } else {
+ if (!rxa_intr_queue(dev_info, rx_queue_id))
+ return 0;
+ if (!rxa_shared_intr(dev_info, rx_queue_id) ||
+ dev_info->nb_shared_intr == 1) {
+ err = rxa_disable_intr(rx_adapter, dev_info,
+ rx_queue_id);
+ if (err)
+ return err;
+ rxa_intr_ring_del_entries(rx_adapter, dev_info,
+ rx_queue_id);
+ }
+
+ for (i = 0; i < dev_info->nb_rx_intr; i++) {
+ if (dev_info->intr_queue[i] == rx_queue_id) {
+ for (; i < dev_info->nb_rx_intr - 1; i++)
+ dev_info->intr_queue[i] =
+ dev_info->intr_queue[i + 1];
+ break;
+ }
+ }
+ }
+
+ return err;
+}
+
+static int
+rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ uint16_t rx_queue_id)
+{
+ int err, err1;
+ uint16_t eth_dev_id = dev_info->dev->data->port_id;
+ union queue_data qd;
+ int init_fd;
+ uint16_t *intr_queue;
+ int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+ if (rxa_intr_queue(dev_info, rx_queue_id))
+ return 0;
+
+ intr_queue = dev_info->intr_queue;
+ if (dev_info->intr_queue == NULL) {
+ size_t len =
+ dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
+ dev_info->intr_queue =
+ rte_zmalloc_socket(
+ rx_adapter->mem_name,
+ len,
+ 0,
+ rx_adapter->socket_id);
+ if (dev_info->intr_queue == NULL)
+ return -ENOMEM;
+ }
+
+ init_fd = rx_adapter->epd;
+ err = rxa_init_epd(rx_adapter);
+ if (err)
+ goto err_free_queue;
+
+ qd.port = eth_dev_id;
+ qd.queue = rx_queue_id;
+
+ err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+ rx_adapter->epd,
+ RTE_INTR_EVENT_ADD,
+ qd.ptr);
+ if (err) {
+ RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
+ " Rx Queue %u err %d", rx_queue_id, err);
+ goto err_del_fd;
+ }
+
+ err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
+ if (err) {
+ RTE_EDEV_LOG_ERR("Could not enable interrupt for"
+ " Rx Queue %u err %d", rx_queue_id, err);
+
+ goto err_del_event;
+ }
+
+ err = rxa_create_intr_thread(rx_adapter);
+ if (!err) {
+ if (sintr)
+ dev_info->shared_intr_enabled = 1;
+ else
+ dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
+ return 0;
+ }
+
+
+ err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+ if (err)
+ RTE_EDEV_LOG_ERR("Could not disable interrupt for"
+ " Rx Queue %u err %d", rx_queue_id, err);
+err_del_event:
+ err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+ rx_adapter->epd,
+ RTE_INTR_EVENT_DEL,
+ 0);
+ if (err1) {
+ RTE_EDEV_LOG_ERR("Could not delete event for"
+ " Rx Queue %u err %d", rx_queue_id, err1);
+ }
+err_del_fd:
+ if (init_fd == INIT_FD) {
+ close(rx_adapter->epd);
+ rx_adapter->epd = -1;
+ }
+err_free_queue:
+ if (intr_queue == NULL)
+ rte_free(dev_info->intr_queue);
+
+ return err;
+}
+
+static int
+rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ int rx_queue_id)
+
+{
+ int i, j, err;
+ int si = -1;
+ int shared_done = (dev_info->nb_shared_intr > 0);
+
+ if (rx_queue_id != -1) {
+ if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
+ return 0;
+ return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
+ }
+
+ err = 0;
+ for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
+
+ if (rxa_shared_intr(dev_info, i) && shared_done)
+ continue;
+
+ err = rxa_config_intr(rx_adapter, dev_info, i);
+
+ shared_done = err == 0 && rxa_shared_intr(dev_info, i);
+ if (shared_done) {
+ si = i;
+ dev_info->shared_intr_enabled = 1;
+ }
+ if (err)
+ break;
+ }
+
+ if (err == 0)
+ return 0;
+
+ shared_done = (dev_info->nb_shared_intr > 0);
+ for (j = 0; j < i; j++) {
+ if (rxa_intr_queue(dev_info, j))
+ continue;
+ if (rxa_shared_intr(dev_info, j) && si != j)
+ continue;
+ err = rxa_disable_intr(rx_adapter, dev_info, j);
+ if (err)
+ break;
+
+ }
+
+ return err;
+}
+
+
+static int
rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
{
int ret;
@@ -843,6 +1615,7 @@ rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
rx_adapter->service_inited = 1;
+ rx_adapter->epd = INIT_FD;
return 0;
err_done:
@@ -886,6 +1659,9 @@ rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
int32_t rx_queue_id)
{
int pollq;
+ int intrq;
+ int sintrq;
+
if (rx_adapter->nb_queues == 0)
return;
@@ -901,9 +1677,14 @@ rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
}
pollq = rxa_polled_queue(dev_info, rx_queue_id);
+ intrq = rxa_intr_queue(dev_info, rx_queue_id);
+ sintrq = rxa_shared_intr(dev_info, rx_queue_id);
rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
rx_adapter->num_rx_polled -= pollq;
dev_info->nb_rx_poll -= pollq;
+ rx_adapter->num_rx_intr -= intrq;
+ dev_info->nb_rx_intr -= intrq;
+ dev_info->nb_shared_intr -= intrq && sintrq;
}
static void
@@ -915,6 +1696,8 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
struct eth_rx_queue_info *queue_info;
const struct rte_event *ev = &conf->ev;
int pollq;
+ int intrq;
+ int sintrq;
if (rx_queue_id == -1) {
uint16_t nb_rx_queues;
@@ -927,6 +1710,8 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
}
pollq = rxa_polled_queue(dev_info, rx_queue_id);
+ intrq = rxa_intr_queue(dev_info, rx_queue_id);
+ sintrq = rxa_shared_intr(dev_info, rx_queue_id);
queue_info = &dev_info->rx_queue[rx_queue_id];
queue_info->event_queue_id = ev->queue_id;
@@ -944,6 +1729,24 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
if (rxa_polled_queue(dev_info, rx_queue_id)) {
rx_adapter->num_rx_polled += !pollq;
dev_info->nb_rx_poll += !pollq;
+ rx_adapter->num_rx_intr -= intrq;
+ dev_info->nb_rx_intr -= intrq;
+ dev_info->nb_shared_intr -= intrq && sintrq;
+ }
+
+ if (rxa_intr_queue(dev_info, rx_queue_id)) {
+ rx_adapter->num_rx_polled -= pollq;
+ dev_info->nb_rx_poll -= pollq;
+ rx_adapter->num_rx_intr += !intrq;
+ dev_info->nb_rx_intr += !intrq;
+ dev_info->nb_shared_intr += !intrq && sintrq;
+ if (dev_info->nb_shared_intr == 1) {
+ if (dev_info->multi_intr_cap)
+ dev_info->next_q_idx =
+ RTE_MAX_RXTX_INTR_VEC_ID - 1;
+ else
+ dev_info->next_q_idx = 0;
+ }
}
}
@@ -960,24 +1763,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
uint32_t *rx_wrr;
uint16_t nb_rx_queues;
uint32_t nb_rx_poll, nb_wrr;
+ uint32_t nb_rx_intr;
+ int num_intr_vec;
+ uint16_t wt;
if (queue_conf->servicing_weight == 0) {
-
struct rte_eth_dev_data *data = dev_info->dev->data;
- if (data->dev_conf.intr_conf.rxq) {
- RTE_EDEV_LOG_ERR("Interrupt driven queues"
- " not supported");
- return -ENOTSUP;
- }
- temp_conf = *queue_conf;
- /* If Rx interrupts are disabled set wt = 1 */
- temp_conf.servicing_weight = 1;
+ temp_conf = *queue_conf;
+ if (!data->dev_conf.intr_conf.rxq) {
+ /* If Rx interrupts are disabled set wt = 1 */
+ temp_conf.servicing_weight = 1;
+ }
queue_conf = &temp_conf;
}
nb_rx_queues = dev_info->dev->data->nb_rx_queues;
rx_queue = dev_info->rx_queue;
+ wt = queue_conf->servicing_weight;
if (dev_info->rx_queue == NULL) {
dev_info->rx_queue =
@@ -993,13 +1796,65 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
queue_conf->servicing_weight,
- &nb_rx_poll, &nb_wrr);
+ &nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
+ if (dev_info->dev->intr_handle)
+ dev_info->multi_intr_cap =
+ rte_intr_cap_multiple(dev_info->dev->intr_handle);
ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
&rx_poll, &rx_wrr);
if (ret)
goto err_free_rxqueue;
+ if (wt == 0) {
+ num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
+
+ ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
+ if (ret)
+ goto err_free_rxqueue;
+
+ ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
+ if (ret)
+ goto err_free_rxqueue;
+ } else {
+
+ num_intr_vec = 0;
+ if (rx_adapter->num_rx_intr > nb_rx_intr) {
+ num_intr_vec = rxa_nb_intr_vect(dev_info,
+ rx_queue_id, 0);
+ /* interrupt based queues are being converted to
+ * poll mode queues, delete the interrupt configuration
+ * for those.
+ */
+ ret = rxa_del_intr_queue(rx_adapter,
+ dev_info, rx_queue_id);
+ if (ret)
+ goto err_free_rxqueue;
+ }
+ }
+
+ if (nb_rx_intr == 0) {
+ ret = rxa_free_intr_resources(rx_adapter);
+ if (ret)
+ goto err_free_rxqueue;
+ }
+
+ if (wt == 0) {
+ uint16_t i;
+
+ if (rx_queue_id == -1) {
+ for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+ dev_info->intr_queue[i] = i;
+ } else {
+ if (!rxa_intr_queue(dev_info, rx_queue_id))
+ dev_info->intr_queue[nb_rx_intr - 1] =
+ rx_queue_id;
+ }
+ }
+
+
+
rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
@@ -1009,6 +1864,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
rx_adapter->eth_rx_poll = rx_poll;
rx_adapter->wrr_sched = rx_wrr;
rx_adapter->wrr_len = nb_wrr;
+ rx_adapter->num_intr_vec += num_intr_vec;
return 0;
err_free_rxqueue:
@@ -1119,6 +1975,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
rx_adapter->socket_id = socket_id;
rx_adapter->conf_cb = conf_cb;
rx_adapter->conf_arg = conf_arg;
+ rx_adapter->id = id;
strcpy(rx_adapter->mem_name, mem_name);
rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
/* FIXME: incompatible with hotplug */
@@ -1302,8 +2159,10 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
uint32_t cap;
uint32_t nb_rx_poll = 0;
uint32_t nb_wrr = 0;
+ uint32_t nb_rx_intr;
struct eth_rx_poll_entry *rx_poll = NULL;
uint32_t *rx_wrr = NULL;
+ int num_intr_vec;
RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1346,29 +2205,59 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
}
} else {
rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
- &nb_rx_poll, &nb_wrr);
+ &nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
&rx_poll, &rx_wrr);
if (ret)
return ret;
rte_spinlock_lock(&rx_adapter->rx_lock);
+
+ num_intr_vec = 0;
+ if (rx_adapter->num_rx_intr > nb_rx_intr) {
+
+ num_intr_vec = rxa_nb_intr_vect(dev_info,
+ rx_queue_id, 0);
+ ret = rxa_del_intr_queue(rx_adapter, dev_info,
+ rx_queue_id);
+ if (ret)
+ goto unlock_ret;
+ }
+
+ if (nb_rx_intr == 0) {
+ ret = rxa_free_intr_resources(rx_adapter);
+ if (ret)
+ goto unlock_ret;
+ }
+
rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
rte_free(rx_adapter->eth_rx_poll);
rte_free(rx_adapter->wrr_sched);
+ if (nb_rx_intr == 0) {
+ rte_free(dev_info->intr_queue);
+ dev_info->intr_queue = NULL;
+ }
+
rx_adapter->eth_rx_poll = rx_poll;
- rx_adapter->num_rx_polled = nb_rx_poll;
rx_adapter->wrr_sched = rx_wrr;
rx_adapter->wrr_len = nb_wrr;
+ rx_adapter->num_intr_vec += num_intr_vec;
if (dev_info->nb_dev_queues == 0) {
rte_free(dev_info->rx_queue);
dev_info->rx_queue = NULL;
}
+unlock_ret:
rte_spinlock_unlock(&rx_adapter->rx_lock);
+ if (ret) {
+ rte_free(rx_poll);
+ rte_free(rx_wrr);
+ return ret;
+ }
rte_service_component_runstate_set(rx_adapter->service_id,
rxa_sw_adapter_queue_count(rx_adapter));
@@ -1377,7 +2266,6 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
return ret;
}
-
int
rte_event_eth_rx_adapter_start(uint8_t id)
{
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 307b2b5..97f25e9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -64,8 +64,7 @@
* the service function ID of the adapter in this case.
*
* Note:
- * 1) Interrupt driven receive queues are currently unimplemented.
- * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
+ * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
* should be added to a new instance of the rx adapter.
*/
@@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
* block cycles can be used to compute the percentage of
* cycles the service is blocked by the event device.
*/
+ uint64_t rx_intr_packets;
+ /**< Received packet count for interrupt mode Rx queues */
};
/**