1use crate::error::{OTelSdkError, OTelSdkResult};
19use crate::logs::log_processor::LogProcessor;
20use crate::{
21 logs::{LogBatch, LogExporter, SdkLogRecord},
22 Resource,
23};
24use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
25
26use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};
27
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29use std::{cmp::min, env, sync::Mutex};
30use std::{
31 fmt::{self, Debug, Formatter},
32 str::FromStr,
33 sync::Arc,
34 thread,
35 time::Duration,
36 time::Instant,
37};
38
39pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
41pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000);
43#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
45pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
46#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
48pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
49pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
51pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
53pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
55pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57
58#[allow(clippy::large_enum_variant)]
60#[derive(Debug)]
61enum BatchMessage {
62 ExportLog(Arc<AtomicBool>),
64 ForceFlush(mpsc::SyncSender<OTelSdkResult>),
66 Shutdown(mpsc::SyncSender<OTelSdkResult>),
68 SetResource(Arc<Resource>),
70}
71
72type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
73
74pub struct BatchLogProcessor {
129 logs_sender: SyncSender<LogsData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
132 forceflush_timeout: Duration,
133 export_log_message_sent: Arc<AtomicBool>,
134 current_batch_size: Arc<AtomicUsize>,
135 max_export_batch_size: usize,
136
137 dropped_logs_count: AtomicUsize,
139
140 max_queue_size: usize,
142}
143
144impl Debug for BatchLogProcessor {
145 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
146 f.debug_struct("BatchLogProcessor")
147 .field("message_sender", &self.message_sender)
148 .finish()
149 }
150}
151
152impl LogProcessor for BatchLogProcessor {
153 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
154 let result = self
155 .logs_sender
156 .try_send(Box::new((record.clone(), instrumentation.clone())));
157
158 match result {
160 Ok(_) => {
161 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
165 >= self.max_export_batch_size
166 {
167 if !self.export_log_message_sent.load(Ordering::Relaxed) {
174 if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
184 match self.message_sender.try_send(BatchMessage::ExportLog(
185 self.export_log_message_sent.clone(),
186 )) {
187 Ok(_) => {
188 }
190 Err(_err) => {
191 self.export_log_message_sent.store(false, Ordering::Relaxed);
195 }
196 }
197 }
198 }
199 }
200 }
201 Err(mpsc::TrySendError::Full(_)) => {
202 if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
205 otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
206 message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
207 }
208 }
209 Err(mpsc::TrySendError::Disconnected(_)) => {
210 otel_warn!(
213 name: "BatchLogProcessor.Emit.AfterShutdown",
214 message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
215 );
216 }
217 }
218 }
219
220 fn force_flush(&self) -> OTelSdkResult {
221 let (sender, receiver) = mpsc::sync_channel(1);
222 match self
223 .message_sender
224 .try_send(BatchMessage::ForceFlush(sender))
225 {
226 Ok(_) => receiver
227 .recv_timeout(self.forceflush_timeout)
228 .map_err(|err| {
229 if err == RecvTimeoutError::Timeout {
230 OTelSdkError::Timeout(self.forceflush_timeout)
231 } else {
232 OTelSdkError::InternalFailure(format!("{}", err))
233 }
234 })?,
235 Err(mpsc::TrySendError::Full(_)) => {
236 otel_debug!(
238 name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
239 message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
240 );
241 Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
242 }
243 Err(mpsc::TrySendError::Disconnected(_)) => {
244 otel_debug!(
247 name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
248 message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
249 );
250
251 Err(OTelSdkError::AlreadyShutdown)
252 }
253 }
254 }
255
256 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
257 let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
258 let max_queue_size = self.max_queue_size;
259 if dropped_logs > 0 {
260 otel_warn!(
261 name: "BatchLogProcessor.LogsDropped",
262 dropped_logs_count = dropped_logs,
263 max_queue_size = max_queue_size,
264 message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
265 );
266 }
267
268 let (sender, receiver) = mpsc::sync_channel(1);
269 match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
270 Ok(_) => {
271 receiver
272 .recv_timeout(timeout)
273 .map(|_| {
274 if let Some(handle) = self.handle.lock().unwrap().take() {
277 handle.join().unwrap();
278 }
279 OTelSdkResult::Ok(())
280 })
281 .map_err(|err| match err {
282 RecvTimeoutError::Timeout => {
283 otel_error!(
284 name: "BatchLogProcessor.Shutdown.Timeout",
285 message = "BatchLogProcessor shutdown timing out."
286 );
287 OTelSdkError::Timeout(timeout)
288 }
289 _ => {
290 otel_error!(
291 name: "BatchLogProcessor.Shutdown.Error",
292 error = format!("{}", err)
293 );
294 OTelSdkError::InternalFailure(format!("{}", err))
295 }
296 })?
297 }
298 Err(mpsc::TrySendError::Full(_)) => {
299 otel_debug!(
301 name: "BatchLogProcessor.Shutdown.ControlChannelFull",
302 message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
303 );
304 Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
305 }
306 Err(mpsc::TrySendError::Disconnected(_)) => {
307 otel_debug!(
310 name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
311 message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
312 );
313
314 Err(OTelSdkError::AlreadyShutdown)
315 }
316 }
317 }
318
319 fn set_resource(&mut self, resource: &Resource) {
320 let resource = Arc::new(resource.clone());
321 let _ = self
322 .message_sender
323 .try_send(BatchMessage::SetResource(resource));
324 }
325}
326
327impl BatchLogProcessor {
328 pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
329 where
330 E: LogExporter + Send + Sync + 'static,
331 {
332 let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
333 let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
335 let max_export_batch_size = config.max_export_batch_size;
336 let current_batch_size = Arc::new(AtomicUsize::new(0));
337 let current_batch_size_for_thread = current_batch_size.clone();
338
339 let handle = thread::Builder::new()
340 .name("OpenTelemetry.Logs.BatchProcessor".to_string())
341 .spawn(move || {
342 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
343 otel_debug!(
344 name: "BatchLogProcessor.ThreadStarted",
345 interval_in_millisecs = config.scheduled_delay.as_millis(),
346 max_export_batch_size = config.max_export_batch_size,
347 max_queue_size = max_queue_size,
348 );
349 let mut last_export_time = Instant::now();
350 let mut logs = Vec::with_capacity(config.max_export_batch_size);
351 let current_batch_size = current_batch_size_for_thread;
352
353 #[inline]
357 fn get_logs_and_export<E>(
358 logs_receiver: &mpsc::Receiver<LogsData>,
359 exporter: &E,
360 logs: &mut Vec<LogsData>,
361 last_export_time: &mut Instant,
362 current_batch_size: &AtomicUsize,
363 config: &BatchConfig,
364 ) -> OTelSdkResult
365 where
366 E: LogExporter + Send + Sync + 'static,
367 {
368 let target = current_batch_size.load(Ordering::Relaxed); let mut result = OTelSdkResult::Ok(());
370 let mut total_exported_logs: usize = 0;
371
372 while target > 0 && total_exported_logs < target {
373 while let Ok(log) = logs_receiver.try_recv() {
375 logs.push(log);
376 if logs.len() == config.max_export_batch_size {
377 break;
378 }
379 }
380
381 let count_of_logs = logs.len(); total_exported_logs += count_of_logs;
383
384 result = export_batch_sync(exporter, logs, last_export_time); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
387 }
388 result
389 }
390
391 loop {
392 let remaining_time = config
393 .scheduled_delay
394 .checked_sub(last_export_time.elapsed())
395 .unwrap_or(config.scheduled_delay);
396
397 match message_receiver.recv_timeout(remaining_time) {
398 Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
399 export_log_message_sent.store(false, Ordering::Relaxed);
401
402 otel_debug!(
403 name: "BatchLogProcessor.ExportingDueToBatchSize",
404 );
405
406 let _ = get_logs_and_export(
407 &logs_receiver,
408 &exporter,
409 &mut logs,
410 &mut last_export_time,
411 ¤t_batch_size,
412 &config,
413 );
414 }
415 Ok(BatchMessage::ForceFlush(sender)) => {
416 otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
417 let result = get_logs_and_export(
418 &logs_receiver,
419 &exporter,
420 &mut logs,
421 &mut last_export_time,
422 ¤t_batch_size,
423 &config,
424 );
425 let _ = sender.send(result);
426 }
427 Ok(BatchMessage::Shutdown(sender)) => {
428 otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
429 let result = get_logs_and_export(
430 &logs_receiver,
431 &exporter,
432 &mut logs,
433 &mut last_export_time,
434 ¤t_batch_size,
435 &config,
436 );
437 let _ = exporter.shutdown();
438 let _ = sender.send(result);
439
440 otel_debug!(
441 name: "BatchLogProcessor.ThreadExiting",
442 reason = "ShutdownRequested"
443 );
444 break;
448 }
449 Ok(BatchMessage::SetResource(resource)) => {
450 exporter.set_resource(&resource);
451 }
452 Err(RecvTimeoutError::Timeout) => {
453 otel_debug!(
454 name: "BatchLogProcessor.ExportingDueToTimer",
455 );
456
457 let _ = get_logs_and_export(
458 &logs_receiver,
459 &exporter,
460 &mut logs,
461 &mut last_export_time,
462 ¤t_batch_size,
463 &config,
464 );
465 }
466 Err(RecvTimeoutError::Disconnected) => {
467 otel_debug!(
470 name: "BatchLogProcessor.ThreadExiting",
471 reason = "MessageSenderDisconnected"
472 );
473 break;
474 }
475 }
476 }
477 otel_debug!(
478 name: "BatchLogProcessor.ThreadStopped"
479 );
480 })
481 .expect("Thread spawn failed."); BatchLogProcessor {
485 logs_sender,
486 message_sender,
487 handle: Mutex::new(Some(handle)),
488 forceflush_timeout: Duration::from_secs(5), dropped_logs_count: AtomicUsize::new(0),
490 max_queue_size,
491 export_log_message_sent: Arc::new(AtomicBool::new(false)),
492 current_batch_size,
493 max_export_batch_size,
494 }
495 }
496
497 pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
499 where
500 E: LogExporter,
501 {
502 BatchLogProcessorBuilder {
503 exporter,
504 config: Default::default(),
505 }
506 }
507}
508
509#[allow(clippy::vec_box)]
510fn export_batch_sync<E>(
511 exporter: &E,
512 batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
513 last_export_time: &mut Instant,
514) -> OTelSdkResult
515where
516 E: LogExporter + ?Sized,
517{
518 *last_export_time = Instant::now();
519
520 if batch.is_empty() {
521 return OTelSdkResult::Ok(());
522 }
523
524 let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
525 let export_result = futures_executor::block_on(export);
526
527 batch.clear();
529
530 match export_result {
531 Ok(_) => OTelSdkResult::Ok(()),
532 Err(err) => {
533 otel_error!(
534 name: "BatchLogProcessor.ExportError",
535 error = format!("{}", err)
536 );
537 OTelSdkResult::Err(err)
538 }
539 }
540}
541
542#[derive(Debug)]
546pub struct BatchLogProcessorBuilder<E> {
547 exporter: E,
548 config: BatchConfig,
549}
550
551impl<E> BatchLogProcessorBuilder<E>
552where
553 E: LogExporter + 'static,
554{
555 pub fn with_batch_config(self, config: BatchConfig) -> Self {
557 BatchLogProcessorBuilder { config, ..self }
558 }
559
560 pub fn build(self) -> BatchLogProcessor {
562 BatchLogProcessor::new(self.exporter, self.config)
563 }
564}
565
566#[derive(Debug)]
569#[allow(dead_code)]
570pub struct BatchConfig {
571 pub(crate) max_queue_size: usize,
574
575 pub(crate) scheduled_delay: Duration,
578
579 pub(crate) max_export_batch_size: usize,
584
585 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
587 pub(crate) max_export_timeout: Duration,
588}
589
590impl Default for BatchConfig {
591 fn default() -> Self {
592 BatchConfigBuilder::default().build()
593 }
594}
595
596#[derive(Debug)]
598pub struct BatchConfigBuilder {
599 max_queue_size: usize,
600 scheduled_delay: Duration,
601 max_export_batch_size: usize,
602 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
603 max_export_timeout: Duration,
604}
605
606impl Default for BatchConfigBuilder {
607 fn default() -> Self {
617 BatchConfigBuilder {
618 max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
619 scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
620 max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
621 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
622 max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
623 }
624 .init_from_env_vars()
625 }
626}
627
628impl BatchConfigBuilder {
629 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
638 self.max_queue_size = max_queue_size;
639 self
640 }
641
642 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
650 self.scheduled_delay = scheduled_delay;
651 self
652 }
653
654 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
662 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
663 self.max_export_timeout = max_export_timeout;
664 self
665 }
666
667 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
677 self.max_export_batch_size = max_export_batch_size;
678 self
679 }
680
681 pub fn build(self) -> BatchConfig {
684 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
687
688 BatchConfig {
689 max_queue_size: self.max_queue_size,
690 scheduled_delay: self.scheduled_delay,
691 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
692 max_export_timeout: self.max_export_timeout,
693 max_export_batch_size,
694 }
695 }
696
697 fn init_from_env_vars(mut self) -> Self {
698 if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
699 .ok()
700 .and_then(|queue_size| usize::from_str(&queue_size).ok())
701 {
702 self.max_queue_size = max_queue_size;
703 }
704
705 if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
706 .ok()
707 .and_then(|batch_size| usize::from_str(&batch_size).ok())
708 {
709 self.max_export_batch_size = max_export_batch_size;
710 }
711
712 if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
713 .ok()
714 .and_then(|delay| u64::from_str(&delay).ok())
715 {
716 self.scheduled_delay = Duration::from_millis(scheduled_delay);
717 }
718
719 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
720 if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
721 .ok()
722 .and_then(|s| u64::from_str(&s).ok())
723 {
724 self.max_export_timeout = Duration::from_millis(max_export_timeout);
725 }
726
727 self
728 }
729}
730
731#[cfg(all(test, feature = "testing", feature = "logs"))]
732mod tests {
733 use super::{
734 BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
735 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
736 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
737 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
738 };
739 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
740 use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
741 use crate::logs::log_processor::tests::MockLogExporter;
742 use crate::logs::SdkLogRecord;
743 use crate::{
744 logs::{InMemoryLogExporter, InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider},
745 Resource,
746 };
747 use opentelemetry::InstrumentationScope;
748 use opentelemetry::KeyValue;
749 use std::sync::{Arc, Mutex};
750 use std::time::Duration;
751
752 #[test]
753 fn test_default_const_values() {
754 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
755 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000);
756 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
757 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
758 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
759 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000);
760 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
761 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
762 assert_eq!(
763 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
764 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
765 );
766 assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
767 }
768
769 #[test]
770 fn test_default_batch_config_adheres_to_specification() {
771 let env_vars = vec![
773 OTEL_BLRP_SCHEDULE_DELAY,
774 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
775 OTEL_BLRP_EXPORT_TIMEOUT,
776 OTEL_BLRP_MAX_QUEUE_SIZE,
777 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
778 ];
779
780 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
781
782 assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
783 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
784 assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
785 assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
786 assert_eq!(
787 config.max_export_batch_size,
788 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
789 );
790 }
791
792 #[test]
793 fn test_code_based_config_overrides_env_vars() {
794 let env_vars = vec![
795 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
796 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
797 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
798 ];
799
800 temp_env::with_vars(env_vars, || {
801 let config = BatchConfigBuilder::default()
802 .with_max_queue_size(2048)
803 .with_scheduled_delay(Duration::from_millis(1000))
804 .with_max_export_batch_size(512)
805 .build();
806
807 assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
808 assert_eq!(config.max_queue_size, 2048);
809 assert_eq!(config.max_export_batch_size, 512);
810 });
811 }
812
813 #[test]
814 fn test_batch_config_configurable_by_env_vars() {
815 let env_vars = vec![
816 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
817 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
818 (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
819 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
820 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
821 ];
822
823 let config = temp_env::with_vars(env_vars, BatchConfig::default);
824
825 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
826 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
827 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
828 assert_eq!(config.max_queue_size, 4096);
829 assert_eq!(config.max_export_batch_size, 1024);
830 }
831
832 #[test]
833 fn test_batch_config_max_export_batch_size_validation() {
834 let env_vars = vec![
835 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
836 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
837 ];
838
839 let config = temp_env::with_vars(env_vars, BatchConfig::default);
840
841 assert_eq!(config.max_queue_size, 256);
842 assert_eq!(config.max_export_batch_size, 256);
843 assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
844 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
845 assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
846 }
847
848 #[test]
849 fn test_batch_config_with_fields() {
850 let batch_builder = BatchConfigBuilder::default()
851 .with_max_export_batch_size(1)
852 .with_scheduled_delay(Duration::from_millis(2))
853 .with_max_queue_size(4);
854
855 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
856 let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
857 let batch = batch_builder.build();
858
859 assert_eq!(batch.max_export_batch_size, 1);
860 assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
861 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
862 assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
863 assert_eq!(batch.max_queue_size, 4);
864 }
865
866 #[test]
867 fn test_build_batch_log_processor_builder() {
868 let mut env_vars = vec![
869 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
870 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
871 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
872 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
873 ];
874 temp_env::with_vars(env_vars.clone(), || {
875 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
876
877 assert_eq!(builder.config.max_export_batch_size, 500);
878 assert_eq!(
879 builder.config.scheduled_delay,
880 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT
881 );
882 assert_eq!(
883 builder.config.max_queue_size,
884 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
885 );
886
887 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
888 assert_eq!(
889 builder.config.max_export_timeout,
890 Duration::from_millis(2046)
891 );
892 });
893
894 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
895
896 temp_env::with_vars(env_vars, || {
897 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
898 assert_eq!(builder.config.max_export_batch_size, 120);
899 assert_eq!(builder.config.max_queue_size, 120);
900 });
901 }
902
903 #[test]
904 fn test_build_batch_log_processor_builder_with_custom_config() {
905 let expected = BatchConfigBuilder::default()
906 .with_max_export_batch_size(1)
907 .with_scheduled_delay(Duration::from_millis(2))
908 .with_max_queue_size(4)
909 .build();
910
911 let builder =
912 BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
913
914 let actual = &builder.config;
915 assert_eq!(actual.max_export_batch_size, 1);
916 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
917 assert_eq!(actual.max_queue_size, 4);
918 }
919
920 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
921 async fn test_set_resource_batch_processor() {
922 let exporter = MockLogExporter {
923 resource: Arc::new(Mutex::new(None)),
924 };
925 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
926 let provider = SdkLoggerProvider::builder()
927 .with_log_processor(processor)
928 .with_resource(
929 Resource::builder_empty()
930 .with_attributes([
931 KeyValue::new("k1", "v1"),
932 KeyValue::new("k2", "v3"),
933 KeyValue::new("k3", "v3"),
934 KeyValue::new("k4", "v4"),
935 KeyValue::new("k5", "v5"),
936 ])
937 .build(),
938 )
939 .build();
940
941 provider.force_flush().unwrap();
942
943 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
944 let _ = provider.shutdown();
945 }
946
947 #[tokio::test(flavor = "multi_thread")]
948 async fn test_batch_shutdown() {
949 let exporter = InMemoryLogExporterBuilder::default()
952 .keep_records_on_shutdown()
953 .build();
954 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
955
956 let mut record = SdkLogRecord::new();
957 let instrumentation = InstrumentationScope::default();
958
959 processor.emit(&mut record, &instrumentation);
960 processor.force_flush().unwrap();
961 processor.shutdown().unwrap();
962 processor.emit(&mut record, &instrumentation);
964 assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
965 assert!(exporter.is_shutdown_called());
966 }
967
968 #[tokio::test(flavor = "current_thread")]
969 async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
970 let exporter = InMemoryLogExporterBuilder::default().build();
971 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
972
973 processor.shutdown().unwrap();
974 }
975
976 #[tokio::test(flavor = "current_thread")]
977 async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
978 let exporter = InMemoryLogExporterBuilder::default().build();
979 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
980 processor.shutdown().unwrap();
981 }
982
983 #[tokio::test(flavor = "multi_thread")]
984 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
985 let exporter = InMemoryLogExporterBuilder::default().build();
986 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
987 processor.shutdown().unwrap();
988 }
989
990 #[tokio::test(flavor = "multi_thread")]
991 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
992 let exporter = InMemoryLogExporterBuilder::default().build();
993 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
994 processor.shutdown().unwrap();
995 }
996}