summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/guides/rel_notes/release_24_07.rst7
-rw-r--r--drivers/event/dsw/dsw_evdev.c8
-rw-r--r--drivers/event/dsw/dsw_evdev.h7
-rw-r--r--drivers/event/dsw/dsw_event.c405
4 files changed, 254 insertions, 173 deletions
diff --git a/doc/guides/rel_notes/release_24_07.rst b/doc/guides/rel_notes/release_24_07.rst
index d67fbd2371..554965b401 100644
--- a/doc/guides/rel_notes/release_24_07.rst
+++ b/doc/guides/rel_notes/release_24_07.rst
@@ -99,6 +99,13 @@ New Features
* Added SSE/NEON vector datapath.
+* **Updated the DSW event device.**
+
+ * Added support for ``RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE``,
+ allowing applications to take on new tasks without having completed
+ (released) the previous event batch. This in turn facilities DSW
+ use alongside high-latency look-aside hardware accelerators.
+
Removed Items
-------------
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index ab0420b549..0dea1091e3 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -23,15 +23,20 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
struct rte_event_ring *in_ring;
struct rte_ring *ctl_in_ring;
char ring_name[RTE_RING_NAMESIZE];
+ bool implicit_release;
port = &dsw->ports[port_id];
+ implicit_release =
+ !(conf->event_port_cfg & RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
+
*port = (struct dsw_port) {
.id = port_id,
.dsw = dsw,
.dequeue_depth = conf->dequeue_depth,
.enqueue_depth = conf->enqueue_depth,
- .new_event_threshold = conf->new_event_threshold
+ .new_event_threshold = conf->new_event_threshold,
+ .implicit_release = implicit_release
};
snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
@@ -222,6 +227,7 @@ dsw_info_get(struct rte_eventdev *dev __rte_unused,
RTE_EVENT_DEV_CAP_ATOMIC |
RTE_EVENT_DEV_CAP_PARALLEL |
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED|
+ RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
RTE_EVENT_DEV_CAP_NONSEQ_MODE|
RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT|
RTE_EVENT_DEV_CAP_CARRY_FLOW_ID
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 2018306265..c9bf4f8b6b 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -127,6 +127,7 @@ struct dsw_queue_flow {
enum dsw_migration_state {
DSW_MIGRATION_STATE_IDLE,
+ DSW_MIGRATION_STATE_FINISH_PENDING,
DSW_MIGRATION_STATE_PAUSING,
DSW_MIGRATION_STATE_UNPAUSING
};
@@ -148,6 +149,8 @@ struct __rte_cache_aligned dsw_port {
int32_t new_event_threshold;
+ bool implicit_release;
+
uint16_t pending_releases;
uint16_t next_parallel_flow_id;
@@ -255,8 +258,8 @@ struct dsw_evdev {
alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(int32_t) credits_on_loan;
};
-#define DSW_CTL_PAUS_REQ (0)
-#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_PAUSE_REQ (0)
+#define DSW_CTL_UNPAUSE_REQ (1)
#define DSW_CTL_CFM (2)
struct __rte_aligned(4) dsw_ctl_msg {
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index ca2b8e1032..33f741990f 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -275,7 +275,7 @@ dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
static void
dsw_port_remove_paused_flow(struct dsw_port *port,
- struct dsw_queue_flow *target_qf)
+ const struct dsw_queue_flow *target_qf)
{
uint16_t i;
@@ -302,6 +302,7 @@ dsw_port_remove_paused_flow(struct dsw_port *port,
DSW_LOG_DP_PORT(ERR, port->id,
"Failed to unpause queue_id %d flow_hash %d.\n",
target_qf->queue_id, target_qf->flow_hash);
+ RTE_VERIFY(0);
}
static void
@@ -602,7 +603,7 @@ dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
return;
/* The rings are dimensioned to fit all in-flight events (even
- * on a single ring), so looping will work.
+ * on a single ring).
*/
rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
NULL);
@@ -791,6 +792,7 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
if (port->emigration_targets_len == 0) {
port->migration_state = DSW_MIGRATION_STATE_IDLE;
+ port->emigration_targets_len = 0;
port->seen_events_len = 0;
}
}
@@ -844,27 +846,35 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
+ /* For simplicity, postpone migration if there are still
+ * events to consume in the in_buffer (from the last
+ * emigration).
+ */
+ if (source_port->in_buffer_len > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+ "events in the input buffer.\n");
+ return;
+ }
+
+ if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Emigration already in progress.\n");
+ return;
+ }
+
if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events "
"are recorded to allow for a migration.\n");
return;
}
- /* A flow migration cannot be initiated if there are paused
- * events, since some/all of those events may be have been
- * produced as a result of processing the flow(s) selected for
- * migration. Moving such a flow would potentially introduced
- * reordering, since processing the migrated flow on the
- * receiving flow may commence before the to-be-enqueued-to
-
- * flows are unpaused, leading to paused events on the second
- * port as well, destined for the same paused flow(s). When
- * those flows are unpaused, the resulting events are
- * delivered the owning port in an undefined order.
+ /* Postpone migration considering in case paused events exists, since
+ * such events may prevent the migration procedure from completing,
+ * leading to wasted CPU cycles (e.g., sorting queue flows).
*/
if (source_port->paused_events_len > 0) {
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are "
- "events in the paus buffer.\n");
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Paused events on "
+ "port. Postponing any migrations.\n");
return;
}
@@ -874,23 +884,7 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
*/
source_port->next_emigration = now +
source_port->migration_interval / 2 +
- rte_rand() % source_port->migration_interval;
-
- if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
- DSW_LOG_DP_PORT(DEBUG, source_port->id,
- "Emigration already in progress.\n");
- return;
- }
-
- /* For simplicity, avoid migration in the unlikely case there
- * is still events to consume in the in_buffer (from the last
- * emigration).
- */
- if (source_port->in_buffer_len > 0) {
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
- "events in the input buffer.\n");
- return;
- }
+ rte_rand_max(source_port->migration_interval);
source_port_load =
rte_atomic_load_explicit(&source_port->load,
@@ -936,7 +930,7 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
if (source_port->emigration_targets_len == 0)
return;
- source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+ source_port->migration_state = DSW_MIGRATION_STATE_FINISH_PENDING;
source_port->emigration_start = rte_get_timer_cycles();
/* No need to go through the whole pause procedure for
@@ -944,24 +938,73 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
* be maintained.
*/
dsw_port_move_parallel_flows(dsw, source_port);
+}
- /* All flows were on PARALLEL queues. */
- if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
- return;
+static void
+dsw_port_abort_migration(struct dsw_port *source_port)
+{
+ RTE_ASSERT(source_port->in_buffer_start == 0);
+ RTE_ASSERT(source_port->in_buffer_len == 0);
- /* There might be 'loopback' events already scheduled in the
- * output buffers.
+ /* Putting the stashed events in the in_buffer makes sure they
+ * are processed before any events on the in_ring, to avoid
+ * reordering.
*/
- dsw_port_flush_out_buffers(dsw, source_port);
+ rte_memcpy(source_port->in_buffer, source_port->emigrating_events,
+ source_port->emigrating_events_len * sizeof(struct rte_event));
+ source_port->in_buffer_len = source_port->emigrating_events_len;
+ source_port->emigrating_events_len = 0;
+
+ source_port->emigration_targets_len = 0;
+
+ source_port->migration_state = DSW_MIGRATION_STATE_IDLE;
+}
+
+static void
+dsw_port_continue_emigration(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ /* A flow migration cannot be completed if there are paused
+ * events, since some/all of those events may be have been
+ * produced as a result of processing the flow(s) selected for
+ * migration. Moving such a flow would potentially introduced
+ * reordering, since processing the migrated flow on the
+ * receiving flow may commence before the to-be-enqueued-to
+ * flows are unpaused, leading to paused events on the second
+ * port as well, destined for the same paused flow(s). When
+ * those flows are unpaused, the resulting events are
+ * delivered the owning port in an undefined order.
+ *
+ * Waiting for the events to be unpaused could lead to a
+ * deadlock, where two ports are both waiting for the other to
+ * unpause.
+ */
+ if (source_port->paused_events_len > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are events in "
+ "the pause buffer. Aborting migration.\n");
+ dsw_port_abort_migration(source_port);
+ return;
+ }
dsw_port_add_paused_flows(source_port,
source_port->emigration_target_qfs,
source_port->emigration_targets_len);
- dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUSE_REQ,
source_port->emigration_target_qfs,
source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
+
+ source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+}
+
+static void
+dsw_port_try_finish_pending(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+ if (unlikely(source_port->migration_state ==
+ DSW_MIGRATION_STATE_FINISH_PENDING &&
+ source_port->pending_releases == 0))
+ dsw_port_continue_emigration(dsw, source_port);
}
static void
@@ -982,8 +1025,6 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
- rte_smp_rmb();
-
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
for (i = 0; i < qfs_len; i++) {
@@ -1008,45 +1049,40 @@ dsw_port_buffer_in_buffer(struct dsw_port *port,
}
static void
-dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- struct rte_event *event)
+dsw_port_stash_migrating_event(struct dsw_port *port,
+ const struct rte_event *event)
+{
+ port->emigrating_events[port->emigrating_events_len] = *event;
+ port->emigrating_events_len++;
+}
+
+static void
+dsw_port_stash_any_migrating_events(struct dsw_port *port,
+ struct rte_event *events,
+ uint16_t *num)
{
uint16_t i;
+ uint16_t offset = 0;
- for (i = 0; i < source_port->emigration_targets_len; i++) {
- struct dsw_queue_flow *qf =
- &source_port->emigration_target_qfs[i];
- uint8_t dest_port_id =
- source_port->emigration_target_port_ids[i];
- struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+ for (i = 0; i < *num; i++) {
+ uint16_t flow_hash;
+ struct rte_event *in_event = &events[i];
- if (event->queue_id == qf->queue_id &&
- dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
- /* No need to care about bursting forwarded
- * events (to the destination port's in_ring),
- * since migration doesn't happen very often,
- * and also the majority of the dequeued
- * events will likely *not* be forwarded.
- */
- while (rte_event_ring_enqueue_burst(dest_port->in_ring,
- event, 1,
- NULL) != 1)
- rte_pause();
- return;
+ flow_hash = dsw_flow_id_hash(in_event->flow_id);
+
+ if (unlikely(dsw_port_is_flow_migrating(port,
+ in_event->queue_id,
+ flow_hash))) {
+ dsw_port_stash_migrating_event(port, in_event);
+ offset++;
+ } else if (offset > 0) {
+ struct rte_event *out_event = &events[i - offset];
+ rte_memcpy(out_event, in_event,
+ sizeof(struct rte_event));
}
}
- /* Event did not belong to the emigrated flows */
- dsw_port_buffer_in_buffer(source_port, event);
-}
-
-static void
-dsw_port_stash_migrating_event(struct dsw_port *port,
- const struct rte_event *event)
-{
- port->emigrating_events[port->emigrating_events_len] = *event;
- port->emigrating_events_len++;
+ *num -= offset;
}
#define DRAIN_DEQUEUE_BURST_SIZE (32)
@@ -1054,28 +1090,21 @@ dsw_port_stash_migrating_event(struct dsw_port *port,
static void
dsw_port_drain_in_ring(struct dsw_port *source_port)
{
- uint16_t num_events;
- uint16_t dequeued;
-
- /* Control ring message should been seen before the ring count
- * is read on the port's in_ring.
- */
- rte_smp_rmb();
-
- num_events = rte_event_ring_count(source_port->in_ring);
-
- for (dequeued = 0; dequeued < num_events; ) {
- uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE,
- num_events - dequeued);
- struct rte_event events[burst_size];
- uint16_t len;
+ for (;;) {
+ struct rte_event events[DRAIN_DEQUEUE_BURST_SIZE];
+ uint16_t n;
uint16_t i;
+ uint16_t available;
- len = rte_event_ring_dequeue_burst(source_port->in_ring,
- events, burst_size,
- NULL);
+ n = rte_event_ring_dequeue_burst(source_port->in_ring,
+ events,
+ DRAIN_DEQUEUE_BURST_SIZE,
+ &available);
- for (i = 0; i < len; i++) {
+ if (n == 0 && available == 0)
+ break;
+
+ for (i = 0; i < n; i++) {
struct rte_event *event = &events[i];
uint16_t flow_hash;
@@ -1089,9 +1118,41 @@ dsw_port_drain_in_ring(struct dsw_port *source_port)
else
dsw_port_buffer_in_buffer(source_port, event);
}
+ }
+}
- dequeued += len;
+static void
+dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct rte_event *event)
+{
+ uint16_t i;
+
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+ if (event->queue_id == qf->queue_id &&
+ dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
+ /* No need to care about bursting forwarded
+ * events (to the destination port's in_ring),
+ * since migration doesn't happen very often,
+ * and also the majority of the dequeued
+ * events will likely *not* be forwarded.
+ */
+ while (rte_event_ring_enqueue_burst(dest_port->in_ring,
+ event, 1,
+ NULL) != 1)
+ rte_pause();
+ return;
+ }
}
+
+ /* Event did not belong to the emigrated flows */
+ dsw_port_buffer_in_buffer(source_port, event);
}
static void
@@ -1114,6 +1175,9 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
{
uint8_t i;
+ /* There may be events lingering in the output buffer from
+ * prior to the pause took effect.
+ */
dsw_port_flush_out_buffers(dsw, source_port);
for (i = 0; i < source_port->emigration_targets_len; i++) {
@@ -1137,12 +1201,21 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
dsw_port_flush_no_longer_paused_events(dsw, source_port);
+ /* Processing migrating flows during migration may have
+ * produced events to paused flows, including the flows which
+ * were being migrated. Flushing the output buffers before
+ * unpausing the flows on other ports assures that such events
+ * are seen *before* any events produced by processing the
+ * migrating flows on the new port.
+ */
+ dsw_port_flush_out_buffers(dsw, source_port);
+
/* Flow table update and migration destination port's enqueues
* must be seen before the control message.
*/
rte_smp_wmb();
- dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUSE_REQ,
source_port->emigration_target_qfs,
source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
@@ -1154,7 +1227,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
{
port->cfm_cnt++;
- if (port->cfm_cnt == (dsw->num_ports-1)) {
+ if (port->cfm_cnt == (dsw->num_ports - 1)) {
switch (port->migration_state) {
case DSW_MIGRATION_STATE_PAUSING:
dsw_port_move_emigrating_flows(dsw, port);
@@ -1164,7 +1237,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
RTE_SCHED_TYPE_ATOMIC);
break;
default:
- RTE_ASSERT(0);
+ RTE_VERIFY(0);
break;
}
}
@@ -1177,12 +1250,12 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
if (dsw_port_ctl_dequeue(port, &msg) == 0) {
switch (msg.type) {
- case DSW_CTL_PAUS_REQ:
+ case DSW_CTL_PAUSE_REQ:
dsw_port_handle_pause_flows(dsw, port,
msg.originating_port_id,
msg.qfs, msg.qfs_len);
break;
- case DSW_CTL_UNPAUS_REQ:
+ case DSW_CTL_UNPAUSE_REQ:
dsw_port_handle_unpause_flows(dsw, port,
msg.originating_port_id,
msg.qfs, msg.qfs_len);
@@ -1203,19 +1276,18 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
static void
dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
- /* For simplicity (in the migration logic), avoid all
- * background processing in case event processing is in
- * progress.
- */
- if (port->pending_releases > 0)
- return;
-
/* Polling the control ring is relatively inexpensive, and
* polling it often helps bringing down migration latency, so
* do this for every iteration.
*/
dsw_port_ctl_process(dsw, port);
+ /* Always check if a migration is waiting for pending releases
+ * to arrive, to minimize the amount of time dequeuing events
+ * from the port is disabled.
+ */
+ dsw_port_try_finish_pending(dsw, port);
+
/* To avoid considering migration and flushing output buffers
* on every dequeue/enqueue call, the scheduler only performs
* such 'background' tasks every nth
@@ -1260,8 +1332,8 @@ static __rte_always_inline uint16_t
dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
const struct rte_event events[],
uint16_t events_len, bool op_types_known,
- uint16_t num_new, uint16_t num_release,
- uint16_t num_non_release)
+ uint16_t num_new, uint16_t num_forward,
+ uint16_t num_release)
{
struct dsw_evdev *dsw = source_port->dsw;
bool enough_credits;
@@ -1295,14 +1367,14 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
if (!op_types_known)
for (i = 0; i < events_len; i++) {
switch (events[i].op) {
- case RTE_EVENT_OP_RELEASE:
- num_release++;
- break;
case RTE_EVENT_OP_NEW:
num_new++;
- /* Falls through. */
- default:
- num_non_release++;
+ break;
+ case RTE_EVENT_OP_FORWARD:
+ num_forward++;
+ break;
+ case RTE_EVENT_OP_RELEASE:
+ num_release++;
break;
}
}
@@ -1318,15 +1390,20 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
source_port->new_event_threshold))
return 0;
- enough_credits = dsw_port_acquire_credits(dsw, source_port,
- num_non_release);
+ enough_credits = dsw_port_acquire_credits(dsw, source_port, num_new);
if (unlikely(!enough_credits))
return 0;
- source_port->pending_releases -= num_release;
+ dsw_port_return_credits(dsw, source_port, num_release);
+
+ /* This may seem harsh, but it's important for an application
+ * to get early feedback for cases where it fails to stick to
+ * the API contract.
+ */
+ RTE_VERIFY(num_forward + num_release <= source_port->pending_releases);
+ source_port->pending_releases -= (num_forward + num_release);
- dsw_port_enqueue_stats(source_port, num_new,
- num_non_release-num_new, num_release);
+ dsw_port_enqueue_stats(source_port, num_new, num_forward, num_release);
for (i = 0; i < events_len; i++) {
const struct rte_event *event = &events[i];
@@ -1338,9 +1415,9 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
}
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
- "accepted.\n", num_non_release);
+ "accepted.\n", num_new + num_forward);
- return (num_non_release + num_release);
+ return (num_new + num_forward + num_release);
}
uint16_t
@@ -1367,7 +1444,7 @@ dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
return dsw_event_enqueue_burst_generic(source_port, events,
events_len, true, events_len,
- 0, events_len);
+ 0, 0);
}
uint16_t
@@ -1380,8 +1457,8 @@ dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
events_len = source_port->enqueue_depth;
return dsw_event_enqueue_burst_generic(source_port, events,
- events_len, true, 0, 0,
- events_len);
+ events_len, true, 0,
+ events_len, 0);
}
uint16_t
@@ -1435,8 +1512,17 @@ static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
{
- if (unlikely(port->in_buffer_len > 0)) {
- uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+ enum dsw_migration_state state = port->migration_state;
+ uint16_t dequeued;
+
+ if (unlikely(state == DSW_MIGRATION_STATE_FINISH_PENDING))
+ /* Do not produce new items of work - only finish
+ * outstanding (unreleased) events, to allow the
+ * migration procedure to continue.
+ */
+ dequeued = 0;
+ else if (unlikely(port->in_buffer_len > 0)) {
+ dequeued = RTE_MIN(num, port->in_buffer_len);
rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
dequeued * sizeof(struct rte_event));
@@ -1446,43 +1532,24 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
if (port->in_buffer_len == 0)
port->in_buffer_start = 0;
-
- return dequeued;
+ } else {
+ dequeued = rte_event_ring_dequeue_burst(port->in_ring,
+ events, num, NULL);
+
+ /* Stash incoming events belonging to migrating flows,
+ * to avoid having to deal with forwarded events to
+ * flows which are also in the process of being
+ * migrated. A failure to do so leads to reordering,
+ * since paused events on the source port may be
+ * flushed after paused events on the migration
+ * destination port.
+ */
+ if (unlikely(state == DSW_MIGRATION_STATE_PAUSING))
+ dsw_port_stash_any_migrating_events(port, events,
+ &dequeued);
}
- return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
-}
-
-static void
-dsw_port_stash_migrating_events(struct dsw_port *port,
- struct rte_event *events, uint16_t *num)
-{
- uint16_t i;
-
- /* The assumption here - performance-wise - is that events
- * belonging to migrating flows are relatively rare.
- */
- for (i = 0; i < (*num); ) {
- struct rte_event *event = &events[i];
- uint16_t flow_hash;
-
- flow_hash = dsw_flow_id_hash(event->flow_id);
-
- if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id,
- flow_hash))) {
- uint16_t left;
-
- dsw_port_stash_migrating_event(port, event);
-
- (*num)--;
- left = *num - i;
-
- if (left > 0)
- memmove(event, event + 1,
- left * sizeof(struct rte_event));
- } else
- i++;
- }
+ return dequeued;
}
uint16_t
@@ -1493,7 +1560,12 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
struct dsw_evdev *dsw = source_port->dsw;
uint16_t dequeued;
- source_port->pending_releases = 0;
+ if (source_port->implicit_release) {
+ dsw_port_return_credits(dsw, port,
+ source_port->pending_releases);
+
+ source_port->pending_releases = 0;
+ }
dsw_port_bg_process(dsw, source_port);
@@ -1502,12 +1574,7 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
dequeued = dsw_port_dequeue_burst(source_port, events, num);
- if (unlikely(source_port->migration_state ==
- DSW_MIGRATION_STATE_PAUSING))
- dsw_port_stash_migrating_events(source_port, events,
- &dequeued);
-
- source_port->pending_releases = dequeued;
+ source_port->pending_releases += dequeued;
dsw_port_load_record(source_port, dequeued);
@@ -1517,8 +1584,6 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
dequeued);
- dsw_port_return_credits(dsw, source_port, dequeued);
-
/* One potential optimization one might think of is to
* add a migration state (prior to 'pausing'), and
* only record seen events when the port is in this