opentelemetry_sdk/logs/
batch_log_processor.rs

1//! # OpenTelemetry Batch Log Processor
2//! The `BatchLogProcessor` is one implementation of the `LogProcessor` interface.
3//!
4//! It buffers log records and sends them to the exporter
5//! in batches. This processor is designed for **production use** in high-throughput
6//! applications and reduces the overhead of frequent exports by using a background
7//! thread for batch processing.
8//!
9//! ## Diagram
10//!
11//! ```ascii
12//!   +-----+---------------+   +-----------------------+   +-------------------+
13//!   |     |               |   |                       |   |                   |
14//!   | SDK | Logger.emit() +---> (Batch)LogProcessor   +--->  (OTLPExporter)   |
15//!   +-----+---------------+   +-----------------------+   +-------------------+
16//! ```
17
18use crate::error::{OTelSdkError, OTelSdkResult};
19use crate::logs::log_processor::LogProcessor;
20use crate::{
21    logs::{LogBatch, LogExporter, SdkLogRecord},
22    Resource,
23};
24use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29use std::{cmp::min, env, sync::Mutex};
30use std::{
31    fmt::{self, Debug, Formatter},
32    str::FromStr,
33    sync::Arc,
34    thread,
35    time::Duration,
36    time::Instant,
37};
38
39/// Delay interval between two consecutive exports.
40pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
41/// Default delay interval between two consecutive exports.
42pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000);
43/// Maximum allowed time to export data.
44#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
45pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
46/// Default maximum allowed time to export data.
47#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
48pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
49/// Maximum queue size.
50pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
51/// Default maximum queue size.
52pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
53/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE.
54pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
55/// Default maximum batch size.
56pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57
58/// Messages sent between application thread and batch log processor's work thread.
59#[allow(clippy::large_enum_variant)]
60#[derive(Debug)]
61enum BatchMessage {
62    /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
63    ExportLog(Arc<AtomicBool>),
64    /// ForceFlush flushes the current buffer to the exporter.
65    ForceFlush(mpsc::SyncSender<OTelSdkResult>),
66    /// Shut down the worker thread, push all logs in buffer to the exporter.
67    Shutdown(mpsc::SyncSender<OTelSdkResult>),
68    /// Set the resource for the exporter.
69    SetResource(Arc<Resource>),
70}
71
72type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
73
74/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
75/// in batches to the configured `LogExporter`. This processor is ideal for
76/// high-throughput environments, as it minimizes the overhead of exporting logs
77/// individually. It uses a **dedicated background thread** to manage and export logs
78/// asynchronously, ensuring that the application's main execution flow is not blocked.
79///
80/// This processor supports the following configurations:
81/// - **Queue size**: Maximum number of log records that can be buffered.
82/// - **Batch size**: Maximum number of log records to include in a single export.
83/// - **Scheduled delay**: Frequency at which the batch is exported.
84///
85/// When using this processor with the OTLP Exporter, the following exporter
86/// features are supported:
87/// - `grpc-tonic`: Requires `LoggerProvider` to be created within a tokio runtime.
88/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
89///
90/// In other words, other clients like `reqwest` and `hyper` are not supported.
91///
92/// `BatchLogProcessor` buffers logs in memory and exports them in batches. An
93/// export is triggered when `max_export_batch_size` is reached or every
94/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
95/// the `force_flush` method. Shutdown also triggers an export of all buffered
96/// logs and is recommended to be called before the application exits to ensure
97/// all buffered logs are exported.
98///
99/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
100/// is a blocking call ,should not be called from your main thread. This can
101/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
102/// tokio's `spawn_blocking`.
103///
104///
105/// ### Using a BatchLogProcessor:
106///
107/// ```rust
108/// use opentelemetry_sdk::logs::{BatchLogProcessor, BatchConfigBuilder, SdkLoggerProvider};
109/// use opentelemetry::global;
110/// use std::time::Duration;
111/// use opentelemetry_sdk::logs::InMemoryLogExporter;
112///
113/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
114/// let processor = BatchLogProcessor::builder(exporter)
115///     .with_batch_config(
116///         BatchConfigBuilder::default()
117///             .with_max_queue_size(2048)
118///             .with_max_export_batch_size(512)
119///             .with_scheduled_delay(Duration::from_secs(5))
120///             .build(),
121///     )
122///     .build();
123///
124/// let provider = SdkLoggerProvider::builder()
125///     .with_log_processor(processor)
126///     .build();
127///
128pub struct BatchLogProcessor {
129    logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
130    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
131    handle: Mutex<Option<thread::JoinHandle<()>>>,
132    forceflush_timeout: Duration,
133    export_log_message_sent: Arc<AtomicBool>,
134    current_batch_size: Arc<AtomicUsize>,
135    max_export_batch_size: usize,
136
137    // Track dropped logs - we'll log this at shutdown
138    dropped_logs_count: AtomicUsize,
139
140    // Track the maximum queue size that was configured for this processor
141    max_queue_size: usize,
142}
143
144impl Debug for BatchLogProcessor {
145    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146        f.debug_struct("BatchLogProcessor")
147            .field("message_sender", &self.message_sender)
148            .finish()
149    }
150}
151
152impl LogProcessor for BatchLogProcessor {
153    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
154        let result = self
155            .logs_sender
156            .try_send(Box::new((record.clone(), instrumentation.clone())));
157
158        // match for result and handle each separately
159        match result {
160            Ok(_) => {
161                // Successfully sent the log record to the data channel.
162                // Increment the current batch size and check if it has reached
163                // the max export batch size.
164                if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
165                    >= self.max_export_batch_size
166                {
167                    // Check if the a control message for exporting logs is
168                    // already sent to the worker thread. If not, send a control
169                    // message to export logs. `export_log_message_sent` is set
170                    // to false ONLY when the worker thread has processed the
171                    // control message.
172
173                    if !self.export_log_message_sent.load(Ordering::Relaxed) {
174                        // This is a cost-efficient check as atomic load
175                        // operations do not require exclusive access to cache
176                        // line. Perform atomic swap to
177                        // `export_log_message_sent` ONLY when the atomic load
178                        // operation above returns false. Atomic
179                        // swap/compare_exchange operations require exclusive
180                        // access to cache line on most processor architectures.
181                        // We could have used compare_exchange as well here, but
182                        // it's more verbose than swap.
183                        if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
184                            match self.message_sender.try_send(BatchMessage::ExportLog(
185                                self.export_log_message_sent.clone(),
186                            )) {
187                                Ok(_) => {
188                                    // Control message sent successfully.
189                                }
190                                Err(_err) => {
191                                    // TODO: Log error If the control message
192                                    // could not be sent, reset the
193                                    // `export_log_message_sent` flag.
194                                    self.export_log_message_sent.store(false, Ordering::Relaxed);
195                                }
196                            }
197                        }
198                    }
199                }
200            }
201            Err(mpsc::TrySendError::Full(_)) => {
202                // Increment dropped logs count. The first time we have to drop
203                // a log, emit a warning.
204                if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
205                    otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
206                        message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
207                }
208            }
209            Err(mpsc::TrySendError::Disconnected(_)) => {
210                // Given background thread is the only receiver, and it's
211                // disconnected, it indicates the thread is shutdown
212                otel_warn!(
213                    name: "BatchLogProcessor.Emit.AfterShutdown",
214                    message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
215                );
216            }
217        }
218    }
219
220    fn force_flush(&self) -> OTelSdkResult {
221        let (sender, receiver) = mpsc::sync_channel(1);
222        match self
223            .message_sender
224            .try_send(BatchMessage::ForceFlush(sender))
225        {
226            Ok(_) => receiver
227                .recv_timeout(self.forceflush_timeout)
228                .map_err(|err| {
229                    if err == RecvTimeoutError::Timeout {
230                        OTelSdkError::Timeout(self.forceflush_timeout)
231                    } else {
232                        OTelSdkError::InternalFailure(format!("{}", err))
233                    }
234                })?,
235            Err(mpsc::TrySendError::Full(_)) => {
236                // If the control message could not be sent, emit a warning.
237                otel_debug!(
238                    name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
239                    message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
240                );
241                Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
242            }
243            Err(mpsc::TrySendError::Disconnected(_)) => {
244                // Given background thread is the only receiver, and it's
245                // disconnected, it indicates the thread is shutdown
246                otel_debug!(
247                    name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
248                    message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
249                );
250
251                Err(OTelSdkError::AlreadyShutdown)
252            }
253        }
254    }
255
256    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
257        let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
258        let max_queue_size = self.max_queue_size;
259        if dropped_logs > 0 {
260            otel_warn!(
261                name: "BatchLogProcessor.LogsDropped",
262                dropped_logs_count = dropped_logs,
263                max_queue_size = max_queue_size,
264                message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
265            );
266        }
267
268        let (sender, receiver) = mpsc::sync_channel(1);
269        match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
270            Ok(_) => {
271                receiver
272                    .recv_timeout(timeout)
273                    .map(|_| {
274                        // join the background thread after receiving back the
275                        // shutdown signal
276                        if let Some(handle) = self.handle.lock().unwrap().take() {
277                            handle.join().unwrap();
278                        }
279                        OTelSdkResult::Ok(())
280                    })
281                    .map_err(|err| match err {
282                        RecvTimeoutError::Timeout => {
283                            otel_error!(
284                                name: "BatchLogProcessor.Shutdown.Timeout",
285                                message = "BatchLogProcessor shutdown timing out."
286                            );
287                            OTelSdkError::Timeout(timeout)
288                        }
289                        _ => {
290                            otel_error!(
291                                name: "BatchLogProcessor.Shutdown.Error",
292                                error = format!("{}", err)
293                            );
294                            OTelSdkError::InternalFailure(format!("{}", err))
295                        }
296                    })?
297            }
298            Err(mpsc::TrySendError::Full(_)) => {
299                // If the control message could not be sent, emit a warning.
300                otel_debug!(
301                    name: "BatchLogProcessor.Shutdown.ControlChannelFull",
302                    message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
303                );
304                Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
305            }
306            Err(mpsc::TrySendError::Disconnected(_)) => {
307                // Given background thread is the only receiver, and it's
308                // disconnected, it indicates the thread is shutdown
309                otel_debug!(
310                    name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
311                    message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
312                );
313
314                Err(OTelSdkError::AlreadyShutdown)
315            }
316        }
317    }
318
319    fn set_resource(&mut self, resource: &Resource) {
320        let resource = Arc::new(resource.clone());
321        let _ = self
322            .message_sender
323            .try_send(BatchMessage::SetResource(resource));
324    }
325}
326
327impl BatchLogProcessor {
328    pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
329    where
330        E: LogExporter + Send + Sync + 'static,
331    {
332        let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
333        let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
334        let max_queue_size = config.max_queue_size;
335        let max_export_batch_size = config.max_export_batch_size;
336        let current_batch_size = Arc::new(AtomicUsize::new(0));
337        let current_batch_size_for_thread = current_batch_size.clone();
338
339        let handle = thread::Builder::new()
340            .name("OpenTelemetry.Logs.BatchProcessor".to_string())
341            .spawn(move || {
342                let _suppress_guard = Context::enter_telemetry_suppressed_scope();
343                otel_debug!(
344                    name: "BatchLogProcessor.ThreadStarted",
345                    interval_in_millisecs = config.scheduled_delay.as_millis(),
346                    max_export_batch_size = config.max_export_batch_size,
347                    max_queue_size = max_queue_size,
348                );
349                let mut last_export_time = Instant::now();
350                let mut logs = Vec::with_capacity(config.max_export_batch_size);
351                let current_batch_size = current_batch_size_for_thread;
352
353                // This method gets up to `max_export_batch_size` amount of logs from the channel and exports them.
354                // It returns the result of the export operation.
355                // It expects the logs vec to be empty when it's called.
356                #[inline]
357                fn get_logs_and_export<E>(
358                    logs_receiver: &mpsc::Receiver<LogsData>,
359                    exporter: &E,
360                    logs: &mut Vec<LogsData>,
361                    last_export_time: &mut Instant,
362                    current_batch_size: &AtomicUsize,
363                    config: &BatchConfig,
364                ) -> OTelSdkResult
365                where
366                    E: LogExporter + Send + Sync + 'static,
367                {
368                    let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
369                    let mut result = OTelSdkResult::Ok(());
370                    let mut total_exported_logs: usize = 0;
371
372                    while target > 0 && total_exported_logs < target {
373                        // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
374                        while let Ok(log) = logs_receiver.try_recv() {
375                            logs.push(log);
376                            if logs.len() == config.max_export_batch_size {
377                                break;
378                            }
379                        }
380
381                        let count_of_logs = logs.len(); // Count of logs that will be exported
382                        total_exported_logs += count_of_logs;
383
384                        result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
385
386                        current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
387                    }
388                    result
389                }
390
391                loop {
392                    let remaining_time = config
393                        .scheduled_delay
394                        .checked_sub(last_export_time.elapsed())
395                        .unwrap_or(config.scheduled_delay);
396
397                    match message_receiver.recv_timeout(remaining_time) {
398                        Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
399                            // Reset the export log message sent flag now it has has been processed.
400                            export_log_message_sent.store(false, Ordering::Relaxed);
401
402                            otel_debug!(
403                                name: "BatchLogProcessor.ExportingDueToBatchSize",
404                            );
405
406                            let _ = get_logs_and_export(
407                                &logs_receiver,
408                                &exporter,
409                                &mut logs,
410                                &mut last_export_time,
411                                &current_batch_size,
412                                &config,
413                            );
414                        }
415                        Ok(BatchMessage::ForceFlush(sender)) => {
416                            otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
417                            let result = get_logs_and_export(
418                                &logs_receiver,
419                                &exporter,
420                                &mut logs,
421                                &mut last_export_time,
422                                &current_batch_size,
423                                &config,
424                            );
425                            let _ = sender.send(result);
426                        }
427                        Ok(BatchMessage::Shutdown(sender)) => {
428                            otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
429                            let result = get_logs_and_export(
430                                &logs_receiver,
431                                &exporter,
432                                &mut logs,
433                                &mut last_export_time,
434                                &current_batch_size,
435                                &config,
436                            );
437                            let _ = exporter.shutdown();
438                            let _ = sender.send(result);
439
440                            otel_debug!(
441                                name: "BatchLogProcessor.ThreadExiting",
442                                reason = "ShutdownRequested"
443                            );
444                            //
445                            // break out the loop and return from the current background thread.
446                            //
447                            break;
448                        }
449                        Ok(BatchMessage::SetResource(resource)) => {
450                            exporter.set_resource(&resource);
451                        }
452                        Err(RecvTimeoutError::Timeout) => {
453                            otel_debug!(
454                                name: "BatchLogProcessor.ExportingDueToTimer",
455                            );
456
457                            let _ = get_logs_and_export(
458                                &logs_receiver,
459                                &exporter,
460                                &mut logs,
461                                &mut last_export_time,
462                                &current_batch_size,
463                                &config,
464                            );
465                        }
466                        Err(RecvTimeoutError::Disconnected) => {
467                            // Channel disconnected, only thing to do is break
468                            // out (i.e exit the thread)
469                            otel_debug!(
470                                name: "BatchLogProcessor.ThreadExiting",
471                                reason = "MessageSenderDisconnected"
472                            );
473                            break;
474                        }
475                    }
476                }
477                otel_debug!(
478                    name: "BatchLogProcessor.ThreadStopped"
479                );
480            })
481            .expect("Thread spawn failed."); //TODO: Handle thread spawn failure
482
483        // Return batch processor with link to worker
484        BatchLogProcessor {
485            logs_sender,
486            message_sender,
487            handle: Mutex::new(Some(handle)),
488            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
489            dropped_logs_count: AtomicUsize::new(0),
490            max_queue_size,
491            export_log_message_sent: Arc::new(AtomicBool::new(false)),
492            current_batch_size,
493            max_export_batch_size,
494        }
495    }
496
497    /// Create a new batch processor builder
498    pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
499    where
500        E: LogExporter,
501    {
502        BatchLogProcessorBuilder {
503            exporter,
504            config: Default::default(),
505        }
506    }
507}
508
509#[allow(clippy::vec_box)]
510fn export_batch_sync<E>(
511    exporter: &E,
512    batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
513    last_export_time: &mut Instant,
514) -> OTelSdkResult
515where
516    E: LogExporter + ?Sized,
517{
518    *last_export_time = Instant::now();
519
520    if batch.is_empty() {
521        return OTelSdkResult::Ok(());
522    }
523
524    let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
525    let export_result = futures_executor::block_on(export);
526
527    // Clear the batch vec after exporting
528    batch.clear();
529
530    match export_result {
531        Ok(_) => OTelSdkResult::Ok(()),
532        Err(err) => {
533            otel_error!(
534                name: "BatchLogProcessor.ExportError",
535                error = format!("{}", err)
536            );
537            OTelSdkResult::Err(err)
538        }
539    }
540}
541
542///
543/// A builder for creating [`BatchLogProcessor`] instances.
544///
545#[derive(Debug)]
546pub struct BatchLogProcessorBuilder<E> {
547    exporter: E,
548    config: BatchConfig,
549}
550
551impl<E> BatchLogProcessorBuilder<E>
552where
553    E: LogExporter + 'static,
554{
555    /// Set the BatchConfig for [`BatchLogProcessorBuilder`]
556    pub fn with_batch_config(self, config: BatchConfig) -> Self {
557        BatchLogProcessorBuilder { config, ..self }
558    }
559
560    /// Build a batch processor
561    pub fn build(self) -> BatchLogProcessor {
562        BatchLogProcessor::new(self.exporter, self.config)
563    }
564}
565
566/// Batch log processor configuration.
567/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
568#[derive(Debug)]
569#[allow(dead_code)]
570pub struct BatchConfig {
571    /// The maximum queue size to buffer logs for delayed processing. If the
572    /// queue gets full it drops the logs. The default value of is 2048.
573    pub(crate) max_queue_size: usize,
574
575    /// The delay interval in milliseconds between two consecutive processing
576    /// of batches. The default value is 1 second.
577    pub(crate) scheduled_delay: Duration,
578
579    /// The maximum number of logs to process in a single batch. If there are
580    /// more than one batch worth of logs then it processes multiple batches
581    /// of logs one batch after the other without any delay. The default value
582    /// is 512.
583    pub(crate) max_export_batch_size: usize,
584
585    /// The maximum duration to export a batch of data.
586    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
587    pub(crate) max_export_timeout: Duration,
588}
589
590impl Default for BatchConfig {
591    fn default() -> Self {
592        BatchConfigBuilder::default().build()
593    }
594}
595
596/// A builder for creating [`BatchConfig`] instances.
597#[derive(Debug)]
598pub struct BatchConfigBuilder {
599    max_queue_size: usize,
600    scheduled_delay: Duration,
601    max_export_batch_size: usize,
602    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
603    max_export_timeout: Duration,
604}
605
606impl Default for BatchConfigBuilder {
607    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
608    /// The values are overridden by environment variables if set.
609    /// The supported environment variables are:
610    /// * `OTEL_BLRP_MAX_QUEUE_SIZE`
611    /// * `OTEL_BLRP_SCHEDULE_DELAY`
612    /// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
613    /// * `OTEL_BLRP_EXPORT_TIMEOUT`
614    ///
615    /// Note: Programmatic configuration overrides any value set via the environment variable.
616    fn default() -> Self {
617        BatchConfigBuilder {
618            max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
619            scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
620            max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
621            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
622            max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
623        }
624        .init_from_env_vars()
625    }
626}
627
628impl BatchConfigBuilder {
629    /// Set max_queue_size for [`BatchConfigBuilder`].
630    /// It's the maximum queue size to buffer logs for delayed processing.
631    /// If the queue gets full it will drop the logs.
632    /// The default value is 2048.
633    ///
634    /// Corresponding environment variable: `OTEL_BLRP_MAX_QUEUE_SIZE`.
635    ///
636    /// Note: Programmatically setting this will override any value set via the environment variable.
637    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
638        self.max_queue_size = max_queue_size;
639        self
640    }
641
642    /// Set scheduled_delay for [`BatchConfigBuilder`].
643    /// It's the delay interval in milliseconds between two consecutive processing of batches.
644    /// The default value is 1000 milliseconds.
645    ///
646    /// Corresponding environment variable: `OTEL_BLRP_SCHEDULE_DELAY`.
647    ///
648    /// Note: Programmatically setting this will override any value set via the environment variable.
649    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
650        self.scheduled_delay = scheduled_delay;
651        self
652    }
653
654    /// Set max_export_timeout for [`BatchConfigBuilder`].
655    /// It's the maximum duration to export a batch of data.
656    /// The default value is 30000 milliseconds.
657    ///
658    /// Corresponding environment variable: `OTEL_BLRP_EXPORT_TIMEOUT`.
659    ///
660    /// Note: Programmatically setting this will override any value set via the environment variable.
661    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
662    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
663        self.max_export_timeout = max_export_timeout;
664        self
665    }
666
667    /// Set max_export_batch_size for [`BatchConfigBuilder`].
668    /// It's the maximum number of logs to process in a single batch. If there are
669    /// more than one batch worth of logs then it processes multiple batches
670    /// of logs one batch after the other without any delay.
671    /// The default value is 512.
672    ///
673    /// Corresponding environment variable: `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`.
674    ///
675    /// Note: Programmatically setting this will override any value set via the environment variable.
676    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
677        self.max_export_batch_size = max_export_batch_size;
678        self
679    }
680
681    /// Builds a `BatchConfig` enforcing the following invariants:
682    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
683    pub fn build(self) -> BatchConfig {
684        // max export batch size must be less or equal to max queue size.
685        // we set max export batch size to max queue size if it's larger than max queue size.
686        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
687
688        BatchConfig {
689            max_queue_size: self.max_queue_size,
690            scheduled_delay: self.scheduled_delay,
691            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
692            max_export_timeout: self.max_export_timeout,
693            max_export_batch_size,
694        }
695    }
696
697    fn init_from_env_vars(mut self) -> Self {
698        if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
699            .ok()
700            .and_then(|queue_size| usize::from_str(&queue_size).ok())
701        {
702            self.max_queue_size = max_queue_size;
703        }
704
705        if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
706            .ok()
707            .and_then(|batch_size| usize::from_str(&batch_size).ok())
708        {
709            self.max_export_batch_size = max_export_batch_size;
710        }
711
712        if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
713            .ok()
714            .and_then(|delay| u64::from_str(&delay).ok())
715        {
716            self.scheduled_delay = Duration::from_millis(scheduled_delay);
717        }
718
719        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
720        if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
721            .ok()
722            .and_then(|s| u64::from_str(&s).ok())
723        {
724            self.max_export_timeout = Duration::from_millis(max_export_timeout);
725        }
726
727        self
728    }
729}
730
731#[cfg(all(test, feature = "testing", feature = "logs"))]
732mod tests {
733    use super::{
734        BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
735        OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
736        OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
737        OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
738    };
739    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
740    use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
741    use crate::logs::log_processor::tests::MockLogExporter;
742    use crate::logs::SdkLogRecord;
743    use crate::{
744        logs::{InMemoryLogExporter, InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider},
745        Resource,
746    };
747    use opentelemetry::InstrumentationScope;
748    use opentelemetry::KeyValue;
749    use std::sync::{Arc, Mutex};
750    use std::time::Duration;
751
752    #[test]
753    fn test_default_const_values() {
754        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
755        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000);
756        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
757        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
758        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
759        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000);
760        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
761        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
762        assert_eq!(
763            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
764            "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
765        );
766        assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
767    }
768
769    #[test]
770    fn test_default_batch_config_adheres_to_specification() {
771        // The following environment variables are expected to be unset so that their default values are used.
772        let env_vars = vec![
773            OTEL_BLRP_SCHEDULE_DELAY,
774            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
775            OTEL_BLRP_EXPORT_TIMEOUT,
776            OTEL_BLRP_MAX_QUEUE_SIZE,
777            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
778        ];
779
780        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
781
782        assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
783        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
784        assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
785        assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
786        assert_eq!(
787            config.max_export_batch_size,
788            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
789        );
790    }
791
792    #[test]
793    fn test_code_based_config_overrides_env_vars() {
794        let env_vars = vec![
795            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
796            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
797            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
798        ];
799
800        temp_env::with_vars(env_vars, || {
801            let config = BatchConfigBuilder::default()
802                .with_max_queue_size(2048)
803                .with_scheduled_delay(Duration::from_millis(1000))
804                .with_max_export_batch_size(512)
805                .build();
806
807            assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
808            assert_eq!(config.max_queue_size, 2048);
809            assert_eq!(config.max_export_batch_size, 512);
810        });
811    }
812
813    #[test]
814    fn test_batch_config_configurable_by_env_vars() {
815        let env_vars = vec![
816            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
817            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
818            (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
819            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
820            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
821        ];
822
823        let config = temp_env::with_vars(env_vars, BatchConfig::default);
824
825        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
826        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
827        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
828        assert_eq!(config.max_queue_size, 4096);
829        assert_eq!(config.max_export_batch_size, 1024);
830    }
831
832    #[test]
833    fn test_batch_config_max_export_batch_size_validation() {
834        let env_vars = vec![
835            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
836            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
837        ];
838
839        let config = temp_env::with_vars(env_vars, BatchConfig::default);
840
841        assert_eq!(config.max_queue_size, 256);
842        assert_eq!(config.max_export_batch_size, 256);
843        assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
844        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
845        assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
846    }
847
848    #[test]
849    fn test_batch_config_with_fields() {
850        let batch_builder = BatchConfigBuilder::default()
851            .with_max_export_batch_size(1)
852            .with_scheduled_delay(Duration::from_millis(2))
853            .with_max_queue_size(4);
854
855        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
856        let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
857        let batch = batch_builder.build();
858
859        assert_eq!(batch.max_export_batch_size, 1);
860        assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
861        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
862        assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
863        assert_eq!(batch.max_queue_size, 4);
864    }
865
866    #[test]
867    fn test_build_batch_log_processor_builder() {
868        let mut env_vars = vec![
869            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
870            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
871            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
872            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
873        ];
874        temp_env::with_vars(env_vars.clone(), || {
875            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
876
877            assert_eq!(builder.config.max_export_batch_size, 500);
878            assert_eq!(
879                builder.config.scheduled_delay,
880                OTEL_BLRP_SCHEDULE_DELAY_DEFAULT
881            );
882            assert_eq!(
883                builder.config.max_queue_size,
884                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
885            );
886
887            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
888            assert_eq!(
889                builder.config.max_export_timeout,
890                Duration::from_millis(2046)
891            );
892        });
893
894        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
895
896        temp_env::with_vars(env_vars, || {
897            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
898            assert_eq!(builder.config.max_export_batch_size, 120);
899            assert_eq!(builder.config.max_queue_size, 120);
900        });
901    }
902
903    #[test]
904    fn test_build_batch_log_processor_builder_with_custom_config() {
905        let expected = BatchConfigBuilder::default()
906            .with_max_export_batch_size(1)
907            .with_scheduled_delay(Duration::from_millis(2))
908            .with_max_queue_size(4)
909            .build();
910
911        let builder =
912            BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
913
914        let actual = &builder.config;
915        assert_eq!(actual.max_export_batch_size, 1);
916        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
917        assert_eq!(actual.max_queue_size, 4);
918    }
919
920    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
921    async fn test_set_resource_batch_processor() {
922        let exporter = MockLogExporter {
923            resource: Arc::new(Mutex::new(None)),
924        };
925        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
926        let provider = SdkLoggerProvider::builder()
927            .with_log_processor(processor)
928            .with_resource(
929                Resource::builder_empty()
930                    .with_attributes([
931                        KeyValue::new("k1", "v1"),
932                        KeyValue::new("k2", "v3"),
933                        KeyValue::new("k3", "v3"),
934                        KeyValue::new("k4", "v4"),
935                        KeyValue::new("k5", "v5"),
936                    ])
937                    .build(),
938            )
939            .build();
940
941        provider.force_flush().unwrap();
942
943        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
944        let _ = provider.shutdown();
945    }
946
947    #[tokio::test(flavor = "multi_thread")]
948    async fn test_batch_shutdown() {
949        // assert we will receive an error
950        // setup
951        let exporter = InMemoryLogExporterBuilder::default()
952            .keep_records_on_shutdown()
953            .build();
954        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
955
956        let mut record = SdkLogRecord::new();
957        let instrumentation = InstrumentationScope::default();
958
959        processor.emit(&mut record, &instrumentation);
960        processor.force_flush().unwrap();
961        processor.shutdown().unwrap();
962        // todo: expect to see errors here. How should we assert this?
963        processor.emit(&mut record, &instrumentation);
964        assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
965        assert!(exporter.is_shutdown_called());
966    }
967
968    #[tokio::test(flavor = "current_thread")]
969    async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
970        let exporter = InMemoryLogExporterBuilder::default().build();
971        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
972
973        processor.shutdown().unwrap();
974    }
975
976    #[tokio::test(flavor = "current_thread")]
977    async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
978        let exporter = InMemoryLogExporterBuilder::default().build();
979        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
980        processor.shutdown().unwrap();
981    }
982
983    #[tokio::test(flavor = "multi_thread")]
984    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
985        let exporter = InMemoryLogExporterBuilder::default().build();
986        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
987        processor.shutdown().unwrap();
988    }
989
990    #[tokio::test(flavor = "multi_thread")]
991    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
992        let exporter = InMemoryLogExporterBuilder::default().build();
993        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
994        processor.shutdown().unwrap();
995    }
996}