diff options
-rw-r--r-- | doc/guides/rel_notes/release_24_07.rst | 7 | ||||
-rw-r--r-- | drivers/event/dsw/dsw_evdev.c | 8 | ||||
-rw-r--r-- | drivers/event/dsw/dsw_evdev.h | 7 | ||||
-rw-r--r-- | drivers/event/dsw/dsw_event.c | 405 |
4 files changed, 254 insertions, 173 deletions
diff --git a/doc/guides/rel_notes/release_24_07.rst b/doc/guides/rel_notes/release_24_07.rst index d67fbd2371..554965b401 100644 --- a/doc/guides/rel_notes/release_24_07.rst +++ b/doc/guides/rel_notes/release_24_07.rst @@ -99,6 +99,13 @@ New Features * Added SSE/NEON vector datapath. +* **Updated the DSW event device.** + + * Added support for ``RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE``, + allowing applications to take on new tasks without having completed + (released) the previous event batch. This in turn facilities DSW + use alongside high-latency look-aside hardware accelerators. + Removed Items ------------- diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c index ab0420b549..0dea1091e3 100644 --- a/drivers/event/dsw/dsw_evdev.c +++ b/drivers/event/dsw/dsw_evdev.c @@ -23,15 +23,20 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, struct rte_event_ring *in_ring; struct rte_ring *ctl_in_ring; char ring_name[RTE_RING_NAMESIZE]; + bool implicit_release; port = &dsw->ports[port_id]; + implicit_release = + !(conf->event_port_cfg & RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL); + *port = (struct dsw_port) { .id = port_id, .dsw = dsw, .dequeue_depth = conf->dequeue_depth, .enqueue_depth = conf->enqueue_depth, - .new_event_threshold = conf->new_event_threshold + .new_event_threshold = conf->new_event_threshold, + .implicit_release = implicit_release }; snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id, @@ -222,6 +227,7 @@ dsw_info_get(struct rte_eventdev *dev __rte_unused, RTE_EVENT_DEV_CAP_ATOMIC | RTE_EVENT_DEV_CAP_PARALLEL | RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED| + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE| RTE_EVENT_DEV_CAP_NONSEQ_MODE| RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT| RTE_EVENT_DEV_CAP_CARRY_FLOW_ID diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h index 2018306265..c9bf4f8b6b 100644 --- a/drivers/event/dsw/dsw_evdev.h +++ b/drivers/event/dsw/dsw_evdev.h @@ -127,6 +127,7 @@ struct dsw_queue_flow { enum dsw_migration_state { DSW_MIGRATION_STATE_IDLE, + DSW_MIGRATION_STATE_FINISH_PENDING, DSW_MIGRATION_STATE_PAUSING, DSW_MIGRATION_STATE_UNPAUSING }; @@ -148,6 +149,8 @@ struct __rte_cache_aligned dsw_port { int32_t new_event_threshold; + bool implicit_release; + uint16_t pending_releases; uint16_t next_parallel_flow_id; @@ -255,8 +258,8 @@ struct dsw_evdev { alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(int32_t) credits_on_loan; }; -#define DSW_CTL_PAUS_REQ (0) -#define DSW_CTL_UNPAUS_REQ (1) +#define DSW_CTL_PAUSE_REQ (0) +#define DSW_CTL_UNPAUSE_REQ (1) #define DSW_CTL_CFM (2) struct __rte_aligned(4) dsw_ctl_msg { diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index ca2b8e1032..33f741990f 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -275,7 +275,7 @@ dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs, static void dsw_port_remove_paused_flow(struct dsw_port *port, - struct dsw_queue_flow *target_qf) + const struct dsw_queue_flow *target_qf) { uint16_t i; @@ -302,6 +302,7 @@ dsw_port_remove_paused_flow(struct dsw_port *port, DSW_LOG_DP_PORT(ERR, port->id, "Failed to unpause queue_id %d flow_hash %d.\n", target_qf->queue_id, target_qf->flow_hash); + RTE_VERIFY(0); } static void @@ -602,7 +603,7 @@ dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port, return; /* The rings are dimensioned to fit all in-flight events (even - * on a single ring), so looping will work. + * on a single ring). */ rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len, NULL); @@ -791,6 +792,7 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, if (port->emigration_targets_len == 0) { port->migration_state = DSW_MIGRATION_STATE_IDLE; + port->emigration_targets_len = 0; port->seen_events_len = 0; } } @@ -844,27 +846,35 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n"); + /* For simplicity, postpone migration if there are still + * events to consume in the in_buffer (from the last + * emigration). + */ + if (source_port->in_buffer_len > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " + "events in the input buffer.\n"); + return; + } + + if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Emigration already in progress.\n"); + return; + } + if (seen_events_len < DSW_MAX_EVENTS_RECORDED) { DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events " "are recorded to allow for a migration.\n"); return; } - /* A flow migration cannot be initiated if there are paused - * events, since some/all of those events may be have been - * produced as a result of processing the flow(s) selected for - * migration. Moving such a flow would potentially introduced - * reordering, since processing the migrated flow on the - * receiving flow may commence before the to-be-enqueued-to - - * flows are unpaused, leading to paused events on the second - * port as well, destined for the same paused flow(s). When - * those flows are unpaused, the resulting events are - * delivered the owning port in an undefined order. + /* Postpone migration considering in case paused events exists, since + * such events may prevent the migration procedure from completing, + * leading to wasted CPU cycles (e.g., sorting queue flows). */ if (source_port->paused_events_len > 0) { - DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are " - "events in the paus buffer.\n"); + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Paused events on " + "port. Postponing any migrations.\n"); return; } @@ -874,23 +884,7 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, */ source_port->next_emigration = now + source_port->migration_interval / 2 + - rte_rand() % source_port->migration_interval; - - if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { - DSW_LOG_DP_PORT(DEBUG, source_port->id, - "Emigration already in progress.\n"); - return; - } - - /* For simplicity, avoid migration in the unlikely case there - * is still events to consume in the in_buffer (from the last - * emigration). - */ - if (source_port->in_buffer_len > 0) { - DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " - "events in the input buffer.\n"); - return; - } + rte_rand_max(source_port->migration_interval); source_port_load = rte_atomic_load_explicit(&source_port->load, @@ -936,7 +930,7 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, if (source_port->emigration_targets_len == 0) return; - source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; + source_port->migration_state = DSW_MIGRATION_STATE_FINISH_PENDING; source_port->emigration_start = rte_get_timer_cycles(); /* No need to go through the whole pause procedure for @@ -944,24 +938,73 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, * be maintained. */ dsw_port_move_parallel_flows(dsw, source_port); +} - /* All flows were on PARALLEL queues. */ - if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE) - return; +static void +dsw_port_abort_migration(struct dsw_port *source_port) +{ + RTE_ASSERT(source_port->in_buffer_start == 0); + RTE_ASSERT(source_port->in_buffer_len == 0); - /* There might be 'loopback' events already scheduled in the - * output buffers. + /* Putting the stashed events in the in_buffer makes sure they + * are processed before any events on the in_ring, to avoid + * reordering. */ - dsw_port_flush_out_buffers(dsw, source_port); + rte_memcpy(source_port->in_buffer, source_port->emigrating_events, + source_port->emigrating_events_len * sizeof(struct rte_event)); + source_port->in_buffer_len = source_port->emigrating_events_len; + source_port->emigrating_events_len = 0; + + source_port->emigration_targets_len = 0; + + source_port->migration_state = DSW_MIGRATION_STATE_IDLE; +} + +static void +dsw_port_continue_emigration(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + /* A flow migration cannot be completed if there are paused + * events, since some/all of those events may be have been + * produced as a result of processing the flow(s) selected for + * migration. Moving such a flow would potentially introduced + * reordering, since processing the migrated flow on the + * receiving flow may commence before the to-be-enqueued-to + * flows are unpaused, leading to paused events on the second + * port as well, destined for the same paused flow(s). When + * those flows are unpaused, the resulting events are + * delivered the owning port in an undefined order. + * + * Waiting for the events to be unpaused could lead to a + * deadlock, where two ports are both waiting for the other to + * unpause. + */ + if (source_port->paused_events_len > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are events in " + "the pause buffer. Aborting migration.\n"); + dsw_port_abort_migration(source_port); + return; + } dsw_port_add_paused_flows(source_port, source_port->emigration_target_qfs, source_port->emigration_targets_len); - dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUSE_REQ, source_port->emigration_target_qfs, source_port->emigration_targets_len); source_port->cfm_cnt = 0; + + source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; +} + +static void +dsw_port_try_finish_pending(struct dsw_evdev *dsw, struct dsw_port *source_port) +{ + if (unlikely(source_port->migration_state == + DSW_MIGRATION_STATE_FINISH_PENDING && + source_port->pending_releases == 0)) + dsw_port_continue_emigration(dsw, source_port); } static void @@ -982,8 +1025,6 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, dsw_port_remove_paused_flows(port, paused_qfs, qfs_len); - rte_smp_rmb(); - dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); for (i = 0; i < qfs_len; i++) { @@ -1008,45 +1049,40 @@ dsw_port_buffer_in_buffer(struct dsw_port *port, } static void -dsw_port_forward_emigrated_event(struct dsw_evdev *dsw, - struct dsw_port *source_port, - struct rte_event *event) +dsw_port_stash_migrating_event(struct dsw_port *port, + const struct rte_event *event) +{ + port->emigrating_events[port->emigrating_events_len] = *event; + port->emigrating_events_len++; +} + +static void +dsw_port_stash_any_migrating_events(struct dsw_port *port, + struct rte_event *events, + uint16_t *num) { uint16_t i; + uint16_t offset = 0; - for (i = 0; i < source_port->emigration_targets_len; i++) { - struct dsw_queue_flow *qf = - &source_port->emigration_target_qfs[i]; - uint8_t dest_port_id = - source_port->emigration_target_port_ids[i]; - struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + for (i = 0; i < *num; i++) { + uint16_t flow_hash; + struct rte_event *in_event = &events[i]; - if (event->queue_id == qf->queue_id && - dsw_flow_id_hash(event->flow_id) == qf->flow_hash) { - /* No need to care about bursting forwarded - * events (to the destination port's in_ring), - * since migration doesn't happen very often, - * and also the majority of the dequeued - * events will likely *not* be forwarded. - */ - while (rte_event_ring_enqueue_burst(dest_port->in_ring, - event, 1, - NULL) != 1) - rte_pause(); - return; + flow_hash = dsw_flow_id_hash(in_event->flow_id); + + if (unlikely(dsw_port_is_flow_migrating(port, + in_event->queue_id, + flow_hash))) { + dsw_port_stash_migrating_event(port, in_event); + offset++; + } else if (offset > 0) { + struct rte_event *out_event = &events[i - offset]; + rte_memcpy(out_event, in_event, + sizeof(struct rte_event)); } } - /* Event did not belong to the emigrated flows */ - dsw_port_buffer_in_buffer(source_port, event); -} - -static void -dsw_port_stash_migrating_event(struct dsw_port *port, - const struct rte_event *event) -{ - port->emigrating_events[port->emigrating_events_len] = *event; - port->emigrating_events_len++; + *num -= offset; } #define DRAIN_DEQUEUE_BURST_SIZE (32) @@ -1054,28 +1090,21 @@ dsw_port_stash_migrating_event(struct dsw_port *port, static void dsw_port_drain_in_ring(struct dsw_port *source_port) { - uint16_t num_events; - uint16_t dequeued; - - /* Control ring message should been seen before the ring count - * is read on the port's in_ring. - */ - rte_smp_rmb(); - - num_events = rte_event_ring_count(source_port->in_ring); - - for (dequeued = 0; dequeued < num_events; ) { - uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE, - num_events - dequeued); - struct rte_event events[burst_size]; - uint16_t len; + for (;;) { + struct rte_event events[DRAIN_DEQUEUE_BURST_SIZE]; + uint16_t n; uint16_t i; + uint16_t available; - len = rte_event_ring_dequeue_burst(source_port->in_ring, - events, burst_size, - NULL); + n = rte_event_ring_dequeue_burst(source_port->in_ring, + events, + DRAIN_DEQUEUE_BURST_SIZE, + &available); - for (i = 0; i < len; i++) { + if (n == 0 && available == 0) + break; + + for (i = 0; i < n; i++) { struct rte_event *event = &events[i]; uint16_t flow_hash; @@ -1089,9 +1118,41 @@ dsw_port_drain_in_ring(struct dsw_port *source_port) else dsw_port_buffer_in_buffer(source_port, event); } + } +} - dequeued += len; +static void +dsw_port_forward_emigrated_event(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct rte_event *event) +{ + uint16_t i; + + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + + if (event->queue_id == qf->queue_id && + dsw_flow_id_hash(event->flow_id) == qf->flow_hash) { + /* No need to care about bursting forwarded + * events (to the destination port's in_ring), + * since migration doesn't happen very often, + * and also the majority of the dequeued + * events will likely *not* be forwarded. + */ + while (rte_event_ring_enqueue_burst(dest_port->in_ring, + event, 1, + NULL) != 1) + rte_pause(); + return; + } } + + /* Event did not belong to the emigrated flows */ + dsw_port_buffer_in_buffer(source_port, event); } static void @@ -1114,6 +1175,9 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, { uint8_t i; + /* There may be events lingering in the output buffer from + * prior to the pause took effect. + */ dsw_port_flush_out_buffers(dsw, source_port); for (i = 0; i < source_port->emigration_targets_len; i++) { @@ -1137,12 +1201,21 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, dsw_port_flush_no_longer_paused_events(dsw, source_port); + /* Processing migrating flows during migration may have + * produced events to paused flows, including the flows which + * were being migrated. Flushing the output buffers before + * unpausing the flows on other ports assures that such events + * are seen *before* any events produced by processing the + * migrating flows on the new port. + */ + dsw_port_flush_out_buffers(dsw, source_port); + /* Flow table update and migration destination port's enqueues * must be seen before the control message. */ rte_smp_wmb(); - dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUSE_REQ, source_port->emigration_target_qfs, source_port->emigration_targets_len); source_port->cfm_cnt = 0; @@ -1154,7 +1227,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) { port->cfm_cnt++; - if (port->cfm_cnt == (dsw->num_ports-1)) { + if (port->cfm_cnt == (dsw->num_ports - 1)) { switch (port->migration_state) { case DSW_MIGRATION_STATE_PAUSING: dsw_port_move_emigrating_flows(dsw, port); @@ -1164,7 +1237,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) RTE_SCHED_TYPE_ATOMIC); break; default: - RTE_ASSERT(0); + RTE_VERIFY(0); break; } } @@ -1177,12 +1250,12 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) if (dsw_port_ctl_dequeue(port, &msg) == 0) { switch (msg.type) { - case DSW_CTL_PAUS_REQ: + case DSW_CTL_PAUSE_REQ: dsw_port_handle_pause_flows(dsw, port, msg.originating_port_id, msg.qfs, msg.qfs_len); break; - case DSW_CTL_UNPAUS_REQ: + case DSW_CTL_UNPAUSE_REQ: dsw_port_handle_unpause_flows(dsw, port, msg.originating_port_id, msg.qfs, msg.qfs_len); @@ -1203,19 +1276,18 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events) static void dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) { - /* For simplicity (in the migration logic), avoid all - * background processing in case event processing is in - * progress. - */ - if (port->pending_releases > 0) - return; - /* Polling the control ring is relatively inexpensive, and * polling it often helps bringing down migration latency, so * do this for every iteration. */ dsw_port_ctl_process(dsw, port); + /* Always check if a migration is waiting for pending releases + * to arrive, to minimize the amount of time dequeuing events + * from the port is disabled. + */ + dsw_port_try_finish_pending(dsw, port); + /* To avoid considering migration and flushing output buffers * on every dequeue/enqueue call, the scheduler only performs * such 'background' tasks every nth @@ -1260,8 +1332,8 @@ static __rte_always_inline uint16_t dsw_event_enqueue_burst_generic(struct dsw_port *source_port, const struct rte_event events[], uint16_t events_len, bool op_types_known, - uint16_t num_new, uint16_t num_release, - uint16_t num_non_release) + uint16_t num_new, uint16_t num_forward, + uint16_t num_release) { struct dsw_evdev *dsw = source_port->dsw; bool enough_credits; @@ -1295,14 +1367,14 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, if (!op_types_known) for (i = 0; i < events_len; i++) { switch (events[i].op) { - case RTE_EVENT_OP_RELEASE: - num_release++; - break; case RTE_EVENT_OP_NEW: num_new++; - /* Falls through. */ - default: - num_non_release++; + break; + case RTE_EVENT_OP_FORWARD: + num_forward++; + break; + case RTE_EVENT_OP_RELEASE: + num_release++; break; } } @@ -1318,15 +1390,20 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, source_port->new_event_threshold)) return 0; - enough_credits = dsw_port_acquire_credits(dsw, source_port, - num_non_release); + enough_credits = dsw_port_acquire_credits(dsw, source_port, num_new); if (unlikely(!enough_credits)) return 0; - source_port->pending_releases -= num_release; + dsw_port_return_credits(dsw, source_port, num_release); + + /* This may seem harsh, but it's important for an application + * to get early feedback for cases where it fails to stick to + * the API contract. + */ + RTE_VERIFY(num_forward + num_release <= source_port->pending_releases); + source_port->pending_releases -= (num_forward + num_release); - dsw_port_enqueue_stats(source_port, num_new, - num_non_release-num_new, num_release); + dsw_port_enqueue_stats(source_port, num_new, num_forward, num_release); for (i = 0; i < events_len; i++) { const struct rte_event *event = &events[i]; @@ -1338,9 +1415,9 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port, } DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events " - "accepted.\n", num_non_release); + "accepted.\n", num_new + num_forward); - return (num_non_release + num_release); + return (num_new + num_forward + num_release); } uint16_t @@ -1367,7 +1444,7 @@ dsw_event_enqueue_new_burst(void *port, const struct rte_event events[], return dsw_event_enqueue_burst_generic(source_port, events, events_len, true, events_len, - 0, events_len); + 0, 0); } uint16_t @@ -1380,8 +1457,8 @@ dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[], events_len = source_port->enqueue_depth; return dsw_event_enqueue_burst_generic(source_port, events, - events_len, true, 0, 0, - events_len); + events_len, true, 0, + events_len, 0); } uint16_t @@ -1435,8 +1512,17 @@ static uint16_t dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, uint16_t num) { - if (unlikely(port->in_buffer_len > 0)) { - uint16_t dequeued = RTE_MIN(num, port->in_buffer_len); + enum dsw_migration_state state = port->migration_state; + uint16_t dequeued; + + if (unlikely(state == DSW_MIGRATION_STATE_FINISH_PENDING)) + /* Do not produce new items of work - only finish + * outstanding (unreleased) events, to allow the + * migration procedure to continue. + */ + dequeued = 0; + else if (unlikely(port->in_buffer_len > 0)) { + dequeued = RTE_MIN(num, port->in_buffer_len); rte_memcpy(events, &port->in_buffer[port->in_buffer_start], dequeued * sizeof(struct rte_event)); @@ -1446,43 +1532,24 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, if (port->in_buffer_len == 0) port->in_buffer_start = 0; - - return dequeued; + } else { + dequeued = rte_event_ring_dequeue_burst(port->in_ring, + events, num, NULL); + + /* Stash incoming events belonging to migrating flows, + * to avoid having to deal with forwarded events to + * flows which are also in the process of being + * migrated. A failure to do so leads to reordering, + * since paused events on the source port may be + * flushed after paused events on the migration + * destination port. + */ + if (unlikely(state == DSW_MIGRATION_STATE_PAUSING)) + dsw_port_stash_any_migrating_events(port, events, + &dequeued); } - return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL); -} - -static void -dsw_port_stash_migrating_events(struct dsw_port *port, - struct rte_event *events, uint16_t *num) -{ - uint16_t i; - - /* The assumption here - performance-wise - is that events - * belonging to migrating flows are relatively rare. - */ - for (i = 0; i < (*num); ) { - struct rte_event *event = &events[i]; - uint16_t flow_hash; - - flow_hash = dsw_flow_id_hash(event->flow_id); - - if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id, - flow_hash))) { - uint16_t left; - - dsw_port_stash_migrating_event(port, event); - - (*num)--; - left = *num - i; - - if (left > 0) - memmove(event, event + 1, - left * sizeof(struct rte_event)); - } else - i++; - } + return dequeued; } uint16_t @@ -1493,7 +1560,12 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, struct dsw_evdev *dsw = source_port->dsw; uint16_t dequeued; - source_port->pending_releases = 0; + if (source_port->implicit_release) { + dsw_port_return_credits(dsw, port, + source_port->pending_releases); + + source_port->pending_releases = 0; + } dsw_port_bg_process(dsw, source_port); @@ -1502,12 +1574,7 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, dequeued = dsw_port_dequeue_burst(source_port, events, num); - if (unlikely(source_port->migration_state == - DSW_MIGRATION_STATE_PAUSING)) - dsw_port_stash_migrating_events(source_port, events, - &dequeued); - - source_port->pending_releases = dequeued; + source_port->pending_releases += dequeued; dsw_port_load_record(source_port, dequeued); @@ -1517,8 +1584,6 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n", dequeued); - dsw_port_return_credits(dsw, source_port, dequeued); - /* One potential optimization one might think of is to * add a migration state (prior to 'pausing'), and * only record seen events when the port is in this |