From patchwork Thu Dec 15 18:55:25 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Mahipal Challa X-Patchwork-Id: 120955 X-Patchwork-Delegate: gakhil@marvell.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 68351A0543; Thu, 15 Dec 2022 19:55:35 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 493EB40695; Thu, 15 Dec 2022 19:55:35 +0100 (CET) Received: from mx0b-0016f401.pphosted.com (mx0a-0016f401.pphosted.com [67.231.148.174]) by mails.dpdk.org (Postfix) with ESMTP id DEC9140685 for ; Thu, 15 Dec 2022 19:55:33 +0100 (CET) Received: from pps.filterd (m0045849.ppops.net [127.0.0.1]) by mx0a-0016f401.pphosted.com (8.17.1.19/8.17.1.19) with ESMTP id 2BFHluui004927 for ; Thu, 15 Dec 2022 10:55:33 -0800 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=marvell.com; h=from : to : cc : subject : date : message-id : mime-version : content-transfer-encoding : content-type; s=pfpt0220; bh=in2d7aK5Ehv5qPPz/eksCBlNoH9Zdv7bLe9oq/nbiq8=; b=HFgS4FMjVNTIN4ciWYwf5inzZ6r+Gzhbmvi/Gt6rGT/zXMZDinPkJ3FOhPyTjxKVNuMM VlOgcBiLDyOhyAmaUReohvD6bE9gucop/XKW1xPrIQUddt+3xAmwAc8vNY/ZzMdnaWGg Hk9bzmJS/n4AcesHwG/Lo2//r/9w/4T5ZCVme9huqClIbA5AsyKYyYAeYxgUD9pjotf9 KPKziwXyeDBtvEhATw0MdYdBjtmmnDs9HMM4wWLUJoKLUo6A2sf5LDdLhgdGrrfVdECo OCT022AUCNg7+iEsJBVjOq9hDKME6C6+zN6To04IYXjFFDv8CbGJzO+4/IXR6UDWgadn dQ== Received: from dc5-exch01.marvell.com ([199.233.59.181]) by mx0a-0016f401.pphosted.com (PPS) with ESMTPS id 3mfws5cwkb-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-SHA384 bits=256 verify=NOT) for ; Thu, 15 Dec 2022 10:55:32 -0800 Received: from DC5-EXCH02.marvell.com (10.69.176.39) by DC5-EXCH01.marvell.com (10.69.176.38) with Microsoft SMTP Server (TLS) id 15.0.1497.42; Thu, 15 Dec 2022 10:55:31 -0800 Received: from maili.marvell.com (10.69.176.80) by DC5-EXCH02.marvell.com (10.69.176.39) with Microsoft SMTP Server id 15.0.1497.42 via Frontend Transport; Thu, 15 Dec 2022 10:55:31 -0800 Received: from mahipal-244.marvell.com (unknown [10.29.20.28]) by maili.marvell.com (Postfix) with ESMTP id 58CA25B6933; Thu, 15 Dec 2022 10:55:29 -0800 (PST) From: Mahipal Challa To: CC: , , , Subject: [dpdk-dev] [PATCH v1 1/1] compress/octeontx: async burst mode feature support Date: Fri, 16 Dec 2022 00:25:25 +0530 Message-ID: <20221215185525.2301440-1-mchalla@marvell.com> X-Mailer: git-send-email 2.25.1 MIME-Version: 1.0 X-Proofpoint-GUID: 8I6aQz3adApKnFKj6mMyLDc-f0MThQpN X-Proofpoint-ORIG-GUID: 8I6aQz3adApKnFKj6mMyLDc-f0MThQpN X-Proofpoint-Virus-Version: vendor=baseguard engine=ICAP:2.0.205,Aquarius:18.0.923,Hydra:6.0.545,FMLib:17.11.122.1 definitions=2022-12-15_11,2022-12-15_02,2022-06-22_01 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Async burst mode feature support is added, improves the single thread compression/decompression throughput. Signed-off-by: Mahipal Challa --- drivers/compress/octeontx/include/zip_regs.h | 1 + drivers/compress/octeontx/otx_zip.h | 37 +-- drivers/compress/octeontx/otx_zip_pmd.c | 258 ++++++++++--------- 3 files changed, 160 insertions(+), 136 deletions(-) diff --git a/drivers/compress/octeontx/include/zip_regs.h b/drivers/compress/octeontx/include/zip_regs.h index a7fcccc055..a07483b045 100644 --- a/drivers/compress/octeontx/include/zip_regs.h +++ b/drivers/compress/octeontx/include/zip_regs.h @@ -687,6 +687,7 @@ union zip_zptr_s { #define ZIP_COMP_E_TIMEOUT (0xc) #define ZIP_COMP_E_INSTR_ERR (0xd) #define ZIP_COMP_E_HCTX_ERR (0xe) +#define ZIP_COMP_E_PTR_ERR (0xf) #define ZIP_COMP_E_STOP (3) /** diff --git a/drivers/compress/octeontx/otx_zip.h b/drivers/compress/octeontx/otx_zip.h index cdef8cc6cb..ee14deb03d 100644 --- a/drivers/compress/octeontx/otx_zip.h +++ b/drivers/compress/octeontx/otx_zip.h @@ -55,6 +55,10 @@ extern int octtx_zip_logtype_driver; ZIP_MAX_NCBP_SIZE)/* ~8072ull */ #define ZIP_BUF_SIZE 256 +#define ZIP_BURST_SIZE 64 + +#define ZIP_MAXSEG_SIZE 59460 +#define ZIP_EXTRABUF_SIZE 4096 #define ZIP_SGPTR_ALIGN 16 #define ZIP_CMDQ_ALIGN 128 @@ -67,7 +71,7 @@ extern int octtx_zip_logtype_driver; ((_align) * (((x) + (_align) - 1) / (_align))) /**< ZIP PMD device name */ -#define COMPRESSDEV_NAME_ZIP_PMD compress_octeonx +#define COMPRESSDEV_NAME_ZIP_PMD compress_octeontx #define ZIP_PMD_LOG(level, fmt, args...) \ rte_log(RTE_LOG_ ## level, \ @@ -95,18 +99,18 @@ struct zip_stream; struct zipvf_qp; /* Algorithm handler function prototype */ -typedef int (*comp_func_t)(struct rte_comp_op *op, - struct zipvf_qp *qp, struct zip_stream *zstrm); +typedef int (*comp_func_t)(struct rte_comp_op *op, struct zipvf_qp *qp, + struct zip_stream *zstrm, int num); /** * ZIP private stream structure */ struct zip_stream { - union zip_inst_s *inst; + union zip_inst_s *inst[ZIP_BURST_SIZE]; /* zip instruction pointer */ comp_func_t func; /* function to process comp operation */ - void *bufs[MAX_BUFS_PER_STREAM]; + void *bufs[MAX_BUFS_PER_STREAM * ZIP_BURST_SIZE]; } __rte_cache_aligned; @@ -162,11 +166,10 @@ struct zip_vf { static inline void -zipvf_prepare_in_buf(struct zip_stream *zstrm, struct rte_comp_op *op) +zipvf_prepare_in_buf(union zip_inst_s *inst, struct rte_comp_op *op) { uint32_t offset, inlen; struct rte_mbuf *m_src; - union zip_inst_s *inst = zstrm->inst; inlen = op->src.length; offset = op->src.offset; @@ -180,11 +183,10 @@ zipvf_prepare_in_buf(struct zip_stream *zstrm, struct rte_comp_op *op) } static inline void -zipvf_prepare_out_buf(struct zip_stream *zstrm, struct rte_comp_op *op) +zipvf_prepare_out_buf(union zip_inst_s *inst, struct rte_comp_op *op) { uint32_t offset; struct rte_mbuf *m_dst; - union zip_inst_s *inst = zstrm->inst; offset = op->dst.offset; m_dst = op->m_dst; @@ -195,14 +197,15 @@ zipvf_prepare_out_buf(struct zip_stream *zstrm, struct rte_comp_op *op) rte_pktmbuf_iova_offset(m_dst, offset); inst->s.totaloutputlength = rte_pktmbuf_pkt_len(m_dst) - op->dst.offset; + if (inst->s.totaloutputlength == ZIP_MAXSEG_SIZE) + inst->s.totaloutputlength += ZIP_EXTRABUF_SIZE; /* DSTOP */ + inst->s.out_ptr_ctl.s.length = inst->s.totaloutputlength; } static inline void -zipvf_prepare_cmd_stateless(struct rte_comp_op *op, struct zip_stream *zstrm) +zipvf_prepare_cmd_stateless(struct rte_comp_op *op, union zip_inst_s *inst) { - union zip_inst_s *inst = zstrm->inst; - /* set flush flag to always 1*/ inst->s.ef = 1; @@ -215,8 +218,8 @@ zipvf_prepare_cmd_stateless(struct rte_comp_op *op, struct zip_stream *zstrm) inst->s.adlercrc32 = op->input_chksum; /* Prepare gather buffers */ - zipvf_prepare_in_buf(zstrm, op); - zipvf_prepare_out_buf(zstrm, op); + zipvf_prepare_in_buf(inst, op); + zipvf_prepare_out_buf(inst, op); } #ifdef ZIP_DBG @@ -224,6 +227,7 @@ static inline void zip_dump_instruction(void *inst) { union zip_inst_s *cmd83 = (union zip_inst_s *)inst; + printf("####### START ########\n"); printf("doneint:%d totaloutputlength:%d\n", cmd83->s.doneint, cmd83->s.totaloutputlength); @@ -265,9 +269,8 @@ void zipvf_push_command(struct zipvf_qp *qp, union zip_inst_s *zcmd); int -zip_process_op(struct rte_comp_op *op, - struct zipvf_qp *qp, - struct zip_stream *zstrm); +zip_process_op(struct rte_comp_op *op, struct zipvf_qp *qp, + struct zip_stream *zstrm, int num); uint64_t zip_reg_read64(uint8_t *hw_addr, uint64_t offset); diff --git a/drivers/compress/octeontx/otx_zip_pmd.c b/drivers/compress/octeontx/otx_zip_pmd.c index dff188e223..2d856b19bb 100644 --- a/drivers/compress/octeontx/otx_zip_pmd.c +++ b/drivers/compress/octeontx/otx_zip_pmd.c @@ -32,10 +32,8 @@ static const struct rte_compressdev_capabilities * Reset session to default state for next set of stateless operation */ static inline void -reset_stream(struct zip_stream *z_stream) +reset_stream(union zip_inst_s *inst) { - union zip_inst_s *inst = (union zip_inst_s *)(z_stream->inst); - inst->s.bf = 1; inst->s.ef = 0; } @@ -43,12 +41,11 @@ reset_stream(struct zip_stream *z_stream) int zip_process_op(struct rte_comp_op *op, struct zipvf_qp *qp, - struct zip_stream *zstrm) + struct zip_stream *zstrm, int num) { - union zip_inst_s *inst = zstrm->inst; + union zip_inst_s *inst = zstrm->inst[num]; volatile union zip_zres_s *zresult = NULL; - if ((op->m_src->nb_segs > 1) || (op->m_dst->nb_segs > 1) || (op->src.offset > rte_pktmbuf_pkt_len(op->m_src)) || (op->dst.offset > rte_pktmbuf_pkt_len(op->m_dst))) { @@ -57,9 +54,9 @@ zip_process_op(struct rte_comp_op *op, return 0; } - zipvf_prepare_cmd_stateless(op, zstrm); + zipvf_prepare_cmd_stateless(op, inst); - zresult = (union zip_zres_s *)zstrm->bufs[RES_BUF]; + zresult = (union zip_zres_s *)zstrm->bufs[RES_BUF + num]; zresult->s.compcode = 0; #ifdef ZIP_DBG @@ -69,43 +66,6 @@ zip_process_op(struct rte_comp_op *op, /* Submit zip command */ zipvf_push_command(qp, (void *)inst); - /* Check and Process results in sync mode */ - do { - } while (!zresult->s.compcode); - - if (zresult->s.compcode == ZIP_COMP_E_SUCCESS) { - op->status = RTE_COMP_OP_STATUS_SUCCESS; - } else { - /* FATAL error cannot do anything */ - ZIP_PMD_ERR("operation failed with error code:%d\n", - zresult->s.compcode); - if (zresult->s.compcode == ZIP_COMP_E_DSTOP) - op->status = RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED; - else - op->status = RTE_COMP_OP_STATUS_ERROR; - } - -#ifdef ZIP_DBG - ZIP_PMD_INFO("written %d\n", zresult->s.totalbyteswritten); -#endif - - /* Update op stats */ - switch (op->status) { - case RTE_COMP_OP_STATUS_SUCCESS: - op->consumed = zresult->s.totalbytesread; - /* Fall-through */ - case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED: - op->produced = zresult->s.totalbyteswritten; - break; - default: - ZIP_PMD_ERR("stats not updated for status:%d\n", - op->status); - break; - } - /* zstream is reset irrespective of result */ - reset_stream(zstrm); - - zresult->s.compcode = ZIP_COMP_E_NOTDONE; return 0; } @@ -115,86 +75,88 @@ zip_set_stream_parameters(struct rte_compressdev *dev, const struct rte_comp_xform *xform, struct zip_stream *z_stream) { - int ret; - union zip_inst_s *inst; struct zip_vf *vf = (struct zip_vf *)dev->data->dev_private; + union zip_inst_s *inst; void *res; + int ret, i; /* Allocate resources required by a stream */ ret = rte_mempool_get_bulk(vf->zip_mp, - z_stream->bufs, MAX_BUFS_PER_STREAM); + z_stream->bufs, (MAX_BUFS_PER_STREAM * ZIP_BURST_SIZE)); if (ret < 0) return -1; - /* get one command buffer from pool and set up */ - inst = (union zip_inst_s *)z_stream->bufs[CMD_BUF]; - res = z_stream->bufs[RES_BUF]; - - memset(inst->u, 0, sizeof(inst->u)); - - /* set bf for only first ops of stream */ - inst->s.bf = 1; - - if (xform->type == RTE_COMP_COMPRESS) { - inst->s.op = ZIP_OP_E_COMP; + for (i = 0; i < ZIP_BURST_SIZE; i++) { + /* get one command buffer from pool and set up */ + inst = (union zip_inst_s *)z_stream->bufs[(CMD_BUF * ZIP_BURST_SIZE) + i]; + res = z_stream->bufs[(RES_BUF * ZIP_BURST_SIZE) + i]; + memset(inst->u, 0, sizeof(inst->u)); + + /* set bf for only first ops of stream */ + inst->s.bf = 1; + + if (xform->type == RTE_COMP_COMPRESS) { + inst->s.op = ZIP_OP_E_COMP; + + switch (xform->compress.deflate.huffman) { + case RTE_COMP_HUFFMAN_DEFAULT: + inst->s.cc = ZIP_CC_DEFAULT; + break; + case RTE_COMP_HUFFMAN_FIXED: + inst->s.cc = ZIP_CC_FIXED_HUFF; + break; + case RTE_COMP_HUFFMAN_DYNAMIC: + inst->s.cc = ZIP_CC_DYN_HUFF; + break; + default: + ret = -1; + goto err; + } + + switch (xform->compress.level) { + case RTE_COMP_LEVEL_MIN: + inst->s.ss = ZIP_COMP_E_LEVEL_MIN; + break; + case RTE_COMP_LEVEL_MAX: + inst->s.ss = ZIP_COMP_E_LEVEL_MAX; + break; + case RTE_COMP_LEVEL_NONE: + ZIP_PMD_ERR("Compression level not supported"); + ret = -1; + goto err; + default: + /* for any value between min and max , choose + * PMD default. + */ + inst->s.ss = ZIP_COMP_E_LEVEL_MED; /** PMD default **/ + break; + } + } else if (xform->type == RTE_COMP_DECOMPRESS) { + inst->s.op = ZIP_OP_E_DECOMP; + /* from HRM, + * For DEFLATE decompression, [CC] must be 0x0. + * For decompression, [SS] must be 0x0 + */ + inst->s.cc = 0; + /* Speed bit should not be set for decompression */ + inst->s.ss = 0; + /* decompression context is supported only for STATEFUL + * operations. Currently we support STATELESS ONLY so + * skip setting of ctx pointer + */ - switch (xform->compress.deflate.huffman) { - case RTE_COMP_HUFFMAN_DEFAULT: - inst->s.cc = ZIP_CC_DEFAULT; - break; - case RTE_COMP_HUFFMAN_FIXED: - inst->s.cc = ZIP_CC_FIXED_HUFF; - break; - case RTE_COMP_HUFFMAN_DYNAMIC: - inst->s.cc = ZIP_CC_DYN_HUFF; - break; - default: + } else { + ZIP_PMD_ERR("\nxform type not supported"); ret = -1; goto err; } - switch (xform->compress.level) { - case RTE_COMP_LEVEL_MIN: - inst->s.ss = ZIP_COMP_E_LEVEL_MIN; - break; - case RTE_COMP_LEVEL_MAX: - inst->s.ss = ZIP_COMP_E_LEVEL_MAX; - break; - case RTE_COMP_LEVEL_NONE: - ZIP_PMD_ERR("Compression level not supported"); - ret = -1; - goto err; - default: - /* for any value between min and max , choose - * PMD default. - */ - inst->s.ss = ZIP_COMP_E_LEVEL_MED; /** PMD default **/ - break; - } - } else if (xform->type == RTE_COMP_DECOMPRESS) { - inst->s.op = ZIP_OP_E_DECOMP; - /* from HRM, - * For DEFLATE decompression, [CC] must be 0x0. - * For decompression, [SS] must be 0x0 - */ - inst->s.cc = 0; - /* Speed bit should not be set for decompression */ - inst->s.ss = 0; - /* decompression context is supported only for STATEFUL - * operations. Currently we support STATELESS ONLY so - * skip setting of ctx pointer - */ + inst->s.res_ptr_addr.s.addr = rte_mempool_virt2iova(res); + inst->s.res_ptr_ctl.s.length = 0; - } else { - ZIP_PMD_ERR("\nxform type not supported"); - ret = -1; - goto err; + z_stream->inst[i] = inst; } - inst->s.res_ptr_addr.s.addr = rte_mempool_virt2iova(res); - inst->s.res_ptr_ctl.s.length = 0; - - z_stream->inst = inst; z_stream->func = zip_process_op; return 0; @@ -202,7 +164,7 @@ zip_set_stream_parameters(struct rte_compressdev *dev, err: rte_mempool_put_bulk(vf->zip_mp, (void *)&(z_stream->bufs[0]), - MAX_BUFS_PER_STREAM); + (MAX_BUFS_PER_STREAM * ZIP_BURST_SIZE)); return ret; } @@ -235,7 +197,7 @@ zip_pmd_config(struct rte_compressdev *dev, /** TBD Should we use the per core object cache for stream resources */ zip_buf_mp = rte_mempool_create( res_pool, - nb_streams * MAX_BUFS_PER_STREAM, + (nb_streams * MAX_BUFS_PER_STREAM * ZIP_BURST_SIZE), ZIP_BUF_SIZE, 0, 0, @@ -453,6 +415,7 @@ zip_pmd_stream_create(struct rte_compressdev *dev, return ret; } *stream = strm; + return 0; } @@ -470,7 +433,7 @@ zip_pmd_stream_free(struct rte_compressdev *dev, void *stream) /* Free resources back to pool */ rte_mempool_put_bulk(vf->zip_mp, (void *)&(z_stream->bufs[0]), - MAX_BUFS_PER_STREAM); + (MAX_BUFS_PER_STREAM * ZIP_BURST_SIZE)); /* Zero out the whole structure */ memset(stream, 0, sizeof(struct zip_stream)); @@ -481,12 +444,12 @@ zip_pmd_stream_free(struct rte_compressdev *dev, void *stream) static uint16_t -zip_pmd_enqueue_burst_sync(void *queue_pair, +zip_pmd_enqueue_burst(void *queue_pair, struct rte_comp_op **ops, uint16_t nb_ops) { struct zipvf_qp *qp = queue_pair; - struct rte_comp_op *op; struct zip_stream *zstrm; + struct rte_comp_op *op; int i, ret = 0; uint16_t enqd = 0; @@ -501,7 +464,7 @@ zip_pmd_enqueue_burst_sync(void *queue_pair, if (unlikely(zstrm == NULL)) op->status = RTE_COMP_OP_STATUS_INVALID_ARGS; else - ret = zstrm->func(op, qp, zstrm); + ret = zstrm->func(op, qp, zstrm, i); } /* Whatever is out of op, put it into completion queue with @@ -518,21 +481,78 @@ zip_pmd_enqueue_burst_sync(void *queue_pair, enqd++; } } + +#ifdef ZIP_DBG + ZIP_PMD_INFO("ops_enqd[nb_ops:%d]:%d\n", nb_ops, enqd); +#endif return enqd; } static uint16_t -zip_pmd_dequeue_burst_sync(void *queue_pair, +zip_pmd_dequeue_burst(void *queue_pair, struct rte_comp_op **ops, uint16_t nb_ops) { + volatile union zip_zres_s *zresult = NULL; struct zipvf_qp *qp = queue_pair; unsigned int nb_dequeued = 0; + struct zip_stream *zstrm; + struct rte_comp_op *op; + unsigned int i; nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts, - (void **)ops, nb_ops, NULL); + (void **)ops, nb_ops, NULL); qp->qp_stats.dequeued_count += nb_dequeued; + /* Dequeue all the submitted ops */ + for (i = 0; i < nb_dequeued; i++) { + op = ops[i]; + /* process stateless ops */ + zstrm = (struct zip_stream *)op->private_xform; + zresult = (union zip_zres_s *)zstrm->bufs[RES_BUF + i]; + + /* Check and Process results in sync mode */ + do { + } while (!zresult->s.compcode); + + if (zresult->s.compcode == ZIP_COMP_E_SUCCESS) { + op->status = RTE_COMP_OP_STATUS_SUCCESS; + } else { + /* FATAL error cannot do anything */ + ZIP_PMD_ERR("operation failed with error code:%d\n", + zresult->s.compcode); + if (zresult->s.compcode == ZIP_COMP_E_DSTOP) + op->status = RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED; + else + op->status = RTE_COMP_OP_STATUS_ERROR; + } + + #ifdef ZIP_DBG + ZIP_PMD_INFO("written %d\n", zresult->s.totalbyteswritten); + #endif + + /* Update op stats */ + switch (op->status) { + case RTE_COMP_OP_STATUS_SUCCESS: + op->consumed = zresult->s.totalbytesread; + /* Fall-through */ + case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED: + op->produced = zresult->s.totalbyteswritten; + break; + default: + ZIP_PMD_ERR("stats not updated for status:%d\n", + op->status); + break; + } + + /* zstream is reset irrespective of result */ + reset_stream(zstrm->inst[i]); + zresult->s.compcode = ZIP_COMP_E_NOTDONE; + } + +#ifdef ZIP_DBG + ZIP_PMD_INFO("ops_deqd[nb_ops:%d]: %d\n", nb_ops, nb_dequeued); +#endif return nb_dequeued; } @@ -597,8 +617,8 @@ zip_pci_probe(struct rte_pci_driver *pci_drv __rte_unused, compressdev->dev_ops = &octtx_zip_pmd_ops; /* register rx/tx burst functions for data path */ - compressdev->dequeue_burst = zip_pmd_dequeue_burst_sync; - compressdev->enqueue_burst = zip_pmd_enqueue_burst_sync; + compressdev->dequeue_burst = zip_pmd_dequeue_burst; + compressdev->enqueue_burst = zip_pmd_enqueue_burst; compressdev->feature_flags = RTE_COMPDEV_FF_HW_ACCELERATED; return ret; }